diff --git a/api/db/joint_services/user_account_service.py b/api/db/joint_services/user_account_service.py index f9a25b498c..6f992576a7 100644 --- a/api/db/joint_services/user_account_service.py +++ b/api/db/joint_services/user_account_service.py @@ -173,7 +173,7 @@ def delete_user_data(user_id: str) -> dict: if doc_ids: for doc in doc_ids: try: - DocMetadataService.delete_document_metadata(doc["id"], doc["kb_id"], tenant_id=None, skip_empty_check=True) + DocMetadataService.delete_document_metadata(doc["id"], doc["kb_id"], tenant_id=None) except Exception as e: logging.warning(f"Failed to delete metadata for document {doc['id']}: {e}") diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py index f2ee29e6da..dbb16a5a94 100644 --- a/api/db/services/doc_metadata_service.py +++ b/api/db/services/doc_metadata_service.py @@ -160,7 +160,7 @@ class DocMetadataService: def _search_metadata(cls, kb_id: str, condition: Dict = None): """ Common search logic for metadata queries. - Uses pagination internally to retrieve ALL data from the index. + Uses pagination internally to retrieve data from the index. Args: kb_id: Knowledge base ID @@ -188,7 +188,10 @@ class DocMetadataService: if condition is None: condition = {"kb_id": kb_id} + # Add sort by id for ES to enable search_after on large data order_by = OrderByExpr() + if not settings.DOC_ENGINE_INFINITY: + order_by.asc("id") page_size = 1000 all_results = [] @@ -474,7 +477,7 @@ class DocMetadataService: # For Infinity or as fallback: use delete+insert logging.debug(f"[update_document_metadata] Using delete+insert method for doc_id: {doc_id}") - cls.delete_document_metadata(doc_id, kb_id, tenant_id, skip_empty_check=True) + cls.delete_document_metadata(doc_id, kb_id, tenant_id) return cls.insert_document_metadata(doc_id, processed_meta) except Exception as e: @@ -483,7 +486,7 @@ class DocMetadataService: @classmethod @DB.connection_context() - def delete_document_metadata(cls, doc_id: str, kb_id: str, tenant_id: str = None, skip_empty_check: bool = False) -> bool: + def delete_document_metadata(cls, doc_id: str, kb_id: str, tenant_id: str = None) -> bool: """ Delete document metadata from ES/Infinity. Also drops the metadata table if it becomes empty (efficiently). @@ -493,7 +496,6 @@ class DocMetadataService: doc_id: Document ID kb_id: Knowledge base ID tenant_id: Tenant ID, if not provided, get it from kb_id - skip_empty_check: If True, skip checking/dropping empty table (for bulk deletions) Returns: True if successful (or no metadata to delete), False otherwise @@ -529,9 +531,6 @@ class DocMetadataService: logging.debug(f"[METADATA DELETE] Get result: {existing_metadata is not None}") if not existing_metadata: logging.debug(f"[METADATA DELETE] Document {doc_id} has no metadata in table, skipping deletion") - # Only check/drop table if not skipped (tenant deletion will handle it) - if not skip_empty_check: - cls._drop_empty_metadata_table(index_name, tenant_id) return True # No metadata to delete is success except Exception as e: # If get fails, document might not exist in metadata table, which is fine @@ -548,14 +547,6 @@ class DocMetadataService: kb_id # Pass actual kb_id (delete() will handle metadata tables correctly) ) logging.debug(f"[METADATA DELETE] Deleted count: {deleted_count}") - - # Only check if table should be dropped if not skipped (for bulk operations) - # Note: delete operation already uses refresh=True, so data is immediately available - if not skip_empty_check: - # Check by querying the actual metadata table (not MySQL) - cls._drop_empty_metadata_table(index_name, tenant_id) - - logging.debug(f"Successfully deleted metadata for document {doc_id}") return True except Exception as e: @@ -782,21 +773,18 @@ class DocMetadataService: Dictionary mapping doc_id to meta_fields dict """ try: - results = cls._search_metadata(kb_id, condition={"kb_id": kb_id}) + condition = {"kb_id": kb_id} + if doc_ids: + condition["id"] = doc_ids + results = cls._search_metadata(kb_id, condition=condition) if not results: return {} # Build mapping: doc_id -> meta_fields meta_mapping = {} - # If doc_ids is provided, create a set for efficient lookup - doc_ids_set = set(doc_ids) if doc_ids else None - - # Use helper to iterate over results in any format + # Use helper to iterate over results for doc_id, doc in cls._iter_search_results(results): - # Filter by doc_ids if provided - if doc_ids_set is not None and doc_id not in doc_ids_set: - continue # Extract metadata (handles both JSON strings and dicts) doc_meta = cls._extract_metadata(doc) @@ -850,13 +838,13 @@ class DocMetadataService: return "string" try: - results = cls._search_metadata(kb_id, condition={"kb_id": kb_id}) + condition = {"kb_id": kb_id} + if doc_ids: + condition["id"] = doc_ids + results = cls._search_metadata(kb_id, condition=condition) if not results: return {} - # If doc_ids are provided, we'll filter after the search - doc_ids_set = set(doc_ids) if doc_ids else None - # Aggregate metadata summary = {} type_counter = {} @@ -865,9 +853,6 @@ class DocMetadataService: # Use helper to iterate over results in any format for doc_id, doc in cls._iter_search_results(results): - # Check doc_ids filter - if doc_ids_set and doc_id not in doc_ids_set: - continue doc_meta = cls._extract_metadata(doc) @@ -1029,22 +1014,17 @@ class DocMetadataService: return changed try: - results = cls._search_metadata(kb_id, condition=None) + results = cls._search_metadata(kb_id, condition={"kb_id": kb_id, "id": doc_ids}) if not results: results = [] # Treat as empty list if None updated_docs = 0 - doc_ids_set = set(doc_ids) found_doc_ids = set() logging.debug(f"[batch_update_metadata] Searching for doc_ids: {doc_ids}") - # Use helper to iterate over results in any format + # Use helper to iterate over results for doc_id, doc in cls._iter_search_results(results): - # Filter to only process requested doc_ids - if doc_id not in doc_ids_set: - continue - found_doc_ids.add(doc_id) # Get current metadata @@ -1066,13 +1046,14 @@ class DocMetadataService: logging.debug(f"[batch_update_metadata] Updating doc_id: {doc_id}, meta: {meta}") # If metadata is empty, delete the row entirely instead of keeping empty metadata if not meta: - cls.delete_document_metadata(doc_id, kb_id, tenant_id=None, skip_empty_check=True) + cls.delete_document_metadata(doc_id, kb_id, tenant_id=None) else: cls.update_document_metadata(doc_id, meta) updated_docs += 1 # Handle documents that don't have metadata rows yet # These documents weren't in the search results, so we need to insert new metadata for them + doc_ids_set = set(doc_ids) missing_doc_ids = doc_ids_set - found_doc_ids if missing_doc_ids and updates: logging.debug(f"[batch_update_metadata] Inserting new metadata for documents without metadata rows: {missing_doc_ids}") diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 6f88c9a44e..6a3d35eec6 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -264,7 +264,8 @@ class ESConnection(ESConnectionBase): assert "id" in d d_copy = copy.deepcopy(d) d_copy["kb_id"] = knowledgebase_id - meta_id = d_copy.pop("id", "") + # Use id as _id for uniqueness, also keep "id" as a regular field for sorting + meta_id = d_copy.get("id", "") operations.append( {"index": {"_index": index_name, "_id": meta_id}}) operations.append(d_copy)