From 7cdc74bbe5e0e9e0ea3cbdd8b4264ca7f16b49b2 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Mon, 18 May 2026 14:21:56 +0800 Subject: [PATCH] Refactor: Drop the vector fetch for ES (#14970) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Stop pulling chunk vectors (`q_*_vec`) back from Elasticsearch in the main retrieval path. ES already knows them; shipping them was pure bandwidth/memory overhead. - Recover the per-chunk cosine similarity via a second KNN-only ES call filtered by the candidate chunk ids. The new `_score` is merged with locally computed term similarity using the user-configured `vector_similarity_weight`. - Lazily fetch the chunk embedding only for the chunks `insert_citations` actually needs. ## Details **`rag/nlp/search.py`** - `Dealer.search`: no longer appends `q_*_vec` to the ES select list. OceanBase still gets it (its rerank path is unchanged). - New `Dealer._knn_scores(sres, idx_names, kb_ids)`: a `MatchDenseExpr` over the cached query vector filtered by `id IN sres.ids`, returning `{chunk_id: cosine_score}` via ES `_score`. - New `Dealer.rerank_with_knn(...)`: term similarity from `qryr.token_similarity` plus the ES-supplied KNN score, combined with `tkweight`/`vtweight` and the existing rank-feature bonus. - New `Dealer.fetch_chunk_vectors(chunk_ids, tenant_ids, kb_ids, dim)`: on-demand vector fetch for citation use. - `Dealer.retrieval` routes Infinity → unchanged, OceanBase → existing local `rerank`, ES → new KNN-score path. **`common/doc_store/es_conn_base.py`** - New `get_scores(res)` helper returning `{_id: _score}` directly from hit headers (ES doesn't surface `_score` through `get_fields`). **`api/db/services/dialog_service.py`** - New top-level `_hydrate_chunk_vectors(...)` helper. On ES it back-fills `ck["vector"]` from `fetch_chunk_vectors` right before `insert_citations`. No-op on Infinity / OB (their chunks already carry vectors). - Both `decorate_answer` closures became `async` and are `await`-ed at all call sites in `async_chat` and `async_ask`. ## Backend behavior | Backend | Returns chunk vec in main search | Sim source | Vectors for citations | |---|---|---|---| | ES | No | second KNN call (`_score`) merged with term sim | fetched on demand | | Infinity | No (unchanged) | normalized `_score` | already on chunks | | OceanBase | Yes (kept) | local hybrid rerank | already on chunks | ## Test plan --- api/db/services/dialog_service.py | 63 +++++++++++-- common/doc_store/es_conn_base.py | 15 ++++ rag/nlp/search.py | 142 +++++++++++++++++++++++++++++- rag/utils/es_conn.py | 3 +- 4 files changed, 212 insertions(+), 11 deletions(-) diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index 07dcd14b5e..f7a5befc3f 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -65,6 +65,53 @@ def _chunk_kb_id_for_doc(row_dict, kb_ids, doc_id): return kb_ids[0] return row_dict.get("kb_id") or row_dict.get("kb_id_kwd") + +async def _hydrate_chunk_vectors(retriever, chunks, tenant_ids, kb_ids): + """ + Citation prep: on the ES backend the main retrieval call deliberately + skips fetching the chunk embedding. insert_citations needs it, so we + pull the vectors for just the candidate chunks right before computing + answer-vs-chunk similarity. Chunks without an ES chunk_id (e.g. web + search results) keep whatever placeholder they were given. Other + backends still carry vectors in the chunk, so we skip the round-trip. + """ + if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE: + return + if not chunks: + return + dim = 0 + for ck in chunks: + v = ck.get("vector") + if isinstance(v, list) and v: + dim = len(v) + break + if not dim: + return + # Skip chunks that already have a non-zero vector (e.g. parent chunks + # produced by retrieval_by_children copy the child vector inline). + chunk_ids = [] + for ck in chunks: + cid = ck.get("chunk_id") + if not cid: + continue + v = ck.get("vector") or [] + if any(x for x in v): + continue + chunk_ids.append(cid) + if not chunk_ids: + return + try: + vectors = await retriever.fetch_chunk_vectors(chunk_ids, tenant_ids, kb_ids, dim) + except Exception as e: # noqa: BLE001 - degrade gracefully on hydrate failure + logger.warning("fetch_chunk_vectors failed; citations will use placeholders: %s", e) + return + if not vectors: + return + for ck in chunks: + cid = ck.get("chunk_id") + if cid and cid in vectors: + ck["vector"] = vectors[cid] + def _normalize_internet_flag(value): if isinstance(value, bool): return value @@ -735,7 +782,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): if "max_tokens" in gen_conf: gen_conf["max_tokens"] = min(gen_conf["max_tokens"], max_tokens - used_token_count) - def decorate_answer(answer): + async def decorate_answer(answer): nonlocal embd_mdl, prompt_config, knowledges, kwargs, kbinfos, prompt, retrieval_ts, questions, langfuse_tracer refs = [] @@ -749,6 +796,9 @@ async def async_chat(dialog, messages, stream=True, **kwargs): idx = set([]) normalized_answer = normalize_arabic_digits(answer) or "" if embd_mdl and not CITATION_MARKER_PATTERN.search(normalized_answer): + # Main retrieval no longer ships chunk vectors back from ES. + # Pull them on demand for the chunks we are about to cite. + await _hydrate_chunk_vectors(retriever, kbinfos.get("chunks", []), tenant_ids, dialog.kb_ids) answer, idx = retriever.insert_citations( answer, [ck["content_ltks"] for ck in kbinfos["chunks"]], @@ -841,7 +891,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): yield {"answer": value, "reference": {}, "audio_binary": tts(tts_mdl, value), "final": False} full_answer = last_state.full_text if last_state else "" if full_answer: - final = decorate_answer(_extract_visible_answer(thought + full_answer)) + final = await decorate_answer(_extract_visible_answer(thought + full_answer)) final["final"] = True final["audio_binary"] = None yield final @@ -852,7 +902,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): answer = await chat_mdl.async_chat(prompt + prompt4citation, msg[1:], gen_conf, images=image_files) user_content = msg[-1].get("content", "[content not available]") logging.debug("User: {}|Assistant: {}".format(user_content, answer)) - res = decorate_answer(answer) + res = await decorate_answer(answer) res["audio_binary"] = tts(tts_mdl, answer) yield res @@ -1542,8 +1592,11 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf msg = [{"role": "user", "content": question}] - def decorate_answer(answer): + async def decorate_answer(answer): nonlocal knowledges, kbinfos, sys_prompt + # Main retrieval no longer ships chunk vectors back from ES. Pull + # them on demand for the chunks we are about to cite. + await _hydrate_chunk_vectors(retriever, kbinfos.get("chunks", []), tenant_ids, kb_ids) answer, idx = retriever.insert_citations(answer, [ck["content_ltks"] for ck in kbinfos["chunks"]], [ck["vector"] for ck in kbinfos["chunks"]], embd_mdl, tkweight=0.7, vtweight=0.3) idx = set([kbinfos["chunks"][int(i)]["doc_id"] for i in idx]) recall_docs = [d for d in kbinfos["doc_aggs"] if d["doc_id"] in idx] @@ -1570,7 +1623,7 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf continue yield {"answer": value, "reference": {}, "final": False} full_answer = last_state.full_text if last_state else "" - final = decorate_answer(_extract_visible_answer(full_answer)) + final = await decorate_answer(_extract_visible_answer(full_answer)) final["final"] = True yield final diff --git a/common/doc_store/es_conn_base.py b/common/doc_store/es_conn_base.py index 88615649f5..daa5f17770 100644 --- a/common/doc_store/es_conn_base.py +++ b/common/doc_store/es_conn_base.py @@ -302,6 +302,21 @@ class ESConnectionBase(DocStoreConnection): def get_doc_ids(self, res): return [d["_id"] for d in res["hits"]["hits"]] + def get_scores(self, res) -> dict[str, float]: + """ + Map hit `_id` to its raw `_score`. Used to recover the cosine + similarity returned by a KNN-only search without pulling the + chunk vectors out of the index. + """ + out = {} + for d in res.get("hits", {}).get("hits", []): + doc_id = d.get("_id") + if doc_id is None: + continue + score = d.get("_score") + out[doc_id] = float(score) if score is not None else 0.0 + return out + def _get_source(self, res): rr = [] for d in res["hits"]["hits"]: diff --git a/rag/nlp/search.py b/rag/nlp/search.py index 87c1c6682a..980dba04d9 100644 --- a/rag/nlp/search.py +++ b/rag/nlp/search.py @@ -180,7 +180,13 @@ class Dealer: else: matchDense = await self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1)) q_vec = matchDense.embedding_data - if not settings.DOC_ENGINE_INFINITY: + # ES path no longer fetches chunk vectors here. The clean + # cosine score is recovered later via a second KNN-only call + # in retrieval(); chunk vectors are fetched on demand for + # citations (see Dealer.fetch_chunk_vectors). OceanBase + # still relies on local rerank against chunk vectors, so + # keep pulling them for that backend. + if settings.DOC_ENGINE_OCEANBASE: src.append(f"q_{len(q_vec)}_vec") fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05,0.95"}) @@ -358,6 +364,113 @@ class Dealer: rank_fea.append(nor / np.sqrt(denor) / q_denor) return np.array(rank_fea) * 10. + pageranks + async def _knn_scores(self, sres: "Dealer.SearchResult", + idx_names: str | list[str], + kb_ids: list[str]) -> dict[str, float]: + """ + Second-pass ES call that returns the cosine similarity between the + query embedding and each candidate chunk's embedding, filtered to the + chunk ids the original search already surfaced. We rely on ES to do + the vector math so the chunk vectors never leave the engine. + """ + if not sres.ids or not sres.query_vector: + return {} + dim = len(sres.query_vector) + matchDense = MatchDenseExpr( + f"q_{dim}_vec", + sres.query_vector, + "float", + "cosine", + len(sres.ids), + {"similarity": 0.0}, + ) + condition = {"id": list(sres.ids)} + res = await thread_pool_exec( + self.dataStore.search, + [], # no _source fields needed; we only want _id and _score + [], + condition, + [matchDense], + OrderByExpr(), + 0, + len(sres.ids), + idx_names, + kb_ids, + ) + return self.dataStore.get_scores(res) + + async def fetch_chunk_vectors(self, chunk_ids: list[str], + tenant_ids: str | list[str], + kb_ids: list[str], + dim: int) -> dict[str, list[float]]: + """ + Citation-time helper: fetch only the embedding vectors for an + explicit set of chunk ids. Used by callers that need to compute + answer-vs-chunk similarity locally (e.g. insert_citations) so the + main retrieval path can keep skipping vector transport. + """ + if not chunk_ids: + return {} + if isinstance(tenant_ids, str): + idx_names = [index_name(tid) for tid in tenant_ids.split(",")] + else: + idx_names = [index_name(tid) for tid in tenant_ids] + vec_field = f"q_{dim}_vec" + res = await thread_pool_exec( + self.dataStore.search, + [vec_field], + [], + {"id": list(chunk_ids)}, + [], + OrderByExpr(), + 0, + len(chunk_ids), + idx_names, + kb_ids, + ) + fields = self.dataStore.get_fields(res, [vec_field]) + out: dict[str, list[float]] = {} + zero = [0.0] * dim + for cid, doc in fields.items(): + v = doc.get(vec_field) + if isinstance(v, str): + v = [get_float(x) for x in v.split("\t")] + if not isinstance(v, list) or len(v) != dim: + v = zero + out[cid] = v + return out + + def rerank_with_knn(self, sres, query, knn_scores: dict[str, float], + tkweight=0.3, vtweight=0.7, + cfield="content_ltks", + rank_feature: dict | None = None): + """ + Merge ES-side KNN cosine similarity with locally computed term + similarity using the user-configured weights. Replaces the older + local-only rerank() for the ES path, which depended on shipping + chunk vectors back to the application. + """ + _, keywords = self.qryr.question(query) + + for i in sres.ids: + if isinstance(sres.field[i].get("important_kwd", []), str): + sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]] + ins_tw = [] + for i in sres.ids: + content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split())) + title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t] + question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t] + important_kwd = sres.field[i].get("important_kwd", []) + tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6 + ins_tw.append(tks) + + tksim = np.array(self.qryr.token_similarity(keywords, ins_tw), dtype=np.float64) + vtsim = np.array([knn_scores.get(chunk_id, 0.0) for chunk_id in sres.ids], + dtype=np.float64) + rank_fea = self._rank_feature_scores(rank_feature, sres) + sim = tkweight * tksim + vtweight * vtsim + rank_fea + return sim, tksim, vtsim + def rerank(self, sres, query, tkweight=0.3, vtweight=0.7, cfield="content_ltks", rank_feature: dict | None = None @@ -491,7 +604,8 @@ class Dealer: if isinstance(tenant_ids, str): tenant_ids = tenant_ids.split(",") - sres = await self.search(req, [index_name(tid) for tid in tenant_ids], kb_ids, embd_mdl, highlight, + idx_names = [index_name(tid) for tid in tenant_ids] + sres = await self.search(req, idx_names, kb_ids, embd_mdl, highlight, rank_feature=rank_feature) # Temporary retrieval-side guard: prune chunks whose parent document no # longer exists before reranking and returning results. @@ -516,8 +630,9 @@ class Dealer: sim = [s if s is not None else 0.0 for s in sim] tsim = sim vsim = sim - else: - # ElasticSearch doesn't normalize each way score before fusion. + elif settings.DOC_ENGINE_OCEANBASE: + # OceanBase still returns chunk vectors in the result; use + # the historical local rerank that depends on them. sim, tsim, vsim = self.rerank( sres, question, @@ -525,6 +640,20 @@ class Dealer: vector_similarity_weight, rank_feature=rank_feature, ) + else: + # ES path: ask ES for the clean cosine score via a second + # KNN-only call filtered by the candidate ids, then merge it + # with locally computed term similarity using the user's + # weight. Chunk vectors stay in the index. + knn_scores = await self._knn_scores(sres, idx_names, kb_ids) + sim, tsim, vsim = self.rerank_with_knn( + sres, + question, + knn_scores, + 1 - vector_similarity_weight, + vector_similarity_weight, + rank_feature=rank_feature, + ) sim_np = np.array(sim, dtype=np.float64) if sim_np.size == 0: @@ -559,6 +688,11 @@ class Dealer: did = chunk.get("doc_id", "") position_int = chunk.get("position_int", []) + # Chunk vectors are no longer fetched during the main retrieval + # call. Fall back to whatever the chunk happens to carry (Infinity + # path) and otherwise emit a zero placeholder so the downstream + # shape stays stable. Citation callers refill this via + # Dealer.fetch_chunk_vectors when needed. d = { "chunk_id": id, "content_ltks": chunk["content_ltks"], diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 1c80515d68..eed2e67c27 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -69,8 +69,7 @@ class ESConnection(ESConnectionBase): index=index_names, body=query, timeout="600s", - track_total_hits=track_total_hits, - _source=True, + track_total_hits=track_total_hits ) def _search_with_search_after(self, index_names: list[str], query: dict, offset: int, limit: int):