mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
### What problem does this PR solve? Closes #15428 The hybrid score in `rag/nlp/search.py` (`rerank_by_model`) blends reranker similarity with token similarity on a fixed `[0, 1]` scale: ```python return tkweight * np.array(tksim) + vtweight * vtsim + rank_fea # tkweight=0.3, vtweight=0.7 ``` The reranker implementations did not agree on that scale. Only three of roughly 17 providers normalized their output, and `NvidiaRerank` returned raw, unbounded logits. Weighted at `0.7`, a negative logit could push a genuinely relevant chunk below pure keyword matches, and its magnitude swamped `tksim`, which lives in `[0, 1]`. The practical effect was that the same query produced differently scaled scores depending on the configured reranker, and logit based providers degraded retrieval quality instead of improving it. This PR enforces a single scoring contract in one place: - `Base.similarity` is now the only public entry point. It short-circuits empty input and guarantees a normalized result. Each provider implements its raw scoring in `_compute_rank`, which removes sixteen duplicated empty input guards and the three scattered normalization calls. - Normalization is range aware. Providers that already return calibrated `[0, 1]` relevance scores (Cohere, Jina, Voyage, and others) keep their absolute magnitudes, so `similarity_threshold` filtering and the reported `vector_similarity` stay meaningful. Only out-of-range output such as NVIDIA logits is min-max rescaled into `[0, 1]`. - The twelve leftover `[DEBUG ...]` prints in `rerank_by_model`, introduced in #14231, are removed. They ran on every retrieval, added per chunk overhead, and leaked queries, keywords, and document content to stdout and logs. A new regression suite in `test/unit_test/rag/llm/test_rerank_normalization.py` covers logit rescaling (positive, negative, and flat batches), preservation of already calibrated scores, ordering, empty input handling, and the per provider HTTP path. It also asserts that no provider overrides `similarity()`, so the contract cannot silently drift. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
982 lines
41 KiB
Python
982 lines
41 KiB
Python
#
|
||
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
#
|
||
import json
|
||
import logging
|
||
import re
|
||
import math
|
||
from collections import OrderedDict, defaultdict
|
||
from dataclasses import dataclass
|
||
|
||
from rag.nlp import rag_tokenizer, query
|
||
import numpy as np
|
||
from common.doc_store.doc_store_base import MatchDenseExpr, FusionExpr, OrderByExpr, DocStoreConnection
|
||
from common.string_utils import remove_redundant_spaces
|
||
from common.float_utils import get_float
|
||
from common.constants import PAGERANK_FLD, TAG_FLD
|
||
from common.tag_feature_utils import parse_tag_features
|
||
from common import settings
|
||
|
||
from common.misc_utils import thread_pool_exec
|
||
|
||
def index_name(uid): return f"ragflow_{uid}"
|
||
|
||
|
||
class Dealer:
|
||
def __init__(self, dataStore: DocStoreConnection):
|
||
self.qryr = query.FulltextQueryer()
|
||
self.dataStore = dataStore
|
||
|
||
@dataclass
|
||
class SearchResult:
|
||
total: int
|
||
ids: list[str]
|
||
query_vector: list[float] | None = None
|
||
field: dict | None = None
|
||
highlight: dict | None = None
|
||
aggregation: list | dict | None = None
|
||
keywords: list[str] | None = None
|
||
group_docs: list[list] | None = None
|
||
|
||
async def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):
|
||
qv, _ = await thread_pool_exec(emb_mdl.encode_queries, txt)
|
||
shape = np.array(qv).shape
|
||
if len(shape) > 1:
|
||
raise Exception(
|
||
f"Dealer.get_vector returned array's shape {shape} doesn't match expectation(exact one dimension).")
|
||
embedding_data = [get_float(v) for v in qv]
|
||
vector_column_name = f"q_{len(embedding_data)}_vec"
|
||
return MatchDenseExpr(vector_column_name, embedding_data, 'float', 'cosine', topk, {"similarity": similarity})
|
||
|
||
async def _existing_doc_ids(self, doc_ids: list[str]) -> set[str]:
|
||
if not doc_ids:
|
||
return set()
|
||
|
||
unique_doc_ids = list(dict.fromkeys(doc_ids))
|
||
|
||
def _load():
|
||
from api.db.services.document_service import DocumentService
|
||
|
||
return {row["id"] for row in DocumentService.get_by_ids(unique_doc_ids).dicts()}
|
||
|
||
return await thread_pool_exec(_load)
|
||
|
||
async def _prune_deleted_chunks(self, sres: SearchResult) -> SearchResult:
|
||
# Temporary safety net:
|
||
# Some delete paths can leave stale chunks in the doc store if the DB row
|
||
# is removed but the vector record is not fully cleaned up. We filter those
|
||
# chunks here so chat/retrieval does not surface content from deleted docs.
|
||
# Keep this as a fallback, not as the primary delete mechanism.
|
||
chunk_doc_ids = [chunk.get("doc_id") for chunk in sres.field.values() if chunk and chunk.get("doc_id")]
|
||
if not chunk_doc_ids:
|
||
return sres
|
||
|
||
existing_doc_ids = await self._existing_doc_ids(chunk_doc_ids)
|
||
if len(existing_doc_ids) == len(set(chunk_doc_ids)):
|
||
return sres
|
||
|
||
filtered_ids = []
|
||
filtered_field = {}
|
||
filtered_highlight = {} if sres.highlight else sres.highlight
|
||
removed = 0
|
||
|
||
for chunk_id in sres.ids:
|
||
chunk = sres.field.get(chunk_id)
|
||
if not chunk or chunk.get("doc_id") not in existing_doc_ids:
|
||
removed += 1
|
||
continue
|
||
|
||
filtered_ids.append(chunk_id)
|
||
filtered_field[chunk_id] = chunk
|
||
if sres.highlight and chunk_id in sres.highlight:
|
||
filtered_highlight[chunk_id] = sres.highlight[chunk_id]
|
||
|
||
if removed:
|
||
logging.warning("Pruned %s stale chunks whose documents no longer exist.", removed)
|
||
|
||
return self.SearchResult(
|
||
total=len(filtered_ids),
|
||
ids=filtered_ids,
|
||
query_vector=sres.query_vector,
|
||
field=filtered_field,
|
||
highlight=filtered_highlight,
|
||
aggregation=sres.aggregation,
|
||
keywords=sres.keywords,
|
||
group_docs=sres.group_docs,
|
||
)
|
||
|
||
def get_filters(self, req):
|
||
condition = dict()
|
||
for key, field in {"kb_ids": "kb_id", "doc_ids": "doc_id"}.items():
|
||
if key in req and req[key] is not None:
|
||
condition[field] = req[key]
|
||
# TODO(yzc): `available_int` is nullable however infinity doesn't support nullable columns.
|
||
for key in ["knowledge_graph_kwd", "available_int", "entity_kwd", "from_entity_kwd", "to_entity_kwd",
|
||
"removed_kwd"]:
|
||
if key in req and req[key] is not None:
|
||
condition[key] = req[key]
|
||
return condition
|
||
|
||
async def search(self, req, idx_names: str | list[str],
|
||
kb_ids: list[str],
|
||
emb_mdl=None,
|
||
highlight: bool | list | None = None,
|
||
rank_feature: dict | None = None
|
||
):
|
||
if highlight is None:
|
||
highlight = False
|
||
|
||
filters = self.get_filters(req)
|
||
orderBy = OrderByExpr()
|
||
|
||
pg = int(req.get("page", 1)) - 1
|
||
topk = int(req.get("topk", 1024))
|
||
ps = int(req.get("size", topk))
|
||
offset, limit = pg * ps, ps
|
||
|
||
src = req.get("fields",
|
||
["docnm_kwd", "content_ltks", "kb_id", "img_id", "title_tks", "important_kwd", "position_int",
|
||
"doc_id", "chunk_order_int", "page_num_int", "top_int", "create_timestamp_flt", "knowledge_graph_kwd",
|
||
"question_kwd", "question_tks", "doc_type_kwd",
|
||
"available_int", "content_with_weight", "mom_id", PAGERANK_FLD, TAG_FLD, "row_id()"])
|
||
kwds = set([])
|
||
|
||
qst = req.get("question", "")
|
||
q_vec = []
|
||
if not qst:
|
||
if req.get("sort"):
|
||
orderBy.asc("chunk_order_int")
|
||
orderBy.asc("page_num_int")
|
||
orderBy.asc("top_int")
|
||
orderBy.desc("create_timestamp_flt")
|
||
res = self.dataStore.search(src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids)
|
||
total = self.dataStore.get_total(res)
|
||
logging.debug("Dealer.search TOTAL: {}".format(total))
|
||
else:
|
||
highlightFields = ["content_ltks", "title_tks"]
|
||
if not highlight:
|
||
highlightFields = []
|
||
elif isinstance(highlight, list):
|
||
highlightFields = highlight
|
||
matchText, keywords = self.qryr.question(qst, min_match=0.3)
|
||
if emb_mdl is None:
|
||
matchExprs = [matchText]
|
||
res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, matchExprs, orderBy, offset, limit,
|
||
idx_names, kb_ids, rank_feature=rank_feature)
|
||
total = self.dataStore.get_total(res)
|
||
logging.debug("Dealer.search TOTAL: {}".format(total))
|
||
else:
|
||
matchDense = await self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1))
|
||
q_vec = matchDense.embedding_data
|
||
# ES path no longer fetches chunk vectors here. The clean
|
||
# cosine score is recovered later via a second KNN-only call
|
||
# in retrieval(); chunk vectors are fetched on demand for
|
||
# citations (see Dealer.fetch_chunk_vectors). OceanBase
|
||
# still relies on local rerank against chunk vectors, so
|
||
# keep pulling them for that backend.
|
||
if settings.DOC_ENGINE_OCEANBASE:
|
||
src.append(f"q_{len(q_vec)}_vec")
|
||
|
||
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05,0.95"})
|
||
matchExprs = [matchText, matchDense, fusionExpr]
|
||
|
||
res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, matchExprs, orderBy, offset, limit,
|
||
idx_names, kb_ids, rank_feature=rank_feature)
|
||
total = self.dataStore.get_total(res)
|
||
logging.debug("Dealer.search TOTAL: {}".format(total))
|
||
|
||
# If result is empty, try again with lower min_match
|
||
if total == 0:
|
||
if filters.get("doc_id"):
|
||
res = await thread_pool_exec(self.dataStore.search, src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids)
|
||
total = self.dataStore.get_total(res)
|
||
else:
|
||
matchText, _ = self.qryr.question(qst, min_match=0.1)
|
||
matchDense.extra_options["similarity"] = 0.17
|
||
res = await thread_pool_exec(self.dataStore.search, src, highlightFields, filters, [matchText, matchDense, fusionExpr],
|
||
orderBy, offset, limit, idx_names, kb_ids,
|
||
rank_feature=rank_feature)
|
||
total = self.dataStore.get_total(res)
|
||
logging.debug("Dealer.search 2 TOTAL: {}".format(total))
|
||
|
||
for k in keywords:
|
||
kwds.add(k)
|
||
for kk in rag_tokenizer.fine_grained_tokenize(k).split():
|
||
if len(kk) < 2:
|
||
continue
|
||
if kk in kwds:
|
||
continue
|
||
kwds.add(kk)
|
||
|
||
logging.debug(f"TOTAL: {total}")
|
||
ids = self.dataStore.get_doc_ids(res)
|
||
keywords = list(kwds)
|
||
highlight = self.dataStore.get_highlight(res, keywords, "content_with_weight")
|
||
aggs = self.dataStore.get_aggregation(res, "docnm_kwd")
|
||
return self.SearchResult(
|
||
total=total,
|
||
ids=ids,
|
||
query_vector=q_vec,
|
||
aggregation=aggs,
|
||
highlight=highlight,
|
||
field=self.dataStore.get_fields(res, src + ["_score"]),
|
||
keywords=keywords
|
||
)
|
||
|
||
@staticmethod
|
||
def trans2floats(txt):
|
||
return [get_float(t) for t in txt.split("\t")]
|
||
|
||
def insert_citations(self, answer, chunks, chunk_v,
|
||
embd_mdl, tkweight=0.1, vtweight=0.9):
|
||
assert len(chunks) == len(chunk_v)
|
||
if not chunks:
|
||
return answer, set([])
|
||
pieces = re.split(r"(```)", answer)
|
||
if len(pieces) >= 3:
|
||
i = 0
|
||
pieces_ = []
|
||
while i < len(pieces):
|
||
if pieces[i] == "```":
|
||
st = i
|
||
i += 1
|
||
while i < len(pieces) and pieces[i] != "```":
|
||
i += 1
|
||
if i < len(pieces):
|
||
i += 1
|
||
pieces_.append("".join(pieces[st: i]) + "\n")
|
||
else:
|
||
# Sentence boundary regex includes Arabic punctuation (، ؛ ؟ ۔)
|
||
pieces_.extend(
|
||
re.split(
|
||
r"([^\|][;。?!!،؛؟۔\n]|[a-z\u0600-\u06FF][.?;!،؛؟][ \n])",
|
||
pieces[i]))
|
||
i += 1
|
||
pieces = pieces_
|
||
else:
|
||
# Sentence boundary regex includes Arabic punctuation (، ؛ ؟ ۔)
|
||
pieces = re.split(r"([^\|][;。?!!،؛؟۔\n]|[a-z\u0600-\u06FF][.?;!،؛؟][ \n])", answer)
|
||
for i in range(1, len(pieces)):
|
||
if re.match(r"([^\|][;。?!!،؛؟۔\n]|[a-z\u0600-\u06FF][.?;!،؛؟][ \n])", pieces[i]):
|
||
pieces[i - 1] += pieces[i][0]
|
||
pieces[i] = pieces[i][1:]
|
||
idx = []
|
||
pieces_ = []
|
||
for i, t in enumerate(pieces):
|
||
if len(t) < 5:
|
||
continue
|
||
idx.append(i)
|
||
pieces_.append(t)
|
||
logging.debug("{} => {}".format(answer, pieces_))
|
||
if not pieces_:
|
||
return answer, set([])
|
||
|
||
ans_v, _ = embd_mdl.encode(pieces_)
|
||
for i in range(len(chunk_v)):
|
||
if len(ans_v[0]) != len(chunk_v[i]):
|
||
chunk_v[i] = [0.0] * len(ans_v[0])
|
||
logging.warning(
|
||
"The dimension of query and chunk do not match: {} vs. {}".format(len(ans_v[0]), len(chunk_v[i])))
|
||
|
||
assert len(ans_v[0]) == len(chunk_v[0]), "The dimension of query and chunk do not match: {} vs. {}".format(
|
||
len(ans_v[0]), len(chunk_v[0]))
|
||
|
||
chunks_tks = [rag_tokenizer.tokenize(self.qryr.rmWWW(ck)).split()
|
||
for ck in chunks]
|
||
cites = {}
|
||
thr = 0.63
|
||
while thr > 0.3 and len(cites.keys()) == 0 and pieces_ and chunks_tks:
|
||
for i, a in enumerate(pieces_):
|
||
sim, tksim, vtsim = self.qryr.hybrid_similarity(ans_v[i],
|
||
chunk_v,
|
||
rag_tokenizer.tokenize(
|
||
self.qryr.rmWWW(pieces_[i])).split(),
|
||
chunks_tks,
|
||
tkweight, vtweight)
|
||
mx = np.max(sim) * 0.99
|
||
logging.debug("{} SIM: {}".format(pieces_[i], mx))
|
||
if mx < thr:
|
||
continue
|
||
cites[idx[i]] = list(
|
||
set([str(ii) for ii in range(len(chunk_v)) if sim[ii] > mx]))[:4]
|
||
thr *= 0.8
|
||
|
||
res = ""
|
||
seted = set([])
|
||
for i, p in enumerate(pieces):
|
||
res += p
|
||
if i not in idx:
|
||
continue
|
||
if i not in cites:
|
||
continue
|
||
for c in cites[i]:
|
||
assert int(c) < len(chunk_v)
|
||
for c in cites[i]:
|
||
if c in seted:
|
||
continue
|
||
res += f" [ID:{c}]"
|
||
seted.add(c)
|
||
|
||
return res, seted
|
||
|
||
def _rank_feature_scores(self, query_rfea, search_res):
|
||
## For rank feature(tag_fea) scores.
|
||
rank_fea = []
|
||
pageranks = []
|
||
for chunk_id in search_res.ids:
|
||
pageranks.append(search_res.field[chunk_id].get(PAGERANK_FLD, 0))
|
||
pageranks = np.array(pageranks, dtype=float)
|
||
|
||
if not query_rfea:
|
||
return np.array([0 for _ in range(len(search_res.ids))]) + pageranks
|
||
|
||
q_denor = np.sqrt(np.sum([s * s for t, s in query_rfea.items() if t != PAGERANK_FLD]))
|
||
if q_denor == 0:
|
||
return np.array([0 for _ in range(len(search_res.ids))]) + pageranks
|
||
for i in search_res.ids:
|
||
nor, denor = 0, 0
|
||
if not search_res.field[i].get(TAG_FLD):
|
||
rank_fea.append(0)
|
||
continue
|
||
tag_feas = parse_tag_features(search_res.field[i].get(TAG_FLD), allow_json_string=True, allow_python_literal=True)
|
||
if not tag_feas:
|
||
rank_fea.append(0)
|
||
continue
|
||
for t, sc in tag_feas.items():
|
||
if t in query_rfea:
|
||
nor += query_rfea[t] * sc
|
||
denor += sc * sc
|
||
if denor == 0:
|
||
rank_fea.append(0)
|
||
else:
|
||
rank_fea.append(nor / np.sqrt(denor) / q_denor)
|
||
return np.array(rank_fea) * 10. + pageranks
|
||
|
||
async def _knn_scores(self, sres: "Dealer.SearchResult",
|
||
idx_names: str | list[str],
|
||
kb_ids: list[str]) -> dict[str, float]:
|
||
"""
|
||
Second-pass ES call that returns the cosine similarity between the
|
||
query embedding and each candidate chunk's embedding, filtered to the
|
||
chunk ids the original search already surfaced. We rely on ES to do
|
||
the vector math so the chunk vectors never leave the engine.
|
||
"""
|
||
if not sres.ids or not sres.query_vector:
|
||
return {}
|
||
dim = len(sres.query_vector)
|
||
matchDense = MatchDenseExpr(
|
||
f"q_{dim}_vec",
|
||
sres.query_vector,
|
||
"float",
|
||
"cosine",
|
||
len(sres.ids),
|
||
{"similarity": 0.0},
|
||
)
|
||
condition = {"id": list(sres.ids)}
|
||
res = await thread_pool_exec(
|
||
self.dataStore.search,
|
||
[], # no _source fields needed; we only want _id and _score
|
||
[],
|
||
condition,
|
||
[matchDense],
|
||
OrderByExpr(),
|
||
0,
|
||
len(sres.ids),
|
||
idx_names,
|
||
kb_ids,
|
||
)
|
||
return self.dataStore.get_scores(res)
|
||
|
||
async def fetch_chunk_vectors(self, chunk_ids: list[str],
|
||
tenant_ids: str | list[str],
|
||
kb_ids: list[str],
|
||
dim: int) -> dict[str, list[float]]:
|
||
"""
|
||
Citation-time helper: fetch only the embedding vectors for an
|
||
explicit set of chunk ids. Used by callers that need to compute
|
||
answer-vs-chunk similarity locally (e.g. insert_citations) so the
|
||
main retrieval path can keep skipping vector transport.
|
||
"""
|
||
if not chunk_ids:
|
||
return {}
|
||
if isinstance(tenant_ids, str):
|
||
idx_names = [index_name(tid) for tid in tenant_ids.split(",")]
|
||
else:
|
||
idx_names = [index_name(tid) for tid in tenant_ids]
|
||
vec_field = f"q_{dim}_vec"
|
||
res = await thread_pool_exec(
|
||
self.dataStore.search,
|
||
[vec_field],
|
||
[],
|
||
{"id": list(chunk_ids)},
|
||
[],
|
||
OrderByExpr(),
|
||
0,
|
||
len(chunk_ids),
|
||
idx_names,
|
||
kb_ids,
|
||
)
|
||
fields = self.dataStore.get_fields(res, [vec_field])
|
||
out: dict[str, list[float]] = {}
|
||
zero = [0.0] * dim
|
||
for cid, doc in fields.items():
|
||
v = doc.get(vec_field)
|
||
if isinstance(v, str):
|
||
v = [get_float(x) for x in v.split("\t")]
|
||
if not isinstance(v, list) or len(v) != dim:
|
||
v = zero
|
||
out[cid] = v
|
||
return out
|
||
|
||
def rerank_with_knn(self, sres, query, knn_scores: dict[str, float],
|
||
tkweight=0.3, vtweight=0.7,
|
||
cfield="content_ltks",
|
||
rank_feature: dict | None = None):
|
||
"""
|
||
Merge ES-side KNN cosine similarity with locally computed term
|
||
similarity using the user-configured weights. Replaces the older
|
||
local-only rerank() for the ES path, which depended on shipping
|
||
chunk vectors back to the application.
|
||
"""
|
||
_, keywords = self.qryr.question(query)
|
||
|
||
for i in sres.ids:
|
||
if isinstance(sres.field[i].get("important_kwd", []), str):
|
||
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
|
||
ins_tw = []
|
||
for i in sres.ids:
|
||
content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split()))
|
||
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
|
||
question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t]
|
||
important_kwd = sres.field[i].get("important_kwd", [])
|
||
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
|
||
ins_tw.append(tks)
|
||
|
||
tksim = np.array(self.qryr.token_similarity(keywords, ins_tw), dtype=np.float64)
|
||
vtsim = np.array([knn_scores.get(chunk_id, 0.0) for chunk_id in sres.ids],
|
||
dtype=np.float64)
|
||
rank_fea = self._rank_feature_scores(rank_feature, sres)
|
||
sim = tkweight * tksim + vtweight * vtsim + rank_fea
|
||
return sim, tksim, vtsim
|
||
|
||
def rerank(self, sres, query, tkweight=0.3,
|
||
vtweight=0.7, cfield="content_ltks",
|
||
rank_feature: dict | None = None
|
||
):
|
||
_, keywords = self.qryr.question(query)
|
||
vector_size = len(sres.query_vector)
|
||
vector_column = f"q_{vector_size}_vec"
|
||
zero_vector = [0.0] * vector_size
|
||
ins_embd = []
|
||
for chunk_id in sres.ids:
|
||
vector = sres.field[chunk_id].get(vector_column, zero_vector)
|
||
if isinstance(vector, str):
|
||
vector = [get_float(v) for v in vector.split("\t")]
|
||
ins_embd.append(vector)
|
||
if not ins_embd:
|
||
return [], [], []
|
||
|
||
for i in sres.ids:
|
||
if isinstance(sres.field[i].get("important_kwd", []), str):
|
||
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
|
||
ins_tw = []
|
||
for i in sres.ids:
|
||
content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split()))
|
||
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
|
||
question_tks = [t for t in sres.field[i].get("question_tks", "").split() if t]
|
||
important_kwd = sres.field[i].get("important_kwd", [])
|
||
tks = content_ltks + title_tks * 2 + important_kwd * 5 + question_tks * 6
|
||
ins_tw.append(tks)
|
||
|
||
## For rank feature(tag_fea) scores.
|
||
rank_fea = self._rank_feature_scores(rank_feature, sres)
|
||
|
||
sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector,
|
||
ins_embd,
|
||
keywords,
|
||
ins_tw, tkweight, vtweight)
|
||
|
||
return sim + rank_fea, tksim, vtsim
|
||
|
||
def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3,
|
||
vtweight=0.7, cfield="content_ltks",
|
||
rank_feature: dict | None = None):
|
||
_, keywords = self.qryr.question(query)
|
||
|
||
for i in sres.ids:
|
||
if isinstance(sres.field[i].get("important_kwd", []), str):
|
||
sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]
|
||
ins_tw = []
|
||
for i in sres.ids:
|
||
#content_ltks = list(OrderedDict.fromkeys(sres.field[i][cfield].split()))
|
||
content_ltks = sres.field[i][cfield].split()
|
||
title_tks = [t for t in sres.field[i].get("title_tks", "").split() if t]
|
||
important_kwd = sres.field[i].get("important_kwd", [])
|
||
tks = content_ltks + title_tks + important_kwd
|
||
ins_tw.append(tks)
|
||
|
||
docs = [remove_redundant_spaces(" ".join(tks)) for tks in ins_tw]
|
||
|
||
tksim = self.qryr.token_similarity(keywords, ins_tw)
|
||
# rerank_mdl.similarity() returns scores normalized to [0, 1] for every
|
||
# provider (see RerankModel.Base.similarity), so the blend below stays
|
||
# on a single scale regardless of the configured reranker.
|
||
vtsim, _ = rerank_mdl.similarity(query, docs)
|
||
## For rank feature(tag_fea) scores.
|
||
rank_fea = self._rank_feature_scores(rank_feature, sres)
|
||
|
||
return tkweight * np.array(tksim) + vtweight * vtsim + rank_fea, tksim, vtsim
|
||
|
||
def hybrid_similarity(self, ans_embd, ins_embd, ans, inst):
|
||
return self.qryr.hybrid_similarity(ans_embd,
|
||
ins_embd,
|
||
rag_tokenizer.tokenize(ans).split(),
|
||
rag_tokenizer.tokenize(inst).split())
|
||
|
||
@staticmethod
|
||
def _rerank_window(page_size: int, top: int = 0) -> int:
|
||
"""Candidate-window size shared by retrieval's block fetch and slice.
|
||
|
||
``retrieval`` reuses this value BOTH as the backend block size and as
|
||
the modulus for extracting a single page from a (re)ranked block::
|
||
|
||
req["page"] = global_offset // window # which block to fetch
|
||
begin = global_offset % window # where the page starts
|
||
|
||
For those two to agree the window MUST be an exact multiple of
|
||
``page_size``; otherwise blocks and pages drift apart and deep
|
||
pagination silently drops results and returns short pages.
|
||
|
||
The window targets a provider-friendly pool of ~64 candidates, bounded
|
||
by ``top`` when given (i.e. when an external reranker is active), and is
|
||
always rounded UP to a whole number of pages to preserve the invariant.
|
||
"""
|
||
if page_size <= 1:
|
||
return min(30, top) if top > 0 else 30
|
||
window = math.ceil(64 / page_size) * page_size
|
||
if top > 0:
|
||
window = min(window, math.ceil(top / page_size) * page_size)
|
||
return window
|
||
|
||
async def retrieval(
|
||
self,
|
||
question,
|
||
embd_mdl,
|
||
tenant_ids,
|
||
kb_ids,
|
||
page,
|
||
page_size,
|
||
similarity_threshold=0.2,
|
||
vector_similarity_weight=0.3,
|
||
top=1024,
|
||
doc_ids=None,
|
||
aggs=True,
|
||
rerank_mdl=None,
|
||
highlight=False,
|
||
rank_feature: dict | None = {PAGERANK_FLD: 10},
|
||
trace_id=None,
|
||
):
|
||
ranks = {"total": 0, "chunks": [], "doc_aggs": {}}
|
||
if not question:
|
||
return ranks
|
||
|
||
# Candidate window for block-based pagination. It MUST stay a multiple
|
||
# of page_size so the block fetched (global_offset // RERANK_LIMIT) and
|
||
# the in-block page slice (global_offset % RERANK_LIMIT) stay aligned;
|
||
# see _rerank_window. When an external reranker is active the pool is
|
||
# also bounded by top.
|
||
RERANK_LIMIT = self._rerank_window(page_size, top if rerank_mdl else 0)
|
||
page = max(page, 1)
|
||
global_offset = (page - 1) * page_size
|
||
req = {
|
||
"kb_ids": kb_ids,
|
||
"doc_ids": doc_ids,
|
||
"page": global_offset // RERANK_LIMIT + 1,
|
||
"size": RERANK_LIMIT,
|
||
"question": question,
|
||
"vector": True,
|
||
"topk": top,
|
||
"similarity": similarity_threshold,
|
||
"available_int": 1,
|
||
}
|
||
logging.debug(f"[Search] global_offset={global_offset}, rerank_limit={RERANK_LIMIT}, page_size={page_size}, page={page}")
|
||
|
||
if isinstance(tenant_ids, str):
|
||
tenant_ids = tenant_ids.split(",")
|
||
|
||
idx_names = [index_name(tid) for tid in tenant_ids]
|
||
sres = await self.search(req, idx_names, kb_ids, embd_mdl, highlight,
|
||
rank_feature=rank_feature)
|
||
# Temporary retrieval-side guard: prune chunks whose parent document no
|
||
# longer exists before reranking and returning results.
|
||
sres = await self._prune_deleted_chunks(sres)
|
||
if sres.total == 0:
|
||
ranks["doc_aggs"] = []
|
||
return ranks
|
||
|
||
term_similarity_weight = 1 - vector_similarity_weight
|
||
logging.debug(
|
||
"[Search] retrieval weights: trace_id=%s kb_count=%s similarity_threshold=%s "
|
||
"vector_similarity_weight=%s full_text_weight=%s rerank_enabled=%s",
|
||
trace_id,
|
||
len(kb_ids),
|
||
similarity_threshold,
|
||
vector_similarity_weight,
|
||
term_similarity_weight,
|
||
bool(rerank_mdl),
|
||
)
|
||
|
||
if rerank_mdl and sres.total > 0:
|
||
sim, tsim, vsim = self.rerank_by_model(
|
||
rerank_mdl,
|
||
sres,
|
||
question,
|
||
term_similarity_weight,
|
||
vector_similarity_weight,
|
||
rank_feature=rank_feature,
|
||
)
|
||
else:
|
||
if settings.DOC_ENGINE_INFINITY:
|
||
# Don't need rerank here since Infinity normalizes each way score before fusion.
|
||
sim = [sres.field[id].get("_score", 0.0) for id in sres.ids]
|
||
sim = [s if s is not None else 0.0 for s in sim]
|
||
tsim = sim
|
||
vsim = sim
|
||
elif settings.DOC_ENGINE_OCEANBASE:
|
||
# OceanBase still returns chunk vectors in the result; use
|
||
# the historical local rerank that depends on them.
|
||
sim, tsim, vsim = self.rerank(
|
||
sres,
|
||
question,
|
||
term_similarity_weight,
|
||
vector_similarity_weight,
|
||
rank_feature=rank_feature,
|
||
)
|
||
else:
|
||
# ES path: ask ES for the clean cosine score via a second
|
||
# KNN-only call filtered by the candidate ids, then merge it
|
||
# with locally computed term similarity using the user's
|
||
# weight. Chunk vectors stay in the index.
|
||
knn_scores = await self._knn_scores(sres, idx_names, kb_ids)
|
||
sim, tsim, vsim = self.rerank_with_knn(
|
||
sres,
|
||
question,
|
||
knn_scores,
|
||
term_similarity_weight,
|
||
vector_similarity_weight,
|
||
rank_feature=rank_feature,
|
||
)
|
||
|
||
sim_np = np.array(sim, dtype=np.float64)
|
||
if sim_np.size == 0:
|
||
ranks["doc_aggs"] = []
|
||
return ranks
|
||
|
||
# Use stable sort for deterministic ordering when scores are tied
|
||
sorted_idx = np.argsort(sim_np * -1, kind='stable')
|
||
|
||
# When vector_similarity_weight is 0, similarity_threshold is not meaningful for term-only scores.
|
||
post_threshold = 0.0 if vector_similarity_weight <= 0 else similarity_threshold
|
||
|
||
valid_idx = [int(i) for i in sorted_idx if sim_np[i] >= post_threshold]
|
||
filtered_count = len(valid_idx)
|
||
ranks["total"] = int(filtered_count)
|
||
|
||
if filtered_count == 0:
|
||
ranks["doc_aggs"] = []
|
||
return ranks
|
||
|
||
begin = global_offset % RERANK_LIMIT
|
||
end = begin + page_size
|
||
page_idx = valid_idx[begin:end]
|
||
|
||
dim = len(sres.query_vector)
|
||
vector_column = f"q_{dim}_vec"
|
||
zero_vector = [0.0] * dim
|
||
|
||
for i in page_idx:
|
||
id = sres.ids[i]
|
||
chunk = sres.field[id]
|
||
dnm = chunk.get("docnm_kwd", "")
|
||
did = chunk.get("doc_id", "")
|
||
|
||
position_int = chunk.get("position_int", [])
|
||
# Chunk vectors are no longer fetched during the main retrieval
|
||
# call. Fall back to whatever the chunk happens to carry (Infinity
|
||
# path) and otherwise emit a zero placeholder so the downstream
|
||
# shape stays stable. Citation callers refill this via
|
||
# Dealer.fetch_chunk_vectors when needed.
|
||
d = {
|
||
"chunk_id": id,
|
||
"content_ltks": chunk["content_ltks"],
|
||
"content_with_weight": chunk["content_with_weight"],
|
||
"doc_id": did,
|
||
"docnm_kwd": dnm,
|
||
"kb_id": chunk["kb_id"],
|
||
"important_kwd": chunk.get("important_kwd", []),
|
||
"tag_kwd": chunk.get("tag_kwd", []),
|
||
"image_id": chunk.get("img_id", ""),
|
||
"similarity": float(sim_np[i]),
|
||
"vector_similarity": float(vsim[i]),
|
||
"term_similarity": float(tsim[i]),
|
||
"vector": chunk.get(vector_column, zero_vector),
|
||
"positions": position_int,
|
||
"doc_type_kwd": chunk.get("doc_type_kwd", ""),
|
||
"mom_id": chunk.get("mom_id", ""),
|
||
"row_id": chunk.get("row_id()"),
|
||
}
|
||
if highlight and sres.highlight:
|
||
if id in sres.highlight:
|
||
d["highlight"] = remove_redundant_spaces(sres.highlight[id])
|
||
else:
|
||
d["highlight"] = d["content_with_weight"]
|
||
ranks["chunks"].append(d)
|
||
|
||
if aggs:
|
||
for i in valid_idx:
|
||
id = sres.ids[i]
|
||
chunk = sres.field[id]
|
||
dnm = chunk.get("docnm_kwd", "")
|
||
did = chunk.get("doc_id", "")
|
||
if dnm not in ranks["doc_aggs"]:
|
||
ranks["doc_aggs"][dnm] = {"doc_id": did, "count": 0}
|
||
ranks["doc_aggs"][dnm]["count"] += 1
|
||
|
||
ranks["doc_aggs"] = [
|
||
{
|
||
"doc_name": k,
|
||
"doc_id": v["doc_id"],
|
||
"count": v["count"],
|
||
}
|
||
for k, v in sorted(
|
||
ranks["doc_aggs"].items(),
|
||
key=lambda x: x[1]["count"] * -1,
|
||
)
|
||
]
|
||
else:
|
||
ranks["doc_aggs"] = []
|
||
|
||
return ranks
|
||
|
||
def sql_retrieval(self, sql, fetch_size=128, format="json"):
|
||
tbl = self.dataStore.sql(sql, fetch_size, format)
|
||
return tbl
|
||
|
||
def chunk_list(self, doc_id: str, tenant_id: str,
|
||
kb_ids: list[str], max_count=1024,
|
||
offset=0,
|
||
fields=["docnm_kwd", "content_with_weight", "img_id"],
|
||
sort_by_position: bool = False,
|
||
retrieve_all: bool = False):
|
||
"""Return chunks for a document.
|
||
|
||
By default, preserve the historical max_count cap. When retrieve_all is
|
||
True, keep paging until the doc store returns fewer rows than requested.
|
||
"""
|
||
condition = {"doc_id": doc_id}
|
||
|
||
fields_set = set(fields or [])
|
||
if sort_by_position:
|
||
for need in ("page_num_int", "position_int", "top_int"):
|
||
if need not in fields_set:
|
||
fields_set.add(need)
|
||
fields = list(fields_set)
|
||
|
||
orderBy = OrderByExpr()
|
||
if sort_by_position:
|
||
orderBy.asc("page_num_int")
|
||
orderBy.asc("position_int")
|
||
orderBy.asc("top_int")
|
||
|
||
res = []
|
||
bs = 128
|
||
p = offset
|
||
while retrieve_all or p < max_count:
|
||
limit = bs if retrieve_all else min(bs, max_count - p)
|
||
if limit <= 0:
|
||
break
|
||
es_res = self.dataStore.search(fields, [], condition, [], orderBy, p, limit, index_name(tenant_id),
|
||
kb_ids)
|
||
dict_chunks = self.dataStore.get_fields(es_res, fields)
|
||
for id, doc in dict_chunks.items():
|
||
doc["id"] = id
|
||
if dict_chunks:
|
||
res.extend(dict_chunks.values())
|
||
chunk_count = len(dict_chunks)
|
||
if chunk_count == 0 or chunk_count < limit:
|
||
break
|
||
p += limit
|
||
return res
|
||
|
||
def all_tags(self, tenant_id: str, kb_ids: list[str], S=1000):
|
||
if not self.dataStore.index_exist(index_name(tenant_id), kb_ids[0]):
|
||
return []
|
||
res = self.dataStore.search([], [], {}, [], OrderByExpr(), 0, 0, index_name(tenant_id), kb_ids, ["tag_kwd"])
|
||
return self.dataStore.get_aggregation(res, "tag_kwd")
|
||
|
||
def all_tags_in_portion(self, tenant_id: str, kb_ids: list[str], S=1000):
|
||
res = self.dataStore.search([], [], {}, [], OrderByExpr(), 0, 0, index_name(tenant_id), kb_ids, ["tag_kwd"])
|
||
res = self.dataStore.get_aggregation(res, "tag_kwd")
|
||
total = np.sum([c for _, c in res])
|
||
return {t: (c + 1) / (total + S) for t, c in res}
|
||
|
||
def tag_content(self, tenant_id: str, kb_ids: list[str], doc, all_tags, topn_tags=3, keywords_topn=30, S=1000):
|
||
idx_nm = index_name(tenant_id)
|
||
match_txt = self.qryr.paragraph(doc["title_tks"] + " " + doc["content_ltks"], doc.get("important_kwd", []),
|
||
keywords_topn)
|
||
res = self.dataStore.search([], [], {}, [match_txt], OrderByExpr(), 0, 0, idx_nm, kb_ids, ["tag_kwd"])
|
||
aggs = self.dataStore.get_aggregation(res, "tag_kwd")
|
||
if not aggs:
|
||
return False
|
||
cnt = np.sum([c for _, c in aggs])
|
||
tag_fea = sorted([(a, round(0.1 * (c + 1) / (cnt + S) / max(1e-6, all_tags.get(a, 0.0001)))) for a, c in aggs],
|
||
key=lambda x: x[1] * -1)[:topn_tags]
|
||
doc[TAG_FLD] = {a.replace(".", "_"): c for a, c in tag_fea if c > 0}
|
||
return True
|
||
|
||
def tag_query(self, question: str, tenant_ids: str | list[str], kb_ids: list[str], all_tags, topn_tags=3, S=1000):
|
||
if isinstance(tenant_ids, str):
|
||
idx_nms = index_name(tenant_ids)
|
||
else:
|
||
idx_nms = [index_name(tid) for tid in tenant_ids]
|
||
match_txt, _ = self.qryr.question(question, min_match=0.0)
|
||
res = self.dataStore.search([], [], {}, [match_txt], OrderByExpr(), 0, 0, idx_nms, kb_ids, ["tag_kwd"])
|
||
aggs = self.dataStore.get_aggregation(res, "tag_kwd")
|
||
if not aggs:
|
||
return {}
|
||
cnt = np.sum([c for _, c in aggs])
|
||
tag_fea = sorted([(a, round(0.1 * (c + 1) / (cnt + S) / max(1e-6, all_tags.get(a, 0.0001)))) for a, c in aggs],
|
||
key=lambda x: x[1] * -1)[:topn_tags]
|
||
return {a.replace(".", "_"): max(1, c) for a, c in tag_fea}
|
||
|
||
async def retrieval_by_toc(self, query: str, chunks: list[dict], tenant_ids: list[str], chat_mdl, topn: int = 6):
|
||
from rag.prompts.generator import relevant_chunks_with_toc # moved from the top of the file to avoid circular import
|
||
if not chunks:
|
||
return []
|
||
idx_nms = [index_name(tid) for tid in tenant_ids]
|
||
ranks, doc_id2kb_id = {}, {}
|
||
for ck in chunks:
|
||
if ck["doc_id"] not in ranks:
|
||
ranks[ck["doc_id"]] = 0
|
||
ranks[ck["doc_id"]] += ck["similarity"]
|
||
doc_id2kb_id[ck["doc_id"]] = ck["kb_id"]
|
||
doc_id = sorted(ranks.items(), key=lambda x: x[1] * -1.)[0][0]
|
||
kb_ids = [doc_id2kb_id[doc_id]]
|
||
es_res = self.dataStore.search(["content_with_weight"], [], {"doc_id": doc_id, "toc_kwd": "toc"}, [],
|
||
OrderByExpr(), 0, 128, idx_nms,
|
||
kb_ids)
|
||
toc = []
|
||
dict_chunks = self.dataStore.get_fields(es_res, ["content_with_weight"])
|
||
for _, doc in dict_chunks.items():
|
||
try:
|
||
toc.extend(json.loads(doc["content_with_weight"]))
|
||
except Exception as e:
|
||
logging.exception(e)
|
||
if not toc:
|
||
return chunks
|
||
|
||
ids = await relevant_chunks_with_toc(query, toc, chat_mdl, topn * 2)
|
||
if not ids:
|
||
return chunks
|
||
|
||
vector_size = 1024
|
||
id2idx = {ck["chunk_id"]: i for i, ck in enumerate(chunks)}
|
||
for cid, sim in ids:
|
||
if cid in id2idx:
|
||
chunks[id2idx[cid]]["similarity"] += sim
|
||
continue
|
||
chunk = self.dataStore.get(cid, idx_nms[0], kb_ids)
|
||
if not chunk:
|
||
continue
|
||
d = {
|
||
"chunk_id": cid,
|
||
"content_ltks": chunk["content_ltks"],
|
||
"content_with_weight": chunk["content_with_weight"],
|
||
"doc_id": doc_id,
|
||
"docnm_kwd": chunk.get("docnm_kwd", ""),
|
||
"kb_id": chunk["kb_id"],
|
||
"important_kwd": chunk.get("important_kwd", []),
|
||
"image_id": chunk.get("img_id", ""),
|
||
"similarity": sim,
|
||
"vector_similarity": sim,
|
||
"term_similarity": sim,
|
||
"vector": [0.0] * vector_size,
|
||
"positions": chunk.get("position_int", []),
|
||
"doc_type_kwd": chunk.get("doc_type_kwd", "")
|
||
}
|
||
for k in chunk.keys():
|
||
if k[-4:] == "_vec":
|
||
d["vector"] = chunk[k]
|
||
vector_size = len(chunk[k])
|
||
break
|
||
chunks.append(d)
|
||
|
||
return sorted(chunks, key=lambda x: x["similarity"] * -1)[:topn]
|
||
|
||
def retrieval_by_children(self, chunks: list[dict], tenant_ids: list[str]):
|
||
if not chunks:
|
||
return []
|
||
idx_nms = [index_name(tid) for tid in tenant_ids]
|
||
mom_chunks = defaultdict(list)
|
||
i = 0
|
||
while i < len(chunks):
|
||
ck = chunks[i]
|
||
mom_id = ck.get("mom_id")
|
||
if not isinstance(mom_id, str) or not mom_id.strip():
|
||
i += 1
|
||
continue
|
||
mom_chunks[ck["mom_id"]].append(chunks.pop(i))
|
||
|
||
if not mom_chunks:
|
||
return chunks
|
||
|
||
if not chunks:
|
||
chunks = []
|
||
|
||
vector_size = 1024
|
||
for id, cks in mom_chunks.items():
|
||
chunk = self.dataStore.get(id, idx_nms[0], [ck["kb_id"] for ck in cks])
|
||
if chunk is None:
|
||
logging.warning(
|
||
"Parent chunk '%s' not found in the index; falling back to %d child chunk(s).",
|
||
id, len(cks),
|
||
)
|
||
chunks.extend(cks)
|
||
continue
|
||
d = {
|
||
"chunk_id": id,
|
||
"content_ltks": " ".join([ck["content_ltks"] for ck in cks]),
|
||
"content_with_weight": chunk["content_with_weight"],
|
||
"doc_id": chunk["doc_id"],
|
||
"docnm_kwd": chunk.get("docnm_kwd", ""),
|
||
"kb_id": chunk["kb_id"],
|
||
"important_kwd": [kwd for ck in cks for kwd in ck.get("important_kwd", [])],
|
||
"image_id": chunk.get("img_id", ""),
|
||
"similarity": np.mean([ck["similarity"] for ck in cks]),
|
||
"vector_similarity": np.mean([ck["similarity"] for ck in cks]),
|
||
"term_similarity": np.mean([ck["similarity"] for ck in cks]),
|
||
"vector": [0.0] * vector_size,
|
||
"positions": chunk.get("position_int", []),
|
||
"doc_type_kwd": chunk.get("doc_type_kwd", "")
|
||
}
|
||
for k in cks[0].keys():
|
||
if k[-4:] == "_vec":
|
||
d["vector"] = cks[0][k]
|
||
vector_size = len(cks[0][k])
|
||
break
|
||
chunks.append(d)
|
||
|
||
return sorted(chunks, key=lambda x: x["similarity"] * -1)
|