diff --git a/admin/client/parser.py b/admin/client/parser.py index a096de7e90..69d44f14e7 100644 --- a/admin/client/parser.py +++ b/admin/client/parser.py @@ -97,6 +97,8 @@ sql_command: login_user | search_on_datasets | get_chunk | list_chunks + | insert_dataset_from_file + | insert_metadata_from_file | create_chat_session | drop_chat_session | list_chat_sessions @@ -207,10 +209,12 @@ DOC_META: "DOC_META"i CHUNK: "CHUNK"i CHUNKS: "CHUNKS"i GET: "GET"i +INSERT: "INSERT"i PAGE: "PAGE"i SIZE: "SIZE"i KEYWORDS: "KEYWORDS"i AVAILABLE: "AVAILABLE"i +FILE: "FILE"i login_user: LOGIN USER quoted_string ";" list_services: LIST SERVICES ";" @@ -349,6 +353,10 @@ parse_dataset_docs: PARSE quoted_string OF DATASET quoted_string ";" parse_dataset_sync: PARSE DATASET quoted_string SYNC ";" parse_dataset_async: PARSE DATASET quoted_string ASYNC ";" +// Internal CLI for GO +insert_dataset_from_file: INSERT DATASET FROM FILE quoted_string ";" +insert_metadata_from_file: INSERT METADATA FROM FILE quoted_string ";" + identifier_list: identifier ("," identifier)* identifier: WORD @@ -750,6 +758,14 @@ class RAGFlowCLITransformer(Transformer): chunk_id = items[2].children[0].strip("'\"") return {"type": "get_chunk", "chunk_id": chunk_id} + def insert_dataset_from_file(self, items): + file_path = items[4].children[0].strip("'\"") + return {"type": "insert_dataset_from_file", "file_path": file_path} + + def insert_metadata_from_file(self, items): + file_path = items[4].children[0].strip("'\"") + return {"type": "insert_metadata_from_file", "file_path": file_path} + def list_chunks(self, items): doc_id = items[4].children[0].strip("'\"") result = {"type": "list_chunks", "doc_id": doc_id} diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index 03d9b8dedd..cae5f2e327 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -1520,6 +1520,48 @@ class RAGFlowClient: else: print(f"Fail to get chunk, code: {res_json['code']}, message: {res_json['message']}") + # Internal + def insert_dataset_from_file(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + + file_path = command_dict["file_path"] + payload = {"file_path": file_path} + response = self.http_client.request("POST", "/kb/insert_from_file", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200: + if res_json["code"] == 0: + print(f"Success to insert dataset from file: {file_path}") + if res_json.get("data"): + self._print_key_value(res_json["data"]) + else: + print(f"Fail to insert dataset from file, code: {res_json['code']}, message: {res_json['message']}") + else: + print(f"Fail to insert dataset from file, code: {res_json['code']}, message: {res_json['message']}") + + # Internal + def insert_metadata_from_file(self, command_dict): + if self.server_type != "user": + print("This command is only allowed in USER mode") + return + + file_path = command_dict["file_path"] + payload = {"file_path": file_path} + response = self.http_client.request("POST", "/tenant/insert_metadata_from_file", json_body=payload, + use_api_base=False, auth_kind="web") + res_json = response.json() + if response.status_code == 200: + if res_json["code"] == 0: + print(f"Success to insert metadata from file: {file_path}") + if res_json.get("data"): + self._print_key_value(res_json["data"]) + else: + print(f"Fail to insert metadata from file, code: {res_json['code']}, message: {res_json['message']}") + else: + print(f"Fail to insert metadata from file, code: {res_json['code']}, message: {res_json['message']}") + def list_chunks(self, command_dict): if self.server_type != "user": print("This command is only allowed in USER mode") @@ -1903,6 +1945,10 @@ def run_command(client: RAGFlowClient, command_dict: dict): return client.search_on_datasets(command_dict) case "get_chunk": return client.get_chunk(command_dict) + case "insert_dataset_from_file": + return client.insert_dataset_from_file(command_dict) + case "insert_metadata_from_file": + return client.insert_metadata_from_file(command_dict) case "list_chunks": return client.list_chunks(command_dict) case "meta": diff --git a/go.mod b/go.mod index 1f28e819a9..9f06faffc6 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,8 @@ require ( github.com/spf13/viper v1.18.2 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.47.0 + golang.org/x/term v0.41.0 + gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.5.2 gorm.io/gorm v1.25.5 ) @@ -98,12 +100,10 @@ require ( golang.org/x/exp v0.0.0-20231226003508-02704c960a9b // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/term v0.41.0 // indirect golang.org/x/text v0.33.0 // indirect google.golang.org/protobuf v1.32.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/ini.v1 v1.67.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/infiniflow/infinity-go-sdk => github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843 +replace github.com/infiniflow/infinity-go-sdk => github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364 diff --git a/go.sum b/go.sum index b2d5571faa..fe150a81b9 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843 h1:s5g1APIXv4c6hwVL4+DwT5JDvRpegZpxxh2ltZzgeGE= -github.com/infiniflow/infinity/go v0.0.0-20260317024756-4aff48d0d843/go.mod h1:hw3z5AwNFsGy1cdrE0Mfjot2y9jqVHTxBufUx9VzZ+0= +github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364 h1:0v5TjSirmCAUX3oaIV8Rd9d5B+kHPdymveETUU8OcC0= +github.com/infiniflow/infinity/go v0.0.0-20260331112649-9bcd52a3d364/go.mod h1:hw3z5AwNFsGy1cdrE0Mfjot2y9jqVHTxBufUx9VzZ+0= github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a h1:Inib12UR9HAfBubrGNraPjKt/Cu8xPbTJbC50+0wP5U= github.com/iromli/go-itsdangerous v0.0.0-20220223194502-9c8bef8dac6a/go.mod h1:8N0Hlye5Lzw+H/yHWpZMkT0QLA+iOHG7KLdvAm95DZg= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -226,8 +226,6 @@ golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= diff --git a/internal/cli/client.go b/internal/cli/client.go index 28c0dc324f..e609fb8000 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -223,6 +223,10 @@ func (c *RAGFlowClient) ExecuteUserCommand(cmd *Command) (ResponseIf, error) { return c.CEList(cmd) case "ce_search": return c.CESearch(cmd) + case "insert_dataset_from_file": + return c.InsertDatasetFromFile(cmd) + case "insert_metadata_from_file": + return c.InsertMetadataFromFile(cmd) // TODO: Implement other commands default: return nil, fmt.Errorf("command '%s' would be executed with API", cmd.Type) diff --git a/internal/cli/lexer.go b/internal/cli/lexer.go index 8add64d6fd..bfb97de9e6 100644 --- a/internal/cli/lexer.go +++ b/internal/cli/lexer.go @@ -305,6 +305,14 @@ func (l *Lexer) lookupIdent(ident string) Token { return Token{Type: TokenAvailable, Value: ident} case "NAME": return Token{Type: TokenName, Value: ident} + case "POOL": + return Token{Type: TokenPool, Value: ident} + case "INSERT": + return Token{Type: TokenInsert, Value: ident} + case "FILE": + return Token{Type: TokenFile, Value: ident} + case "METADATA": + return Token{Type: TokenMetadata, Value: ident} default: return Token{Type: TokenIdentifier, Value: ident} } diff --git a/internal/cli/parser.go b/internal/cli/parser.go index 769c0b6c98..98088e782f 100644 --- a/internal/cli/parser.go +++ b/internal/cli/parser.go @@ -166,6 +166,8 @@ func (p *Parser) parseUserCommand() (*Command, error) { return p.parseGenerateCommand() case TokenImport: return p.parseImportCommand() + case TokenInsert: + return p.parseInsertCommand() case TokenSearch: return p.parseSearchCommand() case TokenParse: @@ -217,7 +219,7 @@ func (p *Parser) expectSemicolon() error { } func isKeyword(tokenType int) bool { - return tokenType >= TokenLogin && tokenType <= TokenDocMeta + return tokenType >= TokenLogin && tokenType <= TokenMetadata } // isCECommand checks if the given string is a ContextEngine command diff --git a/internal/cli/types.go b/internal/cli/types.go index 9726f5e8b3..b3bd2e6c12 100644 --- a/internal/cli/types.go +++ b/internal/cli/types.go @@ -104,6 +104,9 @@ const ( TokenVectorSize TokenDocMeta TokenName // For ALTER PROVIDER NAME + TokenInsert + TokenFile + TokenMetadata // Literals TokenIdentifier diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 4614067960..b5bef94b3c 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -941,3 +941,93 @@ func (c *RAGFlowClient) CESearch(cmd *Command) (ResponseIf, error) { return &response, nil } + +// InsertDatasetFromFile inserts dataset chunks from a JSON file +func (c *RAGFlowClient) InsertDatasetFromFile(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + filePath, ok := cmd.Params["file_path"].(string) + if !ok { + return nil, fmt.Errorf("file_path not provided") + } + + payload := map[string]interface{}{ + "file_path": filePath, + } + + resp, err := c.HTTPClient.Request("POST", "/kb/insert_from_file", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to insert dataset from file: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to insert dataset from file: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to insert dataset from file: %s", filePath) + } else { + result.Message = fmt.Sprintf("Failed to insert dataset from file: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} + +// InsertMetadataFromFile inserts metadata from a JSON file +func (c *RAGFlowClient) InsertMetadataFromFile(cmd *Command) (ResponseIf, error) { + if c.ServerType != "user" { + return nil, fmt.Errorf("this command is only allowed in USER mode") + } + + filePath, ok := cmd.Params["file_path"].(string) + if !ok { + return nil, fmt.Errorf("file_path not provided") + } + + payload := map[string]interface{}{ + "file_path": filePath, + } + + resp, err := c.HTTPClient.Request("POST", "/tenant/insert_metadata_from_file", false, "web", nil, payload) + if err != nil { + return nil, fmt.Errorf("failed to insert metadata from file: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to insert metadata from file: HTTP %d, body: %s", resp.StatusCode, string(resp.Body)) + } + + resJSON, err := resp.JSON() + if err != nil { + return nil, fmt.Errorf("invalid JSON response: %w", err) + } + + code, ok := resJSON["code"].(float64) + if !ok { + return nil, fmt.Errorf("invalid response format: code is not a number") + } + + var result SimpleResponse + result.Code = int(code) + if result.Code == 0 { + result.Message = fmt.Sprintf("Success to insert metadata from file: %s", filePath) + } else { + result.Message = fmt.Sprintf("Failed to insert metadata from file: %v", resJSON) + } + result.Duration = 0 + return &result, nil +} diff --git a/internal/cli/user_parser.go b/internal/cli/user_parser.go index 05456818bc..6411c8e5f7 100644 --- a/internal/cli/user_parser.go +++ b/internal/cli/user_parser.go @@ -32,6 +32,21 @@ func (p *Parser) parseLoginUser() (*Command, error) { cmd.Params["email"] = email p.nextToken() + // Optional: WITH PASSWORD 'password' + if p.curToken.Type == TokenWith { + p.nextToken() + if p.curToken.Type != TokenPassword { + return nil, fmt.Errorf("expected PASSWORD after WITH") + } + p.nextToken() + password, err := p.parseQuotedString() + if err != nil { + return nil, err + } + cmd.Params["password"] = password + p.nextToken() + } + // Semicolon is optional for UNSET TOKEN if p.curToken.Type == TokenSemicolon { p.nextToken() @@ -1531,6 +1546,88 @@ func (p *Parser) parseImportCommand() (*Command, error) { return cmd, nil } +// parseInsertCommand parses INSERT command and dispatches to specific handler +func (p *Parser) parseInsertCommand() (*Command, error) { + p.nextToken() // consume INSERT + + // Expect DATASET or METADATA + if p.curToken.Type == TokenDataset { + return p.parseInsertDatasetFromFile() + } + if p.curToken.Type == TokenMetadata { + return p.parseInsertMetadataFromFile() + } + return nil, fmt.Errorf("expected DATASET or METADATA after INSERT, got %s", p.curToken.Value) +} + +// Internal CLI for GO +// parseInsertDatasetFromFile parses: INSERT DATASET FROM FILE "file_path" +func (p *Parser) parseInsertDatasetFromFile() (*Command, error) { + p.nextToken() // consume DATASET + + // Expect FROM + if p.curToken.Type != TokenFrom { + return nil, fmt.Errorf("expected FROM, got %s", p.curToken.Value) + } + p.nextToken() + + // Expect FILE + if p.curToken.Type != TokenFile { + return nil, fmt.Errorf("expected FILE, got %s", p.curToken.Value) + } + p.nextToken() + + // Get file path (quoted string) + filePath, err := p.parseQuotedString() + if err != nil { + return nil, err + } + + cmd := NewCommand("insert_dataset_from_file") + cmd.Params["file_path"] = filePath + + p.nextToken() + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + return cmd, nil +} + +// Internal CLI for GO +// parseInsertMetadataFromFile parses: INSERT INTO METADATA FROM FILE "file_path" +func (p *Parser) parseInsertMetadataFromFile() (*Command, error) { + p.nextToken() // consume METADATA + + // Expect FROM + if p.curToken.Type != TokenFrom { + return nil, fmt.Errorf("expected FROM, got %s", p.curToken.Value) + } + p.nextToken() + + // Expect FILE + if p.curToken.Type != TokenFile { + return nil, fmt.Errorf("expected FILE, got %s", p.curToken.Value) + } + p.nextToken() + + // Get file path (quoted string) + filePath, err := p.parseQuotedString() + if err != nil { + return nil, err + } + + cmd := NewCommand("insert_metadata_from_file") + cmd.Params["file_path"] = filePath + + p.nextToken() + // Semicolon is optional + if p.curToken.Type == TokenSemicolon { + p.nextToken() + } + return cmd, nil +} + func (p *Parser) parseSearchCommand() (*Command, error) { p.nextToken() // consume SEARCH question, err := p.parseQuotedString() @@ -1687,6 +1784,8 @@ func (p *Parser) parseUserStatement() (*Command, error) { return p.parseParseCommand() case TokenImport: return p.parseImportCommand() + case TokenInsert: + return p.parseInsertCommand() case TokenSearch: return p.parseSearchCommand() default: diff --git a/internal/engine/elasticsearch/index.go b/internal/engine/elasticsearch/index.go index 6834b583da..028a587506 100644 --- a/internal/engine/elasticsearch/index.go +++ b/internal/engine/elasticsearch/index.go @@ -151,3 +151,15 @@ func (e *elasticsearchEngine) CreateDocMetaIndex(ctx context.Context, indexName // TODO return nil } + +// InsertDataset inserts documents into a dataset index +func (e *elasticsearchEngine) InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error) { + // TODO + return []string{}, nil +} + +// InsertMetadata inserts documents into tenant's metadata index +func (e *elasticsearchEngine) InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) { + // TODO + return []string{}, nil +} diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 722dd6b4ff..07effc5792 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -46,6 +46,10 @@ type DocEngine interface { DeleteIndex(ctx context.Context, indexName string) error IndexExists(ctx context.Context, indexName string) (bool, error) + // Insert operations + InsertDataset(ctx context.Context, documents []map[string]interface{}, indexName string, knowledgebaseID string) ([]string, error) + InsertMetadata(ctx context.Context, documents []map[string]interface{}, tenantID string) ([]string, error) + // Document operations IndexDocument(ctx context.Context, indexName, docID string, doc interface{}) error BulkIndex(ctx context.Context, indexName string, docs []interface{}) (interface{}, error) diff --git a/internal/engine/infinity/index.go b/internal/engine/infinity/index.go index 32d9068da0..4008f7f192 100644 --- a/internal/engine/infinity/index.go +++ b/internal/engine/infinity/index.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "regexp" + "strconv" "strings" infinity "github.com/infiniflow/infinity-go-sdk" @@ -344,3 +345,321 @@ func (e *infinityEngine) CreateDocMetaIndex(ctx context.Context, indexName strin return nil } + +// InsertDataset inserts chunks into a dataset table +// Table name format: {tableNamePrefix}_{knowledgebaseID} +// Auto-create the table if it doesn't exist +// Transform chunks before insert: +// - docnm_kwd -> docnm +// - title_kwd/title_sm_tks -> docnm (if docnm_kwd not set) +// - content_with_weight/content_ltks/content_sm_ltks -> content +// - important_kwd -> important_keywords (+ important_kwd_empty_count) +// - question_kwd -> questions (joined with \n) +// - kb_id: list -> str (first element) +// - position_int: list -> hex_joined string +// - chunk_data: dict -> JSON string +// - meta_fields: dict -> JSON string +// - *_feas fields -> JSON string +// - keyword fields with list values -> ### joined string +// - Missing embeddings filled with zeros +// Delete existing rows with matching IDs before insert +func (e *infinityEngine) InsertDataset(ctx context.Context, chunks []map[string]interface{}, tableNamePrefix string, knowledgebaseID string) ([]string, error) { + tableName := fmt.Sprintf("%s_%s", tableNamePrefix, knowledgebaseID) + logger.Info("InfinityConnection.InsertDataset called", zap.String("tableName", tableName), zap.Int("chunkCount", len(chunks))) + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return nil, fmt.Errorf("Failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + // Table doesn't exist, try to create it + errMsg := strings.ToLower(err.Error()) + if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") { + return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err) + } + + // Infer vector size from chunks + vectorSize := 0 + vectorPattern := regexp.MustCompile(`q_(\d+)_vec`) + for _, chunk := range chunks { + for key := range chunk { + matches := vectorPattern.FindStringSubmatch(key) + if len(matches) >= 2 { + vectorSize, _ = strconv.Atoi(matches[1]) + break + } + } + if vectorSize > 0 { + break + } + } + if vectorSize == 0 { + return nil, fmt.Errorf("cannot infer vector size from chunks") + } + + // Determine parser_id from chunk structure + parserID := "" + if chunkData, ok := chunks[0]["chunk_data"].(map[string]interface{}); ok && chunkData != nil { + parserID = "table" + } + + // Create table + if err := e.CreateIndex(ctx, tableNamePrefix, knowledgebaseID, vectorSize, parserID); err != nil { + return nil, fmt.Errorf("Failed to create table: %w", err) + } + + table, err = db.GetTable(tableName) + if err != nil { + return nil, fmt.Errorf("Failed to get table after creation: %w", err) + } + } + + // Get embedding columns and their sizes + var embeddingCols [][2]interface{} + colsResp, err := table.ShowColumns() + if err != nil { + return nil, fmt.Errorf("Failed to get columns: %w", err) + } + result, ok := colsResp.(*infinity.QueryResult) + if !ok { + return nil, fmt.Errorf("unexpected response type: %T", colsResp) + } + + // ShowColumns returns a result set where Data contains arrays of column values + re := regexp.MustCompile(`Embedding\([a-z]+,(\d+)\)`) + if nameArr, ok := result.Data["name"]; ok { + if typeArr, ok := result.Data["type"]; ok { + for i := 0; i < len(nameArr); i++ { + colName, _ := nameArr[i].(string) + colType, _ := typeArr[i].(string) + matches := re.FindStringSubmatch(colType) + if len(matches) >= 2 { + size, _ := strconv.Atoi(matches[1]) + embeddingCols = append(embeddingCols, [2]interface{}{colName, size}) + } + } + } + } + + // Transform chunks + insertChunks := make([]map[string]interface{}, len(chunks)) + for i, chunk := range chunks { + d := make(map[string]interface{}) + + for k, v := range chunk { + switch k { + case "docnm_kwd": + d["docnm"] = v + case "title_kwd": + if _, exists := chunk["docnm_kwd"]; !exists { + d["docnm"] = utility.ConvertToString(v) + } + case "title_sm_tks": + if _, exists := chunk["docnm_kwd"]; !exists { + d["docnm"] = utility.ConvertToString(v) + } + case "important_kwd": + if list, ok := v.([]interface{}); ok { + emptyCount := 0 + tokens := make([]string, 0) + for _, item := range list { + if str, ok := item.(string); ok { + if str == "" { + emptyCount++ + } else { + tokens = append(tokens, str) + } + } + } + d["important_keywords"] = strings.Join(tokens, ",") + d["important_kwd_empty_count"] = emptyCount + } else { + d["important_keywords"] = utility.ConvertToString(v) + } + case "important_tks": + if _, exists := chunk["important_kwd"]; !exists { + d["important_keywords"] = v + } + case "content_with_weight": + d["content"] = v + case "content_ltks": + if _, exists := chunk["content_with_weight"]; !exists { + d["content"] = v + } + case "content_sm_ltks": + if _, exists := chunk["content_with_weight"]; !exists { + d["content"] = v + } + case "authors_tks": + d["authors"] = v + case "authors_sm_tks": + if _, exists := chunk["authors_tks"]; !exists { + d["authors"] = v + } + case "question_kwd": + d["questions"] = strings.Join(utility.ConvertToStringSlice(v), "\n") + case "question_tks": + if _, exists := chunk["question_kwd"]; !exists { + d["questions"] = utility.ConvertToString(v) + } + case "kb_id": + if list, ok := v.([]interface{}); ok && len(list) > 0 { + d["kb_id"] = list[0] + } else { + d["kb_id"] = v + } + case "position_int": + if list, ok := v.([]interface{}); ok { + d["position_int"] = utility.ConvertPositionIntArrayToHex(list) + } else { + d["position_int"] = v + } + case "page_num_int", "top_int": + if list, ok := v.([]interface{}); ok { + d[k] = utility.ConvertIntArrayToHex(list) + } else { + d[k] = v + } + case "chunk_data": + d["chunk_data"] = utility.ConvertMapToJSONString(v) + default: + // Check for *_feas fields + if strings.HasSuffix(k, "_feas") { + jsonBytes, _ := json.Marshal(v) + d[k] = string(jsonBytes) + } else if fieldKeyword(k) { + // keyword fields with list values -> ### joined + if list, ok := v.([]interface{}); ok { + d[k] = strings.Join(utility.ConvertToStringSlice(list), "###") + } else { + d[k] = v + } + } else { + d[k] = v + } + } + } + + // Remove intermediate token fields + for _, key := range []string{"docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", + "content_with_weight", "content_ltks", "content_sm_ltks", "authors_tks", "authors_sm_tks", + "question_kwd", "question_tks"} { + delete(d, key) + } + + // Fill missing embedding columns with zeros (raw slice, matching Python SDK) + for _, ec := range embeddingCols { + name, size := ec[0].(string), ec[1].(int) + if _, exists := d[name]; !exists { + zeros := make([]float64, size) + for i := range zeros { + zeros[i] = 0 + } + d[name] = zeros + } + } + + insertChunks[i] = d + } + + // Delete existing rows with matching IDs + if len(insertChunks) > 0 { + idList := make([]string, len(insertChunks)) + for i, chunk := range insertChunks { + idList[i] = fmt.Sprintf("'%v'", chunk["id"]) + } + filter := fmt.Sprintf("id IN (%s)", strings.Join(idList, ", ")) + logger.Debug(fmt.Sprintf("Deleting existing rows with filter: %s", filter)) + delResp, delErr := table.Delete(filter) + if delErr != nil { + logger.Warn(fmt.Sprintf("Failed to delete existing rows: %v", delErr)) + } else { + logger.Info(fmt.Sprintf("Deleted %d existing rows", delResp.DeletedRows)) + } + } + + // Insert chunks to dataset + _, err = table.Insert(insertChunks) + if err != nil { + return nil, fmt.Errorf("Failed to insert chunks to dataset: %w", err) + } + + logger.Info("InfinityConnection.InsertDataset result", zap.String("tableName", tableName), zap.Int("count", len(insertChunks))) + return []string{}, nil +} + +// InsertMetadata inserts document metadata into tenant's metadata table +// Table name format: ragflow_doc_meta_{tenant_id} +// Auto-create the table if it doesn't exist +// Replace existing metadata with same id and kb_id +func (e *infinityEngine) InsertMetadata(ctx context.Context, metadata []map[string]interface{}, tenantID string) ([]string, error) { + tableName := fmt.Sprintf("ragflow_doc_meta_%s", tenantID) + logger.Info("InfinityConnection.InsertMetadata called", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) + + db, err := e.client.conn.GetDatabase(e.client.dbName) + if err != nil { + return nil, fmt.Errorf("Failed to get database: %w", err) + } + + table, err := db.GetTable(tableName) + if err != nil { + // Table doesn't exist, try to create it + errMsg := strings.ToLower(err.Error()) + if !strings.Contains(errMsg, "not found") && !strings.Contains(errMsg, "doesn't exist") { + return nil, fmt.Errorf("Failed to get table %s: %w", tableName, err) + } + + // Create metadata table + if createErr := e.CreateDocMetaIndex(ctx, tableName); createErr != nil { + return nil, fmt.Errorf("Failed to create metadata table: %w", createErr) + } + + table, err = db.GetTable(tableName) + if err != nil { + return nil, fmt.Errorf("Failed to get table after creation: %w", err) + } + } + + // Transform metadata - convert meta_fields map to JSON string + insertMetadata := make([]map[string]interface{}, len(metadata)) + for i, m := range metadata { + d := make(map[string]interface{}) + for k, v := range m { + if k == "meta_fields" { + d["meta_fields"] = utility.ConvertMapToJSONString(v) + } else { + d[k] = v + } + } + insertMetadata[i] = d + } + + // Delete existing metadata with same id and kb_id, then insert new + if len(insertMetadata) > 0 { + idList := make([]string, len(insertMetadata)) + for i, m := range insertMetadata { + docID := fmt.Sprintf("'%v'", m["id"]) + kbID := fmt.Sprintf("'%v'", m["kb_id"]) + idList[i] = fmt.Sprintf("(id = %s AND kb_id = %s)", docID, kbID) + } + filter := strings.Join(idList, " OR ") + logger.Debug(fmt.Sprintf("Deleting existing metadata with filter: %s", filter)) + delResp, delErr := table.Delete(filter) + if delErr != nil { + logger.Warn(fmt.Sprintf("Failed to delete existing metadata: %v", delErr)) + } else if delResp.DeletedRows > 0 { + logger.Info(fmt.Sprintf("Deleted %d existing metadata entries", delResp.DeletedRows)) + } + } + + // Insert metadata + _, err = table.Insert(insertMetadata) + if err != nil { + return nil, fmt.Errorf("Failed to insert metadata: %w", err) + } + + logger.Info("InfinityConnection.InsertMetadata result", zap.String("tableName", tableName), zap.Int("metaCount", len(metadata))) + return []string{}, nil +} diff --git a/internal/handler/kb.go b/internal/handler/kb.go index 765102ed18..e9d347933c 100644 --- a/internal/handler/kb.go +++ b/internal/handler/kb.go @@ -17,8 +17,11 @@ package handler import ( + "encoding/json" "net/http" + "os" "ragflow/internal/common" + "ragflow/internal/engine" "ragflow/internal/service" "strconv" "strings" @@ -703,3 +706,92 @@ func (h *KnowledgebaseHandler) DeleteIndex(c *gin.Context) { jsonResponse(c, common.CodeSuccess, nil, "success") } + +// InsertDatasetFromFileRequest request for inserting chunks into dataset from file +type InsertDatasetFromFileRequest struct { + FilePath string `json:"file_path" binding:"required"` +} + +// @Summary Insert chunks into dataset from file +// @Description Internal: Insert into dataset table from a JSON file (table name extracted from file) +// @Tags knowledgebase +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body InsertDatasetFromFileRequest true "insert dataset request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/kb/insert_from_file [post] +func (h *KnowledgebaseHandler) InsertDatasetFromFile(c *gin.Context) { + _, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req InsertDatasetFromFileRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": err.Error(), + }) + return + } + + if req.FilePath == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "file_path is required", + }) + return + } + + // Read the JSON file + data, err := os.ReadFile(req.FilePath) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "failed to read file: " + err.Error(), + }) + return + } + + // Parse JSON - format: {"table_name": ..., "knowledgebase_id": ..., "chunks": [...]} + var debugFormat struct { + TableNamePrefix string `json:"table_name"` + KnowledgebaseID string `json:"knowledgebase_id"` + Chunks []map[string]interface{} `json:"chunks"` + } + + if err := json.Unmarshal(data, &debugFormat); err != nil || debugFormat.Chunks == nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "invalid JSON format: expected {\"table_name\": ..., \"knowledgebase_id\": ..., \"chunks\": [...]}", + }) + return + } + + if len(debugFormat.Chunks) == 0 { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "no chunks found in file", + }) + return + } + + // Get the document engine and insert + docEngine := engine.Get() + result, err := docEngine.InsertDataset(c.Request.Context(), debugFormat.Chunks, debugFormat.TableNamePrefix, debugFormat.KnowledgebaseID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 500, + "message": "failed to insert into dataset: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": result, + "message": "success", + }) +} \ No newline at end of file diff --git a/internal/handler/tenant.go b/internal/handler/tenant.go index 80503c7a19..1060aabc84 100644 --- a/internal/handler/tenant.go +++ b/internal/handler/tenant.go @@ -17,11 +17,14 @@ package handler import ( + "encoding/json" "net/http" + "os" "github.com/gin-gonic/gin" "ragflow/internal/common" + "ragflow/internal/engine" "ragflow/internal/service" ) @@ -177,3 +180,93 @@ func (h *TenantHandler) DeleteDocMetaIndex(c *gin.Context) { "data": nil, }) } + +// InsertMetadataFromFileRequest request for inserting metadata from file +type InsertMetadataFromFileRequest struct { + FilePath string `json:"file_path" binding:"required"` +} + +// @Summary Insert document metadata from JSON file +// @Description Internal: Insert metadata into tenant's metadata table from a JSON file +// @Tags tenants +// @Accept json +// @Produce json +// @Security ApiKeyAuth +// @Param request body InsertMetadataFromFileRequest true "insert metadata request" +// @Success 200 {object} map[string]interface{} +// @Router /v1/tenant/insert_metadata_from_file [post] +func (h *TenantHandler) InsertMetadataFromFile(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req InsertMetadataFromFileRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": err.Error(), + }) + return + } + + if req.FilePath == "" { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "file_path is required", + }) + return + } + + // Read the JSON file + data, err := os.ReadFile(req.FilePath) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "failed to read file: " + err.Error(), + }) + return + } + + // Parse JSON - format: {"chunks": [...]} + var inputFormat struct { + Chunks []map[string]interface{} `json:"chunks"` + } + + if err := json.Unmarshal(data, &inputFormat); err != nil || inputFormat.Chunks == nil { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "invalid JSON format: expected {\"chunks\": [...]}", + }) + return + } + + if len(inputFormat.Chunks) == 0 { + c.JSON(http.StatusBadRequest, gin.H{ + "code": 400, + "message": "no chunks found in file", + }) + return + } + + // Use user.ID as tenant ID (user IS the tenant in user mode) + tenantID := user.ID + + // Get the document engine and insert + docEngine := engine.Get() + result, err := docEngine.InsertMetadata(c.Request.Context(), inputFormat.Chunks, tenantID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "code": 500, + "message": "failed to insert metadata: " + err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "data": result, + "message": "success", + }) +} diff --git a/internal/router/router.go b/internal/router/router.go index ea9305b261..3aecefe765 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -213,6 +213,7 @@ func (r *Router) Setup(engine *gin.Engine) { kb.GET("/basic_info", r.knowledgebaseHandler.GetBasicInfo) kb.POST("/index", r.knowledgebaseHandler.CreateIndex) kb.DELETE("/index", r.knowledgebaseHandler.DeleteIndex) + kb.POST("/insert_from_file", r.knowledgebaseHandler.InsertDatasetFromFile) // KB ID specific routes kbByID := kb.Group("/:kb_id") @@ -230,6 +231,7 @@ func (r *Router) Setup(engine *gin.Engine) { { tenant.POST("/doc_meta_index", r.tenantHandler.CreateDocMetaIndex) tenant.DELETE("/doc_meta_index", r.tenantHandler.DeleteDocMetaIndex) + tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) } // Document routes diff --git a/internal/utility/convert.go b/internal/utility/convert.go index 87c52dc35f..5d88969d18 100644 --- a/internal/utility/convert.go +++ b/internal/utility/convert.go @@ -17,6 +17,7 @@ package utility import ( + "encoding/json" "fmt" "os" "strconv" @@ -142,6 +143,26 @@ func ConvertHexToPositionIntArray(hexStr string) interface{} { return result } +// ConvertPositionIntArrayToHex converts position_int list (2D) to hex string +// e.g. [[1,2],[3,4]] -> "0000000100000002_0000000300000004" +func ConvertPositionIntArrayToHex(list []interface{}) string { + var hexParts []string + for _, item := range list { + if inner, ok := item.([]interface{}); ok { + for _, num := range inner { + if n, ok := num.(float64); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", int64(n))) + } else if n, ok := num.(int64); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", n)) + } else if n, ok := num.(int); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", n)) + } + } + } + } + return strings.Join(hexParts, "_") +} + // ConvertHexToIntArray converts hex string to int array (split by "_") func ConvertHexToIntArray(hexStr string) interface{} { if hexStr == "" { @@ -167,6 +188,22 @@ func ConvertHexToIntArray(hexStr string) interface{} { return result } +// ConvertIntArrayToHex converts int array to hex string +// e.g. [1, 2] -> "00000001_00000002" +func ConvertIntArrayToHex(list []interface{}) string { + var hexParts []string + for _, num := range list { + if n, ok := num.(float64); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", int64(n))) + } else if n, ok := num.(int64); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", n)) + } else if n, ok := num.(int); ok { + hexParts = append(hexParts, fmt.Sprintf("%08x", n)) + } + } + return strings.Join(hexParts, "_") +} + // IsEmpty checks if value is empty (nil, empty array, or empty string) func IsEmpty(v interface{}) bool { if v == nil { @@ -217,3 +254,71 @@ func ToFloat64(val interface{}) (float64, bool) { return 0, false } } + +// ConvertToStringSlice converts an interface{} to []string +// e.g. []interface{}{"a", "b", "c"} -> []string{"a", "b", "c"} +// e.g. "hello" -> []string{"hello"} +func ConvertToStringSlice(v interface{}) []string { + if v == nil { + return nil + } + switch val := v.(type) { + case []interface{}: + result := make([]string, 0, len(val)) + for _, item := range val { + if s, ok := item.(string); ok { + result = append(result, s) + } else { + result = append(result, fmt.Sprintf("%v", item)) + } + } + return result + case []string: + return val + case string: + return []string{val} + default: + return nil + } +} + +// ConvertToString converts an interface{} to space-separated string +// For []interface{}, joins elements with space; for other types, returns string representation +// e.g. []interface{}{"a", "b", "c"} -> "a b c" +// e.g. "hello" -> "hello" +func ConvertToString(v interface{}) string { + if v == nil { + return "" + } + switch val := v.(type) { + case []interface{}: + parts := make([]string, 0, len(val)) + for _, item := range val { + if s, ok := item.(string); ok { + parts = append(parts, s) + } else { + parts = append(parts, fmt.Sprintf("%v", item)) + } + } + return strings.Join(parts, " ") + default: + return fmt.Sprintf("%v", v) + } +} + +// ConvertMapToJSONString converts a map to JSON string for Infinity JSON columns +// If v is a map[string]interface{}, marshals it to JSON string +// If v is nil, returns "{}" +// Otherwise returns v as-is +// +// e.g. map[string]interface{}{"key": "value"}) -> `"{\"key\":\"value\"}"` +func ConvertMapToJSONString(v interface{}) interface{} { + if v == nil { + return "{}" + } + if m, ok := v.(map[string]interface{}); ok { + jsonBytes, _ := json.Marshal(m) + return string(jsonBytes) + } + return v +} \ No newline at end of file diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 4eeba7083b..4a29404c2a 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -319,6 +319,20 @@ class InfinityConnection(InfinityConnectionBase): return res_fields.get(chunk_id, None) def insert(self, documents: list[dict], index_name: str, knowledgebase_id: str = None) -> list[str]: + ''' + # Save input to file to test inserting from file in GO + import datetime + import os + debug_file = os.path.join("/var/infinity/tmp", f"insert_{index_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.json") + with open(debug_file, 'w') as f: + json.dump({ + "table_name": index_name, + "knowledgebase_id": knowledgebase_id, + "chunks": documents + }, f, indent=2) + self.logger.debug(f"Saved insert input to {debug_file}") + ''' + inf_conn = self.connPool.get_conn() try: db_instance = inf_conn.get_database(self.dbName)