Refactor: Drop the vector fetch for ES (#14970)

## Summary
- Stop pulling chunk vectors (`q_*_vec`) back from Elasticsearch in the
main retrieval path. ES already knows them; shipping them was pure
bandwidth/memory overhead.
- Recover the per-chunk cosine similarity via a second KNN-only ES call
filtered by the candidate chunk ids. The new `_score` is merged with
locally computed term similarity using the user-configured
`vector_similarity_weight`.
- Lazily fetch the chunk embedding only for the chunks
`insert_citations` actually needs.

## Details
**`rag/nlp/search.py`**
- `Dealer.search`: no longer appends `q_*_vec` to the ES select list.
OceanBase still gets it (its rerank path is unchanged).
- New `Dealer._knn_scores(sres, idx_names, kb_ids)`: a `MatchDenseExpr`
over the cached query vector filtered by `id IN sres.ids`, returning
`{chunk_id: cosine_score}` via ES `_score`.
- New `Dealer.rerank_with_knn(...)`: term similarity from
`qryr.token_similarity` plus the ES-supplied KNN score, combined with
`tkweight`/`vtweight` and the existing rank-feature bonus.
- New `Dealer.fetch_chunk_vectors(chunk_ids, tenant_ids, kb_ids, dim)`:
on-demand vector fetch for citation use.
- `Dealer.retrieval` routes Infinity → unchanged, OceanBase → existing
local `rerank`, ES → new KNN-score path.

**`common/doc_store/es_conn_base.py`**
- New `get_scores(res)` helper returning `{_id: _score}` directly from
hit headers (ES doesn't surface `_score` through `get_fields`).

**`api/db/services/dialog_service.py`**
- New top-level `_hydrate_chunk_vectors(...)` helper. On ES it
back-fills `ck["vector"]` from `fetch_chunk_vectors` right before
`insert_citations`. No-op on Infinity / OB (their chunks already carry
vectors).
- Both `decorate_answer` closures became `async` and are `await`-ed at
all call sites in `async_chat` and `async_ask`.

## Backend behavior
| Backend | Returns chunk vec in main search | Sim source | Vectors for
citations |
|---|---|---|---|
| ES | No | second KNN call (`_score`) merged with term sim | fetched on
demand |
| Infinity | No (unchanged) | normalized `_score` | already on chunks |
| OceanBase | Yes (kept) | local hybrid rerank | already on chunks |

## Test plan
This commit is contained in:
Kevin Hu
2026-05-18 14:21:56 +08:00
committed by GitHub
parent 9f2fb4611f
commit 7cdc74bbe5
4 changed files with 212 additions and 11 deletions

View File

@@ -65,6 +65,53 @@ def _chunk_kb_id_for_doc(row_dict, kb_ids, doc_id):
return kb_ids[0]
return row_dict.get("kb_id") or row_dict.get("kb_id_kwd")
async def _hydrate_chunk_vectors(retriever, chunks, tenant_ids, kb_ids):
"""
Citation prep: on the ES backend the main retrieval call deliberately
skips fetching the chunk embedding. insert_citations needs it, so we
pull the vectors for just the candidate chunks right before computing
answer-vs-chunk similarity. Chunks without an ES chunk_id (e.g. web
search results) keep whatever placeholder they were given. Other
backends still carry vectors in the chunk, so we skip the round-trip.
"""
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
return
if not chunks:
return
dim = 0
for ck in chunks:
v = ck.get("vector")
if isinstance(v, list) and v:
dim = len(v)
break
if not dim:
return
# Skip chunks that already have a non-zero vector (e.g. parent chunks
# produced by retrieval_by_children copy the child vector inline).
chunk_ids = []
for ck in chunks:
cid = ck.get("chunk_id")
if not cid:
continue
v = ck.get("vector") or []
if any(x for x in v):
continue
chunk_ids.append(cid)
if not chunk_ids:
return
try:
vectors = await retriever.fetch_chunk_vectors(chunk_ids, tenant_ids, kb_ids, dim)
except Exception as e: # noqa: BLE001 - degrade gracefully on hydrate failure
logger.warning("fetch_chunk_vectors failed; citations will use placeholders: %s", e)
return
if not vectors:
return
for ck in chunks:
cid = ck.get("chunk_id")
if cid and cid in vectors:
ck["vector"] = vectors[cid]
def _normalize_internet_flag(value):
if isinstance(value, bool):
return value
@@ -735,7 +782,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
if "max_tokens" in gen_conf:
gen_conf["max_tokens"] = min(gen_conf["max_tokens"], max_tokens - used_token_count)
def decorate_answer(answer):
async def decorate_answer(answer):
nonlocal embd_mdl, prompt_config, knowledges, kwargs, kbinfos, prompt, retrieval_ts, questions, langfuse_tracer
refs = []
@@ -749,6 +796,9 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
idx = set([])
normalized_answer = normalize_arabic_digits(answer) or ""
if embd_mdl and not CITATION_MARKER_PATTERN.search(normalized_answer):
# Main retrieval no longer ships chunk vectors back from ES.
# Pull them on demand for the chunks we are about to cite.
await _hydrate_chunk_vectors(retriever, kbinfos.get("chunks", []), tenant_ids, dialog.kb_ids)
answer, idx = retriever.insert_citations(
answer,
[ck["content_ltks"] for ck in kbinfos["chunks"]],
@@ -841,7 +891,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
yield {"answer": value, "reference": {}, "audio_binary": tts(tts_mdl, value), "final": False}
full_answer = last_state.full_text if last_state else ""
if full_answer:
final = decorate_answer(_extract_visible_answer(thought + full_answer))
final = await decorate_answer(_extract_visible_answer(thought + full_answer))
final["final"] = True
final["audio_binary"] = None
yield final
@@ -852,7 +902,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
answer = await chat_mdl.async_chat(prompt + prompt4citation, msg[1:], gen_conf, images=image_files)
user_content = msg[-1].get("content", "[content not available]")
logging.debug("User: {}|Assistant: {}".format(user_content, answer))
res = decorate_answer(answer)
res = await decorate_answer(answer)
res["audio_binary"] = tts(tts_mdl, answer)
yield res
@@ -1542,8 +1592,11 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf
msg = [{"role": "user", "content": question}]
def decorate_answer(answer):
async def decorate_answer(answer):
nonlocal knowledges, kbinfos, sys_prompt
# Main retrieval no longer ships chunk vectors back from ES. Pull
# them on demand for the chunks we are about to cite.
await _hydrate_chunk_vectors(retriever, kbinfos.get("chunks", []), tenant_ids, kb_ids)
answer, idx = retriever.insert_citations(answer, [ck["content_ltks"] for ck in kbinfos["chunks"]], [ck["vector"] for ck in kbinfos["chunks"]], embd_mdl, tkweight=0.7, vtweight=0.3)
idx = set([kbinfos["chunks"][int(i)]["doc_id"] for i in idx])
recall_docs = [d for d in kbinfos["doc_aggs"] if d["doc_id"] in idx]
@@ -1570,7 +1623,7 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf
continue
yield {"answer": value, "reference": {}, "final": False}
full_answer = last_state.full_text if last_state else ""
final = decorate_answer(_extract_visible_answer(full_answer))
final = await decorate_answer(_extract_visible_answer(full_answer))
final["final"] = True
yield final

View File

@@ -302,6 +302,21 @@ class ESConnectionBase(DocStoreConnection):
def get_doc_ids(self, res):
return [d["_id"] for d in res["hits"]["hits"]]
def get_scores(self, res) -> dict[str, float]:
"""
Map hit `_id` to its raw `_score`. Used to recover the cosine
similarity returned by a KNN-only search without pulling the
chunk vectors out of the index.
"""
out = {}
for d in res.get("hits", {}).get("hits", []):
doc_id = d.get("_id")
if doc_id is None:
continue
score = d.get("_score")
out[doc_id] = float(score) if score is not None else 0.0
return out
def _get_source(self, res):
rr = []
for d in res["hits"]["hits"]:

View File

@@ -180,7 +180,13 @@ class Dealer:
else:
matchDense = await self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1))
q_vec = matchDense.embedding_data
if not settings.DOC_ENGINE_INFINITY:
# ES path no longer fetches chunk vectors here. The clean
# cosine score is recovered later via a second KNN-only call
# in retrieval(); chunk vectors are fetched on demand for
# citations (see Dealer.fetch_chunk_vectors). OceanBase
# still relies on local rerank against chunk vectors, so
# keep pulling them for that backend.
if settings.DOC_ENGINE_OCEANBASE:
src.append(f"q_{len(q_vec)}_vec")
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05,0.95"})
@@ -358,6 +364,113 @@ class Dealer:
rank_fea.append(nor / np.sqrt(denor) / q_denor)
return np.array(rank_fea) * 10. + pageranks
async def _knn_scores(self, sres: "Dealer.SearchResult",
idx_names: str | list[str],
kb_ids: list[str]) -> dict[str, float]:
"""
Second-pass ES call that returns the cosine similarity between the
query embedding and each candidate chunk's embedding, filtered to the
chunk ids the original search already surfaced. We rely on ES to do
the vector math so the chunk vectors never leave the engine.
"""
if not sres.ids or not sres.query_vector:
return {}
dim = len(sres.query_vector)
matchDense = MatchDenseExpr(
f"q_{dim}_vec",
sres.query_vector,
"float",
"cosine",
len(sres.ids),
{"similarity": 0.0},
)
condition = {"id": list(sres.ids)}
res = await thread_pool_exec(
self.dataStore.search,
[], # no _source fields needed; we only want _id and _score
[],
condition,
[matchDense],
OrderByExpr(),
0,
len(sres.ids),
idx_names,
kb_ids,
)
return self.dataStore.get_scores(res)
async def fetch_chunk_vectors(self, chunk_ids: list[str],
tenant_ids: str | list[str],
kb_ids: list[str],
dim: int) -> dict[str, list[float]]:
"""
Citation-time helper: fetch only the embedding vectors for an
explicit set of chunk ids. Used by callers that need to compute
answer-vs-chunk similarity locally (e.g. insert_citations) so the
main retrieval path can keep skipping vector transport.
"""
if not chunk_ids:
return {}
if isinstance(tenant_ids, str):
idx_names = [index_name(tid) for tid in tenant_ids.split(",")]
else:
idx_names = [index_name(tid) for tid in tenant_ids]
vec_field = f"q_{dim}_vec"
res = await thread_pool_exec(
self.dataStore.search,
[vec_field],
[],
{"id": list(chunk_ids)},
[],
OrderByExpr(),
0,
len(chunk_ids),
idx_names,
kb_ids,
)
fields = self.dataStore.get_fields(res, [vec_field])
out: dict[str, list[float]] = {}
zero = [0.0] * dim
for cid, doc in fields.items():
v = doc.get(vec_field)
if isinstance(v, str):
v = [get_float(x) for x in v.split("\t")]
if not isinstance(v, list) or len(v) != dim:
v = zero
out[cid] = v
return out
def rerank_with_knn(self, sres, query, knn_scores: dict[str, float],
tkweight=0.3, vtweight=0.7,
cfield="content_ltks",
rank_feature: dict | None = None):
"""
Merge ES-side KNN cosine similarity with locally computed term
similarity using the user-configured weights. Replaces the older
local-only rerank() for the ES path, which depended on shipping
chunk vectors back to the application.
"""
_, keywords = self.qryr.question(query)
for i in sres.ids:
if isinstance(sres.field[i].get("important_kwd", []), str):
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
ins_tw = []
for i in sres.ids:
content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split()))
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t]
important_kwd = sres.field[i].get("important_kwd", [])
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
ins_tw.append(tks)
tksim = np.array(self.qryr.token_similarity(keywords, ins_tw), dtype=np.float64)
vtsim = np.array([knn_scores.get(chunk_id, 0.0) for chunk_id in sres.ids],
dtype=np.float64)
rank_fea = self._rank_feature_scores(rank_feature, sres)
sim = tkweight * tksim + vtweight * vtsim + rank_fea
return sim, tksim, vtsim
def rerank(self, sres, query, tkweight=0.3,
vtweight=0.7, cfield="content_ltks",
rank_feature: dict | None = None
@@ -491,7 +604,8 @@ class Dealer:
if isinstance(tenant_ids, str):
tenant_ids = tenant_ids.split(",")
sres = await self.search(req, [index_name(tid) for tid in tenant_ids], kb_ids, embd_mdl, highlight,
idx_names = [index_name(tid) for tid in tenant_ids]
sres = await self.search(req, idx_names, kb_ids, embd_mdl, highlight,
rank_feature=rank_feature)
# Temporary retrieval-side guard: prune chunks whose parent document no
# longer exists before reranking and returning results.
@@ -516,8 +630,9 @@ class Dealer:
sim = [s if s is not None else 0.0 for s in sim]
tsim = sim
vsim = sim
else:
# ElasticSearch doesn't normalize each way score before fusion.
elif settings.DOC_ENGINE_OCEANBASE:
# OceanBase still returns chunk vectors in the result; use
# the historical local rerank that depends on them.
sim, tsim, vsim = self.rerank(
sres,
question,
@@ -525,6 +640,20 @@ class Dealer:
vector_similarity_weight,
rank_feature=rank_feature,
)
else:
# ES path: ask ES for the clean cosine score via a second
# KNN-only call filtered by the candidate ids, then merge it
# with locally computed term similarity using the user's
# weight. Chunk vectors stay in the index.
knn_scores = await self._knn_scores(sres, idx_names, kb_ids)
sim, tsim, vsim = self.rerank_with_knn(
sres,
question,
knn_scores,
1 - vector_similarity_weight,
vector_similarity_weight,
rank_feature=rank_feature,
)
sim_np = np.array(sim, dtype=np.float64)
if sim_np.size == 0:
@@ -559,6 +688,11 @@ class Dealer:
did = chunk.get("doc_id", "")
position_int = chunk.get("position_int", [])
# Chunk vectors are no longer fetched during the main retrieval
# call. Fall back to whatever the chunk happens to carry (Infinity
# path) and otherwise emit a zero placeholder so the downstream
# shape stays stable. Citation callers refill this via
# Dealer.fetch_chunk_vectors when needed.
d = {
"chunk_id": id,
"content_ltks": chunk["content_ltks"],

View File

@@ -69,8 +69,7 @@ class ESConnection(ESConnectionBase):
index=index_names,
body=query,
timeout="600s",
track_total_hits=track_total_hits,
_source=True,
track_total_hits=track_total_hits
)
def _search_with_search_after(self, index_names: list[str], query: dict, offset: int, limit: int):