mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Fix "Result window is too large" during meta data search (#13521)
### What problem does this PR solve? Fix https://github.com/infiniflow/ragflow/issues/13210#issuecomment-3982878498 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -173,7 +173,7 @@ def delete_user_data(user_id: str) -> dict:
|
||||
if doc_ids:
|
||||
for doc in doc_ids:
|
||||
try:
|
||||
DocMetadataService.delete_document_metadata(doc["id"], doc["kb_id"], tenant_id=None, skip_empty_check=True)
|
||||
DocMetadataService.delete_document_metadata(doc["id"], doc["kb_id"], tenant_id=None)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to delete metadata for document {doc['id']}: {e}")
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ class DocMetadataService:
|
||||
def _search_metadata(cls, kb_id: str, condition: Dict = None):
|
||||
"""
|
||||
Common search logic for metadata queries.
|
||||
Uses pagination internally to retrieve ALL data from the index.
|
||||
Uses pagination internally to retrieve data from the index.
|
||||
|
||||
Args:
|
||||
kb_id: Knowledge base ID
|
||||
@@ -188,7 +188,10 @@ class DocMetadataService:
|
||||
if condition is None:
|
||||
condition = {"kb_id": kb_id}
|
||||
|
||||
# Add sort by id for ES to enable search_after on large data
|
||||
order_by = OrderByExpr()
|
||||
if not settings.DOC_ENGINE_INFINITY:
|
||||
order_by.asc("id")
|
||||
|
||||
page_size = 1000
|
||||
all_results = []
|
||||
@@ -474,7 +477,7 @@ class DocMetadataService:
|
||||
|
||||
# For Infinity or as fallback: use delete+insert
|
||||
logging.debug(f"[update_document_metadata] Using delete+insert method for doc_id: {doc_id}")
|
||||
cls.delete_document_metadata(doc_id, kb_id, tenant_id, skip_empty_check=True)
|
||||
cls.delete_document_metadata(doc_id, kb_id, tenant_id)
|
||||
return cls.insert_document_metadata(doc_id, processed_meta)
|
||||
|
||||
except Exception as e:
|
||||
@@ -483,7 +486,7 @@ class DocMetadataService:
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def delete_document_metadata(cls, doc_id: str, kb_id: str, tenant_id: str = None, skip_empty_check: bool = False) -> bool:
|
||||
def delete_document_metadata(cls, doc_id: str, kb_id: str, tenant_id: str = None) -> bool:
|
||||
"""
|
||||
Delete document metadata from ES/Infinity.
|
||||
Also drops the metadata table if it becomes empty (efficiently).
|
||||
@@ -493,7 +496,6 @@ class DocMetadataService:
|
||||
doc_id: Document ID
|
||||
kb_id: Knowledge base ID
|
||||
tenant_id: Tenant ID, if not provided, get it from kb_id
|
||||
skip_empty_check: If True, skip checking/dropping empty table (for bulk deletions)
|
||||
|
||||
Returns:
|
||||
True if successful (or no metadata to delete), False otherwise
|
||||
@@ -529,9 +531,6 @@ class DocMetadataService:
|
||||
logging.debug(f"[METADATA DELETE] Get result: {existing_metadata is not None}")
|
||||
if not existing_metadata:
|
||||
logging.debug(f"[METADATA DELETE] Document {doc_id} has no metadata in table, skipping deletion")
|
||||
# Only check/drop table if not skipped (tenant deletion will handle it)
|
||||
if not skip_empty_check:
|
||||
cls._drop_empty_metadata_table(index_name, tenant_id)
|
||||
return True # No metadata to delete is success
|
||||
except Exception as e:
|
||||
# If get fails, document might not exist in metadata table, which is fine
|
||||
@@ -548,14 +547,6 @@ class DocMetadataService:
|
||||
kb_id # Pass actual kb_id (delete() will handle metadata tables correctly)
|
||||
)
|
||||
logging.debug(f"[METADATA DELETE] Deleted count: {deleted_count}")
|
||||
|
||||
# Only check if table should be dropped if not skipped (for bulk operations)
|
||||
# Note: delete operation already uses refresh=True, so data is immediately available
|
||||
if not skip_empty_check:
|
||||
# Check by querying the actual metadata table (not MySQL)
|
||||
cls._drop_empty_metadata_table(index_name, tenant_id)
|
||||
|
||||
logging.debug(f"Successfully deleted metadata for document {doc_id}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
@@ -782,21 +773,18 @@ class DocMetadataService:
|
||||
Dictionary mapping doc_id to meta_fields dict
|
||||
"""
|
||||
try:
|
||||
results = cls._search_metadata(kb_id, condition={"kb_id": kb_id})
|
||||
condition = {"kb_id": kb_id}
|
||||
if doc_ids:
|
||||
condition["id"] = doc_ids
|
||||
results = cls._search_metadata(kb_id, condition=condition)
|
||||
if not results:
|
||||
return {}
|
||||
|
||||
# Build mapping: doc_id -> meta_fields
|
||||
meta_mapping = {}
|
||||
|
||||
# If doc_ids is provided, create a set for efficient lookup
|
||||
doc_ids_set = set(doc_ids) if doc_ids else None
|
||||
|
||||
# Use helper to iterate over results in any format
|
||||
# Use helper to iterate over results
|
||||
for doc_id, doc in cls._iter_search_results(results):
|
||||
# Filter by doc_ids if provided
|
||||
if doc_ids_set is not None and doc_id not in doc_ids_set:
|
||||
continue
|
||||
|
||||
# Extract metadata (handles both JSON strings and dicts)
|
||||
doc_meta = cls._extract_metadata(doc)
|
||||
@@ -850,13 +838,13 @@ class DocMetadataService:
|
||||
return "string"
|
||||
|
||||
try:
|
||||
results = cls._search_metadata(kb_id, condition={"kb_id": kb_id})
|
||||
condition = {"kb_id": kb_id}
|
||||
if doc_ids:
|
||||
condition["id"] = doc_ids
|
||||
results = cls._search_metadata(kb_id, condition=condition)
|
||||
if not results:
|
||||
return {}
|
||||
|
||||
# If doc_ids are provided, we'll filter after the search
|
||||
doc_ids_set = set(doc_ids) if doc_ids else None
|
||||
|
||||
# Aggregate metadata
|
||||
summary = {}
|
||||
type_counter = {}
|
||||
@@ -865,9 +853,6 @@ class DocMetadataService:
|
||||
|
||||
# Use helper to iterate over results in any format
|
||||
for doc_id, doc in cls._iter_search_results(results):
|
||||
# Check doc_ids filter
|
||||
if doc_ids_set and doc_id not in doc_ids_set:
|
||||
continue
|
||||
|
||||
doc_meta = cls._extract_metadata(doc)
|
||||
|
||||
@@ -1029,22 +1014,17 @@ class DocMetadataService:
|
||||
return changed
|
||||
|
||||
try:
|
||||
results = cls._search_metadata(kb_id, condition=None)
|
||||
results = cls._search_metadata(kb_id, condition={"kb_id": kb_id, "id": doc_ids})
|
||||
if not results:
|
||||
results = [] # Treat as empty list if None
|
||||
|
||||
updated_docs = 0
|
||||
doc_ids_set = set(doc_ids)
|
||||
found_doc_ids = set()
|
||||
|
||||
logging.debug(f"[batch_update_metadata] Searching for doc_ids: {doc_ids}")
|
||||
|
||||
# Use helper to iterate over results in any format
|
||||
# Use helper to iterate over results
|
||||
for doc_id, doc in cls._iter_search_results(results):
|
||||
# Filter to only process requested doc_ids
|
||||
if doc_id not in doc_ids_set:
|
||||
continue
|
||||
|
||||
found_doc_ids.add(doc_id)
|
||||
|
||||
# Get current metadata
|
||||
@@ -1066,13 +1046,14 @@ class DocMetadataService:
|
||||
logging.debug(f"[batch_update_metadata] Updating doc_id: {doc_id}, meta: {meta}")
|
||||
# If metadata is empty, delete the row entirely instead of keeping empty metadata
|
||||
if not meta:
|
||||
cls.delete_document_metadata(doc_id, kb_id, tenant_id=None, skip_empty_check=True)
|
||||
cls.delete_document_metadata(doc_id, kb_id, tenant_id=None)
|
||||
else:
|
||||
cls.update_document_metadata(doc_id, meta)
|
||||
updated_docs += 1
|
||||
|
||||
# Handle documents that don't have metadata rows yet
|
||||
# These documents weren't in the search results, so we need to insert new metadata for them
|
||||
doc_ids_set = set(doc_ids)
|
||||
missing_doc_ids = doc_ids_set - found_doc_ids
|
||||
if missing_doc_ids and updates:
|
||||
logging.debug(f"[batch_update_metadata] Inserting new metadata for documents without metadata rows: {missing_doc_ids}")
|
||||
|
||||
@@ -264,7 +264,8 @@ class ESConnection(ESConnectionBase):
|
||||
assert "id" in d
|
||||
d_copy = copy.deepcopy(d)
|
||||
d_copy["kb_id"] = knowledgebase_id
|
||||
meta_id = d_copy.pop("id", "")
|
||||
# Use id as _id for uniqueness, also keep "id" as a regular field for sorting
|
||||
meta_id = d_copy.get("id", "")
|
||||
operations.append(
|
||||
{"index": {"_index": index_name, "_id": meta_id}})
|
||||
operations.append(d_copy)
|
||||
|
||||
Reference in New Issue
Block a user