From 6fb8c31c22430d24bb3f8584fd46ac4081b213ac Mon Sep 17 00:00:00 2001 From: as-ondewo Date: Mon, 11 May 2026 10:04:08 +0200 Subject: [PATCH] Fix: Document parse status set to DONE before chunks are retrievable (#13352) ### What problem does this PR solve? The document parse status was set to DONE before the document chunks were actually retrievable from Elasticsearch/Opensearch because it did not wait for the index refresh. This meant that it was possible that the document parse status returned by the API was DONE but when trying to retrieve chunks there were none. Since the index refreshes every 1 second this was quite likely to happen when wait for document parsing by polling with a short interval and then immediately trying to retrieve chunks once the status was DONE. I fixed this bug and added a test case that would have caught it. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/utils/es_conn.py | 2 +- rag/utils/opensearch_conn.py | 2 +- .../test_parse_documents.py | 33 ++++++++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 51356befad..1c80515d68 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -324,7 +324,7 @@ class ESConnection(ESConnectionBase): try: res = [] r = self.es.bulk(index=index_name, operations=operations, - refresh=False, timeout="60s") + refresh="wait_for", timeout="60s") if re.search(r"False", str(r["errors"]), re.IGNORECASE): return res diff --git a/rag/utils/opensearch_conn.py b/rag/utils/opensearch_conn.py index cb8b70ac2d..f2348b7346 100644 --- a/rag/utils/opensearch_conn.py +++ b/rag/utils/opensearch_conn.py @@ -327,7 +327,7 @@ class OSConnection(DocStoreConnection): try: res = [] r = self.os.bulk(index=(indexName), body=operations, - refresh=False, timeout=60) + refresh="wait_for", timeout=60) if re.search(r"False", str(r["errors"]), re.IGNORECASE): return res diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py index 5b9e5ad314..4411cd43cc 100644 --- a/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py @@ -16,7 +16,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import pytest -from common import bulk_upload_documents, list_documents, parse_documents +from common import bulk_upload_documents, delete_documents, list_chunks, list_documents, parse_documents from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth from utils import wait_for @@ -165,6 +165,37 @@ class TestDocumentsParse: validate_document_details(HttpApiAuth, dataset_id, document_ids) + @pytest.mark.p2 + def test_chunks_retrievable_after_parse_status_done(self, HttpApiAuth, add_dataset_func, ragflow_tmp_dir): + @wait_for(30, 0.1, "Document parsing timeout") + def wait_until_done(ids): + r = list_documents(HttpApiAuth, dataset_id) + target_ids = set(ids) + for doc in r["data"]["docs"]: + if doc["id"] in target_ids and doc.get("run") != "DONE": + return False + return True + + dataset_id = add_dataset_func + + # if there is a bug it can be non-deterministic, so repeat 10 times + iterations = 10 + for i in range(1, iterations + 1): + document_ids = bulk_upload_documents(HttpApiAuth, dataset_id, 1, ragflow_tmp_dir) + + res = parse_documents(HttpApiAuth, dataset_id, {"document_ids": document_ids}) + assert res["code"] == 0, f"parse_documents failed: {res}" + + wait_until_done(document_ids) + + for document_id in document_ids: + res = list_chunks(HttpApiAuth, dataset_id, document_id) + assert res["code"] == 0, f"list_chunks failed: {res}" + assert res["data"]["doc"]["chunk_count"] > 0, f"Document {document_id} has run=DONE but chunk_count is 0" + assert len(res["data"]["chunks"]) > 0, f"Document {document_id} has run=DONE but no chunks returned" + + delete_documents(HttpApiAuth, dataset_id, {"ids": document_ids}) + @pytest.mark.p3 def test_parse_100_files(HttpApiAuth, add_dataset_func, tmp_path):