diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 51356befad..1c80515d68 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -324,7 +324,7 @@ class ESConnection(ESConnectionBase): try: res = [] r = self.es.bulk(index=index_name, operations=operations, - refresh=False, timeout="60s") + refresh="wait_for", timeout="60s") if re.search(r"False", str(r["errors"]), re.IGNORECASE): return res diff --git a/rag/utils/opensearch_conn.py b/rag/utils/opensearch_conn.py index cb8b70ac2d..f2348b7346 100644 --- a/rag/utils/opensearch_conn.py +++ b/rag/utils/opensearch_conn.py @@ -327,7 +327,7 @@ class OSConnection(DocStoreConnection): try: res = [] r = self.os.bulk(index=(indexName), body=operations, - refresh=False, timeout=60) + refresh="wait_for", timeout=60) if re.search(r"False", str(r["errors"]), re.IGNORECASE): return res diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py index 5b9e5ad314..4411cd43cc 100644 --- a/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_parse_documents.py @@ -16,7 +16,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import pytest -from common import bulk_upload_documents, list_documents, parse_documents +from common import bulk_upload_documents, delete_documents, list_chunks, list_documents, parse_documents from configs import INVALID_API_TOKEN from libs.auth import RAGFlowHttpApiAuth from utils import wait_for @@ -165,6 +165,37 @@ class TestDocumentsParse: validate_document_details(HttpApiAuth, dataset_id, document_ids) + @pytest.mark.p2 + def test_chunks_retrievable_after_parse_status_done(self, HttpApiAuth, add_dataset_func, ragflow_tmp_dir): + @wait_for(30, 0.1, "Document parsing timeout") + def wait_until_done(ids): + r = list_documents(HttpApiAuth, dataset_id) + target_ids = set(ids) + for doc in r["data"]["docs"]: + if doc["id"] in target_ids and doc.get("run") != "DONE": + return False + return True + + dataset_id = add_dataset_func + + # if there is a bug it can be non-deterministic, so repeat 10 times + iterations = 10 + for i in range(1, iterations + 1): + document_ids = bulk_upload_documents(HttpApiAuth, dataset_id, 1, ragflow_tmp_dir) + + res = parse_documents(HttpApiAuth, dataset_id, {"document_ids": document_ids}) + assert res["code"] == 0, f"parse_documents failed: {res}" + + wait_until_done(document_ids) + + for document_id in document_ids: + res = list_chunks(HttpApiAuth, dataset_id, document_id) + assert res["code"] == 0, f"list_chunks failed: {res}" + assert res["data"]["doc"]["chunk_count"] > 0, f"Document {document_id} has run=DONE but chunk_count is 0" + assert len(res["data"]["chunks"]) > 0, f"Document {document_id} has run=DONE but no chunks returned" + + delete_documents(HttpApiAuth, dataset_id, {"ids": document_ids}) + @pytest.mark.p3 def test_parse_100_files(HttpApiAuth, add_dataset_func, tmp_path):