From 115b730d07bca1cf0863c6bb3c632b0033a08332 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Fri, 12 Jun 2026 17:58:36 +0800 Subject: [PATCH] Go: parse ingestion DSL (#15938) PR #15938 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- internal/cli/cli.go | 6 + internal/cli/response.go | 27 + internal/cli/user_command.go | 48 +- internal/ingestion/chunk/expression.go | 649 ++++++++++++++++++++++++ internal/ingestion/chunk/postprocess.go | 491 ++++++++++++++++++ internal/ingestion/chunk/preprocess.go | 133 +++++ internal/ingestion/chunk/split.go | 186 +++++++ internal/ingestion/chunk/type.go | 55 ++ internal/ingestion/chunk_engine.go | 153 ++++++ internal/ingestion/chunk_engine_test.go | 346 +++++++++++++ internal/ingestion/ingestion_service.go | 16 + 11 files changed, 2103 insertions(+), 7 deletions(-) create mode 100644 internal/ingestion/chunk/expression.go create mode 100644 internal/ingestion/chunk/postprocess.go create mode 100644 internal/ingestion/chunk/preprocess.go create mode 100644 internal/ingestion/chunk/split.go create mode 100644 internal/ingestion/chunk/type.go create mode 100644 internal/ingestion/chunk_engine.go create mode 100644 internal/ingestion/chunk_engine_test.go diff --git a/internal/cli/cli.go b/internal/cli/cli.go index f2234ac96d..2cd3d6a71b 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1209,6 +1209,12 @@ func (c *CLI) handleMetaCommand(cmd *Command) error { c.running = false case "?", "h", "help": c.printHelp() + case "pwd": + dir, err := os.Getwd() + if err != nil { + return fmt.Errorf("get working directory: %w", err) + } + fmt.Println(dir) default: return fmt.Errorf("unknown meta command: \\%s", command) } diff --git a/internal/cli/response.go b/internal/cli/response.go index 054ceb4430..4f8654a2c4 100644 --- a/internal/cli/response.go +++ b/internal/cli/response.go @@ -529,6 +529,33 @@ func (r *TaskResponse) PrintOut() { } } +type ExplainResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Duration float64 + OutputFormat OutputFormat +} + +func (r *ExplainResponse) Type() string { + return "explain" +} + +func (r *ExplainResponse) TimeCost() float64 { + return r.Duration +} + +func (r *ExplainResponse) SetOutputFormat(format OutputFormat) { + r.OutputFormat = format +} + +func (r *ExplainResponse) PrintOut() { + if r.Code == 0 { + fmt.Printf("\n%s\n", r.Message) + } else { + fmt.Printf("ERROR %d\n", r.Code) + } +} + // FileSystemResponse wraps the raw text output from executeFilesystem(). type FileSystemResponse struct { Output string diff --git a/internal/cli/user_command.go b/internal/cli/user_command.go index 858e72ebef..e6d84d902a 100644 --- a/internal/cli/user_command.go +++ b/internal/cli/user_command.go @@ -28,6 +28,7 @@ import ( "os" "os/exec" "path/filepath" + "ragflow/internal/ingestion" "strings" "time" ) @@ -3445,27 +3446,60 @@ func (c *CLI) ChunkCommand(cmd *Command) (ResponseIf, error) { return nil, fmt.Errorf("this command is only allowed in USER mode") } + var result ExplainResponse + start := time.Now() + filename, ok := cmd.Params["filename"].(string) if !ok { return nil, fmt.Errorf("filename not provided") } - dsl, ok := cmd.Params["dsl"].(string) + dslFilename, ok := cmd.Params["dsl"].(string) if !ok { return nil, fmt.Errorf("dsl not provided") } + dsl, err := os.ReadFile(dslFilename) + if err != nil { + return nil, fmt.Errorf("failed to read dsl file: %w", err) + } + explain, ok := cmd.Params["explain"].(bool) if !ok { explain = false } - if explain { - fmt.Printf("Explain chunk file: %s, DSL: %s\n", filename, dsl) - } else { - // TODO: not implemented - fmt.Printf("Chunk file: %s, DSL: %s\n", filename, dsl) + engine := ingestion.NewChunkEngine() + plan, err := engine.Compile(string(dsl)) + if err != nil { + return nil, fmt.Errorf("compile failed: %w", err) } - var result SimpleResponse + if explain { + + explanation, err := engine.Explain(plan) + if err != nil { + return nil, fmt.Errorf("explain error: %w", err) + } + + result.Message = explanation + } else { + fileToChunking, err := os.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + chunkContext, err := engine.Execute(plan, string(fileToChunking)) + if err != nil { + return nil, fmt.Errorf("chunking error: %w", err) + } + + for _, resultChunk := range chunkContext.ResultChunks { + fmt.Printf("Chunk index: %d\n", resultChunk.Index) + fmt.Printf("Chunk size: %d\n", resultChunk.Size) + fmt.Printf("Chunk content: \n%s\n", resultChunk.Content) + } + } + + result.Duration = time.Since(start).Seconds() result.Code = 0 result.Message = fmt.Sprintf("Success to chunk %s", filename) return &result, nil diff --git a/internal/ingestion/chunk/expression.go b/internal/ingestion/chunk/expression.go new file mode 100644 index 0000000000..35176f81f8 --- /dev/null +++ b/internal/ingestion/chunk/expression.go @@ -0,0 +1,649 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package chunk + +import ( + "fmt" + "math" + "regexp" + "strconv" + "unicode" +) + +// --------------------------------------------------------------------------- +// Token types +// --------------------------------------------------------------------------- + +type tokenType int + +const ( + tokenEOF tokenType = iota + tokenIdentifier + tokenString + tokenNumber + tokenTrue + tokenFalse + tokenEq + tokenNeq + tokenGt + tokenLt + tokenGte + tokenLte + tokenAnd + tokenOr + tokenNot + tokenLParen + tokenRParen +) + +var keywords = map[string]tokenType{ + "AND": tokenAnd, + "OR": tokenOr, + "NOT": tokenNot, + "true": tokenTrue, + "false": tokenFalse, + "TRUE": tokenTrue, + "FALSE": tokenFalse, +} + +type token struct { + typ tokenType + raw string +} + +// --------------------------------------------------------------------------- +// Lexer +// --------------------------------------------------------------------------- + +type lexer struct { + input []rune + pos int +} + +func newLexer(input string) *lexer { + return &lexer{input: []rune(input)} +} + +func (l *lexer) skipWhitespace() { + for l.pos < len(l.input) && unicode.IsSpace(l.input[l.pos]) { + l.pos++ + } +} + +func (l *lexer) next() token { + l.skipWhitespace() + if l.pos >= len(l.input) { + return token{typ: tokenEOF, raw: ""} + } + + ch := l.input[l.pos] + + // Single-quoted string + if ch == '\'' { + l.pos++ // skip opening ' + start := l.pos + for l.pos < len(l.input) && l.input[l.pos] != '\'' { + l.pos++ + } + raw := string(l.input[start:l.pos]) + if l.pos < len(l.input) { + l.pos++ // skip closing ' + } + return token{typ: tokenString, raw: raw} + } + + // Operators + if l.pos+1 < len(l.input) { + next := l.input[l.pos+1] + switch string([]rune{ch, next}) { + case ">=": + l.pos += 2 + return token{typ: tokenGte, raw: ">="} + case "<=": + l.pos += 2 + return token{typ: tokenLte, raw: "<="} + case "!=": + l.pos += 2 + return token{typ: tokenNeq, raw: "!="} + } + } + switch ch { + case '=': + l.pos++ + return token{typ: tokenEq, raw: "="} + case '>': + l.pos++ + return token{typ: tokenGt, raw: ">"} + case '<': + l.pos++ + return token{typ: tokenLt, raw: "<"} + case '(': + l.pos++ + return token{typ: tokenLParen, raw: "("} + case ')': + l.pos++ + return token{typ: tokenRParen, raw: ")"} + } + + // Number + if unicode.IsDigit(ch) || (ch == '-' && l.pos+1 < len(l.input) && unicode.IsDigit(l.input[l.pos+1])) { + start := l.pos + if l.input[l.pos] == '-' { + l.pos++ + } + for l.pos < len(l.input) && (unicode.IsDigit(l.input[l.pos]) || l.input[l.pos] == '.') { + l.pos++ + } + return token{typ: tokenNumber, raw: string(l.input[start:l.pos])} + } + + // Identifier / keyword + if unicode.IsLetter(ch) || ch == '_' { + start := l.pos + for l.pos < len(l.input) && (unicode.IsLetter(l.input[l.pos]) || unicode.IsDigit(l.input[l.pos]) || l.input[l.pos] == '_') { + l.pos++ + } + raw := string(l.input[start:l.pos]) + if kw, ok := keywords[raw]; ok { + return token{typ: kw, raw: raw} + } + return token{typ: tokenIdentifier, raw: raw} + } + + // Unknown + l.pos++ + return token{typ: tokenIdentifier, raw: string(ch)} +} + +func (l *lexer) peek() token { + pos := l.pos + tok := l.next() + l.pos = pos + return tok +} + +// --------------------------------------------------------------------------- +// AST nodes +// --------------------------------------------------------------------------- + +type Expr interface { + String() string +} + +type binaryExpr struct { + left Expr + op tokenType + right Expr +} + +func (e binaryExpr) String() string { + ops := map[tokenType]string{ + tokenEq: "=", + tokenNeq: "!=", + tokenGt: ">", + tokenLt: "<", + tokenGte: ">=", + tokenLte: "<=", + tokenAnd: "AND", + tokenOr: "OR", + } + return fmt.Sprintf("(%s %s %s)", e.left, ops[e.op], e.right) +} + +type unaryExpr struct { + op tokenType + right Expr +} + +func (e unaryExpr) String() string { + return fmt.Sprintf("(NOT %s)", e.right) +} + +type identifierExpr struct { + name string +} + +func (e identifierExpr) String() string { + return e.name +} + +type stringExpr struct { + value string +} + +func (e stringExpr) String() string { + return "'" + e.value + "'" +} + +type numberExpr struct { + value float64 +} + +func (e numberExpr) String() string { + return strconv.FormatFloat(e.value, 'f', -1, 64) +} + +type boolExpr struct { + value bool +} + +func (e boolExpr) String() string { + return strconv.FormatBool(e.value) +} + +// --------------------------------------------------------------------------- +// Recursive-descent parser +// --------------------------------------------------------------------------- + +type parser struct { + lex *lexer + cur token + peeked bool +} + +func newParser(input string) *parser { + p := &parser{lex: newLexer(input)} + p.advance() + return p +} + +func (p *parser) advance() { + if p.peeked { + p.peeked = false + return + } + p.cur = p.lex.next() +} + +func (p *parser) peek() token { + if !p.peeked { + p.peeked = true + p.cur = p.lex.next() + } + return p.cur +} + +func (p *parser) expect(typ tokenType) token { + tok := p.cur + if tok.typ != typ { + panic(fmt.Sprintf("expected token %d but got %d (%q)", typ, tok.typ, tok.raw)) + } + p.advance() + return tok +} + +func (p *parser) parse() Expr { + return p.parseOr() +} + +// or_expr → and_expr ("OR" and_expr)* +func (p *parser) parseOr() Expr { + e := p.parseAnd() + for p.cur.typ == tokenOr { + op := p.cur.typ + p.advance() + right := p.parseAnd() + e = binaryExpr{left: e, op: op, right: right} + } + return e +} + +// and_expr → not_expr ("AND" not_expr)* +func (p *parser) parseAnd() Expr { + e := p.parseNot() + for p.cur.typ == tokenAnd { + op := p.cur.typ + p.advance() + right := p.parseNot() + e = binaryExpr{left: e, op: op, right: right} + } + return e +} + +// not_expr → "NOT" not_expr | primary +func (p *parser) parseNot() Expr { + if p.cur.typ == tokenNot { + op := p.cur.typ + p.advance() + right := p.parseNot() + return unaryExpr{op: op, right: right} + } + return p.parsePrimary() +} + +// primary → comparison | "(" expression ")" +func (p *parser) parsePrimary() Expr { + if p.cur.typ == tokenLParen { + p.advance() + e := p.parseOr() + p.expect(tokenRParen) + return e + } + return p.parseComparison() +} + +// comparison → IDENTIFIER OP value | value +// comparison → IDENTIFIER OP value +func (p *parser) parseComparison() Expr { + if p.cur.typ == tokenIdentifier { + id := p.cur.raw + p.advance() + switch p.cur.typ { + case tokenEq, tokenNeq, tokenGt, tokenLt, tokenGte, tokenLte: + op := p.cur.typ + p.advance() + right := p.parseValue() + return binaryExpr{left: identifierExpr{name: id}, op: op, right: right} + default: + // identifier alone – treat as boolean check + return binaryExpr{ + left: identifierExpr{name: id}, + op: tokenEq, + right: boolExpr{value: true}, + } + } + } + return p.parseValue() +} + +// value → STRING | NUMBER | BOOLEAN +func (p *parser) parseValue() Expr { + switch p.cur.typ { + case tokenString: + v := stringExpr{value: p.cur.raw} + p.advance() + return v + case tokenNumber: + f, _ := strconv.ParseFloat(p.cur.raw, 64) + p.advance() + return numberExpr{value: f} + case tokenTrue: + p.advance() + return boolExpr{value: true} + case tokenFalse: + p.advance() + return boolExpr{value: false} + default: + // treat as identifier (e.g. bare variable reference) + id := identifierExpr{name: p.cur.raw} + p.advance() + return id + } +} + +// --------------------------------------------------------------------------- +// Evaluator +// --------------------------------------------------------------------------- + +var reMediaURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(jpg|jpeg|png|gif|bmp|webp|svg|mp4|avi|mov|wmv|flv|mkv|m4v|mp3|wav|ogg|aac)`) +var reImageURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(jpg|jpeg|png|gif|bmp|webp|svg)`) +var reVideoURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(mp4|avi|mov|wmv|flv|mkv|m4v)`) +var reAnyURL = regexp.MustCompile(`(?i)https?://[^\s]+`) + +// buildExprContext builds a variable context from a chunk's content and metadata. +// It auto-detects media/image/video URLs and language hints. +func buildExprContext(chunk ContentProvider, metadata map[string]interface{}) map[string]interface{} { + vars := make(map[string]interface{}) + content := chunk.GetContent() + + // Pre-populate from metadata + for k, v := range metadata { + vars[k] = v + } + + // Auto-detect URL presence + vars["has_media_url"] = reMediaURL.MatchString(content) + vars["has_image_url"] = reImageURL.MatchString(content) + vars["has_video_url"] = reVideoURL.MatchString(content) + vars["has_url"] = reAnyURL.MatchString(content) + vars["length"] = len([]rune(content)) + + return vars +} + +// ContentProvider allows evaluating expressions against any type that has content. +type ContentProvider interface { + GetContent() string +} + +// Evaluate parses and evaluates a boolean expression against a variable map. +func Evaluate(exprStr string, vars map[string]interface{}) (bool, error) { + p := newParser(exprStr) + ast := p.parse() + res, err := eval(ast, vars) + if err != nil { + return false, fmt.Errorf("evaluate %q: %w", exprStr, err) + } + b, ok := toBool(res) + if !ok { + return false, fmt.Errorf("evaluate %q: result %v (%T) is not a boolean", exprStr, res, res) + } + return b, nil +} + +// CompileExpression parses an expression string into a reusable AST. +func CompileExpression(exprStr string) (Expr, error) { + defer func() { + if r := recover(); r != nil { + panic(fmt.Sprintf("compile expression %q: %v", exprStr, r)) + } + }() + p := newParser(exprStr) + return p.parse(), nil +} + +// EvalCompiled evaluates a pre-compiled expression AST against variables. +func EvalCompiled(ast interface{}, vars map[string]interface{}) (bool, error) { + e, ok := ast.(Expr) + if !ok { + return false, fmt.Errorf("invalid AST type: %T", ast) + } + res, err := eval(e, vars) + if err != nil { + return false, err + } + b, ok := toBool(res) + if !ok { + return false, fmt.Errorf("result %v (%T) is not boolean", res, res) + } + return b, nil +} + +func eval(e Expr, vars map[string]interface{}) (interface{}, error) { + switch n := e.(type) { + case binaryExpr: + return evalBinary(n, vars) + case unaryExpr: + return evalUnary(n, vars) + case identifierExpr: + v, ok := vars[n.name] + if !ok { + return nil, fmt.Errorf("undefined variable: %s", n.name) + } + return v, nil + case stringExpr: + return n.value, nil + case numberExpr: + return n.value, nil + case boolExpr: + return n.value, nil + default: + return nil, fmt.Errorf("unknown expression type: %T", e) + } +} + +func evalBinary(e binaryExpr, vars map[string]interface{}) (interface{}, error) { + left, err := eval(e.left, vars) + if err != nil { + return nil, err + } + right, err := eval(e.right, vars) + if err != nil { + return nil, err + } + + switch e.op { + case tokenAnd: + l, ok := toBool(left) + if !ok { + return false, fmt.Errorf("AND requires boolean left operand") + } + if !l { + return false, nil + } + r, ok := toBool(right) + if !ok { + return false, fmt.Errorf("AND requires boolean right operand") + } + return r, nil + + case tokenOr: + l, ok := toBool(left) + if !ok { + return false, fmt.Errorf("OR requires boolean left operand") + } + if l { + return true, nil + } + r, ok := toBool(right) + if !ok { + return false, fmt.Errorf("OR requires boolean right operand") + } + return r, nil + + case tokenEq: + return compareEq(left, right), nil + case tokenNeq: + return !compareEq(left, right), nil + case tokenGt, tokenLt, tokenGte, tokenLte: + return compareOrder(left, right, e.op) + default: + return false, fmt.Errorf("unknown binary op %d", e.op) + } +} + +func evalUnary(e unaryExpr, vars map[string]interface{}) (interface{}, error) { + right, err := eval(e.right, vars) + if err != nil { + return nil, err + } + b, ok := toBool(right) + if !ok { + return false, fmt.Errorf("NOT requires boolean operand") + } + return !b, nil +} + +func toBool(v interface{}) (bool, bool) { + switch vv := v.(type) { + case bool: + return vv, true + case string: + return vv == "true" || vv == "TRUE" || vv == "1", true + case float64: + return vv != 0, true + case int: + return vv != 0, true + } + return false, false +} + +func compareEq(a, b interface{}) bool { + // Normalise numeric types + af, aIsNum := toFloat(a) + bf, bIsNum := toFloat(b) + if aIsNum && bIsNum { + return af == bf + } + // Fall back to string comparison + return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) +} + +func toFloat(v interface{}) (float64, bool) { + switch vv := v.(type) { + case float64: + return vv, true + case int: + return float64(vv), true + case string: + f, err := strconv.ParseFloat(vv, 64) + return f, err == nil + } + return 0, false +} + +func compareOrder(a, b interface{}, op tokenType) (bool, error) { + af, aOK := toFloat(a) + bf, bOK := toFloat(b) + if aOK && bOK { + switch op { + case tokenGt: + return af > bf, nil + case tokenLt: + return af < bf, nil + case tokenGte: + return af >= bf, nil + case tokenLte: + return af <= bf, nil + } + } + // String fallback + sa := fmt.Sprintf("%v", a) + sb := fmt.Sprintf("%v", b) + switch op { + case tokenGt: + return sa > sb, nil + case tokenLt: + return sa < sb, nil + case tokenGte: + return sa >= sb, nil + case tokenLte: + return sa <= sb, nil + } + return false, fmt.Errorf("unsupported comparison op %d between %T and %T", op, a, b) +} + +// --------------------------------------------------------------------------- +// Language heuristics +// --------------------------------------------------------------------------- + +// DetectLanguage returns a best-effort language code ('zh', 'en', etc.) +// based on the proportion of CJK characters. +func DetectLanguage(text string) string { + cjk := 0 + total := 0 + for _, r := range text { + if unicode.Is(unicode.Han, r) { + cjk++ + } + if unicode.IsLetter(r) { + total++ + } + } + if total > 0 && float64(cjk)/float64(total) > 0.3 { + return "zh" + } + return "en" +} + +// RuneCount returns the number of runes in text. +func RuneCount(text string) int { + return len([]rune(text)) +} + +// Ensure math is used (for NaN etc.) +var _ = math.NaN diff --git a/internal/ingestion/chunk/postprocess.go b/internal/ingestion/chunk/postprocess.go new file mode 100644 index 0000000000..72e9357af1 --- /dev/null +++ b/internal/ingestion/chunk/postprocess.go @@ -0,0 +1,491 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package chunk + +import ( + "fmt" + "strings" +) + +// --------------------------------------------------------------------------- +// Overlap condition +// --------------------------------------------------------------------------- + +type overlapConfig struct { + Size int `json:"size"` + Unit string `json:"unit,omitempty"` // "char" (default) or "sentence" +} + +type overlapCondition struct { + Name string + Condition Expr // pre-compiled expression AST from CompileExpression + OverlapConfig overlapConfig +} + +type mergeConfig struct { + TargetSize int `json:"target_size"` + Strategy string `json:"strategy"` // "greedy" +} + +type filterConfig struct { + MinLength int `json:"min_length"` + MaxLength int `json:"max_length"` +} + +type metadataConfig struct { + IncludeIndex bool `json:"include_index"` + CustomFields map[string]string `json:"custom_fields,omitempty"` +} + +// --------------------------------------------------------------------------- +// PostprocessOperator +// --------------------------------------------------------------------------- + +type PostprocessOperator struct { + merge *mergeConfig + overlap struct { + unit string // "char" (default) or "sentence" + conditions []overlapCondition + defaultCfg overlapConfig + } + filter *filterConfig + addMetadata *metadataConfig +} + +func NewPostprocessOperator(config map[string]interface{}) (*PostprocessOperator, error) { + op := &PostprocessOperator{} + + // Merge + if m, ok := config["merge"].(map[string]interface{}); ok { + op.merge = &mergeConfig{} + if ts, ok := m["target_size"].(float64); ok { + op.merge.TargetSize = int(ts) + } else { + op.merge.TargetSize = 500 + } + if s, ok := m["strategy"].(string); ok { + op.merge.Strategy = s + } else { + op.merge.Strategy = "greedy" + } + } + + // Overlap + if ov, ok := config["overlap"].(map[string]interface{}); ok { + if u, ok := ov["unit"].(string); ok { + op.overlap.unit = u + } else { + op.overlap.unit = "char" + } + + // Default + if d, ok := ov["default"].(map[string]interface{}); ok { + op.overlap.defaultCfg = parseOverlapConfig(d) + } + + // Conditions + if conds, ok := ov["conditions"].([]interface{}); ok { + for _, ci := range conds { + c, ok := ci.(map[string]interface{}) + if !ok { + continue + } + cond := overlapCondition{} + if n, ok := c["name"].(string); ok { + cond.Name = n + } + if exprStr, ok := c["if"].(string); ok { + expression, err := CompileExpression(exprStr) + if err == nil { + cond.Condition = expression + } + } + if thenMap, ok := c["then"].(map[string]interface{}); ok { + cond.OverlapConfig = parseOverlapConfig(thenMap) + } + op.overlap.conditions = append(op.overlap.conditions, cond) + } + } + } + + // Filter + if f, ok := config["filter"].(map[string]interface{}); ok { + op.filter = &filterConfig{} + if v, ok := f["min_length"].(float64); ok { + op.filter.MinLength = int(v) + } + if v, ok := f["max_length"].(float64); ok { + op.filter.MaxLength = int(v) + } + } + + // Add metadata + if am, ok := config["add_metadata"].(map[string]interface{}); ok { + op.addMetadata = &metadataConfig{} + if inc, ok := am["include_index"].(bool); ok { + op.addMetadata.IncludeIndex = inc + } + if cf, ok := am["custom_fields"].(map[string]interface{}); ok { + m := make(map[string]string, len(cf)) + for k, v := range cf { + m[k] = fmt.Sprintf("%v", v) + } + op.addMetadata.CustomFields = m + } + } + + return op, nil +} + +func (o *PostprocessOperator) Prepare(chunkCtx *ChunkContext) error { + + return nil +} + +func (o *PostprocessOperator) Execute(chunkCtx *ChunkContext) error { + chunks := chunkCtx.SplitChunks + if len(chunks) == 0 { + return nil + } + + // 1. Merge + if o.merge != nil { + chunks = o.mergeChunks(chunks) + } + + // 2. Overlap + chunks = o.applyOverlap(chunks) + + // 3. Filter + if o.filter != nil { + chunks = o.filterChunks(chunks) + } + + // 4. Add metadata + if o.addMetadata != nil { + chunks = o.addChunkMetadata(chunks) + } + + // Re-index + for i := range chunks { + chunks[i].Index = i + chunks[i].Size = len(chunks[i].GetContent()) + } + + chunkCtx.ResultChunks = chunks + return nil +} + +func (o *PostprocessOperator) Finish(chunkCtx *ChunkContext) error { + return nil +} + +func (o *PostprocessOperator) String() string { + var buf strings.Builder + buf.WriteString("postprocess:\n") + + if o.merge != nil { + fmt.Fprintf(&buf, " merge:\n") + fmt.Fprintf(&buf, " target_size: %d\n", o.merge.TargetSize) + fmt.Fprintf(&buf, " strategy: %q\n", o.merge.Strategy) + } + + fmt.Fprintf(&buf, " overlap:\n") + fmt.Fprintf(&buf, " unit: %q\n", o.overlap.unit) + fmt.Fprintf(&buf, " default:\n") + fmt.Fprintf(&buf, " size: %d\n", o.overlap.defaultCfg.Size) + if o.overlap.defaultCfg.Unit != "" { + fmt.Fprintf(&buf, " unit: %q\n", o.overlap.defaultCfg.Unit) + } + if len(o.overlap.conditions) > 0 { + fmt.Fprintf(&buf, " conditions:\n") + for _, c := range o.overlap.conditions { + fmt.Fprintf(&buf, " - name: %q\n", c.Name) + fmt.Fprintf(&buf, " condition: %q\n", c.Condition.String()) + fmt.Fprintf(&buf, " then:\n") + fmt.Fprintf(&buf, " size: %d\n", c.OverlapConfig.Size) + if c.OverlapConfig.Unit != "" { + fmt.Fprintf(&buf, " unit: %q\n", c.OverlapConfig.Unit) + } + } + } + + if o.filter != nil { + fmt.Fprintf(&buf, " filter:\n") + fmt.Fprintf(&buf, " min_length: %d\n", o.filter.MinLength) + fmt.Fprintf(&buf, " max_length: %d\n", o.filter.MaxLength) + } + + if o.addMetadata != nil { + fmt.Fprintf(&buf, " add_metadata:\n") + fmt.Fprintf(&buf, " include_index: %t\n", o.addMetadata.IncludeIndex) + if len(o.addMetadata.CustomFields) > 0 { + fmt.Fprintf(&buf, " custom_fields:\n") + for k, v := range o.addMetadata.CustomFields { + fmt.Fprintf(&buf, " %s: %q\n", k, v) + } + } + } + + return buf.String() +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func parseOverlapConfig(m map[string]interface{}) overlapConfig { + cfg := overlapConfig{} + if size, ok := m["size"].(float64); ok { + cfg.Size = int(size) + } + if u, ok := m["unit"].(string); ok { + cfg.Unit = u + } + return cfg +} + +// mergeChunks greedily merges small chunks into larger ones up to target_size. +func (o *PostprocessOperator) mergeChunks(chunks []ChunkData) []ChunkData { + target := o.merge.TargetSize + if target <= 0 { + target = 500 + } + + var merged []ChunkData + var buf strings.Builder + var bufMeta map[string]interface{} + firstIndex := 0 + + for i, c := range chunks { + // If this single chunk already exceeds target, flush first then add + if len([]rune(c.Content)) >= target { + if buf.Len() > 0 { + merged = append(merged, ChunkData{ + Content: buf.String(), + Index: firstIndex, + Metadata: bufMeta, + }) + buf.Reset() + bufMeta = nil + } + merged = append(merged, c) + firstIndex = i + 1 + continue + } + + if buf.Len() == 0 { + buf.WriteString(c.Content) + bufMeta = c.Metadata + firstIndex = c.Index + } else { + nextLen := len([]rune(c.Content)) + // If adding this chunk would exceed target, flush current and start new + if buf.Len()+nextLen+1 > target { + merged = append(merged, ChunkData{ + Content: buf.String(), + Index: firstIndex, + Metadata: bufMeta, + }) + buf.Reset() + buf.WriteString(c.Content) + bufMeta = c.Metadata + firstIndex = c.Index + } else { + buf.WriteString(" ") + buf.WriteString(c.Content) + // Merge metadata (last wins for overlapping keys) + if c.Metadata != nil && bufMeta == nil { + bufMeta = make(map[string]interface{}) + } + for k, v := range c.Metadata { + bufMeta[k] = v + } + } + } + } + + // Flush remaining + if buf.Len() > 0 { + merged = append(merged, ChunkData{ + Content: buf.String(), + Index: firstIndex, + Metadata: bufMeta, + }) + } + + return merged +} + +// applyOverlap evaluates conditions on each chunk and prepends overlap from the previous chunk. +func (o *PostprocessOperator) applyOverlap(chunks []ChunkData) []ChunkData { + if len(chunks) <= 1 { + return chunks + } + + result := make([]ChunkData, len(chunks)) + copy(result, chunks) + + for i := 1; i < len(chunks); i++ { + // Determine overlap size for chunks[i] + cfg := o.resolveOverlapConfig(chunks[i]) + overlapSize := cfg.Size + if overlapSize <= 0 { + continue + } + + prevContent := result[i-1].Content + prevRunes := []rune(prevContent) + if len(prevRunes) == 0 { + continue + } + + // Which unit? + unit := cfg.Unit + if unit == "" { + unit = o.overlap.unit + } + + var overlapText string + switch unit { + case "sentence": + // Take last N sentences + sentences := splitSentencesGeneric(prevContent) + if len(sentences) < overlapSize { + overlapText = prevContent + } else { + overlapText = strings.Join(sentences[len(sentences)-overlapSize:], " ") + } + default: // "char" + if overlapSize >= len(prevRunes) { + overlapText = prevContent + } else { + overlapText = string(prevRunes[len(prevRunes)-overlapSize:]) + } + } + + result[i].Content = overlapText + result[i].Content + } + + return result +} + +// resolveOverlapConfig evaluates overlap conditions for a chunk. +func (o *PostprocessOperator) resolveOverlapConfig(chunk ChunkData) overlapConfig { + vars := buildExprContext(&chunk, chunk.Metadata) + + for _, cond := range o.overlap.conditions { + if cond.Condition == nil { + continue + } + result, err := EvalCompiled(cond.Condition, vars) + if err != nil { + continue + } + if result { + cfg := cond.OverlapConfig + if cfg.Unit == "" { + cfg.Unit = o.overlap.unit + } + return cfg + } + } + + cfg := o.overlap.defaultCfg + if cfg.Unit == "" { + cfg.Unit = o.overlap.unit + } + return cfg +} + +// filterChunks removes chunks outside the length bounds. +func (o *PostprocessOperator) filterChunks(chunks []ChunkData) []ChunkData { + filtered := make([]ChunkData, 0, len(chunks)) + for _, c := range chunks { + l := len([]rune(c.Content)) + if o.filter.MinLength > 0 && l < o.filter.MinLength { + continue + } + if o.filter.MaxLength > 0 && l > o.filter.MaxLength { + continue + } + filtered = append(filtered, c) + } + return filtered +} + +// addChunkMetadata enriches chunks with metadata. +func (o *PostprocessOperator) addChunkMetadata(chunks []ChunkData) []ChunkData { + result := make([]ChunkData, len(chunks)) + for i, c := range chunks { + if c.Metadata == nil { + c.Metadata = make(map[string]interface{}) + } + if o.addMetadata.IncludeIndex { + c.Metadata["index"] = i + } + for field, action := range o.addMetadata.CustomFields { + switch action { + case "auto_detect": + switch field { + case "has_media_url": + c.Metadata[field] = reMediaURL.MatchString(c.Content) + case "has_image_url": + c.Metadata[field] = reImageURL.MatchString(c.Content) + case "has_video_url": + c.Metadata[field] = reVideoURL.MatchString(c.Content) + case "language": + c.Metadata[field] = DetectLanguage(c.Content) + case "length": + c.Metadata[field] = RuneCount(c.Content) + default: + // Unknown auto-detect field — check for URLs generically + c.Metadata[field] = reAnyURL.MatchString(c.Content) + } + default: + c.Metadata[field] = action + } + } + result[i] = c + } + return result +} + +// splitSentencesGeneric splits text into sentences using common punctuation. +var sentenceBoundaries = []rune{'。', '!', '?', '.', '!', '?'} + +func splitSentencesGeneric(text string) []string { + runes := []rune(text) + boundSet := make(map[rune]bool) + for _, r := range sentenceBoundaries { + boundSet[r] = true + } + + var sentences []string + var buf strings.Builder + for _, r := range runes { + buf.WriteRune(r) + if boundSet[r] { + sentences = append(sentences, strings.TrimSpace(buf.String())) + buf.Reset() + } + } + if buf.Len() > 0 { + sentences = append(sentences, strings.TrimSpace(buf.String())) + } + return sentences +} diff --git a/internal/ingestion/chunk/preprocess.go b/internal/ingestion/chunk/preprocess.go new file mode 100644 index 0000000000..617980e5fe --- /dev/null +++ b/internal/ingestion/chunk/preprocess.go @@ -0,0 +1,133 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package chunk + +import ( + "fmt" + "regexp" + "strings" +) + +type PreprocessOperator struct { + normalizeNewlines bool + stripWhitespace bool + removeEmptyLines bool + softLineBreakMerging bool +} + +func NewPreprocessOperator(config map[string]interface{}) (*PreprocessOperator, error) { + operator := &PreprocessOperator{} + if v, ok := config["normalize_newlines"]; ok { + operator.normalizeNewlines, ok = v.(bool) + if !ok { + return nil, fmt.Errorf("preprocess: normalize_newlines must be bool") + } + } + if v, ok := config["strip_whitespace"]; ok { + operator.stripWhitespace, ok = v.(bool) + if !ok { + return nil, fmt.Errorf("preprocess: strip_whitespace must be bool") + } + } + if v, ok := config["remove_empty_lines"]; ok { + operator.removeEmptyLines, ok = v.(bool) + if !ok { + return nil, fmt.Errorf("preprocess: remove_empty_lines must be bool") + } + } + if v, ok := config["soft_line_break_merging"]; ok { + operator.softLineBreakMerging, ok = v.(bool) + if !ok { + return nil, fmt.Errorf("preprocess: soft_line_break_merging must be bool") + } + } + return operator, nil +} + +func (o *PreprocessOperator) Prepare(chunkCtx *ChunkContext) error { + return nil +} + +func (o *PreprocessOperator) Execute(chunkCtx *ChunkContext) error { + text := chunkCtx.Origin + + if o.normalizeNewlines { + // \r\n → \n, \r → \n + text = strings.ReplaceAll(text, "\r\n", "\n") + text = strings.ReplaceAll(text, "\r", "\n") + // Collapse multiple \n into one + re := regexp.MustCompile(`\n{2,}`) + text = re.ReplaceAllString(text, "\n") + } + + if o.stripWhitespace { + // Trim leading/trailing whitespace on each line + lines := strings.Split(text, "\n") + for i, line := range lines { + lines[i] = strings.TrimSpace(line) + } + text = strings.Join(lines, "\n") + } + + if o.removeEmptyLines { + lines := strings.Split(text, "\n") + filtered := make([]string, 0, len(lines)) + for _, line := range lines { + if strings.TrimSpace(line) != "" { + filtered = append(filtered, line) + } + } + text = strings.Join(filtered, "\n") + } + + if o.softLineBreakMerging { + lines := strings.Split(text, "\n") + var merged []string + var current strings.Builder + + sentenceEnd := regexp.MustCompile(`[.!?][\s]*$`) + + for i, line := range lines { + if current.Len() > 0 { + current.WriteString(" ") + } + current.WriteString(line) + + if i == len(lines)-1 || sentenceEnd.MatchString(line) { + merged = append(merged, current.String()) + current.Reset() + } + } + text = strings.Join(merged, "\n") + } + + chunkCtx.TextAfterPreprocess = text + return nil +} + +func (o *PreprocessOperator) Finish(chunkCtx *ChunkContext) error { + return nil +} + +func (o *PreprocessOperator) String() string { + var buf strings.Builder + buf.WriteString("preprocess:\n") + fmt.Fprintf(&buf, " normalize_newlines: %t\n", o.normalizeNewlines) + fmt.Fprintf(&buf, " strip_whitespace: %t\n", o.stripWhitespace) + fmt.Fprintf(&buf, " remove_empty_lines: %t\n", o.removeEmptyLines) + return buf.String() +} diff --git a/internal/ingestion/chunk/split.go b/internal/ingestion/chunk/split.go new file mode 100644 index 0000000000..314ab8e1ed --- /dev/null +++ b/internal/ingestion/chunk/split.go @@ -0,0 +1,186 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package chunk + +import ( + "fmt" + "strings" + "unicode/utf8" +) + +type SplitOperator struct { + strategy string + boundaries []string + keepSeparators bool +} + +func NewSplitOperator(config map[string]interface{}) (*SplitOperator, error) { + op := &SplitOperator{} + + if v, ok := config["strategy"]; ok { + if s, ok := v.(string); ok { + op.strategy = s + } + } + + if params, ok := config["params"].(map[string]interface{}); ok { + if b, ok := params["boundaries"]; ok { + if boundStrs, ok := b.([]interface{}); ok { + for _, bs := range boundStrs { + if s, ok := bs.(string); ok { + op.boundaries = append(op.boundaries, s) + } + } + } + } + if ks, ok := params["keep_separators"]; ok { + if b, ok := ks.(bool); ok { + op.keepSeparators = b + } + } + } + + return op, nil +} + +func (o *SplitOperator) Prepare(ctx *ChunkContext) error { + return nil +} + +func (o *SplitOperator) Execute(ctx *ChunkContext) error { + text := ctx.TextAfterPreprocess + + if o.strategy == "" { + o.strategy = "sentence" + } + + switch o.strategy { + case "sentence": + ctx.SplitChunks = o.splitSentences(text) + case "char": + ctx.SplitChunks = o.splitByChar(text) + case "paragraph": + ctx.SplitChunks = o.splitByParagraph(text) + default: + ctx.SplitChunks = o.splitSentences(text) + } + + return nil +} + +func (o *SplitOperator) Finish(ctx *ChunkContext) error { + return nil +} + +func (o *SplitOperator) String() string { + var buf strings.Builder + buf.WriteString("split:\n") + fmt.Fprintf(&buf, " strategy: %q\n", o.strategy) + fmt.Fprintf(&buf, " boundaries:\n") + for _, r := range o.boundaries { + fmt.Fprintf(&buf, " - %q\n", r) + } + fmt.Fprintf(&buf, " keep_separators: %t\n", o.keepSeparators) + return buf.String() +} + +// splitSentences splits text at multi-rune boundaries, optionally keeping separators. +func (o *SplitOperator) splitSentences(text string) []ChunkData { + if len(o.boundaries) == 0 { + o.boundaries = []string{"。", "!", "?", "\n"} + } + + var chunks []ChunkData + var buf strings.Builder + i := 0 + + for i < len(text) { + // Try to match any boundary at current position (first match wins) + matchedBound := "" + for _, bound := range o.boundaries { + if bound != "" && i+len(bound) <= len(text) && text[i:i+len(bound)] == bound { + matchedBound = bound + break + } + } + + if matchedBound != "" { + if o.keepSeparators { + buf.WriteString(matchedBound) + } + if buf.Len() > 0 { + chunks = append(chunks, ChunkData{ + Content: buf.String(), + Index: len(chunks), + Metadata: map[string]interface{}{ + "language": DetectLanguage(buf.String()), + }, + }) + buf.Reset() + } + i += len(matchedBound) + } else { + r, size := utf8.DecodeRuneInString(text[i:]) + buf.WriteRune(r) + i += size + } + } + + // flush remaining text + if buf.Len() > 0 { + chunks = append(chunks, ChunkData{ + Content: buf.String(), + Index: len(chunks), + Metadata: map[string]interface{}{ + "language": DetectLanguage(buf.String()), + }, + }) + } + + return chunks +} + +func (o *SplitOperator) splitByChar(text string) []ChunkData { + var chunks []ChunkData + for len(text) > 0 { + r, size := utf8.DecodeRuneInString(text) + chunks = append(chunks, ChunkData{ + Content: string(r), + Index: len(chunks), + Metadata: map[string]interface{}{"language": DetectLanguage(text)}, + }) + text = text[size:] + } + return chunks +} + +func (o *SplitOperator) splitByParagraph(text string) []ChunkData { + paragraphs := strings.Split(text, "\n") + chunks := make([]ChunkData, 0, len(paragraphs)) + for i, p := range paragraphs { + trimmed := strings.TrimSpace(p) + if trimmed == "" { + continue + } + chunks = append(chunks, ChunkData{ + Content: trimmed, + Index: i, + Metadata: map[string]interface{}{"language": DetectLanguage(trimmed)}, + }) + } + return chunks +} diff --git a/internal/ingestion/chunk/type.go b/internal/ingestion/chunk/type.go new file mode 100644 index 0000000000..1d85abbb6c --- /dev/null +++ b/internal/ingestion/chunk/type.go @@ -0,0 +1,55 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package chunk + +// Operator defines the interface for all chunking pipeline stages. +type Operator interface { + // Prepare configures the operator from a DSL stage config map. + Prepare(ctx *ChunkContext) error + // Execute runs the operator on the shared context. + Execute(ctx *ChunkContext) error + // Finish performs any cleanup. + Finish(ctx *ChunkContext) error + + String() string +} + +// ChunkData represents a single chunk produced by the pipeline. +type ChunkData struct { + Content string `json:"content"` + Size int `json:"size"` + Index int `json:"index,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func (c *ChunkData) GetContent() string { + if c == nil { + return "" + } + return c.Content +} + +// ChunkContext flows through the pipeline, carrying text and chunks. +type ChunkContext struct { + Origin string // raw text + + TextAfterPreprocess string // text after preprocess operator + + SplitChunks []ChunkData // chunks after split operator + + ResultChunks []ChunkData // final or intermediate chunks +} diff --git a/internal/ingestion/chunk_engine.go b/internal/ingestion/chunk_engine.go new file mode 100644 index 0000000000..e97795f9c8 --- /dev/null +++ b/internal/ingestion/chunk_engine.go @@ -0,0 +1,153 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package ingestion + +import ( + "encoding/json" + "fmt" + "strings" + + "ragflow/internal/ingestion/chunk" +) + +/* + DSL reference — see comment block above for the full JSON structure. + + Pipeline stages: + 1. "preprocess" → chunk.PreprocessOperator + 2. "split" → chunk.SplitOperator + 3. "postprocess" → chunk.PostprocessOperator +*/ + +// ChunkPlan holds the ordered pipeline operators. +type ChunkPlan struct { + Operators []chunk.Operator + Version string + Description string + Name string +} + +// ChunkEngine parses DSL JSON into a plan and executes it. +type ChunkEngine struct{} + +func NewChunkEngine() *ChunkEngine { + return &ChunkEngine{} +} + +// --------------------------------------------------------------------------- +// Compile — compile DSL JSON into an ordered operator list +// --------------------------------------------------------------------------- + +func (e *ChunkEngine) Compile(dsl string) (*ChunkPlan, error) { + var parsed map[string]interface{} + if err := json.Unmarshal([]byte(dsl), &parsed); err != nil { + return nil, fmt.Errorf("compile DSL: %w", err) + } + + plan := &ChunkPlan{} + + pipelineRaw, ok := parsed["pipeline"].([]interface{}) + if !ok || len(pipelineRaw) == 0 { + return plan, nil + } + + plan.Name, ok = parsed["name"].(string) + if !ok { + plan.Name = "No name" + } + plan.Description, ok = parsed["description"].(string) + if !ok { + plan.Description = "No description" + } + plan.Version, ok = parsed["version"].(string) + if !ok { + plan.Version = "1.0" + } + + for i, operatorRaw := range pipelineRaw { + var operator map[string]interface{} + operator, ok = operatorRaw.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("pipeline[%d]: expected object", i) + } + + var op chunk.Operator + var err error + operatorName, _ := operator["operator"].(string) + switch operatorName { + case "preprocess": + op, err = chunk.NewPreprocessOperator(operator) + if err != nil { + return nil, fmt.Errorf("create preprocess operator[%d]: %w", i, err) + } + case "split": + op, err = chunk.NewSplitOperator(operator) + if err != nil { + return nil, fmt.Errorf("create split operator[%d]: %w", i, err) + } + case "postprocess": + op, err = chunk.NewPostprocessOperator(operator) + if err != nil { + return nil, fmt.Errorf("create postprocess operator[%d]: %w", i, err) + } + default: + return nil, fmt.Errorf("pipeline[%d]: unknown operator %s", i, operatorName) + } + delete(operator, "operator") + + plan.Operators = append(plan.Operators, op) + } + + return plan, nil +} + +// --------------------------------------------------------------------------- +// Execute — run the pipeline operators on input text +// --------------------------------------------------------------------------- + +func (e *ChunkEngine) Execute(plan *ChunkPlan, text string) (*chunk.ChunkContext, error) { + chunkContext := &chunk.ChunkContext{Origin: text} + + for i, op := range plan.Operators { + if err := op.Prepare(chunkContext); err != nil { + return nil, fmt.Errorf("re-prepare operator[%d]: %w", i, err) + } + + if err := op.Execute(chunkContext); err != nil { + return nil, fmt.Errorf("execute operator[%d]: %w", i, err) + } + + if err := op.Finish(chunkContext); err != nil { + return nil, fmt.Errorf("finish operator[%d]: %w", i, err) + } + } + + return chunkContext, nil +} + +// --------------------------------------------------------------------------- +// Explain — describe the plan in human-readable form +// --------------------------------------------------------------------------- + +func (e *ChunkEngine) Explain(plan *ChunkPlan) (string, error) { + var buf strings.Builder + buf.WriteString("Chunk Pipeline Plan:\n") + for i, op := range plan.Operators { + buf.WriteString(fmt.Sprintf(" [%d] %s\n", i, op.String())) + } + return buf.String(), nil +} diff --git a/internal/ingestion/chunk_engine_test.go b/internal/ingestion/chunk_engine_test.go new file mode 100644 index 0000000000..51fbf72fb4 --- /dev/null +++ b/internal/ingestion/chunk_engine_test.go @@ -0,0 +1,346 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package ingestion + +import ( + "fmt" + "strings" + "testing" + + "ragflow/internal/ingestion/chunk" +) + +// The full DSL example from chunk_engine.go L22-L95. +var mediaAwareDSL = `{ + "version": "1.0", + "name": "media_aware_chunking", + "description": "Disable overlap when encountering image/video URLs", + + "pipeline": [ + { + "operator": "preprocess", + "normalize_newlines": true, + "strip_whitespace": true, + "remove_empty_lines": true + }, + + { + "operator": "split", + "strategy": "sentence", + "params": { + "boundaries": [". ", "! ", "? ", "\n"], + "keep_separators": true + } + }, + + { + "operator": "postprocess", + "merge": { + "target_size": 500, + "strategy": "greedy" + }, + "overlap": { + "enabled": true, + "unit": "char", + "conditions": [ + { + "name": "Contains media URL", + "if": "has_media_url = true", + "then": {"size": 0} + }, + { + "name": "Contains image URL", + "if": "has_image_url = true", + "then": {"size": 0} + }, + { + "name": "Contains video URL", + "if": "has_video_url = true", + "then": {"size": 0} + }, + { + "name": "Normal English long sentence", + "if": "language = 'en' AND length > 50 AND has_media_url = false", + "then": {"size": 1, "unit": "sentence"} + }, + { + "name": "Normal English short sentence", + "if": "language = 'en' AND length <= 50 AND has_media_url = false", + "then": {"size": 30} + } + ], + "default": {"size": 50} + }, + "filter": { + "min_length": 10, + "max_length": 1200 + }, + "add_metadata": { + "include_index": true, + "custom_fields": { + "has_media_url": "auto_detect" + } + } + } + ] +}` + +var minimalDSL = `{ + "pipeline": [ + {"operator": "preprocess", "normalize_newlines": true}, + {"operator": "split", "strategy": "sentence", "params": {"boundaries": ["\n"], "keep_separators": false}}, + {"operator": "postprocess", "filter": {"min_length": 1}} + ] +}` + +// --------------------------------------------------------------------------- +// Plan success tests +// --------------------------------------------------------------------------- + +func TestPlan_FullDSL(t *testing.T) { + engine := NewChunkEngine() + plan, err := engine.Compile(mediaAwareDSL) + if err != nil { + t.Fatalf("Compile(mediaAwareDSL) unexpected error: %v", err) + } + if plan == nil { + t.Fatal("Plan returned nil") + } + + // Must have exactly 3 operators + if len(plan.Operators) != 3 { + t.Fatalf("expected 3 operators, got %d", len(plan.Operators)) + } + + // Verify operator types in order + expectedTypes := []string{ + "*chunk.PreprocessOperator", + "*chunk.SplitOperator", + "*chunk.PostprocessOperator", + } + for i, op := range plan.Operators { + typ := fmt.Sprintf("%T", op) + if typ != expectedTypes[i] { + t.Errorf("operator[%d]: expected %s, got %s", i, expectedTypes[i], typ) + } + } + + // Verify operators implement Operator interface + for i, op := range plan.Operators { + var iface chunk.Operator = op + _ = iface // compile-time check + if op == nil { + t.Errorf("operator[%d] is nil", i) + } + } +} + +func TestPlan_MinimalDSL(t *testing.T) { + engine := NewChunkEngine() + plan, err := engine.Compile(minimalDSL) + if err != nil { + t.Fatalf("Compile(minimalDSL) unexpected error: %v", err) + } + if plan == nil { + t.Fatal("Plan returned nil") + } + if len(plan.Operators) != 3 { + t.Fatalf("expected 3 operators, got %d", len(plan.Operators)) + } +} + +// --------------------------------------------------------------------------- +// Plan error tests +// --------------------------------------------------------------------------- + +func TestPlan_InvalidJSON(t *testing.T) { + engine := NewChunkEngine() + invalid := `{bad json}` + plan, err := engine.Compile(invalid) + if err == nil { + t.Fatal("expected error for invalid JSON, got nil") + } + if plan != nil { + t.Fatal("expected nil plan on error") + } +} + +func TestPlan_UnknownOperator(t *testing.T) { + engine := NewChunkEngine() + dsl := `{"pipeline": [{"operator": "unknown_operator"}]}` + plan, err := engine.Compile(dsl) + if err == nil { + t.Fatal("expected error for unknown operator, got nil") + } + if !strings.Contains(err.Error(), "unknown_operator") { + t.Errorf("error should mention unknown operator, got: %v", err) + } + if plan != nil { + t.Fatal("expected nil plan on error") + } +} + +func TestPlan_EmptyPipeline(t *testing.T) { + engine := NewChunkEngine() + dsl := `{"pipeline": []}` + plan, err := engine.Compile(dsl) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if plan == nil { + t.Fatal("plan should not be nil") + } + if len(plan.Operators) != 0 { + t.Fatalf("expected 0 operators, got %d", len(plan.Operators)) + } +} + +func TestPlan_MissingPipeline(t *testing.T) { + engine := NewChunkEngine() + dsl := `{"version": "1.0"}` + plan, err := engine.Compile(dsl) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if plan == nil { + t.Fatal("plan should not be nil") + } + if len(plan.Operators) != 0 { + t.Fatalf("expected 0 operators, got %d", len(plan.Operators)) + } +} + +// --------------------------------------------------------------------------- +// Plan + Execute integration test +// --------------------------------------------------------------------------- + +func TestPlan_Execute_FullPipeline(t *testing.T) { + engine := NewChunkEngine() + plan, err := engine.Compile(mediaAwareDSL) + if err != nil { + t.Fatalf("Plan error: %v", err) + } + + inputText := `这是第一句话。这是第二句话!这是第三句话?\n这是第四句话。` + ctx, err := engine.Execute(plan, inputText) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + if ctx == nil { + t.Fatal("Execute returned nil context") + } + if len(ctx.ResultChunks) == 0 { + t.Fatal("expected at least one chunk after execution") + } + for i, c := range ctx.ResultChunks { + if c.Content == "" { + t.Errorf("chunk[%d] has empty content", i) + } + println(c.Content) + if c.Metadata == nil { + t.Errorf("chunk[%d] has nil metadata", i) + } + } +} + +func TestPlan_Execute_MinimalPipeline(t *testing.T) { + engine := NewChunkEngine() + plan, err := engine.Compile(minimalDSL) + if err != nil { + t.Fatalf("Plan error: %v", err) + } + + inputText := "Hello world.\nGoodbye world." + ctx, err := engine.Execute(plan, inputText) + if err != nil { + t.Fatalf("Execute error: %v", err) + } + if ctx == nil { + t.Fatal("Execute returned nil context") + } + if len(ctx.ResultChunks) == 0 { + t.Fatal("expected at least one chunk after execution") + } +} + +// --------------------------------------------------------------------------- +// Edge cases +// --------------------------------------------------------------------------- + +func TestPlan_Explain(t *testing.T) { + engine := NewChunkEngine() + plan, err := engine.Compile(mediaAwareDSL) + if err != nil { + t.Fatalf("Plan error: %v", err) + } + + _, err = engine.Explain(plan) + if err != nil { + t.Fatalf("Explain error: %v", err) + } + //fmt.Println(explanation) +} + +func TestPlan_ReuseEngine(t *testing.T) { + engine := NewChunkEngine() + + // First plan + plan1, err := engine.Compile(mediaAwareDSL) + if err != nil { + t.Fatalf("first Plan error: %v", err) + } + + // Second plan from the same engine + plan2, err := engine.Compile(minimalDSL) + if err != nil { + t.Fatalf("second Plan error: %v", err) + } + + if len(plan1.Operators) != len(plan2.Operators) { + t.Errorf("plan1 has %d operators, plan2 has %d", len(plan1.Operators), len(plan2.Operators)) + } +} + +// Benchmark +func BenchmarkPlan_FullDSL(b *testing.B) { + engine := NewChunkEngine() + dsl := mediaAwareDSL + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := engine.Compile(dsl) + if err != nil { + b.Fatalf("Plan error: %v", err) + } + } +} + +func BenchmarkPlan_Execute_FullDSL(b *testing.B) { + engine := NewChunkEngine() + dsl := mediaAwareDSL + plan, err := engine.Compile(dsl) + if err != nil { + b.Fatalf("Plan error: %v", err) + } + inputText := strings.Repeat("这是第一句话。这是第二句话!这是第三句话?\n", 100) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := engine.Execute(plan, inputText) + if err != nil { + b.Fatalf("Execute error: %v", err) + } + } +} diff --git a/internal/ingestion/ingestion_service.go b/internal/ingestion/ingestion_service.go index 1947332cd0..f0beb1620c 100644 --- a/internal/ingestion/ingestion_service.go +++ b/internal/ingestion/ingestion_service.go @@ -1,3 +1,19 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + package ingestion import (