3 세대 진화 — Elasticsearch → Sia → OpenSearch
1세대(~2019): Elasticsearch를 SRE 마인드로 운영, 6개월마다 클러스터 2배 증설. 2세대(2019~2024): 자체 엔진 Sia — Lucene Core + BSL 람다 + gRPC + Kafka pull. 3세대(2024+): Project Sunrise로 OpenSearch 이전. OpenSearch Foundation 창립 멤버.
"Elasticsearch를 블랙박스로 쓰던 시절부터, 자체 엔진 Sia를 만들었다가, 다시 OpenSearch 커뮤니티로 돌아온 10년의 궤적 — 그리고 그 사이에 쌓인 람다 아키텍처 · Pull-based 인제스천 · Vector 파이프라인."
Uber search platform은 10+년 동안 세 세대를 거쳤다 — Elasticsearch(블랙박스 운영) → Sia(custom Lucene + BSL 람다 + Kafka pull) → OpenSearch(Project Sunrise, 2024+). 자체 엔진을 만든 결정과 다시 포기한 결정 모두 분명한 trigger가 있었고, 그 사이에 BSL 람다 아키텍처 · Pull-based Kafka 인제스천 · Vector 파이프라인 · gRPC 같은 재사용 가능한 패턴이 축적됐다.
1세대(~2019): Elasticsearch를 SRE 마인드로 운영, 6개월마다 클러스터 2배 증설. 2세대(2019~2024): 자체 엔진 Sia — Lucene Core + BSL 람다 + gRPC + Kafka pull. 3세대(2024+): Project Sunrise로 OpenSearch 이전. OpenSearch Foundation 창립 멤버.
AI/ML 가속화로 Sia 유지 비용이 OpenSearch 이전 비용을 초과한 시점이 trigger. Lucene이 vector search(HNSW)를 추가할 때마다 BSL에서 별도 재구현 필요 — 커뮤니티 진화 속도를 따라잡을 수 없었다. "오픈 생태계와 함께 혁신하는 것이 가장 지속 가능한 길"이라는 결론.
Sia의 핵심 아키텍처. Live(메모리 NRT 레이어) → Snapshot(~30분마다 flush, Lucene segment) → Base(주간 컴팩션). Elasticsearch NRT 한계를 메우면서 메모리 압박을 통제. 다만 Lucene 신규 operator마다 BSL 위에서 재구현하는 비용이 후일 Sunrise의 trigger가 됐다.
Push 모델은 트래픽 스파이크에서 backpressure를 클라이언트로 떠넘기고 우선순위 제어가 불가능. Sia/OpenSearch 모두 Kafka를 source of truth로 두고 StreamPoller가 자체 페이스로 끌어옴. translog는 no-op으로 대체. 재생·pause·offset 리셋이 모두 일급 기능.
1.5B 문서 × 400-dim. Spark bulk indexing으로 인덱스 빌드 12.5h → 2.5h (-79%). 핵심 튜닝: refresh_interval=-1, flush_threshold_size 1024MB, _source/doc_values 비활성화로 인덱스 11TB→4TB. Blue/Green으로 별도 클러스터에 새 인덱스 빌드 후 traffic 스왑.
Uber 인프라는 gRPC native이지만 OpenSearch는 REST/JSON 전용 → 번역 레이어 비용. Uber가 OpenSearch에 Search · Bulk gRPC 엔드포인트를 기여. M3 p99 write 34ms→13.6ms, Delivery vector p50 83ms→38ms. Spark bulk job runtime -20~35%.
각 세대 전환에는 분명한 trigger가 있었다. Elasticsearch → Sia는 NRT 한계(Live Index 필요)와 push 모델의 backpressure가 trigger. Sia → OpenSearch는 Lucene 커뮤니티 진화 속도(특히 vector search)를 따라잡을 수 없어서.
Sia는 "더 만들었다"가 아니라 "고립이 비용이 되는 시점"에 정확히 종결됐다. 자체 시스템을 만드는 결정만큼 그것을 버리는 결정도 분명한 trigger가 필요하다.
"이 시스템 계속 유지할 가치가 있나"를 분기마다 자문해야 한다. 매몰 비용 무시는 의외로 어렵다.
Sia의 핵심 아키텍처. Elasticsearch의 NRT가 충분히 빠르지 않아 직접 만든 3-layer. Live가 메모리에서 즉시 검색 가능한 in-flight 데이터를 들고 있고, 주기적으로 Snapshot(Lucene segment)으로 flush, 주간으로 Base 컴팩션.
결과: 메모리 압박을 통제하면서 NRT 보장. 반대 면: Lucene이 새 operator를 추가할 때마다 BSL 3개 레이어 모두에서 재구현 필요. 이 비용 누적이 Sunrise의 trigger가 됐다.
"진짜 real-time이 필요한가?"를 분리. Uber는 fulfillment 매칭만 진짜 real-time이 필요했고, 나머지는 NRT로 충분하다는 걸 뒤늦게 깨달았다.
Push 모델(Elasticsearch 기본)은 트래픽 스파이크에서 cluster가 요청을 reject → backpressure가 클라이언트로 떠넘겨진다. Pull 모델은 Kafka가 source of truth가 되고 cluster는 자체 페이스로 끌어온다.
StreamPoller가 read한 메시지를 BlockingQueue에 넣고, 워커들이 doc-id 기준 파티셔닝으로 병렬 인덱싱. BatchStartPointer 체크포인트로 commit마다 안전한 offset 저장 — 장애 복구가 일급 기능. translog는 no-op으로 대체 (Kafka가 이미 durable).
pause/resume per-index, offset 리셋으로 backlog skip, 재생까지 — 모두 인덱스 단위 API. push 모델에서는 거의 불가능했던 것들.
리전마다 자체 OpenSearch 클러스터를 두고, 모두가 같은 글로벌 Kafka 토픽을 독립적으로 consume한다. 리전별 데이터 사본이 완전 동일 — 어떤 리전에서 쿼리해도 일관된 글로벌 view.
Uber 내부의 cross-replicated Kafka 인프라가 리전별 토픽을 글로벌 aggregated 토픽으로 자동 미러링. 장애 시 traffic을 다른 리전으로 즉시 fail-over 가능 — pull 모델의 결정적 장점.
Push 모델에서는 cross-region replication을 cluster 레벨에서 별도 구현해야 함. Pull 모델은 "각자 같은 큐를 읽으면 끝" — 복잡도 한 단계 축소.
1.5B 문서 × 400-dim 임베딩의 batch 인덱싱.
Spark + OpenSearch bulk API로 인덱스 빌드 12.5h → 2.5h (-79%).
핵심 튜닝: refresh_interval=-1, flush_threshold_size 518→1024MB,
_source/doc_values 비활성화로 인덱스 11TB → 4TB.
배포는 항상 blue/green — 새 인덱스를 별도 클러스터에 통째로 빌드한 뒤 traffic을 한 번에 스왑. in-flight 쿼리에 0 downtime, 롤백도 alias 한 줄.
HNSW 그래프는 메모리 잔류가 필수 — circuit breaker 트리거되면 query time이 ms → 수십초로 폭주. knn.memory.circuit_breaker.limit 모니터링 필수.
Sia/OpenSearch 모두 read와 write 경로를 물리적으로 분리한다. Search Gateway가 OpenSearch-호환 API를 노출하고, 내부 구현(LucenePlus 등)을 추상화. Ingester는 Kafka에서 끌어와 force-merge 후 원격 저장소(HDFS/GCS)에 업로드. Searcher는 그 저장소에서 인덱스를 읽어 쿼리만 처리.
이 분리 덕분에 인제스천 부하가 쿼리 latency에 절대 영향을 주지 않는다. ingester 스케일링과 searcher 스케일링도 독립적.
"write와 read가 같은 노드에 있다"는 OLTP 사고를 검색 인프라에서는 의도적으로 깨야 한다. 분리하면 캐싱·복제·재시작 전략이 모두 단순해진다.
가장 반직관적 결정 — "우리가 만들었던 Sia를 버리고 OpenSearch로 돌아갔다". 5년간 키운 시스템을 포기하는 것이 새로 만드는 것보다 어렵다. 하지만 Lucene/오픈 생태계의 진화 속도가 Uber의 fork 유지 속도를 능가하는 임계점이 왔을 때, 매몰 비용을 무시하고 결정한 것이 Project Sunrise의 본질. "가장 지속 가능한 길은 오픈 생태계와 함께 혁신하는 것".
정의. 2024년 Uber Search Platform 팀이 시작한 전략적 이니셔티브 — 자체 엔진 Sia를 OpenSearch로 단계적 이전하면서 검색 플랫폼 전체를 다시 그리는 작업.
trigger. Lucene이 vector search(HNSW)를 추가했을 때 BSL 위에서 별도 재구현 필요. 매번 Lucene 신규 operator마다 같은 비용 발생. Sia 유지 비용 > OpenSearch 마이그레이션 비용의 임계점.
핵심 결정. ① Sia의 BSL 람다 대신 OpenSearch native NRT. ② OpenSearch-compatible Search Gateway로 클라이언트 영향 최소화. ③ true real-time과 NRT 분리 — fulfillment 매칭만 진짜 real-time 필요. ④ Sia에서 검증된 패턴(Kafka pull, blue-green 등)은 OpenSearch에 기여.
철학. "가장 지속 가능한 길은 오픈 생태계와 함께 혁신하는 것". Uber는 OpenSearch Software Foundation 창립 멤버로 합류.
정의. Sia 인덱스의 3-layer 람다 아키텍처. Live(메모리 in-flight) → Snapshot(~30분마다 flush한 Lucene segment) → Base(주간 컴팩션). 각 layer가 시간 범위가 다른 데이터를 보유.
왜 필요했나. Elasticsearch의 NRT가 fulfillment 매칭에 충분히 빠르지 않았다. Live 레이어가 ms 단위 가시성을 제공하면서, Snapshot·Base가 메모리 부담을 분산.
비용. Lucene 신규 operator마다 3개 레이어 모두에서 재구현 필요. 특히 vector search(HNSW) 추가 시 BSL 안에서 HNSW를 다시 만들어야 했다. 이 누적 비용이 결국 Sunrise의 trigger.
교훈. "real-time이 진짜 필요한가?"를 분리. Uber는 대부분의 use case가 NRT로 충분하다는 걸 후에 깨달았다 — 처음부터 모두에게 real-time을 약속한 것이 BSL을 강제했다.
Uber 내부 코드는 비공개. 여기서는 글에서 묘사된 패턴들을 Python/PySpark + OpenSearch 클라이언트로 재구성한 참고 구현. 실제 배포는 Java/JVM 기반이지만 아이디어는 그대로.
import threading, queue, time from dataclasses import dataclass from kafka import KafkaConsumer @dataclass class Message: offset: int key: str payload: dict class StreamPoller: """Kafka에서 자체 페이스로 끌어와 BlockingQueue로 흘리는 worker. backpressure는 큐가 full일 때 자연 발생 — Kafka는 그대로 대기.""" def __init__(self, topic: str, shard_id: int, queue_size: int = 1000): self.consumer = KafkaConsumer(topic, group_id=f"os-shard-{shard_id}", auto_offset_reset="earliest", enable_auto_commit=False) self.queue = queue.Queue(maxsize=queue_size) self.start_pointer = 0 # BatchStartPointer self.in_flight_offsets = set() def poll_loop(self): for msg in self.consumer: # 큐가 full이면 put이 block → poller도 block # → Kafka에서 더 안 받음 (= 자연 backpressure) self.queue.put(Message(msg.offset, msg.key.decode(), msg.value)) self.in_flight_offsets.add(msg.offset) def commit(self, processed_offsets: set): # BatchStartPointer = 모든 worker가 처리 완료한 최소 offset self.in_flight_offsets -= processed_offsets new_start = (min(self.in_flight_offsets) if self.in_flight_offsets else max(processed_offsets) + 1) self.start_pointer = new_start self.consumer.commit({...: new_start})
import schedule, time from typing import List from lucene import IndexWriter, DirectoryReader class BSLIndex: """Base / Snapshot / Live 3-layer 인덱스 — Sia 패턴 재현.""" def __init__(self): self.live = InMemoryLuceneIndex() # 메모리 NRT self.snapshots: List[DirectoryReader] = [] self.base: DirectoryReader = None # ─── Layer 1: Live (always write here) ─── def index_document(self, doc: dict): self.live.add(doc) # ms 단위 가시성 # ─── Layer 2: ~30min마다 Live → Snapshot flush ─── def flush_live_to_snapshot(self): snapshot = self.live.flush_to_disk() # 표준 Lucene segment self.snapshots.append(snapshot) self.live.reset() # 메모리 해제 # ─── Layer 3: weekly snapshot들 → Base compaction ─── def build_base(self): # 누적된 snapshot들 + 기존 base를 force-merge merged = force_merge([self.base] + self.snapshots) self.base = merged self.snapshots = [] # 컴팩션 후 비움 # ─── 쿼리: 3개 layer 모두 보고 union ─── def search(self, query: str) -> list: results = [] results += self.live.search(query) # NRT for snap in self.snapshots: results += snap.search(query) # recent if self.base: results += self.base.search(query) # historical return merge_topk(results) # 스케줄러 (Sia는 cron으로 운영했을 것) schedule.every(30).minutes.do(idx.flush_live_to_snapshot) schedule.every().week.do(idx.build_base)
from pyspark.sql import SparkSession spark = (SparkSession.builder .appName("vector-bulk-ingest") .config("spark.executor.cores", 16) # 충분한 cores .config("spark.executor.instances", 100) # 병렬 폭 .config("spark.sql.shuffle.partitions", 800) .getOrCreate()) # 핵심: 인덱스 설정을 bulk-friendly로 — Uber가 강조한 값들 INDEX_SETTINGS = { "index": { "refresh_interval": "-1", # 1s → off, 요청별 refresh "translog": { "flush_threshold_size": "1024mb", # 518 → 1024 MB "sync_interval": "120s", # 매 요청 → 120s }, "merge": { "policy": { "floor_segment": "10m", # 1m → 10m "segments_per_tier": 15, # 10 → 15 "max_merge_at_once": 15, # 10 → 15 } }, "knn.algo_param.index_thread_qty": 8, # native KNN 빌드 스레드 }, "mappings": { "_source": {"enabled": False}, # 인덱스 크기 -50%+ "properties": { "id": {"type": "keyword", "doc_values": False}, "vec": { "type": "knn_vector", "dimension": 400, "method": {"name": "hnsw", "engine": "faiss"}, } } } } (spark.read.table("hive.embeddings_table") .repartition(800) .write .format("opensearch") # spark-opensearch-connector .option("opensearch.resource", "vec-v2-green") .option("opensearch.batch.size.bytes", "50mb") .option("opensearch.batch.write.refresh", "false") .mode("append") .save()) # 빌드 완료 후 명시적 force_merge → 단일 segment os_client.indices.forcemerge(index="vec-v2-green", max_num_segments=1)
from opensearchpy import OpenSearch client = OpenSearch([{"host": "prod.cluster", "port": 9200}]) ALIAS = "vec-current" # 쿼리는 항상 alias로 def deploy_green(green_index: str): """새로 빌드한 green 인덱스를 검증 후 traffic 스왑.""" # ─── Gate 1: 완전성 (doc count가 비슷한가) ─── blue = resolve_current_index(ALIAS) blue_count = client.count(index=blue)["count"] green_count = client.count(index=green_index)["count"] assert green_count >= blue_count * 0.99, \ f"green {green_count} too small vs blue {blue_count}" # ─── Gate 2: 매핑 하위 호환 (필드 추가만, 삭제·타입변경 금지) ─── assert mapping_compatible(blue, green_index), "mapping breaking change" # ─── Gate 3: recall regression (sample 쿼리 결과 비교) ─── for q in load_eval_queries(n=100): r_blue = client.search(index=blue, body=q)["hits"]["hits"] r_green = client.search(index=green_index, body=q)["hits"]["hits"] recall_at_10 = overlap([h["_id"] for h in r_blue[:10]], [h["_id"] for h in r_green[:10]]) assert recall_at_10 > 0.95, f"recall regression on {q}" # ─── 원자적 swap — 한 줄, downtime 0 ─── client.indices.update_aliases(body={ "actions": [ {"remove": {"index": blue, "alias": ALIAS}}, {"add": {"index": green_index, "alias": ALIAS}}, ] }) # 24h 후 blue 인덱스 삭제 (rollback 여유) schedule_delete(blue, after="24h")
schedule_delete(after=24h)로 즉시 삭제 안 함 — rollback 여유. 운영 핵심은 "한 번에 빠르게"보다 "검증된 안전".import grpc from opensearch_pb2 import BulkRequest, IndexAction, Document from opensearch_pb2_grpc import BulkServiceStub # OpenSearch gRPC endpoint — Uber가 contribute한 native API channel = grpc.secure_channel("opensearch.internal:9300", grpc.ssl_channel_credentials()) bulk_stub = BulkServiceStub(channel) def bulk_index(docs: list[dict], index: str): """REST/JSON 거치지 않고 Protobuf 그대로 — translation layer 0.""" actions = [] for d in docs: doc_pb = Document( id=d["id"], # vector는 Protobuf의 packed repeated float — bytes 효율 88% 절감 vector=d["vec"], metadata={k: v for k, v in d.items() if k not in ["id", "vec"]}, ) actions.append(IndexAction(index=index, document=doc_pb)) request = BulkRequest(actions=actions, refresh=False) response = bulk_stub.Bulk(request, timeout=30.0) if response.errors: for item in response.items: if item.error: log_error(item.id, item.error.message) return response.took_ms # Spark batch job에서 사용: runtime -20~35% # M3 metrics: p99 write 34.1ms → 13.6ms (-60%) # Delivery vector: p50 83ms → 38ms (-53%)
위 5개 코드는 독립적으로 도입 가능한 패턴들이다. 한꺼번에 다 적용하지 말고 가장 큰 통증부터: ① backpressure 문제면 Pull-based, ② 모델 업데이트 disruption이면 Blue-Green, ③ vector workload면 Spark bulk + 설정 튜닝, ④ gRPC native 인프라면 마지막에 gRPC endpoint. 각 단계의 ROI를 측정한 뒤 다음으로.