From ebf36950e475f997e6d19f27308366c6926cf9ee Mon Sep 17 00:00:00 2001 From: qinling0210 <88864212+qinling0210@users.noreply.github.com> Date: Thu, 26 Mar 2026 11:54:10 +0800 Subject: [PATCH] Implement Create/Drop Index/Metadata index in GO (#13791) ### What problem does this PR solve? Implement Create/Drop Index/Metadata index in GO New API handling in GO: POST/kb/index DELETE /kb/index POST /tenant/doc_meta_index DELETE /tenant/doc_meta_index CREATE INDEX FOR DATASET 'dataset_name' VECTOR_SIZE 1024; DROP INDEX FOR DATASET 'dataset_name'; CREATE INDEX DOC_META; DROP INDEX DOC_META; ### Type of change - [x] Refactoring --- admin/client/parser.py | 36 +++ admin/client/ragflow_client.py | 81 +++++++ internal/cli/cli.go | 4 + internal/cli/client.go | 8 + internal/cli/lexer.go | 6 + internal/cli/parser.go | 2 +- internal/cli/types.go | 3 + internal/cli/user_command.go | 180 ++++++++++++++ internal/cli/user_parser.go | 99 ++++++++ internal/engine/elasticsearch/index.go | 27 ++- internal/engine/engine.go | 9 +- internal/engine/infinity/client.go | 110 ++++++++- internal/engine/infinity/index.go | 319 ++++++++++++++++++++++++- internal/handler/kb.go | 81 +++++++ internal/handler/tenant.go | 64 +++++ internal/router/router.go | 9 + internal/server/config.go | 8 +- internal/service/kb.go | 69 ++++++ internal/service/tenant.go | 34 +++ internal/utility/path.go | 46 ++++ 20 files changed, 1165 insertions(+), 30 deletions(-) create mode 100644 internal/utility/path.go diff --git a/admin/client/parser.py b/admin/client/parser.py index 7465919305..a096de7e90 100644 --- a/admin/client/parser.py +++ b/admin/client/parser.py @@ -84,6 +84,10 @@ sql_command: login_user | list_user_chats | create_user_chat | drop_user_chat + | create_index + | drop_index + | create_doc_meta_index + | drop_doc_meta_index | list_user_model_providers | list_user_default_models | parse_dataset_docs @@ -176,6 +180,7 @@ IMPORT: "IMPORT"i INTO: "INTO"i IN: "IN"i WITH: "WITH"i +VECTOR_SIZE: "VECTOR_SIZE"i PARSER: "PARSER"i PIPELINE: "PIPELINE"i SEARCH: "SEARCH"i @@ -197,6 +202,8 @@ FINGERPRINT: "FINGERPRINT"i LICENSE: "LICENSE"i CHECK: "CHECK"i CONFIG: "CONFIG"i +INDEX: "INDEX"i +DOC_META: "DOC_META"i CHUNK: "CHUNK"i CHUNKS: "CHUNKS"i GET: "GET"i @@ -323,6 +330,10 @@ list_user_agents: LIST AGENTS ";" list_user_chats: LIST CHATS ";" create_user_chat: CREATE CHAT quoted_string ";" drop_user_chat: DROP CHAT quoted_string ";" +create_index: CREATE INDEX FOR DATASET quoted_string VECTOR_SIZE NUMBER ";" +drop_index: DROP INDEX FOR DATASET quoted_string ";" +create_doc_meta_index: CREATE INDEX DOC_META ";" +drop_doc_meta_index: DROP INDEX DOC_META ";" create_chat_session: CREATE CHAT quoted_string SESSION ";" drop_chat_session: DROP CHAT quoted_string SESSION quoted_string ";" list_chat_sessions: LIST CHAT quoted_string SESSIONS ";" @@ -650,6 +661,31 @@ class RAGFlowCLITransformer(Transformer): chat_name = items[2].children[0].strip("'\"") return {"type": "drop_user_chat", "chat_name": chat_name} + def create_index(self, items): + # items: CREATE, INDEX, FOR, DATASET, quoted_string, VECTOR_SIZE, NUMBER, ";" + dataset_name = None + vector_size = None + for i, item in enumerate(items): + if hasattr(item, 'data') and item.data == 'quoted_string': + dataset_name = item.children[0].strip("'\"") + if hasattr(item, 'type') and item.type == 'NUMBER': + if i > 0 and items[i-1].type == 'VECTOR_SIZE': + vector_size = int(item) + return {"type": "create_index", "dataset_name": dataset_name, "vector_size": vector_size} + + def drop_index(self, items): + dataset_name = None + for item in items: + if hasattr(item, 'data') and item.data == 'quoted_string': + dataset_name = item.children[0].strip("'\"") + return {"type": "drop_index", "dataset_name": dataset_name} + + def create_doc_meta_index(self, items): + return {"type": "create_doc_meta_index"} + + def drop_doc_meta_index(self, items): + return {"type": "drop_doc_meta_index"} + def list_user_model_providers(self, items): return {"type": "list_user_model_providers"} diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index e45ec99c38..03d9b8dedd 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -1080,6 +1080,75 @@ class RAGFlowClient: else: print(f"Fail to create chat {chat_name}, code: {res_json['code']}, message: {res_json['message']}") + def create_index(self, command): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + dataset_name = command["dataset_name"] + vector_size = command.get("vector_size") + if not vector_size: + print("vector_size is required") + return + # Get dataset ID by name + dataset_id = self._get_dataset_id(dataset_name) + if dataset_id is None: + return + # Build payload + payload = {"kb_id": dataset_id, "vector_size": vector_size} + # Call API + response = self.http_client.request("POST", "/kb/index", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200 and res_json.get("code") == 0: + print(f"Success to create index for dataset: {dataset_name}") + else: + print(f"Fail to create index for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") + + def drop_index(self, command): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + dataset_name = command["dataset_name"] + # Get dataset ID by name + dataset_id = self._get_dataset_id(dataset_name) + if dataset_id is None: + return + # Call API to delete index + payload = {"kb_id": dataset_id} + response = self.http_client.request("DELETE", "/kb/index", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200 and res_json.get("code") == 0: + print(f"Success to drop index for dataset: {dataset_name}") + else: + print(f"Fail to drop index for dataset {dataset_name}, code: {res_json.get('code')}, message: {res_json.get('message')}") + + def create_doc_meta_index(self, command): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + # Call API to create doc meta index + response = self.http_client.request("POST", "/tenant/doc_meta_index", + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200 and res_json.get("code") == 0: + print("Success to create doc meta index") + else: + print(f"Fail to create doc meta index, code: {res_json.get('code')}, message: {res_json.get('message')}") + + def drop_doc_meta_index(self, command): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + # Call API to delete doc meta index + response = self.http_client.request("DELETE", "/tenant/doc_meta_index", + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200 and res_json.get("code") == 0: + print("Success to drop doc meta index") + else: + print(f"Fail to drop doc meta index, code: {res_json.get('code')}, message: {res_json.get('message')}") + def drop_user_chat(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") @@ -1804,6 +1873,14 @@ def run_command(client: RAGFlowClient, command_dict: dict): client.create_user_chat(command_dict) case "drop_user_chat": client.drop_user_chat(command_dict) + case "create_index": + client.create_index(command_dict) + case "drop_index": + client.drop_index(command_dict) + case "create_doc_meta_index": + client.create_doc_meta_index(command_dict) + case "drop_doc_meta_index": + client.drop_doc_meta_index(command_dict) case "create_chat_session": client.create_chat_session(command_dict) case "drop_chat_session": @@ -1887,6 +1964,10 @@ LIST METADATA OF DATASETS [, ]* LIST METADATA SUMMARY OF DATASET DOCUMENTS [, ]* GET CHUNK LIST CHUNKS OF DOCUMENT [PAGE ] [SIZE ] [KEYWORDS ] [AVAILABLE <0|1>] +CREATE INDEX FOR DATASET VECTOR_SIZE +DROP INDEX FOR DATASET +CREATE INDEX DOC_META +DROP INDEX DOC_META Meta Commands: \\?, \\h, \\help Show this help diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 8ca48fb6dd..7c77ad68f1 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -564,6 +564,10 @@ Commands (User Mode): SET TOKEN 'token_value'; - Set and validate API token SHOW TOKEN; - Show current API token UNSET TOKEN; - Remove current API token + CREATE INDEX FOR DATASET 'name' VECTOR_SIZE N; - Create index for dataset + DROP INDEX FOR DATASET 'name'; - Drop index for dataset + CREATE INDEX DOC_META; - Create doc meta index + DROP INDEX DOC_META; - Drop doc meta index Commands (Admin Mode): LIST USERS; - List all users diff --git a/internal/cli/client.go b/internal/cli/client.go index e550453689..10795baadf 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -372,6 +372,14 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.UnsetToken(cmd) case "show_version": return c.ShowServerVersion(cmd) + case "create_index": + return c.CreateIndex(cmd) + case "drop_index": + return c.DropIndex(cmd) + case "create_doc_meta_index": + return c.CreateDocMetaIndex(cmd) + case "drop_doc_meta_index": + return c.DropDocMetaIndex(cmd) // TODO: Implement other commands default: return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type) diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index 1b4ad20dcd..f0cbeee6ed 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -293,6 +293,12 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenToken, Value: ident} case "TOKENS": return Token{Type: TokenTokens, Value: ident} + case "INDEX": + return Token{Type: TokenIndex, Value: ident} + case "VECTOR_SIZE": + return Token{Type: TokenVectorSize, Value: ident} + case "DOC_META": + return Token{Type: TokenDocMeta, Value: ident} default: return Token{Type: TokenIdentifier, Value: ident} } diff --git a/internal/cli/parser.go b/internal/cli/parser.go index 730d8892e2..d4afadf9aa 100644 --- a/internal/cli/parser.go +++ b/internal/cli/parser.go @@ -208,7 +208,7 @@ func (p *Parser) expectSemicolon() error { } func isKeyword(tokenType int) bool { - return tokenType >= TokenLogin && tokenType <= TokenPing + return tokenType >= TokenLogin && tokenType <= TokenDocMeta } // Helper functions for parsing diff --git a/internal/cli/types.go b/internal/cli/types.go index 06c646eca8..5d90d54394 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -98,6 +98,9 @@ const ( TokenToken TokenTokens TokenUnset + TokenIndex + TokenVectorSize + TokenDocMeta // Literals TokenIdentifier diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 32f39c06e4..ed108b242e 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -546,3 +546,183 @@ func (c *RAGFlowClient) UnsetToken(cmd *Command) (ResponseIf, error) { result.Duration = 0 return &result, nil } + +// CreateIndex creates an index for a dataset +func (c *RAGFlowClient) CreateIndex(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + datasetName, ok := cmd.Params["dataset_name"].(string) + if !ok { + return nil, fmt.Errorf("dataset_name not provided") + } + + vectorSize, ok := cmd.Params["vector_size"].(int) + if !ok { + return nil, fmt.Errorf("vector_size not provided") + } + + // Get dataset ID by name + datasetID, err := c.getDatasetID(datasetName) + if err != nil { + return nil, err + } + + payload := map[string]interface{}{ + "kb_id": datasetID, + "vector_size": vectorSize, + } + + resp, err := c.HTTPClient.Request("POST", "/kb/index", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to create index: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to create index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to create index for dataset: %s", datasetName) + } else { + result.Message = fmt.Sprintf("Failed to create index: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + +// DropIndex drops an index for a dataset +func (c *RAGFlowClient) DropIndex(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + datasetName, ok := cmd.Params["dataset_name"].(string) + if !ok { + return nil, fmt.Errorf("dataset_name not provided") + } + + // Get dataset ID by name + datasetID, err := c.getDatasetID(datasetName) + if err != nil { + return nil, err + } + + payload := map[string]interface{}{ + "kb_id": datasetID, + } + + resp, err := c.HTTPClient.Request("DELETE", "/kb/index", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to drop index: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to drop index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to drop index for dataset: %s", datasetName) + } else { + result.Message = fmt.Sprintf("Failed to drop index: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + +// CreateDocMetaIndex creates the document metadata index for the tenant +func (c *RAGFlowClient) CreateDocMetaIndex(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + resp, err := c.HTTPClient.Request("POST", "/tenant/doc_meta_index", false, "web", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to create doc meta index: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to create doc meta index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = "Success to create doc meta index" + } else { + result.Message = fmt.Sprintf("Failed to create doc meta index: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + +// DropDocMetaIndex drops the document metadata index for the tenant +func (c *RAGFlowClient) DropDocMetaIndex(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + resp, err := c.HTTPClient.Request("DELETE", "/tenant/doc_meta_index", false, "web", nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to drop doc meta index: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to drop doc meta index: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = "Success to drop doc meta index" + } else { + result.Message = fmt.Sprintf("Failed to drop doc meta index: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} diff --git a/internal/cli/user_parser.go b/internal/cli/user_parser.go index cae5b6e575..4190597fc7 100644 --- a/internal/cli/user_parser.go +++ b/internal/cli/user_parser.go @@ -432,6 +432,8 @@ func (p *Parser) parseCreateCommand() (*Command, error) { return p.parseCreateChat() case TokenToken: return p.parseCreateToken() + case TokenIndex: + return p.parseCreateIndex() default: return nil, fmt.Errorf("unknown CREATE target: %s", p.curToken.Value) } @@ -448,6 +450,61 @@ func (p *Parser) parseCreateToken() (*Command, error) { return NewCommand("create_token"), nil } +func (p *Parser) parseCreateIndex() (*Command, error) { + // CREATE INDEX FOR DATASET 'name' VECTOR_SIZE N + // CREATE INDEX DOC_META + p.nextToken() // consume INDEX + + // Check if creating doc meta index + if p.curToken.Type == TokenDocMeta { + p.nextToken() + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + return NewCommand("create_doc_meta_index"), nil + } + + // Otherwise, must be CREATE INDEX FOR DATASET 'name' VECTOR_SIZE N + if p.curToken.Type != TokenFor { + return nil, fmt.Errorf("expected FOR or DOC_META after INDEX, got %s", p.curToken.Value) + } + p.nextToken() + + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after FOR, got %s", p.curToken.Value) + } + p.nextToken() + + datasetName, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset name, got %s", p.curToken.Value) + } + + p.nextToken() + if p.curToken.Type != TokenVectorSize { + return nil, fmt.Errorf("expected VECTOR_SIZE after dataset name, got %s", p.curToken.Value) + } + p.nextToken() + + if p.curToken.Type != TokenNumber { + return nil, fmt.Errorf("expected vector size number, got %s", p.curToken.Value) + } + vectorSize, err := strconv.Atoi(p.curToken.Value) + if err != nil { + return nil, fmt.Errorf("invalid vector size: %s", p.curToken.Value) + } + + p.nextToken() + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + cmd := NewCommand("create_index") + cmd.Params["dataset_name"] = datasetName + cmd.Params["vector_size"] = vectorSize + return cmd, nil +} + func (p *Parser) parseCreateUser() (*Command, error) { p.nextToken() // consume USER userName, err := p.parseQuotedString() @@ -620,6 +677,8 @@ func (p *Parser) parseDropCommand() (*Command, error) { return p.parseDropChat() case TokenToken: return p.parseDropToken() + case TokenIndex: + return p.parseDropIndex() default: return nil, fmt.Errorf("unknown DROP target: %s", p.curToken.Value) } @@ -656,6 +715,46 @@ func (p *Parser) parseDropToken() (*Command, error) { return cmd, nil } +func (p *Parser) parseDropIndex() (*Command, error) { + // DROP INDEX FOR DATASET 'name' OR DROP INDEX DOC_META + p.nextToken() // consume INDEX + + // Check if dropping doc meta index + if p.curToken.Type == TokenDocMeta { + p.nextToken() + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + cmd := NewCommand("drop_doc_meta_index") + return cmd, nil + } + + // Otherwise, must be DROP INDEX FOR DATASET 'name' + if p.curToken.Type != TokenFor { + return nil, fmt.Errorf("expected FOR or DOC_META after INDEX, got %s", p.curToken.Value) + } + p.nextToken() + + if p.curToken.Type != TokenDataset { + return nil, fmt.Errorf("expected DATASET after FOR, got %s", p.curToken.Value) + } + p.nextToken() + + datasetName, err := p.parseQuotedString() + if err != nil { + return nil, fmt.Errorf("expected dataset name, got %s", p.curToken.Value) + } + + p.nextToken() + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + + cmd := NewCommand("drop_index") + cmd.Params["dataset_name"] = datasetName + return cmd, nil +} + func (p *Parser) parseDropUser() (*Command, error) { p.nextToken() // consume USER userName, err := p.parseQuotedString() diff --git a/internal/engine/elasticsearch/index.go b/internal/engine/elasticsearch/index.go index 795c41bf04..6834b583da 100644 --- a/internal/engine/elasticsearch/index.go +++ b/internal/engine/elasticsearch/index.go @@ -27,7 +27,14 @@ import ( ) // CreateIndex creates an index -func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName string, mapping interface{}) error { +func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { + // Elasticsearch doesn't support vector_size or parser_id in the same way + // Build mapping for ES (if needed) + // TODO + mapping := map[string]interface{}{ + "dataset_id": datasetID, + } + if indexName == "" { return fmt.Errorf("index name cannot be empty") } @@ -44,15 +51,11 @@ func (e *elasticsearchEngine) CreateIndex(ctx context.Context, indexName string, // Prepare request body var body io.Reader if mapping != nil { - if str, ok := mapping.(string); ok { - body = bytes.NewBufferString(str) - } else { - data, err := json.Marshal(mapping) - if err != nil { - return fmt.Errorf("failed to marshal mapping: %w", err) - } - body = bytes.NewReader(data) + data, err := json.Marshal(mapping) + if err != nil { + return fmt.Errorf("failed to marshal mapping: %w", err) } + body = bytes.NewReader(data) } // Create index @@ -142,3 +145,9 @@ func (e *elasticsearchEngine) IndexExists(ctx context.Context, indexName string) return false, fmt.Errorf("elasticsearch returned error: %s", res.Status()) } + +// CreateDocMetaIndex creates the document metadata index +func (e *elasticsearchEngine) CreateDocMetaIndex(ctx context.Context, indexName string) error { + // TODO + return nil +} diff --git a/internal/engine/engine.go b/internal/engine/engine.go index f6cd56e3cd..722dd6b4ff 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -42,7 +42,7 @@ type DocEngine interface { Search(ctx context.Context, req interface{}) (interface{}, error) // Index operations - CreateIndex(ctx context.Context, indexName string, mapping interface{}) error + CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error DeleteIndex(ctx context.Context, indexName string) error IndexExists(ctx context.Context, indexName string) (bool, error) @@ -51,8 +51,11 @@ type DocEngine interface { BulkIndex(ctx context.Context, indexName string, docs []interface{}) (interface{}, error) DeleteDocument(ctx context.Context, indexName, docID string) error - // Chunk operations - GetChunk(ctx context.Context, indexName, chunkID string, kbIDs []string) (interface{}, error) + // Chunk operations + GetChunk(ctx context.Context, indexName, chunkID string, kbIDs []string) (interface{}, error) + + // Doc metadata index operations (per-tenant) + CreateDocMetaIndex(ctx context.Context, indexName string) error // Health check Ping(ctx context.Context) error diff --git a/internal/engine/infinity/client.go b/internal/engine/infinity/client.go index 99e6aadfb5..f3281d24ed 100644 --- a/internal/engine/infinity/client.go +++ b/internal/engine/infinity/client.go @@ -19,11 +19,14 @@ package infinity import ( "context" "fmt" - "ragflow/internal/server" + "reflect" "strconv" "strings" + "time" infinity "github.com/infiniflow/infinity-go-sdk" + "ragflow/internal/server" + "ragflow/internal/logger" ) // infinityClient Infinity SDK client wrapper @@ -48,21 +51,80 @@ func NewInfinityClient(cfg *server.InfinityConfig) (*infinityClient, error) { } } - conn, err := infinity.Connect(infinity.NetworkAddress{IP: host, Port: port}) + // Retry connecting for up to 120 seconds (24 attempts * 5 seconds) + logger.Info("Connecting to Infinity") + var conn *infinity.InfinityConnection + var err error + for i := 0; i < 24; i++ { + conn, err = infinity.Connect(infinity.NetworkAddress{IP: host, Port: port}) + if err == nil { + break + } + if i < 23 { + time.Sleep(5 * time.Second) + } + } if err != nil { - return nil, fmt.Errorf("failed to connect to Infinity: %w", err) + return nil, fmt.Errorf("Failed to connect to Infinity after 120s: %w", err) } - return &infinityClient{ + client := &infinityClient{ conn: conn, dbName: cfg.DBName, - }, nil + } + + return client, nil +} + +// WaitForHealthy blocks until Infinity is healthy or timeout +func (c *infinityClient) WaitForHealthy(ctx context.Context, timeout time.Duration) error { + logger.Info("Waiting for Infinity to be healthy") + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + res, err := c.conn.ShowCurrentNode() + if err != nil { + time.Sleep(5 * time.Second) + continue + } + // Use reflection to access ErrorCode and ServerStatus fields + // since ShowCurrentNodeResponse is in an internal package + v := reflect.ValueOf(res) + if v.Kind() != reflect.Ptr { + time.Sleep(5 * time.Second) + continue + } + v = v.Elem() + errorCode := v.FieldByName("ErrorCode") + serverStatus := v.FieldByName("ServerStatus") + if !errorCode.IsValid() || !serverStatus.IsValid() { + time.Sleep(5 * time.Second) + continue + } + // ErrorCode 0 means OK, ServerStatus "started" or "alive" means healthy + if errorCode.Int() == 0 { + status := serverStatus.String() + if status == "started" || status == "alive" { + logger.Info("Infinity is healthy") + return nil + } + } + time.Sleep(5 * time.Second) + } + return fmt.Errorf("Infinity not healthy after %v", timeout) } // Engine Infinity engine implementation using Go SDK type infinityEngine struct { - config *server.InfinityConfig - client *infinityClient + config *server.InfinityConfig + client *infinityClient + mappingFileName string + docMetaMappingFileName string } // NewEngine creates an Infinity engine @@ -77,9 +139,30 @@ func NewEngine(cfg interface{}) (*infinityEngine, error) { return nil, err } + mappingFileName := infConfig.MappingFileName + if mappingFileName == "" { + mappingFileName = "infinity_mapping.json" + } + docMetaMappingFileName := infConfig.DocMetaMappingFileName + if docMetaMappingFileName == "" { + docMetaMappingFileName = "doc_meta_infinity_mapping.json" + } + engine := &infinityEngine{ - config: infConfig, - client: client, + config: infConfig, + client: client, + mappingFileName: mappingFileName, + docMetaMappingFileName: docMetaMappingFileName, + } + + // Wait for Infinity to be healthy + if err := client.WaitForHealthy(context.Background(), 120*time.Second); err != nil { + return nil, fmt.Errorf("Infinity not healthy: %w", err) + } + + // MigrateDB creates the database if it doesn't exist + if err := engine.MigrateDB(context.Background()); err != nil { + return nil, fmt.Errorf("failed to migrate database: %w", err) } return engine, nil @@ -109,3 +192,12 @@ func (e *infinityEngine) Close() error { } return nil } + +// MigrateDB creates the database if it doesn't exist +func (e *infinityEngine) MigrateDB(ctx context.Context) error { + _, err := e.client.conn.CreateDatabase(e.client.dbName, infinity.ConflictTypeIgnore, "") + if err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + return nil +} diff --git a/internal/engine/infinity/index.go b/internal/engine/infinity/index.go index f4bab3dfb4..32d9068da0 100644 --- a/internal/engine/infinity/index.go +++ b/internal/engine/infinity/index.go @@ -17,21 +17,330 @@ package infinity import ( + "bytes" "context" + "encoding/json" "fmt" + "os" + "path/filepath" + "regexp" + "strings" + + infinity "github.com/infiniflow/infinity-go-sdk" + "ragflow/internal/logger" + "ragflow/internal/utility" + + "go.uber.org/zap" ) -// CreateIndex creates a table/index -func (e *infinityEngine) CreateIndex(ctx context.Context, indexName string, mapping interface{}) error { - return fmt.Errorf("infinity create table not implemented: waiting for official Go SDK") +// fieldInfo represents a field in the infinity mapping schema +type fieldInfo struct { + Type string `json:"type"` + Default interface{} `json:"default"` + Analyzer interface{} `json:"analyzer"` // string or []string + IndexType interface{} `json:"index_type"` // string or map + Comment string `json:"comment"` +} + +// orderedFields preserves the order of fields as defined in JSON +type orderedFields struct { + Keys []string + Fields map[string]fieldInfo +} + +func (o *orderedFields) UnmarshalJSON(data []byte) error { + // Parse JSON manually to preserve key order + // Look for key names by scanning the JSON string + // This is a simple approach: find {"key": value, "key2": value2...} + o.Fields = make(map[string]fieldInfo) + o.Keys = make([]string, 0) + + // Use a streaming JSON parser approach + dec := json.NewDecoder(bytes.NewReader(data)) + tok, err := dec.Token() + if err != nil { + return err + } + if delim, ok := tok.(json.Delim); ok && delim == '{' { + for dec.More() { + // Read key + tok, err := dec.Token() + if err != nil { + return err + } + key, ok := tok.(string) + if !ok { + continue + } + o.Keys = append(o.Keys, key) + + // Read value into fieldInfo + var field fieldInfo + if err := dec.Decode(&field); err != nil { + return err + } + o.Fields[key] = field + } + } + return nil +} + +// CreateIndex creates a table/index in Infinity +// indexName is the table name prefix (e.g., "ragflow_") +// The full table name is built as "{indexName}_{datasetID}" +func (e *infinityEngine) CreateIndex(ctx context.Context, indexName, datasetID string, vectorSize int, parserID string) error { + vecSize := vectorSize + + // Build full table name: {indexName}_{datasetID} + tableName := fmt.Sprintf("%s_%s", indexName, datasetID) + + // Use configured schema + fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.mappingFileName) + + schemaData, err := os.ReadFile(fpMapping) + if err != nil { + return fmt.Errorf("Failed to read mapping file: %w", err) + } + + var schema orderedFields + if err := json.Unmarshal(schemaData, &schema); err != nil { + return fmt.Errorf("Failed to parse mapping file: %w", err) + } + + // Get database + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("Failed to get database: %w", err) + } + + // Build column definitions (preserving JSON order) + var columns infinity.TableSchema + for _, fieldName := range schema.Keys { + fieldInfo := schema.Fields[fieldName] + col := infinity.ColumnDefinition{ + Name: fieldName, + DataType: fieldInfo.Type, + Default: fieldInfo.Default, + // Comment: fieldInfo.Comment, + } + columns = append(columns, &col) + } + + // Add vector column + vectorColName := fmt.Sprintf("q_%d_vec", vecSize) + columns = append(columns, &infinity.ColumnDefinition{ + Name: vectorColName, + DataType: fmt.Sprintf("vector,%d,float", vecSize), + }) + + // Add chunk_data column for table parser + if parserID == "table" { + columns = append(columns, &infinity.ColumnDefinition{ + Name: "chunk_data", + DataType: "json", + Default: "{}", + }) + } + + // Create table + table, err := db.CreateTable(tableName, columns, infinity.ConflictTypeIgnore) + if err != nil { + return fmt.Errorf("Failed to create table: %w", err) + } + logger.Debug("Infinity created table", zap.String("tableName", tableName)) + + // Create HNSW index on vector column + _, err = table.CreateIndex( + "q_vec_idx", + infinity.NewIndexInfo(vectorColName, infinity.IndexTypeHnsw, map[string]string{ + "M": "16", + "ef_construction": "50", + "metric": "cosine", + "encode": "lvq", + }), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create HNSW index: %w", err) + } + + // Create full-text indexes for varchar fields with analyzers + for _, fieldName := range schema.Keys { + fieldInfo := schema.Fields[fieldName] + if fieldInfo.Type != "varchar" || fieldInfo.Analyzer == nil { + continue + } + + analyzers := []string{} + switch a := fieldInfo.Analyzer.(type) { + case string: + analyzers = []string{a} + case []interface{}: + for _, v := range a { + if s, ok := v.(string); ok { + analyzers = append(analyzers, s) + } + } + } + + for _, analyzer := range analyzers { + indexNameFt := fmt.Sprintf("ft_%s_%s", + regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(fieldName, "_"), + regexp.MustCompile(`[^a-zA-Z0-9]`).ReplaceAllString(analyzer, "_"), + ) + _, err = table.CreateIndex( + indexNameFt, + infinity.NewIndexInfo(fieldName, infinity.IndexTypeFullText, map[string]string{"ANALYZER": analyzer}), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create fulltext index %s: %w", indexNameFt, err) + } + } + } + + // Create secondary indexes for fields with index_type + for _, fieldName := range schema.Keys { + fieldInfo := schema.Fields[fieldName] + if fieldInfo.IndexType == nil { + continue + } + + indexTypeStr := "" + params := map[string]string{} + + switch it := fieldInfo.IndexType.(type) { + case string: + indexTypeStr = it + case map[string]interface{}: + if t, ok := it["type"].(string); ok { + indexTypeStr = t + } + if card, ok := it["cardinality"].(string); ok { + params["cardinality"] = card + } + } + + if indexTypeStr == "secondary" { + indexNameSec := fmt.Sprintf("sec_%s", fieldName) + _, err = table.CreateIndex( + indexNameSec, + infinity.NewIndexInfo(fieldName, infinity.IndexTypeSecondary, params), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create secondary index %s: %w", indexNameSec, err) + } + } + } + + _ = table // suppress unused variable warning + return nil } // DeleteIndex deletes a table/index func (e *infinityEngine) DeleteIndex(ctx context.Context, indexName string) error { - return fmt.Errorf("infinity drop table not implemented: waiting for official Go SDK") + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("Failed to get database: %w", err) + } + + _, err = db.DropTable(indexName, infinity.ConflictTypeIgnore) + if err != nil { + return fmt.Errorf("Failed to drop table: %w", err) + } + logger.Debug("Infinity deleted table", zap.String("tableName", indexName)) + return nil } // IndexExists checks if table/index exists func (e *infinityEngine) IndexExists(ctx context.Context, indexName string) (bool, error) { - return false, fmt.Errorf("infinity check table existence not implemented: waiting for official Go SDK") + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return false, fmt.Errorf("Failed to get database: %w", err) + } + + _, err = db.GetTable(indexName) + if err != nil { + // Check if error is "table not found" + if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "NotFound") { + return false, nil + } + return false, err + } + return true, nil +} + +// CreateDocMetaIndex creates the document metadata table/index +func (e *infinityEngine) CreateDocMetaIndex(ctx context.Context, indexName string) error { + // Get database + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return fmt.Errorf("Failed to get database: %w", err) + } + + // Use configured doc_meta mapping file + fpMapping := filepath.Join(utility.GetProjectRoot(), "conf", e.docMetaMappingFileName) + + schemaData, err := os.ReadFile(fpMapping) + if err != nil { + return fmt.Errorf("Failed to read mapping file: %w", err) + } + + var schema map[string]fieldInfo + if err := json.Unmarshal(schemaData, &schema); err != nil { + return fmt.Errorf("Failed to parse mapping file: %w", err) + } + + // Build column definitions + var columns infinity.TableSchema + for fieldName, fieldInfo := range schema { + col := infinity.ColumnDefinition{ + Name: fieldName, + DataType: fieldInfo.Type, + Default: fieldInfo.Default, + // Comment: fieldInfo.Comment, + } + columns = append(columns, &col) + } + + // Create table + _, err = db.CreateTable(indexName, columns, infinity.ConflictTypeIgnore) + if err != nil { + return fmt.Errorf("Failed to create doc meta table: %w", err) + } + logger.Debug("Infinity created doc meta table", zap.String("tableName", indexName)) + + // Get table for creating indexes + table, err := db.GetTable(indexName) + if err != nil { + return fmt.Errorf("Failed to get table: %w", err) + } + + // Create secondary index on id + _, err = table.CreateIndex( + fmt.Sprintf("idx_%s_id", indexName), + infinity.NewIndexInfo("id", infinity.IndexTypeSecondary, nil), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create secondary index on id: %w", err) + } + + // Create secondary index on kb_id + _, err = table.CreateIndex( + fmt.Sprintf("idx_%s_kb_id", indexName), + infinity.NewIndexInfo("kb_id", infinity.IndexTypeSecondary, nil), + infinity.ConflictTypeIgnore, + "", + ) + if err != nil { + return fmt.Errorf("Failed to create secondary index on kb_id: %w", err) + } + + return nil } diff --git a/internal/handler/kb.go b/internal/handler/kb.go index ab37158e33..765102ed18 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -622,3 +622,84 @@ func (h *KnowledgebaseHandler) GetBasicInfo(c *gin.Context) { jsonResponse(c, common.CodeSuccess, map[string]interface{}{}, "success") } + +// CreateIndex handles the create index request for a knowledge base +// @Summary Create Index +// @Description Create the Infinity index/table for a knowledge base +// @Tags knowledgebase +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body service.CreateIndexRequest true "create index request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/kb/index [post] +func (h *KnowledgebaseHandler) CreateIndex(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req service.CreateIndexRequest + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + // Check authorization + if !h.kbService.Accessible(req.KBID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization.") + return + } + + result, code, err := h.kbService.CreateIndex(&req) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, result, "success") +} + +// DeleteIndexRequest represents the request for deleting an index +type DeleteIndexRequest struct { + KBID string `json:"kb_id" binding:"required"` +} + +// DeleteIndex handles the delete index request for a knowledge base +// @Summary Delete Index +// @Description Delete the Infinity index/table for a knowledge base +// @Tags knowledgebase +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body DeleteIndexRequest true "delete index request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/kb/index [delete] +func (h *KnowledgebaseHandler) DeleteIndex(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req DeleteIndexRequest + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, err.Error()) + return + } + + // Check authorization + if !h.kbService.Accessible(req.KBID, user.ID) { + jsonError(c, common.CodeAuthenticationError, "No authorization.") + return + } + + code, err := h.kbService.DeleteIndex(req.KBID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + jsonResponse(c, common.CodeSuccess, nil, "success") +} diff --git a/internal/handler/tenant.go b/internal/handler/tenant.go index bb43ffb98a..80503c7a19 100644 --- a/internal/handler/tenant.go +++ b/internal/handler/tenant.go @@ -113,3 +113,67 @@ func (h *TenantHandler) TenantList(c *gin.Context) { "data": tenantList, }) } + +// CreateDocMetaIndex handles the create doc meta index request +// @Summary Create Doc Meta Index +// @Description Create the document metadata index for a tenant +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/doc_meta_index [post] +func (h *TenantHandler) CreateDocMetaIndex(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + // Use user.ID as tenant ID (user IS the tenant in user mode) + tenantID := user.ID + + code, err := h.tenantService.CreateDocMetaIndex(tenantID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "message": "success", + "data": nil, + }) +} + +// DeleteDocMetaIndex handles the delete doc meta index request +// @Summary Delete Doc Meta Index +// @Description Delete the document metadata index for a tenant +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/doc_meta_index [delete] +func (h *TenantHandler) DeleteDocMetaIndex(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + // Use user.ID as tenant ID (user IS the tenant in user mode) + tenantID := user.ID + + code, err := h.tenantService.DeleteDocMetaIndex(tenantID) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "message": "success", + "data": nil, + }) +} diff --git a/internal/router/router.go b/internal/router/router.go index e7ddb8958c..6f6f3721ac 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -177,6 +177,8 @@ func (r *Router) Setup(engine *gin.Engine) { kb.GET("/tags", r.knowledgebaseHandler.ListTagsFromKbs) kb.GET("/get_meta", r.knowledgebaseHandler.GetMeta) kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo) + kb.POST("/index", r.knowledgebaseHandler.CreateIndex) + kb.DELETE("/index", r.knowledgebaseHandler.DeleteIndex) // KB ID specific routes kbByID := kb.Group("/:kb_id") @@ -189,6 +191,13 @@ func (r *Router) Setup(engine *gin.Engine) { } } + // Tenant routes (per-tenant resources) + tenant := authorized.Group("/v1/tenant") + { + tenant.POST("/doc_meta_index", r.tenantHandler.CreateDocMetaIndex) + tenant.DELETE("/doc_meta_index", r.tenantHandler.DeleteDocMetaIndex) + } + // Document routes doc := authorized.Group("/v1/document") { diff --git a/internal/server/config.go b/internal/server/config.go index 8892761ed5..3aaed8119a 100644 --- a/internal/server/config.go +++ b/internal/server/config.go @@ -135,9 +135,11 @@ type ElasticsearchConfig struct { // InfinityConfig Infinity configuration type InfinityConfig struct { - URI string `mapstructure:"uri"` - PostgresPort int `mapstructure:"postgres_port"` - DBName string `mapstructure:"db_name"` + URI string `mapstructure:"uri"` + PostgresPort int `mapstructure:"postgres_port"` + DBName string `mapstructure:"db_name"` + MappingFileName string `mapstructure:"mapping_file_name"` + DocMetaMappingFileName string `mapstructure:"doc_meta_mapping_file_name"` } type StorageType string diff --git a/internal/service/kb.go b/internal/service/kb.go index ba71bc321d..042b0b7296 100644 --- a/internal/service/kb.go +++ b/internal/service/kb.go @@ -17,10 +17,12 @@ package service import ( + "context" "errors" "fmt" "ragflow/internal/common" "ragflow/internal/dao" + "ragflow/internal/engine" "ragflow/internal/model" "ragflow/internal/utility" "strings" @@ -36,6 +38,7 @@ type KnowledgebaseService struct { userDAO *dao.UserDAO tenantDAO *dao.TenantDAO connectorDAO *dao.ConnectorDAO + docEngine engine.DocEngine } // NewKnowledgebaseService creates a new knowledge base service @@ -46,6 +49,7 @@ func NewKnowledgebaseService() *KnowledgebaseService { userDAO: dao.NewUserDAO(), tenantDAO: dao.NewTenantDAO(), connectorDAO: dao.NewConnectorDAO(), + docEngine: engine.Get(), } } @@ -186,6 +190,71 @@ func (s *KnowledgebaseService) CreateKB(req *CreateKBRequest, tenantID string) ( return &CreateKBResponse{KBID: kbID}, common.CodeSuccess, nil } +// CreateIndexRequest represents the request for creating an index +type CreateIndexRequest struct { + KBID string `json:"kb_id" binding:"required"` + VectorSize int `json:"vector_size" binding:"required"` + ParserID string `json:"parser_id,omitempty"` +} + +// CreateIndexResponse represents the response for creating an index +type CreateIndexResponse struct { + KBID string `json:"kb_id"` + IndexName string `json:"index_name"` + VectorSize int `json:"vector_size"` +} + +// CreateIndex creates an index in the document engine for a knowledge base +func (s *KnowledgebaseService) CreateIndex(req *CreateIndexRequest) (*CreateIndexResponse, common.ErrorCode, error) { + // Get KB to find tenant_id for building index name + kb, err := s.kbDAO.GetByID(req.KBID) + if err != nil { + return nil, common.CodeDataError, fmt.Errorf("knowledge base not found: %s", req.KBID) + } + + // vector_size is required + vecSize := req.VectorSize + if vecSize <= 0 { + return nil, common.CodeDataError, fmt.Errorf("vector_size must be positive") + } + + // Build index name prefix: ragflow_ + indexName := fmt.Sprintf("ragflow_%s", kb.TenantID) + + // Call document engine to create index + // Full table name will be built as "{indexName}_{kb_id}" + err = s.docEngine.CreateIndex(context.Background(), indexName, req.KBID, vecSize, req.ParserID) + if err != nil { + return nil, common.CodeServerError, fmt.Errorf("failed to create index: %w", err) + } + + return &CreateIndexResponse{ + KBID: req.KBID, + IndexName: indexName, + VectorSize: vecSize, + }, common.CodeSuccess, nil +} + +// DeleteIndex deletes the index in the document engine for a knowledge base +func (s *KnowledgebaseService) DeleteIndex(kbID string) (common.ErrorCode, error) { + // Get KB to find tenant_id for building index name + kb, err := s.kbDAO.GetByID(kbID) + if err != nil { + return common.CodeDataError, fmt.Errorf("knowledge base not found: %s", kbID) + } + + // Build index name: ragflow__ + indexName := fmt.Sprintf("ragflow_%s_%s", kb.TenantID, kbID) + + // Call document engine to delete index + err = s.docEngine.DeleteIndex(context.Background(), indexName) + if err != nil { + return common.CodeServerError, fmt.Errorf("failed to delete index: %w", err) + } + + return common.CodeSuccess, nil +} + // UpdateKB updates an existing knowledge base // This matches the Python update endpoint in kb_app.py func (s *KnowledgebaseService) UpdateKB(req *UpdateKBRequest, userID string) (map[string]interface{}, common.ErrorCode, error) { diff --git a/internal/service/tenant.go b/internal/service/tenant.go index 5a024b36c4..e4c3858561 100644 --- a/internal/service/tenant.go +++ b/internal/service/tenant.go @@ -17,15 +17,20 @@ package service import ( + "context" + "fmt" "time" + "ragflow/internal/common" "ragflow/internal/dao" + "ragflow/internal/engine" ) // TenantService tenant service type TenantService struct { tenantDAO *dao.TenantDAO userTenantDAO *dao.UserTenantDAO + docEngine engine.DocEngine } // NewTenantService create tenant service @@ -33,6 +38,7 @@ func NewTenantService() *TenantService { return &TenantService{ tenantDAO: dao.NewTenantDAO(), userTenantDAO: dao.NewUserTenantDAO(), + docEngine: engine.Get(), } } @@ -118,3 +124,31 @@ func (s *TenantService) GetTenantList(userID string) ([]*TenantListItem, error) return result, nil } + +// CreateDocMetaIndex creates the document metadata index for a tenant +func (s *TenantService) CreateDocMetaIndex(tenantID string) (common.ErrorCode, error) { + // Build index name: ragflow_doc_meta_ + indexName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) + + // Call document engine to create doc meta index + err := s.docEngine.CreateDocMetaIndex(context.Background(), indexName) + if err != nil { + return common.CodeServerError, fmt.Errorf("failed to create doc meta index: %w", err) + } + + return common.CodeSuccess, nil +} + +// DeleteDocMetaIndex deletes the document metadata index for a tenant +func (s *TenantService) DeleteDocMetaIndex(tenantID string) (common.ErrorCode, error) { + // Build index name: ragflow_doc_meta_ + indexName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) + + // Call document engine to delete doc meta index + err := s.docEngine.DeleteIndex(context.Background(), indexName) + if err != nil { + return common.CodeServerError, fmt.Errorf("failed to delete doc meta index: %w", err) + } + + return common.CodeSuccess, nil +} diff --git a/internal/utility/path.go b/internal/utility/path.go new file mode 100644 index 0000000000..fdeb68c8e5 --- /dev/null +++ b/internal/utility/path.go @@ -0,0 +1,46 @@ +/* +Copyright 2026 The InfiniFlow Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utility + +import ( + "os" + "path/filepath" + "runtime" +) + +// GetProjectRoot returns the project root directory by finding go.mod marker +func GetProjectRoot() string { + // Try environment variable first + if confDir := os.Getenv("RAGFLOW_CONF_DIR"); confDir != "" { + return confDir + } + + // Find project root by looking for go.mod + _, curFile, _, _ := runtime.Caller(0) + dir := filepath.Dir(curFile) + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + parent := filepath.Dir(dir) + if parent == dir { + // Reached filesystem root, fallback to hardcoded path + return filepath.Dir(filepath.Dir(filepath.Dir(filepath.Dir(curFile)))) + } + dir = parent + } +}