diff --git a/admin/client/parser.py b/admin/client/parser.py index 4d9f4b6eef..cdb20b491d 100644 --- a/admin/client/parser.py +++ b/admin/client/parser.py @@ -84,10 +84,10 @@ sql_command: login_user | list_user_chats | create_user_chat | drop_user_chat - | create_index - | drop_index - | create_doc_meta_index - | drop_doc_meta_index + | create_dataset_table + | drop_dataset_table + | create_metadata_table + | drop_metadata_table | list_user_model_providers | list_user_default_models | parse_dataset_docs @@ -102,6 +102,7 @@ sql_command: login_user | update_chunk | set_metadata | remove_tags + | remove_chunks | create_chat_session | drop_chat_session | list_chat_sessions @@ -143,6 +144,7 @@ ALTER: "ALTER"i ACTIVE: "ACTIVE"i ADMIN: "ADMIN"i PASSWORD: "PASSWORD"i +DATASET_TABLE: "DATASET TABLE"i DATASET: "DATASET"i DATASETS: "DATASETS"i OF: "OF"i @@ -187,7 +189,8 @@ IMPORT: "IMPORT"i INTO: "INTO"i IN: "IN"i WITH: "WITH"i -VECTOR_SIZE: "VECTOR_SIZE"i +VECTOR: "VECTOR"i +SIZE: "SIZE"i PARSER: "PARSER"i PIPELINE: "PIPELINE"i SEARCH: "SEARCH"i @@ -210,13 +213,12 @@ LICENSE: "LICENSE"i CHECK: "CHECK"i CONFIG: "CONFIG"i INDEX: "INDEX"i -DOC_META: "DOC_META"i +TABLE: "TABLE"i CHUNK: "CHUNK"i CHUNKS: "CHUNKS"i GET: "GET"i INSERT: "INSERT"i PAGE: "PAGE"i -SIZE: "SIZE"i KEYWORDS: "KEYWORDS"i AVAILABLE: "AVAILABLE"i FILE: "FILE"i @@ -345,10 +347,6 @@ list_user_agents: LIST AGENTS ";" list_user_chats: LIST CHATS ";" create_user_chat: CREATE CHAT quoted_string ";" drop_user_chat: DROP CHAT quoted_string ";" -create_index: CREATE INDEX FOR DATASET quoted_string VECTOR_SIZE NUMBER ";" -drop_index: DROP INDEX FOR DATASET quoted_string ";" -create_doc_meta_index: CREATE INDEX DOC_META ";" -drop_doc_meta_index: DROP INDEX DOC_META ";" create_chat_session: CREATE CHAT quoted_string SESSION ";" drop_chat_session: DROP CHAT quoted_string SESSION quoted_string ";" list_chat_sessions: LIST CHAT quoted_string SESSIONS ";" @@ -359,18 +357,23 @@ import_docs_into_dataset: IMPORT quoted_string INTO DATASET quoted_string ";" search_on_datasets: SEARCH quoted_string ON DATASETS quoted_string ";" get_chunk: GET CHUNK quoted_string ";" list_chunks: LIST CHUNKS OF DOCUMENT quoted_string ("PAGE" NUMBER)? ("SIZE" NUMBER)? ("KEYWORDS" quoted_string)? ("AVAILABLE" NUMBER)? ";" +set_metadata: SET METADATA OF DOCUMENT quoted_string TO quoted_string ";" +remove_tags: REMOVE TAGS quoted_string (COMMA quoted_string)* FROM DATASET quoted_string ";" +remove_chunks: REMOVE CHUNKS quoted_string (COMMA quoted_string)* FROM DOCUMENT quoted_string ";" + | REMOVE ALL CHUNKS FROM DOCUMENT quoted_string ";" parse_dataset_docs: PARSE quoted_string OF DATASET quoted_string ";" parse_dataset_sync: PARSE DATASET quoted_string SYNC ";" parse_dataset_async: PARSE DATASET quoted_string ASYNC ";" -update_chunk: UPDATE CHUNK quoted_string OF DATASET quoted_string SET quoted_string ";" -set_metadata: SET METADATA OF DOCUMENT quoted_string TO quoted_string ";" -remove_tags: REMOVE TAGS quoted_string (COMMA quoted_string)* FROM DATASET quoted_string ";" - -// Internal CLI for GO +// Internal CLI only for GO +create_dataset_table: CREATE DATASET TABLE quoted_string VECTOR SIZE NUMBER ";" +drop_dataset_table: DROP DATASET TABLE quoted_string ";" +create_metadata_table: CREATE METADATA TABLE ";" +drop_metadata_table: DROP METADATA TABLE ";" insert_dataset_from_file: INSERT DATASET FROM FILE quoted_string ";" insert_metadata_from_file: INSERT METADATA FROM FILE quoted_string ";" +update_chunk: UPDATE CHUNK quoted_string OF DATASET quoted_string SET quoted_string ";" identifier_list: identifier (COMMA identifier)* @@ -690,30 +693,29 @@ class RAGFlowCLITransformer(Transformer): chat_name = items[2].children[0].strip("'\"") return {"type": "drop_user_chat", "chat_name": chat_name} - def create_index(self, items): - # items: CREATE, INDEX, FOR, DATASET, quoted_string, VECTOR_SIZE, NUMBER, ";" + def create_dataset_table(self, items): dataset_name = None vector_size = None for i, item in enumerate(items): if hasattr(item, 'data') and item.data == 'quoted_string': dataset_name = item.children[0].strip("'\"") if hasattr(item, 'type') and item.type == 'NUMBER': - if i > 0 and items[i-1].type == 'VECTOR_SIZE': + if i > 0 and items[i-1].type == 'SIZE' and items[i-2].type == 'VECTOR': vector_size = int(item) - return {"type": "create_index", "dataset_name": dataset_name, "vector_size": vector_size} + return {"type": "create_dataset_table", "dataset_name": dataset_name, "vector_size": vector_size} - def drop_index(self, items): + def drop_dataset_table(self, items): dataset_name = None for item in items: if hasattr(item, 'data') and item.data == 'quoted_string': dataset_name = item.children[0].strip("'\"") - return {"type": "drop_index", "dataset_name": dataset_name} + return {"type": "drop_dataset_table", "dataset_name": dataset_name} - def create_doc_meta_index(self, items): - return {"type": "create_doc_meta_index"} + def create_metadata_table(self, items): + return {"type": "create_metadata_table"} - def drop_doc_meta_index(self, items): - return {"type": "drop_doc_meta_index"} + def drop_metadata_table(self, items): + return {"type": "drop_metadata_table"} def list_user_model_providers(self, items): return {"type": "list_user_model_providers"} @@ -825,6 +827,35 @@ class RAGFlowCLITransformer(Transformer): break return {"type": "remove_tags", "dataset_name": dataset_name, "tags": tags} + def remove_chunks(self, items): + # Handle two cases: + # 1. REMOVE CHUNKS quoted_string (COMMA quoted_string)* FROM DOCUMENT quoted_string ";" + # 2. REMOVE ALL CHUNKS FROM DOCUMENT quoted_string ";" + + # Check if it's "REMOVE ALL CHUNKS" + for item in items: + if hasattr(item, 'type') and item.type == 'ALL': + # Find doc_id + for j, inner_item in enumerate(items): + if hasattr(inner_item, 'type') and inner_item.type == 'DOCUMENT': + doc_id = items[j + 1].children[0].strip("'\"") + return {"type": "remove_chunks", "doc_id": doc_id, "delete_all": True} + + # Otherwise, we have chunk_ids + chunk_ids = [] + doc_id = None + for i, item in enumerate(items): + if hasattr(item, 'type') and item.type == 'DOCUMENT': + doc_id = items[i + 1].children[0].strip("'\"") + elif hasattr(item, 'children') and item.children: + val = item.children[0].strip("'\"") + # Skip if it's "FROM" or "DOCUMENT" + if val.upper() in ['FROM', 'DOCUMENT']: + continue + chunk_ids.append(val) + + return {"type": "remove_chunks", "doc_id": doc_id, "chunk_ids": chunk_ids} + def list_chunks(self, items): doc_id = items[4].children[0].strip("'\"") result = {"type": "list_chunks", "doc_id": doc_id} diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index 0a8adc1699..3b0d8556e4 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -1029,7 +1029,7 @@ class RAGFlowClient: else: print(f"Fail to create chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") - def create_index(self, command): + def create_dataset_table(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") return @@ -1045,15 +1045,15 @@ class RAGFlowClient: # Build payload payload = {"kb_id": dataset_id, "vector_size": vector_size} # Call API - response = self.http_client.request("POST", "/kb/index", json_body=payload, + response = self.http_client.request("POST", "/kb/doc_engine_table", json_body=payload, use_api_base=False, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json.get("code") == 0: - print(f"Success to create index for dataset: {dataset_name}") + print(f"Success to create table for dataset: {dataset_name}") else: - print(f"Fail to create index for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") + print(f"Fail to create table for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") - def drop_index(self, command): + def drop_dataset_table(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") return @@ -1062,41 +1062,41 @@ class RAGFlowClient: dataset_id = self._get_dataset_id(dataset_name) if dataset_id is None: return - # Call API to delete index + # Call API to delete table payload = {"kb_id": dataset_id} - response = self.http_client.request("DELETE", "/kb/index", json_body=payload, + response = self.http_client.request("DELETE", "/kb/doc_engine_table", json_body=payload, use_api_base=False, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json.get("code") == 0: - print(f"Success to drop index for dataset: {dataset_name}") + print(f"Success to drop table for dataset: {dataset_name}") else: - print(f"Fail to drop index for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") + print(f"Fail to drop table for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") - def create_doc_meta_index(self, command): + def create_metadata_table(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") return - # Call API to create doc meta index - response = self.http_client.request("POST", "/tenant/doc_meta_index", + # Call API to create metadata table + response = self.http_client.request("POST", "/tenant/doc_engine_metadata_table", use_api_base=False, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json.get("code") == 0: - print("Success to create doc meta index") + print("Success to create metadata table") else: - print(f"Fail to create doc meta index, code: {res_json.get('code')}, message: {res_json.get('message')}") + print(f"Fail to create metadata table, code: {res_json.get('code')}, message: {res_json.get('message')}") - def drop_doc_meta_index(self, command): + def drop_metadata_table(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") return - # Call API to delete doc meta index - response = self.http_client.request("DELETE", "/tenant/doc_meta_index", + # Call API to delete metadata table + response = self.http_client.request("DELETE", "/tenant/doc_engine_metadata_table", use_api_base=False, auth_kind="web") res_json = response.json() if response.status_code == 200 and res_json.get("code") == 0: - print("Success to drop doc meta index") + print("Success to drop metadata table") else: - print(f"Fail to drop doc meta index, code: {res_json.get('code')}, message: {res_json.get('message')}") + print(f"Fail to drop metadata table, code: {res_json.get('code')}, message: {res_json.get('message')}") def drop_user_chat(self, command): if self.server_type != "user": @@ -1548,9 +1548,13 @@ class RAGFlowClient: print(f"Invalid JSON body: {e}") return - # Call PUT /datasets/{dataset_id}/documents/{doc_id}/chunks/{chunk_id} - path = f"/datasets/{dataset_id}/documents/{doc_id}/chunks/{chunk_id}" - response = self.http_client.request("PUT", path, json_body=payload, use_api_base=True, auth_kind="api") + # Add IDs to payload + payload["dataset_id"] = dataset_id + payload["document_id"] = doc_id + payload["chunk_id"] = chunk_id + + # Call POST /v1/chunk/update + response = self.http_client.request("POST", "/chunk/update", json_body=payload, use_api_base=False, auth_kind="web") res_json = response.json() if response.status_code == 200: if res_json.get("code") == 0: @@ -1583,7 +1587,7 @@ class RAGFlowClient: else: print(f"Fail to set metadata, code: {res_json.get('code')}, message: {res_json.get('message')}") else: - print(f"Fail to set metadata, HTTP {response.status_code}") + print(f"Fail to set metadata, HTTP {response.status_code}: {res_json.get('message', 'no message')}") def remove_tags(self, command_dict): if self.server_type != "user": @@ -1613,6 +1617,31 @@ class RAGFlowClient: else: print(f"Fail to remove tags, HTTP {response.status_code}") + def remove_chunks(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + + doc_id = command_dict["doc_id"] + payload = {"doc_id": doc_id} + + if command_dict.get("delete_all"): + payload["delete_all"] = True + elif command_dict.get("chunk_ids"): + payload["chunk_ids"] = command_dict["chunk_ids"] + + response = self.http_client.request("POST", "/chunk/rm", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200: + if res_json.get("code") == 0: + deleted_count = res_json.get("data", 0) + print(f"Success to remove chunks from document {doc_id}: {deleted_count} chunks deleted") + else: + print(f"Fail to remove chunks, code: {res_json.get('code')}, message: {res_json.get('message')}") + else: + print(f"Fail to remove chunks, HTTP {response.status_code}") + def list_chunks(self, command_dict): if self.server_type != "user": print("This command is only allowed in USER mode") @@ -1976,14 +2005,14 @@ def run_command(client: RAGFlowClient, command_dict: dict): client.create_user_chat(command_dict) case "drop_user_chat": client.drop_user_chat(command_dict) - case "create_index": - client.create_index(command_dict) - case "drop_index": - client.drop_index(command_dict) - case "create_doc_meta_index": - client.create_doc_meta_index(command_dict) - case "drop_doc_meta_index": - client.drop_doc_meta_index(command_dict) + case "create_dataset_table": + client.create_dataset_table(command_dict) + case "drop_dataset_table": + client.drop_dataset_table(command_dict) + case "create_metadata_table": + client.create_metadata_table(command_dict) + case "drop_metadata_table": + client.drop_metadata_table(command_dict) case "create_chat_session": client.create_chat_session(command_dict) case "drop_chat_session": @@ -2016,6 +2045,8 @@ def run_command(client: RAGFlowClient, command_dict: dict): return client.set_metadata(command_dict) case "remove_tags": return client.remove_tags(command_dict) + case "remove_chunks": + return client.remove_chunks(command_dict) case "list_chunks": return client.list_chunks(command_dict) case "meta": @@ -2077,10 +2108,6 @@ LIST METADATA OF DATASETS [, ]* LIST METADATA SUMMARY OF DATASET DOCUMENTS [, ]* GET CHUNK LIST CHUNKS OF DOCUMENT [PAGE ] [SIZE ] [KEYWORDS ] [AVAILABLE <0|1>] -CREATE INDEX FOR DATASET VECTOR_SIZE -DROP INDEX FOR DATASET -CREATE INDEX DOC_META -DROP INDEX DOC_META Meta Commands: \\?, \\h, \\help Show this help diff --git a/internal/cli/client.go b/internal/cli/client.go index c368fc5383..1a6cc0326d 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -208,14 +208,6 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.UnsetToken(cmd) case "show_version": return c.ShowServerVersion(cmd) - case "create_index": - return c.CreateIndex(cmd) - case "drop_index": - return c.DropIndex(cmd) - case "create_doc_meta_index": - return c.CreateDocMetaIndex(cmd) - case "drop_doc_meta_index": - return c.DropDocMetaIndex(cmd) case "list_available_providers": return c.ListAvailableProviders(cmd) case "show_provider": @@ -256,6 +248,27 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.UseModel(cmd) case "show_current_model": return c.ShowCurrentModel(cmd) + // Dataset, metadata commands + case "create_dataset_table": + return c.CreateDatasetInDocEngine(cmd) + case "drop_dataset_table": + return c.DropDatasetInDocEngine(cmd) + case "create_metadata_table": + return c.CreateMetadataInDocEngine(cmd) + case "drop_metadata_table": + return c.DropMetadataInDocEngine(cmd) + case "insert_dataset_from_file": + return c.InsertDatasetFromFile(cmd) + case "insert_metadata_from_file": + return c.InsertMetadataFromFile(cmd) + case "update_chunk": + return c.UpdateChunk(cmd) + case "set_meta": + return c.SetMeta(cmd) + case "rm_tags": + return c.RmTags(cmd) + case "remove_chunks": + return c.RemoveChunks(cmd) // ContextEngine commands case "context_list": return c.ContextList(cmd) @@ -267,16 +280,6 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.CEList(cmd) case "ce_search": return c.CESearch(cmd) - case "insert_dataset_from_file": - return c.InsertDatasetFromFile(cmd) - case "insert_metadata_from_file": - return c.InsertMetadataFromFile(cmd) - case "update_chunk": - return c.UpdateChunk(cmd) - case "set_meta": - return c.SetMeta(cmd) - case "rm_tags": - return c.RmTags(cmd) // TODO: Implement other commands default: return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type) diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index 19ae0ce1a5..afa990206f 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -53,6 +53,22 @@ func (l *Lexer) peekChar() byte { return l.input[l.readPos] } +func (l *Lexer) peekToken() string { + // Skip whitespace starting from readPos + skipPos := l.readPos + for skipPos < len(l.input) && (l.input[skipPos] == ' ' || l.input[skipPos] == '\t' || l.input[skipPos] == '\n' || l.input[skipPos] == '\r') { + skipPos++ + } + + // Read identifier starting from skipPos + start := skipPos + for skipPos < len(l.input) && (isLetter(l.input[skipPos]) || isDigit(l.input[skipPos]) || l.input[skipPos] == '_' || l.input[skipPos] == '-' || l.input[skipPos] == '.') { + skipPos++ + } + + return l.input[start:skipPos] +} + func (l *Lexer) skipWhitespace() { for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' { l.readChar() @@ -206,6 +222,18 @@ func (l *Lexer) lookupIdent(ident string) Token { case "PASSWORD": return Token{Type: TokenPassword, Value: ident} case "DATASET": + // Check if followed by TABLE for compound token + if strings.ToUpper(l.peekToken()) == "TABLE" { + // Skip whitespace to TABLE + for l.ch == ' ' || l.ch == '\t' || l.ch == '\n' || l.ch == '\r' { + l.readChar() + } + // Skip past TABLE + for isLetter(l.ch) || isDigit(l.ch) || l.ch == '_' || l.ch == '-' || l.ch == '.' { + l.readChar() + } + return Token{Type: TokenDatasetTable, Value: "DATASET TABLE"} + } return Token{Type: TokenDataset, Value: ident} case "DATASETS": return Token{Type: TokenDatasets, Value: ident} @@ -325,10 +353,14 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenTokens, Value: ident} case "INDEX": return Token{Type: TokenIndex, Value: ident} - case "VECTOR_SIZE": - return Token{Type: TokenVectorSize, Value: ident} - case "DOC_META": - return Token{Type: TokenDocMeta, Value: ident} + case "VECTOR": + return Token{Type: TokenVector, Value: ident} + case "SIZE": + return Token{Type: TokenSize, Value: ident} + case "METADATA": + return Token{Type: TokenMetadata, Value: ident} + case "TABLE": + return Token{Type: TokenTable, Value: ident} case "AVAILABLE": return Token{Type: TokenAvailable, Value: ident} case "NAME": @@ -345,8 +377,6 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenInsert, Value: ident} case "FILE": return Token{Type: TokenFile, Value: ident} - case "METADATA": - return Token{Type: TokenMetadata, Value: ident} case "USE": return Token{Type: TokenUse, Value: ident} case "UPDATE": @@ -355,6 +385,8 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenRemove, Value: ident} case "CHUNK": return Token{Type: TokenChunk, Value: ident} + case "CHUNKS": + return Token{Type: TokenChunks, Value: ident} case "DOCUMENT": return Token{Type: TokenDocument, Value: ident} case "TAGS": diff --git a/internal/cli/types.go b/internal/cli/types.go index 5132455d4e..a3c879a33c 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -47,6 +47,7 @@ const ( TokenPassword TokenDataset TokenDatasets + TokenDatasetTable TokenOf TokenAgents TokenRole @@ -103,7 +104,8 @@ const ( TokenTokens TokenUnset TokenIndex - TokenVectorSize + TokenVector + TokenSize TokenDocMeta TokenName // For ALTER PROVIDER NAME TokenInstance @@ -117,9 +119,11 @@ const ( TokenInsert TokenFile TokenMetadata + TokenTable TokenUpdate TokenRemove TokenChunk + TokenChunks TokenDocument TokenTag TokenLog diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 8f36c9ff65..c628e82b4e 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -759,8 +759,8 @@ func (c *RAGFlowClient) UnsetToken(cmd *Command) (ResponseIf, error) { return &result, nil } -// CreateIndex creates an index for a dataset -func (c *RAGFlowClient) CreateIndex(cmd *Command) (ResponseIf, error) { +// CreateDataset creates a table for a dataset +func (c *RAGFlowClient) CreateDataset(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } @@ -786,13 +786,13 @@ func (c *RAGFlowClient) CreateIndex(cmd *Command) (ResponseIf, error) { "vector_size": vectorSize, } - resp, err := c.HTTPClient.Request("POST", "/kb/index", false, "web", nil, payload) + resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", false, "web", nil, payload) if err != nil { - return nil, fmt.Errorf("failed to create index: %w", err) + return nil, fmt.Errorf("failed to create table: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to create index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -808,16 +808,73 @@ func (c *RAGFlowClient) CreateIndex(cmd *Command) (ResponseIf, error) { var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = fmt.Sprintf("Success to create index for dataset: %s", datasetName) + result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName) } else { - result.Message = fmt.Sprintf("Failed to create index: %v", resJSON) + result.Message = fmt.Sprintf("Failed to create table: %v", resJSON) } result.Duration = 0 return &result, nil } -// DropIndex drops an index for a dataset -func (c *RAGFlowClient) DropIndex(cmd *Command) (ResponseIf, error) { +// CreateDatasetInDocEngine creates a table for a dataset in doc engine +func (c *RAGFlowClient) CreateDatasetInDocEngine(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + datasetName, ok := cmd.Params["dataset_name"].(string) + if !ok { + return nil, fmt.Errorf("dataset_name not provided") + } + + vectorSize, ok := cmd.Params["vector_size"].(int) + if !ok { + return nil, fmt.Errorf("vector_size not provided") + } + + // Get dataset ID by name + datasetID, err := c.getDatasetID(datasetName) + if err != nil { + return nil, err + } + + payload := map[string]interface{}{ + "kb_id": datasetID, + "vector_size": vectorSize, + } + + resp, err := c.HTTPClient.Request("POST", "/kb/doc_engine_table", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to create table: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to create table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to create table for dataset: %s", datasetName) + } else { + result.Message = fmt.Sprintf("Failed to create table: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + +// DropDatasetInDocEngine drops a table for a dataset in doc engine +func (c *RAGFlowClient) DropDatasetInDocEngine(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } @@ -837,13 +894,13 @@ func (c *RAGFlowClient) DropIndex(cmd *Command) (ResponseIf, error) { "kb_id": datasetID, } - resp, err := c.HTTPClient.Request("DELETE", "/kb/index", false, "web", nil, payload) + resp, err := c.HTTPClient.Request("DELETE", "/kb/doc_engine_table", false, "web", nil, payload) if err != nil { - return nil, fmt.Errorf("failed to drop index: %w", err) + return nil, fmt.Errorf("failed to drop dataset: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to drop index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to drop dataset: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -859,27 +916,27 @@ func (c *RAGFlowClient) DropIndex(cmd *Command) (ResponseIf, error) { var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = fmt.Sprintf("Success to drop index for dataset: %s", datasetName) + result.Message = fmt.Sprintf("Success to drop table for dataset: %s", datasetName) } else { - result.Message = fmt.Sprintf("Failed to drop index: %v", resJSON) + result.Message = fmt.Sprintf("Failed to drop table for dataset: %s: %v", datasetName, resJSON) } result.Duration = 0 return &result, nil } -// CreateDocMetaIndex creates the document metadata index for the tenant -func (c *RAGFlowClient) CreateDocMetaIndex(cmd *Command) (ResponseIf, error) { +// CreateMetadataInDocEngine creates the document metadata table for the tenant +func (c *RAGFlowClient) CreateMetadataInDocEngine(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } - resp, err := c.HTTPClient.Request("POST", "/tenant/doc_meta_index", false, "web", nil, nil) + resp, err := c.HTTPClient.Request("POST", "/tenant/doc_engine_metadata_table", false, "web", nil, nil) if err != nil { - return nil, fmt.Errorf("failed to create doc meta index: %w", err) + return nil, fmt.Errorf("failed to create metadata table: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to create doc meta index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to create metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -895,27 +952,27 @@ func (c *RAGFlowClient) CreateDocMetaIndex(cmd *Command) (ResponseIf, error) { var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = "Success to create doc meta index" + result.Message = "Success to create metadata table" } else { - result.Message = fmt.Sprintf("Failed to create doc meta index: %v", resJSON) + result.Message = fmt.Sprintf("Failed to create metadata table: %v", resJSON) } result.Duration = 0 return &result, nil } -// DropDocMetaIndex drops the document metadata index for the tenant -func (c *RAGFlowClient) DropDocMetaIndex(cmd *Command) (ResponseIf, error) { +// DropMetadataInDocEngine drops the document metadata table for the tenant +func (c *RAGFlowClient) DropMetadataInDocEngine(cmd *Command) (ResponseIf, error) { if c.ServerType != "user" { return nil, fmt.Errorf("this command is only allowed in USER mode") } - resp, err := c.HTTPClient.Request("DELETE", "/tenant/doc_meta_index", false, "web", nil, nil) + resp, err := c.HTTPClient.Request("DELETE", "/tenant/doc_engine_metadata_table", false, "web", nil, nil) if err != nil { - return nil, fmt.Errorf("failed to drop doc meta index: %w", err) + return nil, fmt.Errorf("failed to drop metadata table: %w", err) } if resp.StatusCode != 200 { - return nil, fmt.Errorf("failed to drop doc meta index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + return nil, fmt.Errorf("failed to drop metadata table: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) } resJSON, err := resp.JSON() @@ -931,9 +988,9 @@ func (c *RAGFlowClient) DropDocMetaIndex(cmd *Command) (ResponseIf, error) { var result SimpleResponse result.Code = int(code) if result.Code == 0 { - result.Message = "Success to drop doc meta index" + result.Message = "Success to drop metadata table" } else { - result.Message = fmt.Sprintf("Failed to drop doc meta index: %v", resJSON) + result.Message = fmt.Sprintf("Failed to drop metadata table: %v", resJSON) } result.Duration = 0 return &result, nil @@ -1729,8 +1786,12 @@ func (c *RAGFlowClient) UpdateChunk(cmd *Command) (ResponseIf, error) { return nil, fmt.Errorf("invalid JSON body: %w", err) } - path := fmt.Sprintf("/datasets/%s/documents/%s/chunks/%s", datasetID, docID, chunkID) - resp, err := c.HTTPClient.Request("PUT", path, true, "api", nil, payload) + // Add IDs to payload + payload["dataset_id"] = datasetID + payload["document_id"] = docID + payload["chunk_id"] = chunkID + + resp, err := c.HTTPClient.Request("POST", "/chunk/update", false, "web", nil, payload) if err != nil { return nil, fmt.Errorf("failed to update chunk: %w", err) } @@ -1865,3 +1926,64 @@ func (c *RAGFlowClient) RmTags(cmd *Command) (ResponseIf, error) { result.Duration = 0 return &result, nil } + +// RemoveChunks removes chunks from a document +func (c *RAGFlowClient) RemoveChunks(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + docID, ok := cmd.Params["doc_id"].(string) + if !ok { + return nil, fmt.Errorf("doc_id not provided") + } + + payload := map[string]interface{}{ + "doc_id": docID, + } + + // Check if delete_all is set + if deleteAll, ok := cmd.Params["delete_all"].(bool); ok && deleteAll { + payload["delete_all"] = true + } else if chunkIDs, ok := cmd.Params["chunk_ids"].([]string); ok { + payload["chunk_ids"] = chunkIDs + } + + resp, err := c.HTTPClient.Request("POST", "/chunk/rm", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to remove chunks: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to remove chunks: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + deletedCount := int64(0) + switch data := resJSON["data"].(type) { + case float64: + deletedCount = int64(data) + case map[string]interface{}: + if count, ok := data["deleted_count"].(float64); ok { + deletedCount = int64(count) + } + } + result.Message = fmt.Sprintf("Success to remove chunks from document %s: %d chunks deleted", docID, deletedCount) + } else { + result.Message = fmt.Sprintf("Failed to remove chunks: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} diff --git a/internal/cli/user_parser.go b/internal/cli/user_parser.go index 7f7552c5e3..f74dae494b 100644 --- a/internal/cli/user_parser.go +++ b/internal/cli/user_parser.go @@ -511,8 +511,10 @@ func (p *Parser) parseCreateCommand() (*Command, error) { return p.parseCreateChat() case TokenToken: return p.parseCreateToken() - case TokenIndex: - return p.parseCreateIndex() + case TokenDatasetTable: + return p.parseCreateDatasetTable() + case TokenMetadata: + return p.parseCreateMetadataTable() case TokenProvider: return p.parseCreateProviderInstance() default: @@ -541,30 +543,10 @@ func (p *Parser) parseCreateToken() (*Command, error) { return NewCommand("create_token"), nil } -func (p *Parser) parseCreateIndex() (*Command, error) { - // CREATE INDEX FOR DATASET 'name' VECTOR_SIZE N - // CREATE INDEX DOC_META - p.nextToken() // consume INDEX - - // Check if creating doc meta index - if p.curToken.Type == TokenDocMeta { - p.nextToken() - if p.curToken.Type == TokenSemicolon { - p.nextToken() - } - return NewCommand("create_doc_meta_index"), nil - } - - // Otherwise, must be CREATE INDEX FOR DATASET 'name' VECTOR_SIZE N - if p.curToken.Type != TokenFor { - return nil, fmt.Errorf("expected FOR or DOC_META after INDEX, got %s", p.curToken.Value) - } - p.nextToken() - - if p.curToken.Type != TokenDataset { - return nil, fmt.Errorf("expected DATASET after FOR, got %s", p.curToken.Value) - } - p.nextToken() +// Internal CLI for GO +// parseCreateDatasetTable parses: CREATE DATASET TABLE 'name' VECTOR SIZE N +func (p *Parser) parseCreateDatasetTable() (*Command, error) { + p.nextToken() // consume DATASET TABLE compound token datasetName, err := p.parseQuotedString() if err != nil { @@ -572,8 +554,12 @@ func (p *Parser) parseCreateIndex() (*Command, error) { } p.nextToken() - if p.curToken.Type != TokenVectorSize { - return nil, fmt.Errorf("expected VECTOR_SIZE after dataset name, got %s", p.curToken.Value) + if p.curToken.Type != TokenVector { + return nil, fmt.Errorf("expected VECTOR after dataset name, got %s", p.curToken.Value) + } + p.nextToken() + if p.curToken.Type != TokenSize { + return nil, fmt.Errorf("expected SIZE after VECTOR, got %s", p.curToken.Value) } p.nextToken() @@ -590,12 +576,30 @@ func (p *Parser) parseCreateIndex() (*Command, error) { p.nextToken() } - cmd := NewCommand("create_index") + cmd := NewCommand("create_dataset_table") cmd.Params["dataset_name"] = datasetName cmd.Params["vector_size"] = vectorSize return cmd, nil } +// Internal CLI for GO +// parseCreateMetadataTable parses: CREATE METADATA TABLE +func (p *Parser) parseCreateMetadataTable() (*Command, error) { + // CREATE METADATA TABLE + p.nextToken() // consume METADATA + + if p.curToken.Type != TokenTable { + return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value) + } + p.nextToken() + + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + return NewCommand("create_metadata_table"), nil +} + func (p *Parser) parseCreateUser() (*Command, error) { p.nextToken() // consume USER userName, err := p.parseQuotedString() @@ -801,8 +805,10 @@ func (p *Parser) parseDropCommand() (*Command, error) { return p.parseDropChat() case TokenToken: return p.parseDropToken() - case TokenIndex: - return p.parseDropIndex() + case TokenDatasetTable: + return p.parseDropDatasetTable() + case TokenMetadata: + return p.parseDropMetadataTable() case TokenInstance: return p.parseDropInstance() default: @@ -827,6 +833,8 @@ func (p *Parser) parseRemoveCommand() (*Command, error) { switch p.curToken.Type { case TokenTag: return p.parseRemoveTags() + case TokenChunks, TokenAll: + return p.parseRemoveChunk() default: return nil, fmt.Errorf("unknown REMOVE target: %s", p.curToken.Value) } @@ -863,30 +871,10 @@ func (p *Parser) parseDropToken() (*Command, error) { return cmd, nil } -func (p *Parser) parseDropIndex() (*Command, error) { - // DROP INDEX FOR DATASET 'name' OR DROP INDEX DOC_META - p.nextToken() // consume INDEX - - // Check if dropping doc meta index - if p.curToken.Type == TokenDocMeta { - p.nextToken() - if p.curToken.Type == TokenSemicolon { - p.nextToken() - } - cmd := NewCommand("drop_doc_meta_index") - return cmd, nil - } - - // Otherwise, must be DROP INDEX FOR DATASET 'name' - if p.curToken.Type != TokenFor { - return nil, fmt.Errorf("expected FOR or DOC_META after INDEX, got %s", p.curToken.Value) - } - p.nextToken() - - if p.curToken.Type != TokenDataset { - return nil, fmt.Errorf("expected DATASET after FOR, got %s", p.curToken.Value) - } - p.nextToken() +// Internal CLI for GO +// parseDropDatasetTable parses: DROP DATASET TABLE 'name' +func (p *Parser) parseDropDatasetTable() (*Command, error) { + p.nextToken() // consume DATASET TABLE datasetName, err := p.parseQuotedString() if err != nil { @@ -898,11 +886,29 @@ func (p *Parser) parseDropIndex() (*Command, error) { p.nextToken() } - cmd := NewCommand("drop_index") + cmd := NewCommand("drop_dataset_table") cmd.Params["dataset_name"] = datasetName return cmd, nil } +// Internal CLI for GO +// parseDropMetadataTable parses: DROP METADATA TABLE +func (p *Parser) parseDropMetadataTable() (*Command, error) { + // DROP METADATA TABLE + p.nextToken() // consume METADATA + + if p.curToken.Type != TokenTable { + return nil, fmt.Errorf("expected TABLE after METADATA, got %s", p.curToken.Value) + } + p.nextToken() + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + cmd := NewCommand("drop_metadata_table") + return cmd, nil +} + func (p *Parser) parseDropUser() (*Command, error) { p.nextToken() // consume USER userName, err := p.parseQuotedString() @@ -2428,15 +2434,21 @@ func (p *Parser) parseUnsetCommand() (*Command, error) { return NewCommand("unset_token"), nil } -// parseUpdateCommand parses UPDATE CHUNK command -// UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}' +// Internal +// parseUpdateCommand parses: UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}' func (p *Parser) parseUpdateCommand() (*Command, error) { p.nextToken() // consume UPDATE - if p.curToken.Type != TokenChunk { - return nil, fmt.Errorf("expected CHUNK after UPDATE") + if p.curToken.Type == TokenChunk { + return p.parseUpdateChunk() } - p.nextToken() + + return nil, fmt.Errorf("unknown UPDATE target: %s", p.curToken.Value) +} + +// parseUpdateChunk parses: UPDATE CHUNK 'chunk_id' OF DATASET 'dataset_name' SET '{"content": "..."}' +func (p *Parser) parseUpdateChunk() (*Command, error) { + p.nextToken() // consume CHUNK // Parse chunk_id chunkID, err := p.parseQuotedString() @@ -2588,3 +2600,74 @@ func (p *Parser) parseRemoveTags() (*Command, error) { return cmd, nil } + + +// parseRemoveChunk parses: +// - REMOVE CHUNKS 'chunk_id1', 'chunk_id2' FROM DOCUMENT 'doc_id'; +// - REMOVE ALL CHUNKS FROM DOCUMENT 'doc_id'; +func (p *Parser) parseRemoveChunk() (*Command, error) { + cmd := NewCommand("remove_chunks") + + // Check if ALL CHUNKS - if we came here from TokenAll case, curToken is already ALL + if p.curToken.Type == TokenAll { + p.nextToken() // consume ALL + if p.curToken.Type != TokenChunks { + return nil, fmt.Errorf("expected CHUNKS after ALL") + } + p.nextToken() // consume CHUNKS + cmd.Params["delete_all"] = true + } else { + // curToken is TokenChunks, consume it first + p.nextToken() + // Multiple chunks: REMOVE CHUNKS 'id1', 'id2' FROM DOCUMENT 'doc_id' + // Parse first chunk ID + chunkID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected chunk_id: %w", err) + } + chunkIDs := []string{chunkID} + + // Parse additional chunk IDs separated by commas + for { + p.nextToken() + if p.curToken.Type == TokenComma { + p.nextToken() + chunkID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected chunk_id after comma: %w", err) + } + chunkIDs = append(chunkIDs, chunkID) + } else { + break + } + } + cmd.Params["chunk_ids"] = chunkIDs + } + + // Expect FROM + if p.curToken.Type != TokenFrom { + return nil, fmt.Errorf("expected FROM after chunk(s)") + } + p.nextToken() + + // Expect DOCUMENT + if p.curToken.Type != TokenDocument { + return nil, fmt.Errorf("expected DOCUMENT after FROM") + } + p.nextToken() + + // Parse doc_id + docID, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected doc_id: %w", err) + } + cmd.Params["doc_id"] = docID + p.nextToken() + + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + return cmd, nil +} diff --git a/internal/engine/elasticsearch/document.go b/internal/engine/elasticsearch/document.go deleted file mode 100644 index 393a81d399..0000000000 --- a/internal/engine/elasticsearch/document.go +++ /dev/null @@ -1,238 +0,0 @@ -// -// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package elasticsearch - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - - "github.com/elastic/go-elasticsearch/v8/esapi" -) - -// IndexDocument indexes a single document -func (e *elasticsearchEngine) IndexDocument(ctx context.Context, indexName, docID string, doc interface{}) error { - if indexName == "" { - return fmt.Errorf("index name cannot be empty") - } - if docID == "" { - return fmt.Errorf("document id cannot be empty") - } - if doc == nil { - return fmt.Errorf("document cannot be nil") - } - - // Serialize document - data, err := json.Marshal(doc) - if err != nil { - return fmt.Errorf("failed to marshal document: %w", err) - } - - // Index document - req := esapi.IndexRequest{ - Index: indexName, - DocumentID: docID, - Body: bytes.NewReader(data), - Refresh: "true", - } - - res, err := req.Do(ctx, e.client) - if err != nil { - return fmt.Errorf("failed to index document: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return fmt.Errorf("elasticsearch returned error: %s", res.Status()) - } - - return nil -} - -// BulkIndex indexes documents in bulk -func (e *elasticsearchEngine) BulkIndex(ctx context.Context, indexName string, docs []interface{}) (interface{}, error) { - if indexName == "" { - return nil, fmt.Errorf("index name cannot be empty") - } - if len(docs) == 0 { - return nil, fmt.Errorf("documents cannot be empty") - } - - // Build bulk request - var buf bytes.Buffer - for _, doc := range docs { - docMap, ok := doc.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("document must be map[string]interface{}") - } - - docID, hasID := docMap["_id"] - if !hasID { - return nil, fmt.Errorf("document missing _id field") - } - - // Delete _id field to avoid duplication - delete(docMap, "_id") - - // Add index operation - meta := map[string]interface{}{ - "_index": indexName, - "_id": docID, - } - metaData, _ := json.Marshal(meta) - docData, _ := json.Marshal(docMap) - - buf.Write(metaData) - buf.WriteByte('\n') - buf.Write(docData) - buf.WriteByte('\n') - } - - // Execute bulk request - req := esapi.BulkRequest{ - Body: &buf, - Refresh: "true", - } - - res, err := req.Do(ctx, e.client) - if err != nil { - return nil, fmt.Errorf("bulk index failed: %w", err) - } - defer res.Body.Close() - - if res.IsError() { - return nil, fmt.Errorf("elasticsearch returned error: %s", res.Status()) - } - - // Parse response - var result map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to parse response: %w", err) - } - - // Check for errors - if errors, ok := result["errors"].(bool); ok && errors { - // Get error details - if items, ok := result["items"].([]interface{}); ok && len(items) > 0 { - for _, item := range items { - if itemMap, ok := item.(map[string]interface{}); ok { - for _, op := range itemMap { - if opMap, ok := op.(map[string]interface{}); ok { - if errInfo, ok := opMap["error"].(map[string]interface{}); ok { - if reason, ok := errInfo["reason"].(string); ok { - return nil, fmt.Errorf("bulk index error: %s", reason) - } - } - } - } - } - } - } - return nil, fmt.Errorf("bulk index has errors") - } - - response := &BulkResponse{ - Took: int64(result["took"].(float64)), - Errors: result["errors"].(bool), - Indexed: len(docs), - } - - return response, nil -} - -// BulkResponse bulk operation response -type BulkResponse struct { - Took int64 - Errors bool - Indexed int -} - -// GetDocument gets a document -func (e *elasticsearchEngine) GetDocument(ctx context.Context, indexName, docID string) (interface{}, error) { - if indexName == "" { - return nil, fmt.Errorf("index name cannot be empty") - } - if docID == "" { - return nil, fmt.Errorf("document id cannot be empty") - } - - // Get document - req := esapi.GetRequest{ - Index: indexName, - DocumentID: docID, - } - - res, err := req.Do(ctx, e.client) - if err != nil { - return nil, fmt.Errorf("failed to get document: %w", err) - } - defer res.Body.Close() - - if res.StatusCode == 404 { - return nil, fmt.Errorf("document not found") - } - - if res.IsError() { - return nil, fmt.Errorf("elasticsearch returned error: %s", res.Status()) - } - - // Parse response - var result map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to parse response: %w", err) - } - - if found, ok := result["found"].(bool); !ok || !found { - return nil, fmt.Errorf("document not found") - } - - return result["_source"], nil -} - -// DeleteDocument deletes a document -func (e *elasticsearchEngine) DeleteDocument(ctx context.Context, indexName, docID string) error { - if indexName == "" { - return fmt.Errorf("index name cannot be empty") - } - if docID == "" { - return fmt.Errorf("document id cannot be empty") - } - - // Delete document - req := esapi.DeleteRequest{ - Index: indexName, - DocumentID: docID, - Refresh: "true", - } - - res, err := req.Do(ctx, e.client) - if err != nil { - return fmt.Errorf("failed to delete document: %w", err) - } - defer res.Body.Close() - - if res.StatusCode == 404 { - return fmt.Errorf("document not found") - } - - if res.IsError() { - return fmt.Errorf("elasticsearch returned error: %s", res.Status()) - } - - return nil -} diff --git a/internal/engine/elasticsearch/index.go b/internal/engine/elasticsearch/index.go index 047b14f443..b0190697d1 100644 --- a/internal/engine/elasticsearch/index.go +++ b/internal/engine/elasticsearch/index.go @@ -26,8 +26,8 @@ import ( "github.com/elastic/go-elasticsearch/v8/esapi" ) -// CreateIndex creates an index -func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { +// CreateDataset creates an index +func (e *elasticsearchEngine) CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { // Elasticsearch doesn't support vector_size or parser_id in the same way // Build mapping for ES (if needed) // TODO @@ -40,7 +40,7 @@ func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName, datase } // Check if index already exists - exists, err := e.IndexExists(ctx, indexName) + exists, err := e.TableExists(ctx, indexName) if err != nil { return fmt.Errorf("failed to check index existence: %w", err) } @@ -88,14 +88,14 @@ func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName, datase return nil } -// DeleteIndex deletes an index -func (e *elasticsearchEngine) DeleteIndex(ctx context.Context, indexName string) error { +// DropTable deletes an index +func (e *elasticsearchEngine) DropTable(ctx context.Context, indexName string) error { if indexName == "" { return fmt.Errorf("index name cannot be empty") } // Check if index exists - exists, err := e.IndexExists(ctx, indexName) + exists, err := e.TableExists(ctx, indexName) if err != nil { return fmt.Errorf("failed to check index existence: %w", err) } @@ -121,8 +121,8 @@ func (e *elasticsearchEngine) DeleteIndex(ctx context.Context, indexName string) return nil } -// IndexExists checks if index exists -func (e *elasticsearchEngine) IndexExists(ctx context.Context, indexName string) (bool, error) { +// TableExists checks if index exists +func (e *elasticsearchEngine) TableExists(ctx context.Context, indexName string) (bool, error) { if indexName == "" { return false, fmt.Errorf("index name cannot be empty") } @@ -146,8 +146,8 @@ func (e *elasticsearchEngine) IndexExists(ctx context.Context, indexName string) return false, fmt.Errorf("elasticsearch returned error: %s", res.Status()) } -// CreateDocMetaIndex creates the document metadata index -func (e *elasticsearchEngine) CreateDocMetaIndex(ctx context.Context, indexName string) error { +// CreateMetadata creates the document metadata index +func (e *elasticsearchEngine) CreateMetadata(ctx context.Context, indexName string) error { // TODO return nil } @@ -176,3 +176,9 @@ func (e *elasticsearchEngine) UpdateMetadata(ctx context.Context, docID string, // TODO return nil } + +// Delete deletes rows from either a dataset index or metadata index +func (e *elasticsearchEngine) Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) { + // TODO + return 0, nil +} diff --git a/internal/engine/engine.go b/internal/engine/engine.go index baca4475d2..6ea188f8db 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -41,29 +41,23 @@ type DocEngine interface { // Search Search(ctx context.Context, req interface{}) (interface{}, error) - // Index operations - CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error - DeleteIndex(ctx context.Context, indexName string) error - IndexExists(ctx context.Context, indexName string) (bool, error) - - // Insert operations + // Dataset operations + CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error) - InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) - - // Update operations UpdateDataset(ctx context.Context, condition map[string]interface{}, newValue map[string]interface{}, tableNamePrefix string, knowledgebaseID string) error - UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error - - // Document operations - IndexDocument(ctx context.Context, indexName, docID string, doc interface{}) error - BulkIndex(ctx context.Context, indexName string, docs []interface{}) (interface{}, error) - DeleteDocument(ctx context.Context, indexName, docID string) error // Chunk operations GetChunk(ctx context.Context, indexName, chunkID string, kbIDs []string) (interface{}, error) - // Doc metadata index operations (per-tenant) - CreateDocMetaIndex(ctx context.Context, indexName string) error + // Document metadata operations + CreateMetadata(ctx context.Context, indexName string) error + InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) + UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error + + // Operations for both dataset and metadata tables + Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) + DropTable(ctx context.Context, indexName string) error + TableExists(ctx context.Context, indexName string) (bool, error) // Health check Ping(ctx context.Context) error diff --git a/internal/engine/infinity/common.go b/internal/engine/infinity/common.go new file mode 100644 index 0000000000..0837fe080d --- /dev/null +++ b/internal/engine/infinity/common.go @@ -0,0 +1,289 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package infinity + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + + infinity "github.com/infiniflow/infinity-go-sdk" + "ragflow/internal/logger" +) + +// Delete deletes rows from either a dataset table or metadata table. +// If indexName starts with "ragflow_doc_meta_", it's a metadata table. +// Otherwise, it's a dataset table: {indexName}_{datasetID} +func (e *infinityEngine) Delete(ctx context.Context, condition map[string]interface{}, indexName string, datasetID string) (int64, error) { + var tableName string + if strings.HasPrefix(indexName, "ragflow_doc_meta_") { + tableName = indexName + } else { + tableName = fmt.Sprintf("%s_%s", indexName, datasetID) + } + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return 0, fmt.Errorf("failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + logger.Warn(fmt.Sprintf("Table %s does not exist, skipping delete", tableName)) + return 0, nil + } + + // Get table columns for building filter + clmns := make(map[string]struct { + Type string + Default interface{} + }) + colsResp, err := table.ShowColumns() + if err != nil { + return 0, fmt.Errorf("failed to get columns: %w", err) + } + result, ok := colsResp.(*infinity.QueryResult) + if ok { + if nameArr, ok := result.Data["name"]; ok { + if typeArr, ok := result.Data["type"]; ok { + if defArr, ok := result.Data["default"]; ok { + for i := 0; i < len(nameArr); i++ { + colName, _ := nameArr[i].(string) + colType, _ := typeArr[i].(string) + var colDefault interface{} + if i < len(defArr) { + colDefault = defArr[i] + } + clmns[colName] = struct { + Type string + Default interface{} + }{colType, colDefault} + } + } + } + } + } + + // Build filter from condition + filter := buildFilterFromCondition(condition, clmns) + + delResp, err := table.Delete(filter) + if err != nil { + return 0, fmt.Errorf("failed to delete: %w", err) + } + + return delResp.DeletedRows, nil +} + +// DropTable deletes a table/index +func (e *infinityEngine) DropTable(ctx context.Context, indexName string) error { + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("Failed to get database: %w", err) + } + + _, err = db.DropTable(indexName, infinity.ConflictTypeIgnore) + if err != nil { + return fmt.Errorf("Failed to drop table: %w", err) + } + return nil +} + +// TableExists checks if table/index exists +func (e *infinityEngine) TableExists(ctx context.Context, indexName string) (bool, error) { + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return false, fmt.Errorf("Failed to get database: %w", err) + } + + _, err = db.GetTable(indexName) + if err != nil { + // Check if error is "table not found" + errLower := strings.ToLower(err.Error()) + if strings.Contains(errLower, "not found") || strings.Contains(errLower, "notexist") || strings.Contains(errLower, "doesn't exist") { + return false, nil + } + return false, err + } + return true, nil +} + +// fieldInfo represents a field in the infinity mapping schema +type fieldInfo struct { + Type string `json:"type"` + Default interface{} `json:"default"` + Analyzer interface{} `json:"analyzer"` // string or []string + IndexType interface{} `json:"index_type"` // string or map + Comment string `json:"comment"` +} + +// orderedFields preserves the order of fields as defined in JSON +type orderedFields struct { + Keys []string + Fields map[string]fieldInfo +} + +func (o *orderedFields) UnmarshalJSON(data []byte) error { + // Parse JSON manually to preserve key order + // Look for key names by scanning the JSON string + // This is a simple approach: find {"key": value, "key2": value2...} + o.Fields = make(map[string]fieldInfo) + o.Keys = make([]string, 0) + + // Use a streaming JSON parser approach + dec := json.NewDecoder(bytes.NewReader(data)) + tok, err := dec.Token() + if err != nil { + return err + } + if delim, ok := tok.(json.Delim); ok && delim == '{' { + for dec.More() { + // Read key + tok, err := dec.Token() + if err != nil { + return err + } + key, ok := tok.(string) + if !ok { + continue + } + o.Keys = append(o.Keys, key) + + // Read value into fieldInfo + var field fieldInfo + if err := dec.Decode(&field); err != nil { + return err + } + o.Fields[key] = field + } + } + return nil +} + +// existsCondition builds a NOT EXISTS or field!='' condition +func existsCondition(field string, tableColumns map[string]struct { + Type string + Default interface{} +}) string { + col, colOk := tableColumns[field] + if !colOk { + logger.Warn(fmt.Sprintf("Column '%s' not found in table columns", field)) + return fmt.Sprintf("%s!=null", field) + } + if strings.Contains(strings.ToLower(col.Type), "char") { + if col.Default != nil { + return fmt.Sprintf(" %s!='%v' ", field, col.Default) + } + return fmt.Sprintf(" %s!='' ", field) + } + if col.Default != nil { + return fmt.Sprintf("%s!=%v", field, col.Default) + } + return fmt.Sprintf("%s!=null", field) +} + +func buildFilterFromCondition(condition map[string]interface{}, tableColumns map[string]struct { + Type string + Default interface{} +}) string { + var conditions []string + + for k, v := range condition { + if v == nil { + continue + } + if strVal, ok := v.(string); ok && strVal == "" { + continue + } + + // Handle must_not conditions -> NOT (...) + if k == "must_not" { + if mustNotMap, ok := v.(map[string]interface{}); ok { + for kk, vv := range mustNotMap { + if kk == "exists" { + if existsField, ok := vv.(string); ok { + conditions = append(conditions, fmt.Sprintf("NOT (%s)", existsCondition(existsField, tableColumns))) + } + } + } + } + continue + } + + // Handle keyword fields -> filter_fulltext with converted field name + if fieldKeyword(k) { + if listVal, ok := v.([]interface{}); ok { + var orConds []string + for _, item := range listVal { + if strItem, ok := item.(string); ok { + strItem = strings.ReplaceAll(strItem, "'", "''") + orConds = append(orConds, fmt.Sprintf("filter_fulltext('%s', '%s')", convertMatchingField(k), strItem)) + } + } + if len(orConds) > 0 { + conditions = append(conditions, "("+strings.Join(orConds, " OR ")+")") + } + } else if strVal, ok := v.(string); ok { + strVal = strings.ReplaceAll(strVal, "'", "''") + conditions = append(conditions, fmt.Sprintf("filter_fulltext('%s', '%s')", convertMatchingField(k), strVal)) + } + continue + } + + // Handle list values (IN condition) + if listVal, ok := v.([]interface{}); ok { + var inVals []string + for _, item := range listVal { + if strItem, ok := item.(string); ok { + strItem = strings.ReplaceAll(strItem, "'", "''") + inVals = append(inVals, fmt.Sprintf("'%s'", strItem)) + } else { + inVals = append(inVals, fmt.Sprintf("%v", item)) + } + } + if len(inVals) > 0 { + conditions = append(conditions, fmt.Sprintf("%s IN (%s)", k, strings.Join(inVals, ", "))) + } + continue + } + + // Handle exists condition + if k == "exists" { + if existsField, ok := v.(string); ok { + conditions = append(conditions, existsCondition(existsField, tableColumns)) + } + continue + } + + // Handle string values + if strVal, ok := v.(string); ok { + strVal = strings.ReplaceAll(strVal, "'", "''") + conditions = append(conditions, fmt.Sprintf("%s='%s'", k, strVal)) + continue + } + + // Handle other values + conditions = append(conditions, fmt.Sprintf("%s=%v", k, v)) + } + + if len(conditions) == 0 { + return "1=1" + } + return strings.Join(conditions, " AND ") +} diff --git a/internal/engine/infinity/index.go b/internal/engine/infinity/dataset.go similarity index 58% rename from internal/engine/infinity/index.go rename to internal/engine/infinity/dataset.go index 75bb8ddc8e..c671ddab32 100644 --- a/internal/engine/infinity/index.go +++ b/internal/engine/infinity/dataset.go @@ -17,7 +17,6 @@ package infinity import ( - "bytes" "context" "encoding/json" "fmt" @@ -34,62 +33,10 @@ import ( "go.uber.org/zap" ) -// fieldInfo represents a field in the infinity mapping schema -type fieldInfo struct { - Type string `json:"type"` - Default interface{} `json:"default"` - Analyzer interface{} `json:"analyzer"` // string or []string - IndexType interface{} `json:"index_type"` // string or map - Comment string `json:"comment"` -} - -// orderedFields preserves the order of fields as defined in JSON -type orderedFields struct { - Keys []string - Fields map[string]fieldInfo -} - -func (o *orderedFields) UnmarshalJSON(data []byte) error { - // Parse JSON manually to preserve key order - // Look for key names by scanning the JSON string - // This is a simple approach: find {"key": value, "key2": value2...} - o.Fields = make(map[string]fieldInfo) - o.Keys = make([]string, 0) - - // Use a streaming JSON parser approach - dec := json.NewDecoder(bytes.NewReader(data)) - tok, err := dec.Token() - if err != nil { - return err - } - if delim, ok := tok.(json.Delim); ok && delim == '{' { - for dec.More() { - // Read key - tok, err := dec.Token() - if err != nil { - return err - } - key, ok := tok.(string) - if !ok { - continue - } - o.Keys = append(o.Keys, key) - - // Read value into fieldInfo - var field fieldInfo - if err := dec.Decode(&field); err != nil { - return err - } - o.Fields[key] = field - } - } - return nil -} - -// CreateIndex creates a table/index in Infinity +// CreateDataset creates a table in Infinity // indexName is the table name prefix (e.g., "ragflow_") // The full table name is built as "{indexName}_{datasetID}" -func (e *infinityEngine) CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { +func (e *infinityEngine) CreateDataset(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { vecSize := vectorSize // Build full table name: {indexName}_{datasetID} @@ -114,6 +61,15 @@ func (e *infinityEngine) CreateIndex(ctx context.Context, indexName, datasetID s return fmt.Errorf("Failed to get database: %w", err) } + // Check if table already exists + exists, err := e.TableExists(ctx, tableName) + if err != nil { + return fmt.Errorf("Failed to check if table exists: %w", err) + } + if exists { + return fmt.Errorf("table '%s' already exists", tableName) + } + // Build column definitions (preserving JSON order) var columns infinity.TableSchema for _, fieldName := range schema.Keys { @@ -242,355 +198,6 @@ func (e *infinityEngine) CreateIndex(ctx context.Context, indexName, datasetID s return nil } -// DeleteIndex deletes a table/index -func (e *infinityEngine) DeleteIndex(ctx context.Context, indexName string) error { - db, err := e.client.conn.GetDatabase(e.client.dbName) - if err != nil { - return fmt.Errorf("Failed to get database: %w", err) - } - - _, err = db.DropTable(indexName, infinity.ConflictTypeIgnore) - if err != nil { - return fmt.Errorf("Failed to drop table: %w", err) - } - logger.Debug("Infinity deleted table", zap.String("tableName", indexName)) - return nil -} - -// IndexExists checks if table/index exists -func (e *infinityEngine) IndexExists(ctx context.Context, indexName string) (bool, error) { - db, err := e.client.conn.GetDatabase(e.client.dbName) - if err != nil { - return false, fmt.Errorf("Failed to get database: %w", err) - } - - _, err = db.GetTable(indexName) - if err != nil { - // Check if error is "table not found" - if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "NotFound") { - return false, nil - } - return false, err - } - return true, nil -} - -// CreateDocMetaIndex creates the document metadata table/index -func (e *infinityEngine) CreateDocMetaIndex(ctx context.Context, indexName string) error { - // Get database - db, err := e.client.conn.GetDatabase(e.client.dbName) - if err != nil { - return fmt.Errorf("Failed to get database: %w", err) - } - - // Use configured doc_meta mapping file - fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName) - - schemaData, err := os.ReadFile(fpMapping) - if err != nil { - return fmt.Errorf("Failed to read mapping file: %w", err) - } - - var schema map[string]fieldInfo - if err := json.Unmarshal(schemaData, &schema); err != nil { - return fmt.Errorf("Failed to parse mapping file: %w", err) - } - - // Build column definitions - var columns infinity.TableSchema - for fieldName, fieldInfo := range schema { - col := infinity.ColumnDefinition{ - Name: fieldName, - DataType: fieldInfo.Type, - Default: fieldInfo.Default, - // Comment: fieldInfo.Comment, - } - columns = append(columns, &col) - } - - // Create table - _, err = db.CreateTable(indexName, columns, infinity.ConflictTypeIgnore) - if err != nil { - return fmt.Errorf("Failed to create doc meta table: %w", err) - } - logger.Debug("Infinity created doc meta table", zap.String("tableName", indexName)) - - // Get table for creating indexes - table, err := db.GetTable(indexName) - if err != nil { - return fmt.Errorf("Failed to get table: %w", err) - } - - // Create secondary index on id - _, err = table.CreateIndex( - fmt.Sprintf("idx_%s_id", indexName), - infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil), - infinity.ConflictTypeIgnore, - "", - ) - if err != nil { - return fmt.Errorf("Failed to create secondary index on id: %w", err) - } - - // Create secondary index on kb_id - _, err = table.CreateIndex( - fmt.Sprintf("idx_%s_kb_id", indexName), - infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil), - infinity.ConflictTypeIgnore, - "", - ) - if err != nil { - return fmt.Errorf("Failed to create secondary index on kb_id: %w", err) - } - - return nil -} - -// TransformChunkFields transforms chunk field name for insert/update -// It handles field name conversions and value transformations: -// - docnm_kwd -> docnm -// - title_kwd/title_sm_tks -> docnm (if docnm_kwd not set) -// - important_kwd -> important_keywords (+ important_kwd_empty_count) -// - content_with_weight/content_ltks/content_sm_ltks -> content -// - authors_tks/authors_sm_tks -> authors -// - question_kwd -> questions (joined with \n), question_tks -> questions (if question_kwd not set) -// - kb_id: list -> str (first element) -// - position_int: list -> hex_joined string -// - page_num_int, top_int: list -> hex string -// - *_feas fields -> JSON string -// - keyword fields with list values -> ### joined string -// - chunk_data: dict -> JSON string -// - Missing embeddings filled with zeros if embeddingCols provided -func TransformChunkFields(chunk map[string]interface{}, embeddingCols [][2]interface{}) map[string]interface{} { - d := make(map[string]interface{}) - - for k, v := range chunk { - switch k { - case "docnm_kwd": - d["docnm"] = v - case "title_kwd": - if _, exists := chunk["docnm_kwd"]; !exists { - d["docnm"] = utility.ConvertToString(v) - } - case "title_sm_tks": - if _, exists := chunk["docnm_kwd"]; !exists { - d["docnm"] = utility.ConvertToString(v) - } - case "important_kwd": - if list, ok := v.([]interface{}); ok { - emptyCount := 0 - tokens := make([]string, 0) - for _, item := range list { - if str, ok := item.(string); ok { - if str == "" { - emptyCount++ - } else { - tokens = append(tokens, str) - } - } - } - d["important_keywords"] = strings.Join(tokens, ",") - d["important_kwd_empty_count"] = emptyCount - } else { - d["important_keywords"] = utility.ConvertToString(v) - } - case "important_tks": - if _, exists := chunk["important_kwd"]; !exists { - d["important_keywords"] = v - } - case "content_with_weight": - d["content"] = v - case "content_ltks": - if _, exists := chunk["content_with_weight"]; !exists { - d["content"] = v - } - case "content_sm_ltks": - if _, exists := chunk["content_with_weight"]; !exists { - d["content"] = v - } - case "authors_tks": - d["authors"] = v - case "authors_sm_tks": - if _, exists := chunk["authors_tks"]; !exists { - d["authors"] = v - } - case "question_kwd": - d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n") - case "tag_kwd": - d["tag_kwd"] = strings.Join(utility.ConvertToStringSlice(v), "###") - case "question_tks": - if _, exists := chunk["question_kwd"]; !exists { - d["questions"] = utility.ConvertToString(v) - } - case "kb_id": - if list, ok := v.([]interface{}); ok && len(list) > 0 { - d["kb_id"] = list[0] - } else { - d["kb_id"] = v - } - case "position_int": - if list, ok := v.([]interface{}); ok { - d["position_int"] = utility.ConvertPositionIntArrayToHex(list) - } else { - d["position_int"] = v - } - case "page_num_int", "top_int": - if list, ok := v.([]interface{}); ok { - d[k] = utility.ConvertIntArrayToHex(list) - } else { - d[k] = v - } - case "chunk_data": - d["chunk_data"] = utility.ConvertMapToJSONString(v) - default: - // Check for *_feas fields - if strings.HasSuffix(k, "_feas") { - jsonBytes, _ := json.Marshal(v) - d[k] = string(jsonBytes) - } else if fieldKeyword(k) { - // keyword fields with list values -> ### joined - if list, ok := v.([]interface{}); ok { - d[k] = strings.Join(utility.ConvertToStringSlice(list), "###") - } else { - d[k] = v - } - } else { - d[k] = v - } - } - } - - // Remove intermediate token fields - for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", - "content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", - "question_kwd", "question_tks"} { - delete(d, key) - } - - // Fill missing embedding columns with zeros if embedding info provided - for _, ec := range embeddingCols { - name, size := ec[0].(string), ec[1].(int) - if _, exists := d[name]; !exists { - zeros := make([]float64, size) - for i := range zeros { - zeros[i] = 0 - } - d[name] = zeros - } - } - - return d -} - -// existsCondition builds a NOT EXISTS or field!='' condition -func existsCondition(field string, tableColumns map[string]struct { - Type string - Default interface{} -}) string { - col, colOk := tableColumns[field] - if !colOk { - logger.Warn(fmt.Sprintf("Column '%s' not found in table columns", field)) - return fmt.Sprintf("%s!=null", field) - } - if strings.Contains(strings.ToLower(col.Type), "char") { - if col.Default != nil { - return fmt.Sprintf(" %s!='%v' ", field, col.Default) - } - return fmt.Sprintf(" %s!='' ", field) - } - if col.Default != nil { - return fmt.Sprintf("%s!=%v", field, col.Default) - } - return fmt.Sprintf("%s!=null", field) -} - -func buildFilterFromCondition(condition map[string]interface{}, tableColumns map[string]struct { - Type string - Default interface{} -}) string { - var conditions []string - - for k, v := range condition { - if v == nil || v == "" { - continue - } - - // Handle must_not conditions -> NOT (...) - if k == "must_not" { - if mustNotMap, ok := v.(map[string]interface{}); ok { - for kk, vv := range mustNotMap { - if kk == "exists" { - if existsField, ok := vv.(string); ok { - conditions = append(conditions, fmt.Sprintf("NOT (%s)", existsCondition(existsField, tableColumns))) - } - } - } - } - continue - } - - // Handle keyword fields -> filter_fulltext with converted field name - if fieldKeyword(k) { - if listVal, ok := v.([]interface{}); ok { - var orConds []string - for _, item := range listVal { - if strItem, ok := item.(string); ok { - strItem = strings.ReplaceAll(strItem, "'", "''") - orConds = append(orConds, fmt.Sprintf("filter_fulltext('%s', '%s')", convertMatchingField(k), strItem)) - } - } - if len(orConds) > 0 { - conditions = append(conditions, "("+strings.Join(orConds, " OR ")+")") - } - } else if strVal, ok := v.(string); ok { - strVal = strings.ReplaceAll(strVal, "'", "''") - conditions = append(conditions, fmt.Sprintf("filter_fulltext('%s', '%s')", convertMatchingField(k), strVal)) - } - continue - } - - // Handle list values (IN condition) - if listVal, ok := v.([]interface{}); ok { - var inVals []string - for _, item := range listVal { - if strItem, ok := item.(string); ok { - strItem = strings.ReplaceAll(strItem, "'", "''") - inVals = append(inVals, fmt.Sprintf("'%s'", strItem)) - } else { - inVals = append(inVals, fmt.Sprintf("%v", item)) - } - } - if len(inVals) > 0 { - conditions = append(conditions, fmt.Sprintf("%s IN (%s)", k, strings.Join(inVals, ", "))) - } - continue - } - - // Handle exists condition - if k == "exists" { - if existsField, ok := v.(string); ok { - conditions = append(conditions, existsCondition(existsField, tableColumns)) - } - continue - } - - // Handle string values - if strVal, ok := v.(string); ok { - strVal = strings.ReplaceAll(strVal, "'", "''") - conditions = append(conditions, fmt.Sprintf("%s='%s'", k, strVal)) - continue - } - - // Handle other values - conditions = append(conditions, fmt.Sprintf("%s=%v", k, v)) - } - - if len(conditions) == 0 { - return "1=1" - } - return strings.Join(conditions, " AND ") -} - // InsertDataset inserts chunks into a dataset table // Table name format: {tableNamePrefix}_{knowledgebaseID} // Auto-create the table if it doesn't exist @@ -638,7 +245,7 @@ func (e *infinityEngine) InsertDataset(ctx context.Context, chunks []map[string] } // Create table - if err := e.CreateIndex(ctx, tableNamePrefix, knowledgebaseID, vectorSize, parserID); err != nil { + if err := e.CreateDataset(ctx, tableNamePrefix, knowledgebaseID, vectorSize, parserID); err != nil { return nil, fmt.Errorf("Failed to create table: %w", err) } @@ -855,150 +462,142 @@ func (e *infinityEngine) UpdateDataset(ctx context.Context, condition map[string return nil } -// InsertMetadata inserts document metadata into tenant's metadata table -// Table name format: ragflow_doc_meta_{tenant_id} -// Auto-create the table if it doesn't exist -// Replace existing metadata with same id and kb_id -func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) { - tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) - logger.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) +// TransformChunkFields transforms chunk field name for insert/update +// It handles field name conversions and value transformations: +// - docnm_kwd -> docnm +// - title_kwd/title_sm_tks -> docnm (if docnm_kwd not set) +// - important_kwd -> important_keywords (+ important_kwd_empty_count) +// - content_with_weight/content_ltks/content_sm_ltks -> content +// - authors_tks/authors_sm_tks -> authors +// - question_kwd -> questions (joined with \n), question_tks -> questions (if question_kwd not set) +// - kb_id: list -> str (first element) +// - position_int: list -> hex_joined string +// - page_num_int, top_int: list -> hex string +// - *_feas fields -> JSON string +// - keyword fields with list values -> ### joined string +// - chunk_data: dict -> JSON string +// - Missing embeddings filled with zeros if embeddingCols provided +func TransformChunkFields(chunk map[string]interface{}, embeddingCols [][2]interface{}) map[string]interface{} { + d := make(map[string]interface{}) - db, err := e.client.conn.GetDatabase(e.client.dbName) - if err != nil { - return nil, fmt.Errorf("Failed to get database: %w", err) - } - - table, err := db.GetTable(tableName) - if err != nil { - // Table doesn't exist, try to create it - errMsg := strings.ToLower(err.Error()) - if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") { - return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err) - } - - // Create metadata table - if createErr := e.CreateDocMetaIndex(ctx, tableName); createErr != nil { - return nil, fmt.Errorf("Failed to create metadata table: %w", createErr) - } - - table, err = db.GetTable(tableName) - if err != nil { - return nil, fmt.Errorf("Failed to get table after creation: %w", err) - } - } - - // Transform metadata - convert meta_fields map to JSON string - insertMetadata := make([]map[string]interface{}, len(metadata)) - for i, m := range metadata { - d := make(map[string]interface{}) - for k, v := range m { - if k == "meta_fields" { - d["meta_fields"] = utility.ConvertMapToJSONString(v) + for k, v := range chunk { + switch k { + case "docnm_kwd": + d["docnm"] = v + case "title_kwd": + if _, exists := chunk["docnm_kwd"]; !exists { + d["docnm"] = utility.ConvertToString(v) + } + case "title_sm_tks": + if _, exists := chunk["docnm_kwd"]; !exists { + d["docnm"] = utility.ConvertToString(v) + } + case "important_kwd": + if list, ok := v.([]interface{}); ok { + emptyCount := 0 + tokens := make([]string, 0) + for _, item := range list { + if str, ok := item.(string); ok { + if str == "" { + emptyCount++ + } else { + tokens = append(tokens, str) + } + } + } + d["important_keywords"] = strings.Join(tokens, ",") + d["important_kwd_empty_count"] = emptyCount + } else { + d["important_keywords"] = utility.ConvertToString(v) + } + case "important_tks": + if _, exists := chunk["important_kwd"]; !exists { + d["important_keywords"] = v + } + case "content_with_weight": + d["content"] = v + case "content_ltks": + if _, exists := chunk["content_with_weight"]; !exists { + d["content"] = v + } + case "content_sm_ltks": + if _, exists := chunk["content_with_weight"]; !exists { + d["content"] = v + } + case "authors_tks": + d["authors"] = v + case "authors_sm_tks": + if _, exists := chunk["authors_tks"]; !exists { + d["authors"] = v + } + case "question_kwd": + d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n") + case "tag_kwd": + d["tag_kwd"] = strings.Join(utility.ConvertToStringSlice(v), "###") + case "question_tks": + if _, exists := chunk["question_kwd"]; !exists { + d["questions"] = utility.ConvertToString(v) + } + case "kb_id": + if list, ok := v.([]interface{}); ok && len(list) > 0 { + d["kb_id"] = list[0] + } else { + d["kb_id"] = v + } + case "position_int": + if list, ok := v.([]interface{}); ok { + d["position_int"] = utility.ConvertPositionIntArrayToHex(list) + } else { + d["position_int"] = v + } + case "page_num_int", "top_int": + if list, ok := v.([]interface{}); ok { + d[k] = utility.ConvertIntArrayToHex(list) + } else { + d[k] = v + } + case "chunk_data": + d["chunk_data"] = utility.ConvertMapToJSONString(v) + default: + // Check for *_feas fields + if strings.HasSuffix(k, "_feas") { + jsonBytes, _ := json.Marshal(v) + d[k] = string(jsonBytes) + } else if fieldKeyword(k) { + // keyword fields with list values -> ### joined + if list, ok := v.([]interface{}); ok { + d[k] = strings.Join(utility.ConvertToStringSlice(list), "###") + } else { + d[k] = v + } } else { d[k] = v } } - insertMetadata[i] = d } - // Delete existing metadata with same id and kb_id, then insert new - if len(insertMetadata) > 0 { - idList := make([]string, len(insertMetadata)) - for i, m := range insertMetadata { - docID := fmt.Sprintf("'%v'", m["id"]) - kbID := fmt.Sprintf("'%v'", m["kb_id"]) - idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID) + // Remove intermediate token fields + for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", + "content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", + "question_kwd", "question_tks"} { + delete(d, key) + } + + // Fill missing embedding columns with zeros if embedding info provided + for _, ec := range embeddingCols { + name, ok1 := ec[0].(string) + size, ok2 := ec[1].(int) + if !ok1 || !ok2 { + continue } - filter := strings.Join(idList, " OR ") - logger.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter)) - delResp, delErr := table.Delete(filter) - if delErr != nil { - logger.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr)) - } else if delResp.DeletedRows > 0 { - logger.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows)) + if _, exists := d[name]; !exists { + zeros := make([]float64, size) + for i := range zeros { + zeros[i] = 0 + } + d[name] = zeros } } - // Insert metadata - _, err = table.Insert(insertMetadata) - if err != nil { - return nil, fmt.Errorf("Failed to insert metadata: %w", err) - } - - logger.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) - return []string{}, nil -} - -// UpdateMetadata updates document metadata in tenant's metadata table -// Table name format: ragflow_doc_meta_{tenant_id} -func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error { - tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) - logger.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("kbID", kbID)) - - db, err := e.client.conn.GetDatabase(e.client.dbName) - if err != nil { - return fmt.Errorf("failed to get database: %w", err) - } - - table, err := db.GetTable(tableName) - if err != nil { - return fmt.Errorf("failed to get metadata table %s: %w", tableName, err) - } - - // Query existing metadata using the chainable API - filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", docID, kbID) - - // Use chainable API: Output().Filter().Limit().Offset() - queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0) - - // Execute query - result, err := queryTable.ToResult() - if err != nil { - logger.Warn(fmt.Sprintf("Failed to query existing metadata: %v", err)) - // If query fails, just insert new metadata - } else { - // Get results - rows, ok := result.([]map[string]interface{}) - if ok && len(rows) > 0 { - existingMetaFieldsVal := rows[0]["meta_fields"] - - // Parse existing meta_fields if it's a string - var existingMetaFields map[string]interface{} - if existingMetaFieldsVal != nil { - switch v := existingMetaFieldsVal.(type) { - case string: - if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil { - logger.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err)) - existingMetaFields = make(map[string]interface{}) - } - case map[string]interface{}: - existingMetaFields = v - } - } - - // Merge new meta_fields with existing - if existingMetaFields == nil { - existingMetaFields = make(map[string]interface{}) - } - for k, v := range metaFields { - existingMetaFields[k] = v - } - metaFields = existingMetaFields - } - } - - // Prepare updated metadata - updatedFields := map[string]interface{}{ - "meta_fields": utility.ConvertMapToJSONString(metaFields), - } - - // Update metadata - logger.Info(fmt.Sprintf("INFINITY metadata update: table=%s, filter=%s, newValue=%v", tableName, filter, updatedFields)) - _, err = table.Update(filter, updatedFields) - if err != nil { - return fmt.Errorf("failed to update metadata: %w", err) - } - - logger.Info("InfinityConnection.UpdateMetadata completes", zap.String("tableName", tableName), zap.String("docID", docID)) - return nil + return d } diff --git a/internal/engine/infinity/document.go b/internal/engine/infinity/document.go deleted file mode 100644 index f56545e83e..0000000000 --- a/internal/engine/infinity/document.go +++ /dev/null @@ -1,47 +0,0 @@ -// -// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -package infinity - -import ( - "context" - "fmt" -) - -// IndexDocument indexes a single document -func (e *infinityEngine) IndexDocument(ctx context.Context, tableName, docID string, doc interface{}) error { - return fmt.Errorf("infinity insert not implemented: waiting for official Go SDK") -} - -// BulkIndex indexes documents in bulk -func (e *infinityEngine) BulkIndex(ctx context.Context, tableName string, docs []interface{}) (interface{}, error) { - return nil, fmt.Errorf("infinity bulk insert not implemented: waiting for official Go SDK") -} - -// BulkResponse bulk operation response -type BulkResponse struct { - Inserted int -} - -// GetDocument gets a document -func (e *infinityEngine) GetDocument(ctx context.Context, tableName, docID string) (interface{}, error) { - return nil, fmt.Errorf("infinity get document not implemented: waiting for official Go SDK") -} - -// DeleteDocument deletes a document -func (e *infinityEngine) DeleteDocument(ctx context.Context, tableName, docID string) error { - return fmt.Errorf("infinity delete not implemented: waiting for official Go SDK") -} diff --git a/internal/engine/infinity/metadata.go b/internal/engine/infinity/metadata.go new file mode 100644 index 0000000000..afb6679993 --- /dev/null +++ b/internal/engine/infinity/metadata.go @@ -0,0 +1,286 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package infinity + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + infinity "github.com/infiniflow/infinity-go-sdk" + "ragflow/internal/logger" + "ragflow/internal/utility" + + "go.uber.org/zap" +) + +// CreateMetadata creates the document metadata table/index +func (e *infinityEngine) CreateMetadata(ctx context.Context, indexName string) error { + // Get database + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("Failed to get database: %w", err) + } + + // Check if table already exists + exists, err := e.TableExists(ctx, indexName) + if err != nil { + return fmt.Errorf("Failed to check if table exists: %w", err) + } + if exists { + return fmt.Errorf("metadata table '%s' already exists", indexName) + } + + // Use configured doc_meta mapping file + fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName) + + schemaData, err := os.ReadFile(fpMapping) + if err != nil { + return fmt.Errorf("Failed to read mapping file: %w", err) + } + + var schema map[string]fieldInfo + if err := json.Unmarshal(schemaData, &schema); err != nil { + return fmt.Errorf("Failed to parse mapping file: %w", err) + } + + // Build column definitions + var columns infinity.TableSchema + for fieldName, fieldInfo := range schema { + col := infinity.ColumnDefinition{ + Name: fieldName, + DataType: fieldInfo.Type, + Default: fieldInfo.Default, + // Comment: fieldInfo.Comment, + } + columns = append(columns, &col) + } + + // Create table + _, err = db.CreateTable(indexName, columns, infinity.ConflictTypeIgnore) + if err != nil { + return fmt.Errorf("Failed to create doc meta table: %w", err) + } + logger.Debug("Infinity created doc meta table", zap.String("tableName", indexName)) + + // Get table for creating indexes + table, err := db.GetTable(indexName) + if err != nil { + return fmt.Errorf("Failed to get table: %w", err) + } + + // Create secondary index on id + _, err = table.CreateIndex( + fmt.Sprintf("idx_%s_id", indexName), + infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create secondary index on id: %w", err) + } + + // Create secondary index on kb_id + _, err = table.CreateIndex( + fmt.Sprintf("idx_%s_kb_id", indexName), + infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create secondary index on kb_id: %w", err) + } + + return nil +} + +// InsertMetadata inserts document metadata into tenant's metadata table +// Table name format: ragflow_doc_meta_{tenant_id} +// Auto-create the table if it doesn't exist +// Replace existing metadata with same id and kb_id +func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) { + tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) + logger.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return nil, fmt.Errorf("Failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + // Table doesn't exist, try to create it + errMsg := strings.ToLower(err.Error()) + if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") { + return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err) + } + + // Create metadata table + if createErr := e.CreateMetadata(ctx, tableName); createErr != nil { + return nil, fmt.Errorf("Failed to create metadata table: %w", createErr) + } + + table, err = db.GetTable(tableName) + if err != nil { + return nil, fmt.Errorf("Failed to get table after creation: %w", err) + } + } + + // Transform metadata - convert meta_fields map to JSON string + insertMetadata := make([]map[string]interface{}, len(metadata)) + for i, m := range metadata { + d := make(map[string]interface{}) + for k, v := range m { + if k == "meta_fields" { + d["meta_fields"] = utility.ConvertMapToJSONString(v) + } else { + d[k] = v + } + } + insertMetadata[i] = d + } + + // Delete existing metadata with same id and kb_id, then insert new + if len(insertMetadata) > 0 { + idList := make([]string, len(insertMetadata)) + for i, m := range insertMetadata { + // Escape single quotes in values to prevent SQL injection + docID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["id"]), "'", "''")) + kbID := fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%v", m["kb_id"]), "'", "''")) + idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID) + } + filter := strings.Join(idList, " OR ") + logger.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter)) + delResp, delErr := table.Delete(filter) + if delErr != nil { + logger.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr)) + } else if delResp.DeletedRows > 0 { + logger.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows)) + } + } + + // Insert metadata + _, err = table.Insert(insertMetadata) + if err != nil { + return nil, fmt.Errorf("Failed to insert metadata: %w", err) + } + + logger.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) + return []string{}, nil +} + +// UpdateMetadata updates or inserts document metadata in tenant's metadata table. +// If a row with the given docID and kbID exists, it merges the new metadata with existing. +// If no row exists, it inserts a new row. +// Table name format: ragflow_doc_meta_{tenant_id} +func (e *infinityEngine) UpdateMetadata(ctx context.Context, docID string, kbID string, metaFields map[string]interface{}, tenantID string) error { + tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) + logger.Info("InfinityConnection.UpdateMetadata called", zap.String("tableName", tableName), zap.String("docID", docID), zap.String("kbID", kbID)) + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + return fmt.Errorf("failed to get metadata table %s: %w", tableName, err) + } + + // Build filter to find existing row by docID and kbID + escapedDocID := strings.ReplaceAll(docID, "'", "''") + escapedKbID := strings.ReplaceAll(kbID, "'", "''") + filter := fmt.Sprintf("id = '%s' AND kb_id = '%s'", escapedDocID, escapedKbID) + + // Query existing metadata using the chainable API + queryTable := table.Output([]string{"id", "kb_id", "meta_fields"}).Filter(filter).Limit(1).Offset(0) + + // Execute query to check if row exists + result, err := queryTable.ToResult() + rowExists := false + if err != nil { + logger.Warn(fmt.Sprintf("Failed to query existing metadata: %v", err)) + // If query fails, treat as not exists and insert + } else { + // Get results - ToResult returns *infinity.QueryResult + qr, ok := result.(*infinity.QueryResult) + // Check if id column has any rows - len(qr.Data["id"]) > 0 means there are rows + if ok && qr != nil && len(qr.Data["id"]) > 0 { + rowExists = true + // Get meta_fields from the first row + if metaFieldsData, exists := qr.Data["meta_fields"]; exists && len(metaFieldsData) > 0 { + existingMetaFieldsVal := metaFieldsData[0] + + // Parse existing meta_fields if it's a string + var existingMetaFields map[string]interface{} + if existingMetaFieldsVal != nil { + switch v := existingMetaFieldsVal.(type) { + case string: + if err := json.Unmarshal([]byte(v), &existingMetaFields); err != nil { + logger.Warn(fmt.Sprintf("Failed to parse existing meta_fields: %v", err)) + existingMetaFields = make(map[string]interface{}) + } + case map[string]interface{}: + existingMetaFields = v + } + } + + // Merge new meta_fields with existing (new values override existing) + if existingMetaFields == nil { + existingMetaFields = make(map[string]interface{}) + } + for k, v := range metaFields { + existingMetaFields[k] = v + } + metaFields = existingMetaFields + } + } + } + + // Prepare updated metadata as JSON string + updatedFields := map[string]interface{}{ + "meta_fields": utility.ConvertMapToJSONString(metaFields), + } + + if rowExists { + // Row exists: update it with merged metadata + logger.Info(fmt.Sprintf("UpdateMetadata: updating existing row, table=%s, filter=%s, newValue=%v", tableName, filter, updatedFields)) + _, err = table.Update(filter, updatedFields) + if err != nil { + return fmt.Errorf("failed to update metadata: %w", err) + } + } else { + // Row doesn't exist: insert new row + insertFields := map[string]interface{}{ + "id": docID, + "kb_id": kbID, + "meta_fields": utility.ConvertMapToJSONString(metaFields), + } + logger.Info(fmt.Sprintf("UpdateMetadata: inserting new row, table=%s, newValue=%v", tableName, insertFields)) + _, err = table.Insert(insertFields) + if err != nil { + return fmt.Errorf("failed to insert metadata: %w", err) + } + } + + logger.Info("InfinityConnection.UpdateMetadata completes", zap.String("tableName", tableName), zap.String("docID", docID)) + return nil +} + diff --git a/internal/handler/chunk.go b/internal/handler/chunk.go index e7beda5c16..d5967ff802 100644 --- a/internal/handler/chunk.go +++ b/internal/handler/chunk.go @@ -254,12 +254,9 @@ func (h *ChunkHandler) List(c *gin.Context) { // @Tags chunks // @Accept json // @Produce json -// @Param dataset_id path string true "Dataset ID" -// @Param document_id path string true "Document ID" -// @Param chunk_id path string true "Chunk ID" // @Param request body service.UpdateChunkRequest true "update chunk" // @Success 200 {object} map[string]interface{} -// @Router /v1/datasets/{dataset_id}/documents/{document_id}/chunks/{chunk_id} [put] +// @Router /v1/chunk/update [post] func (h *ChunkHandler) UpdateChunk(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { @@ -267,20 +264,7 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { return } - // Get path parameters - datasetID := c.Param("dataset_id") - documentID := c.Param("document_id") - chunkID := c.Param("chunk_id") - - if datasetID == "" || documentID == "" || chunkID == "" { - c.JSON(http.StatusBadRequest, gin.H{ - "code": 400, - "message": "dataset_id, document_id, and chunk_id are required", - }) - return - } - - // Validate allowed update fields + // Validate allowed update fields and get IDs from body var rawBody map[string]interface{} if err := json.NewDecoder(c.Request.Body).Decode(&rawBody); err != nil { c.JSON(http.StatusBadRequest, gin.H{ @@ -290,7 +274,35 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { return } - // Allowed fields for update + // Get required ID fields + datasetID, ok := rawBody["dataset_id"].(string) + if !ok || datasetID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "dataset_id is required", + }) + return + } + chunkID, ok := rawBody["chunk_id"].(string) + if !ok || chunkID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "chunk_id is required", + }) + return + } + + // Get document_id from request + documentID, ok := rawBody["document_id"].(string) + if !ok || documentID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "doc_id is required", + }) + return + } + + // Allowed fields for update (exclude ID fields) allowedFields := map[string]bool{ "content": true, "important_keywords": true, @@ -301,7 +313,7 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { "tag_feas": true, } for field := range rawBody { - if !allowedFields[field] { + if field != "dataset_id" && field != "document_id" && field != "chunk_id" && !allowedFields[field] { c.JSON(http.StatusBadRequest, gin.H{ "code": 400, "message": "Update field '" + field + "' is not supported. Updatable fields: content, important_keywords, questions, available, positions, tag_kwd, tag_feas", @@ -366,3 +378,52 @@ func (h *ChunkHandler) UpdateChunk(c *gin.Context) { "message": "chunk updated successfully", }) } + +// Remove handles chunk removal requests +// @Summary Remove Chunks +// @Description Remove chunks from a document +// @Tags chunks +// @Accept json +// @Produce json +// @Param request body service.RemoveChunksRequest true "remove chunks request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/chunk/rm [post] +func (h *ChunkHandler) Remove(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req service.RemoveChunksRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": err.Error(), + }) + return + } + + if req.DocID == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "doc_id is required", + }) + return + } + + deletedCount, err := h.chunkService.RemoveChunks(&req, user.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 500, + "message": err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": deletedCount, + "message": "success", + }) +} diff --git a/internal/handler/document.go b/internal/handler/document.go index 66a08911a5..a4152c07dc 100644 --- a/internal/handler/document.go +++ b/internal/handler/document.go @@ -22,6 +22,7 @@ import ( "net/http" "ragflow/internal/common" "strconv" + "strings" "github.com/gin-gonic/gin" @@ -461,10 +462,18 @@ func (h *DocumentHandler) SetMeta(c *gin.Context) { err := h.documentService.SetDocumentMetadata(req.DocID, meta) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "code": 1, - "message": "Failed to set metadata: " + err.Error(), - }) + errMsg := err.Error() + if strings.Contains(errMsg, "no such document") || strings.Contains(errMsg, "document not found") { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 1, + "message": errMsg, + }) + } else { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 1, + "message": "Failed to set metadata: " + errMsg, + }) + } return } diff --git a/internal/handler/kb.go b/internal/handler/kb.go index 75b5473636..1644dfa50f 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -654,24 +654,24 @@ func (h *KnowledgebaseHandler) GetBasicInfo(c *gin.Context) { jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success") } -// CreateIndex handles the create index request for a knowledge base -// @Summary Create Index -// @Description Create the Infinity index/table for a knowledge base +// CreateDatasetInDocEngine handles the create dataset request for a knowledge base +// @Summary Create Dataset in Doc Engine +// @Description Create the Infinity table for a knowledge base // @Tags knowledgebase // @Accept json // @Produce json // @Security ApiKeyAuth -// @Param request body service.CreateIndexRequest true "create index request" +// @Param request body service.CreateDatasetTableRequest true "create dataset request" // @Success 200 {object} map[string]interface{} -// @Router /v1/kb/index [post] -func (h *KnowledgebaseHandler) CreateIndex(c *gin.Context) { +// @Router /v1/kb/doc_engine_table [post] +func (h *KnowledgebaseHandler) CreateDatasetInDocEngine(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) return } - var req service.CreateIndexRequest + var req service.CreateDatasetTableRequest if err := c.ShouldBindJSON(&req); err != nil { jsonError(c, common.CodeDataError, err.Error()) return @@ -683,7 +683,7 @@ func (h *KnowledgebaseHandler) CreateIndex(c *gin.Context) { return } - result, code, err := h.kbService.CreateIndex(&req) + result, code, err := h.kbService.CreateDatasetInDocEngine(&req) if err != nil { jsonError(c, code, err.Error()) return @@ -692,29 +692,29 @@ func (h *KnowledgebaseHandler) CreateIndex(c *gin.Context) { jsonResponse(c, common.CodeSuccess, result, "success") } -// DeleteIndexRequest represents the request for deleting an index -type DeleteIndexRequest struct { +// DeleteDatasetInDocEngineRequest represents the request for deleting a dataset table +type DeleteDatasetInDocEngineRequest struct { KBID string `json:"kb_id" binding:"required"` } -// DeleteIndex handles the delete index request for a knowledge base -// @Summary Delete Index -// @Description Delete the Infinity index/table for a knowledge base +// DeleteDatasetInDocEngine handles the delete dataset request for a knowledge base +// @Summary Delete Dataset in Doc Engine +// @Description Delete the Infinity table for a knowledge base // @Tags knowledgebase // @Accept json // @Produce json // @Security ApiKeyAuth -// @Param request body DeleteIndexRequest true "delete index request" +// @Param request body DeleteDatasetInDocEngineRequest true "delete dataset request" // @Success 200 {object} map[string]interface{} -// @Router /v1/kb/index [delete] -func (h *KnowledgebaseHandler) DeleteIndex(c *gin.Context) { +// @Router /v1/kb/doc_engine_table [delete] +func (h *KnowledgebaseHandler) DeleteDatasetInDocEngine(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) return } - var req DeleteIndexRequest + var req DeleteDatasetInDocEngineRequest if err := c.ShouldBindJSON(&req); err != nil { jsonError(c, common.CodeDataError, err.Error()) return @@ -726,7 +726,7 @@ func (h *KnowledgebaseHandler) DeleteIndex(c *gin.Context) { return } - code, err := h.kbService.DeleteIndex(req.KBID) + code, err := h.kbService.DeleteDatasetInDocEngine(req.KBID) if err != nil { jsonError(c, code, err.Error()) return diff --git a/internal/handler/tenant.go b/internal/handler/tenant.go index 1060aabc84..79ffdda090 100644 --- a/internal/handler/tenant.go +++ b/internal/handler/tenant.go @@ -117,16 +117,16 @@ func (h *TenantHandler) TenantList(c *gin.Context) { }) } -// CreateDocMetaIndex handles the create doc meta index request -// @Summary Create Doc Meta Index -// @Description Create the document metadata index for a tenant +// CreateMetadataInDocEngine handles the create doc meta table request +// @Summary Create Doc Meta Table +// @Description Create the document metadata table for a tenant // @Tags tenants // @Accept json // @Produce json // @Security ApiKeyAuth // @Success 200 {object} map[string]interface{} -// @Router /v1/tenant/doc_meta_index [post] -func (h *TenantHandler) CreateDocMetaIndex(c *gin.Context) { +// @Router /v1/tenant/doc_engine_metadata_table [post] +func (h *TenantHandler) CreateMetadataInDocEngine(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) @@ -136,7 +136,7 @@ func (h *TenantHandler) CreateDocMetaIndex(c *gin.Context) { // Use user.ID as tenant ID (user IS the tenant in user mode) tenantID := user.ID - code, err := h.tenantService.CreateDocMetaIndex(tenantID) + code, err := h.tenantService.CreateMetadataInDocEngine(tenantID) if err != nil { jsonError(c, code, err.Error()) return @@ -149,16 +149,16 @@ func (h *TenantHandler) CreateDocMetaIndex(c *gin.Context) { }) } -// DeleteDocMetaIndex handles the delete doc meta index request -// @Summary Delete Doc Meta Index -// @Description Delete the document metadata index for a tenant +// DeleteMetadataInDocEngine handles the delete doc meta table request +// @Summary Delete Metadata In Doc Engine +// @Description Delete the document metadata table for a tenant // @Tags tenants // @Accept json // @Produce json // @Security ApiKeyAuth // @Success 200 {object} map[string]interface{} -// @Router /v1/tenant/doc_meta_index [delete] -func (h *TenantHandler) DeleteDocMetaIndex(c *gin.Context) { +// @Router /v1/tenant/doc_engine_metadata_table [delete] +func (h *TenantHandler) DeleteMetadataInDocEngine(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { jsonError(c, errorCode, errorMessage) @@ -168,7 +168,7 @@ func (h *TenantHandler) DeleteDocMetaIndex(c *gin.Context) { // Use user.ID as tenant ID (user IS the tenant in user mode) tenantID := user.ID - code, err := h.tenantService.DeleteDocMetaIndex(tenantID) + code, err := h.tenantService.DeleteMetadataInDocEngine(tenantID) if err != nil { jsonError(c, code, err.Error()) return diff --git a/internal/router/router.go b/internal/router/router.go index 684aa3068f..adb255d798 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -159,12 +159,6 @@ func (r *Router) Setup(engine *gin.Engine) { datasets.DELETE("", r.datasetsHandler.DeleteDatasets) } - // RESTful dataset chunk routes - datasetChunks := v1.Group("/datasets/:dataset_id/documents/:document_id/chunks") - { - datasetChunks.PUT("/:chunk_id", r.chunkHandler.UpdateChunk) - } - // Author routes authors := v1.Group("/authors") { @@ -264,9 +258,9 @@ func (r *Router) Setup(engine *gin.Engine) { kb.GET("/tags", r.knowledgebaseHandler.ListTagsFromKbs) kb.GET("/get_meta", r.knowledgebaseHandler.GetMeta) kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo) - kb.POST("/index", r.knowledgebaseHandler.CreateIndex) - kb.DELETE("/index", r.knowledgebaseHandler.DeleteIndex) - kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile) + kb.POST("/doc_engine_table", r.knowledgebaseHandler.CreateDatasetInDocEngine) // Internal API only for GO + kb.DELETE("/doc_engine_table", r.knowledgebaseHandler.DeleteDatasetInDocEngine) // Internal API only for GO + kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile) // Internal API only for GO // KB ID specific routes kbByID := kb.Group("/:kb_id") @@ -282,9 +276,9 @@ func (r *Router) Setup(engine *gin.Engine) { // Tenant routes (per-tenant resources) tenant := authorized.Group("/v1/tenant") { - tenant.POST("/doc_meta_index", r.tenantHandler.CreateDocMetaIndex) - tenant.DELETE("/doc_meta_index", r.tenantHandler.DeleteDocMetaIndex) - tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) + tenant.POST("/doc_engine_metadata_table", r.tenantHandler.CreateMetadataInDocEngine) // Internal API only for GO + tenant.DELETE("/doc_engine_metadata_table", r.tenantHandler.DeleteMetadataInDocEngine) // Internal API only for GO + tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) // Internal API only for GO } // Document routes @@ -301,6 +295,8 @@ func (r *Router) Setup(engine *gin.Engine) { chunk.POST("/retrieval_test", r.chunkHandler.RetrievalTest) chunk.GET("/get", r.chunkHandler.Get) chunk.POST("/list", r.chunkHandler.List) + chunk.POST("/update", r.chunkHandler.UpdateChunk) // Internal API only for GO + chunk.POST("/rm", r.chunkHandler.Remove) } // LLM routes diff --git a/internal/service/chunk.go b/internal/service/chunk.go index 8efd88bf33..4cc98cf6a8 100644 --- a/internal/service/chunk.go +++ b/internal/service/chunk.go @@ -1009,3 +1009,83 @@ func (s *ChunkService) UpdateChunk(req *UpdateChunkRequest, userID string) error return nil } + +// RemoveChunksRequest request for removing chunks +type RemoveChunksRequest struct { + DocID string `json:"doc_id"` + ChunkIDs []string `json:"chunk_ids,omitempty"` + DeleteAll bool `json:"delete_all,omitempty"` +} + +// RemoveChunks removes chunks from the dataset table. +// If ChunkIDs is empty and DeleteAll is true, removes all chunks for the document. +// Otherwise removes only the specified chunks. +func (s *ChunkService) RemoveChunks(req *RemoveChunksRequest, userID string) (int64, error) { + if s.docEngine == nil { + return 0, fmt.Errorf("doc engine not initialized") + } + + if req.DocID == "" { + return 0, fmt.Errorf("doc_id is required") + } + + ctx := context.Background() + + // Get user's tenants + tenants, err := s.userTenantDAO.GetByUserID(userID) + if err != nil { + return 0, fmt.Errorf("failed to get user tenants: %w", err) + } + if len(tenants) == 0 { + return 0, fmt.Errorf("user has no accessible tenants") + } + + // Verify document exists and belongs to a dataset (do this first to get doc.KbID) + docDAO := dao.NewDocumentDAO() + doc, err := docDAO.GetByID(req.DocID) + if err != nil || doc == nil { + return 0, fmt.Errorf("document not found") + } + + // Find the tenant that owns this document + var targetTenantID string + for _, tenant := range tenants { + kb, err := s.kbDAO.GetByIDAndTenantID(doc.KbID, tenant.TenantID) + if err == nil && kb != nil { + targetTenantID = tenant.TenantID + break + } + } + if targetTenantID == "" { + return 0, fmt.Errorf("user does not have access to this document") + } + + indexName := fmt.Sprintf("ragflow_%s", targetTenantID) + + // Build condition + condition := make(map[string]interface{}) + switch { + case len(req.ChunkIDs) > 0 && req.DeleteAll: + return 0, fmt.Errorf("chunk_ids and delete_all are mutually exclusive") + case len(req.ChunkIDs) > 0: + // Delete specific chunks - convert []string to []interface{} for buildFilterFromCondition + chunkIDsIf := make([]interface{}, len(req.ChunkIDs)) + for i, id := range req.ChunkIDs { + chunkIDsIf[i] = id + } + condition["id"] = chunkIDsIf + condition["doc_id"] = req.DocID + case req.DeleteAll: + // Delete all chunks for this document + condition["doc_id"] = req.DocID + default: + return 0, fmt.Errorf("either chunk_ids or delete_all must be provided") + } + + deletedCount, err := s.docEngine.Delete(ctx, condition, indexName, doc.KbID) + if err != nil { + return 0, fmt.Errorf("failed to delete chunks: %w", err) + } + + return deletedCount, nil +} diff --git a/internal/service/kb.go b/internal/service/kb.go index 81f454f1cd..56b46fc06c 100644 --- a/internal/service/kb.go +++ b/internal/service/kb.go @@ -191,23 +191,23 @@ func (s *KnowledgebaseService) CreateKB(req *CreateKBRequest, tenantID string) ( return &CreateKBResponse{KBID: kbID}, common.CodeSuccess, nil } -// CreateIndexRequest represents the request for creating an index -type CreateIndexRequest struct { +// CreateDatasetTableRequest represents the request for creating a dataset table +type CreateDatasetTableRequest struct { KBID string `json:"kb_id" binding:"required"` VectorSize int `json:"vector_size" binding:"required"` ParserID string `json:"parser_id,omitempty"` } -// CreateIndexResponse represents the response for creating an index -type CreateIndexResponse struct { +// CreateDatasetInDocEngineResponse represents the response for creating a dataset table +type CreateDatasetInDocEngineResponse struct { KBID string `json:"kb_id"` - IndexName string `json:"index_name"` + TableName string `json:"table_name"` VectorSize int `json:"vector_size"` } -// CreateIndex creates an index in the document engine for a knowledge base -func (s *KnowledgebaseService) CreateIndex(req *CreateIndexRequest) (*CreateIndexResponse, common.ErrorCode, error) { - // Get KB to find tenant_id for building index name +// CreateDatasetInDocEngine creates a table in the document engine for a knowledge base +func (s *KnowledgebaseService) CreateDatasetInDocEngine(req *CreateDatasetTableRequest) (*CreateDatasetInDocEngineResponse, common.ErrorCode, error) { + // Get KB to find tenant_id for building table name kb, err := s.kbDAO.GetByID(req.KBID) if err != nil { return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID) @@ -219,38 +219,38 @@ func (s *KnowledgebaseService) CreateIndex(req *CreateIndexRequest) (*CreateInde return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive") } - // Build index name prefix: ragflow_ - indexName := fmt.Sprintf("ragflow_%s", kb.TenantID) + // Build table name prefix: ragflow_ + tableName := fmt.Sprintf("ragflow_%s", kb.TenantID) - // Call document engine to create index - // Full table name will be built as "{indexName}_{kb_id}" - err = s.docEngine.CreateIndex(context.Background(), indexName, req.KBID, vecSize, req.ParserID) + // Call document engine to create table + // Full table name will be built as "{tableName}_{kb_id}" + err = s.docEngine.CreateDataset(context.Background(), tableName, req.KBID, vecSize, req.ParserID) if err != nil { - return nil, common.CodeServerError, fmt.Errorf("failed to create index: %w", err) + return nil, common.CodeServerError, fmt.Errorf("failed to create dataset: %w", err) } - return &CreateIndexResponse{ + return &CreateDatasetInDocEngineResponse{ KBID: req.KBID, - IndexName: indexName, + TableName: tableName, VectorSize: vecSize, }, common.CodeSuccess, nil } -// DeleteIndex deletes the index in the document engine for a knowledge base -func (s *KnowledgebaseService) DeleteIndex(kbID string) (common.ErrorCode, error) { - // Get KB to find tenant_id for building index name +// DeleteDatasetInDocEngine deletes the table in the document engine for a knowledge base +func (s *KnowledgebaseService) DeleteDatasetInDocEngine(kbID string) (common.ErrorCode, error) { + // Get KB to find tenant_id for building table name kb, err := s.kbDAO.GetByID(kbID) if err != nil { return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID) } - // Build index name: ragflow__ - indexName := fmt.Sprintf("ragflow_%s_%s", kb.TenantID, kbID) + // Build table name: ragflow__ + tableName := fmt.Sprintf("ragflow_%s_%s", kb.TenantID, kbID) - // Call document engine to delete index - err = s.docEngine.DeleteIndex(context.Background(), indexName) + // Call document engine to delete table + err = s.docEngine.DropTable(context.Background(), tableName) if err != nil { - return common.CodeServerError, fmt.Errorf("failed to delete index: %w", err) + return common.CodeServerError, fmt.Errorf("failed to delete table: %w", err) } return common.CodeSuccess, nil diff --git a/internal/service/tenant.go b/internal/service/tenant.go index e2c7a9c8e6..e66bf5687d 100644 --- a/internal/service/tenant.go +++ b/internal/service/tenant.go @@ -257,29 +257,29 @@ func (s *TenantService) GetTenantList(userID string) ([]*TenantListItem, error) return result, nil } -// CreateDocMetaIndex creates the document metadata index for a tenant -func (s *TenantService) CreateDocMetaIndex(tenantID string) (common.ErrorCode, error) { - // Build index name: ragflow_doc_meta_ - indexName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) +// CreateMetadataInDocEngine creates the document metadata table for a tenant +func (s *TenantService) CreateMetadataInDocEngine(tenantID string) (common.ErrorCode, error) { + // Build table name: ragflow_doc_meta_ + tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) - // Call document engine to create doc meta index - err := s.docEngine.CreateDocMetaIndex(context.Background(), indexName) + // Call document engine to create doc meta table + err := s.docEngine.CreateMetadata(context.Background(), tableName) if err != nil { - return common.CodeServerError, fmt.Errorf("failed to create doc meta index: %w", err) + return common.CodeServerError, fmt.Errorf("failed to create metadata table: %w", err) } return common.CodeSuccess, nil } -// DeleteDocMetaIndex deletes the document metadata index for a tenant -func (s *TenantService) DeleteDocMetaIndex(tenantID string) (common.ErrorCode, error) { - // Build index name: ragflow_doc_meta_ - indexName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) +// DeleteMetadataInDocEngine deletes the document metadata table for a tenant +func (s *TenantService) DeleteMetadataInDocEngine(tenantID string) (common.ErrorCode, error) { + // Build table name: ragflow_doc_meta_ + tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) - // Call document engine to delete doc meta index - err := s.docEngine.DeleteIndex(context.Background(), indexName) + // Call document engine to delete doc meta table + err := s.docEngine.DropTable(context.Background(), tableName) if err != nil { - return common.CodeServerError, fmt.Errorf("failed to delete doc meta index: %w", err) + return common.CodeServerError, fmt.Errorf("failed to delete doc meta table: %w", err) } return common.CodeSuccess, nil