Go: parse ingestion DSL (#15938)

PR #15938

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-06-12 17:58:36 +08:00
committed by GitHub
parent 89aac82663
commit 115b730d07
11 changed files with 2103 additions and 7 deletions

View File

@@ -1209,6 +1209,12 @@ func (c *CLI) handleMetaCommand(cmd *Command) error {
c.running = false
case "?", "h", "help":
c.printHelp()
case "pwd":
dir, err := os.Getwd()
if err != nil {
return fmt.Errorf("get working directory: %w", err)
}
fmt.Println(dir)
default:
return fmt.Errorf("unknown meta command: \\%s", command)
}

View File

@@ -529,6 +529,33 @@ func (r *TaskResponse) PrintOut() {
}
}
type ExplainResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Duration float64
OutputFormat OutputFormat
}
func (r *ExplainResponse) Type() string {
return "explain"
}
func (r *ExplainResponse) TimeCost() float64 {
return r.Duration
}
func (r *ExplainResponse) SetOutputFormat(format OutputFormat) {
r.OutputFormat = format
}
func (r *ExplainResponse) PrintOut() {
if r.Code == 0 {
fmt.Printf("\n%s\n", r.Message)
} else {
fmt.Printf("ERROR %d\n", r.Code)
}
}
// FileSystemResponse wraps the raw text output from executeFilesystem().
type FileSystemResponse struct {
Output string

View File

@@ -28,6 +28,7 @@ import (
"os"
"os/exec"
"path/filepath"
"ragflow/internal/ingestion"
"strings"
"time"
)
@@ -3445,27 +3446,60 @@ func (c *CLI) ChunkCommand(cmd *Command) (ResponseIf, error) {
return nil, fmt.Errorf("this command is only allowed in USER mode")
}
var result ExplainResponse
start := time.Now()
filename, ok := cmd.Params["filename"].(string)
if !ok {
return nil, fmt.Errorf("filename not provided")
}
dsl, ok := cmd.Params["dsl"].(string)
dslFilename, ok := cmd.Params["dsl"].(string)
if !ok {
return nil, fmt.Errorf("dsl not provided")
}
dsl, err := os.ReadFile(dslFilename)
if err != nil {
return nil, fmt.Errorf("failed to read dsl file: %w", err)
}
explain, ok := cmd.Params["explain"].(bool)
if !ok {
explain = false
}
if explain {
fmt.Printf("Explain chunk file: %s, DSL: %s\n", filename, dsl)
} else {
// TODO: not implemented
fmt.Printf("Chunk file: %s, DSL: %s\n", filename, dsl)
engine := ingestion.NewChunkEngine()
plan, err := engine.Compile(string(dsl))
if err != nil {
return nil, fmt.Errorf("compile failed: %w", err)
}
var result SimpleResponse
if explain {
explanation, err := engine.Explain(plan)
if err != nil {
return nil, fmt.Errorf("explain error: %w", err)
}
result.Message = explanation
} else {
fileToChunking, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
chunkContext, err := engine.Execute(plan, string(fileToChunking))
if err != nil {
return nil, fmt.Errorf("chunking error: %w", err)
}
for _, resultChunk := range chunkContext.ResultChunks {
fmt.Printf("Chunk index: %d\n", resultChunk.Index)
fmt.Printf("Chunk size: %d\n", resultChunk.Size)
fmt.Printf("Chunk content: \n%s\n", resultChunk.Content)
}
}
result.Duration = time.Since(start).Seconds()
result.Code = 0
result.Message = fmt.Sprintf("Success to chunk %s", filename)
return &result, nil

View File

@@ -0,0 +1,649 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package chunk
import (
"fmt"
"math"
"regexp"
"strconv"
"unicode"
)
// ---------------------------------------------------------------------------
// Token types
// ---------------------------------------------------------------------------
type tokenType int
const (
tokenEOF tokenType = iota
tokenIdentifier
tokenString
tokenNumber
tokenTrue
tokenFalse
tokenEq
tokenNeq
tokenGt
tokenLt
tokenGte
tokenLte
tokenAnd
tokenOr
tokenNot
tokenLParen
tokenRParen
)
var keywords = map[string]tokenType{
"AND": tokenAnd,
"OR": tokenOr,
"NOT": tokenNot,
"true": tokenTrue,
"false": tokenFalse,
"TRUE": tokenTrue,
"FALSE": tokenFalse,
}
type token struct {
typ tokenType
raw string
}
// ---------------------------------------------------------------------------
// Lexer
// ---------------------------------------------------------------------------
type lexer struct {
input []rune
pos int
}
func newLexer(input string) *lexer {
return &lexer{input: []rune(input)}
}
func (l *lexer) skipWhitespace() {
for l.pos < len(l.input) && unicode.IsSpace(l.input[l.pos]) {
l.pos++
}
}
func (l *lexer) next() token {
l.skipWhitespace()
if l.pos >= len(l.input) {
return token{typ: tokenEOF, raw: ""}
}
ch := l.input[l.pos]
// Single-quoted string
if ch == '\'' {
l.pos++ // skip opening '
start := l.pos
for l.pos < len(l.input) && l.input[l.pos] != '\'' {
l.pos++
}
raw := string(l.input[start:l.pos])
if l.pos < len(l.input) {
l.pos++ // skip closing '
}
return token{typ: tokenString, raw: raw}
}
// Operators
if l.pos+1 < len(l.input) {
next := l.input[l.pos+1]
switch string([]rune{ch, next}) {
case ">=":
l.pos += 2
return token{typ: tokenGte, raw: ">="}
case "<=":
l.pos += 2
return token{typ: tokenLte, raw: "<="}
case "!=":
l.pos += 2
return token{typ: tokenNeq, raw: "!="}
}
}
switch ch {
case '=':
l.pos++
return token{typ: tokenEq, raw: "="}
case '>':
l.pos++
return token{typ: tokenGt, raw: ">"}
case '<':
l.pos++
return token{typ: tokenLt, raw: "<"}
case '(':
l.pos++
return token{typ: tokenLParen, raw: "("}
case ')':
l.pos++
return token{typ: tokenRParen, raw: ")"}
}
// Number
if unicode.IsDigit(ch) || (ch == '-' && l.pos+1 < len(l.input) && unicode.IsDigit(l.input[l.pos+1])) {
start := l.pos
if l.input[l.pos] == '-' {
l.pos++
}
for l.pos < len(l.input) && (unicode.IsDigit(l.input[l.pos]) || l.input[l.pos] == '.') {
l.pos++
}
return token{typ: tokenNumber, raw: string(l.input[start:l.pos])}
}
// Identifier / keyword
if unicode.IsLetter(ch) || ch == '_' {
start := l.pos
for l.pos < len(l.input) && (unicode.IsLetter(l.input[l.pos]) || unicode.IsDigit(l.input[l.pos]) || l.input[l.pos] == '_') {
l.pos++
}
raw := string(l.input[start:l.pos])
if kw, ok := keywords[raw]; ok {
return token{typ: kw, raw: raw}
}
return token{typ: tokenIdentifier, raw: raw}
}
// Unknown
l.pos++
return token{typ: tokenIdentifier, raw: string(ch)}
}
func (l *lexer) peek() token {
pos := l.pos
tok := l.next()
l.pos = pos
return tok
}
// ---------------------------------------------------------------------------
// AST nodes
// ---------------------------------------------------------------------------
type Expr interface {
String() string
}
type binaryExpr struct {
left Expr
op tokenType
right Expr
}
func (e binaryExpr) String() string {
ops := map[tokenType]string{
tokenEq: "=",
tokenNeq: "!=",
tokenGt: ">",
tokenLt: "<",
tokenGte: ">=",
tokenLte: "<=",
tokenAnd: "AND",
tokenOr: "OR",
}
return fmt.Sprintf("(%s %s %s)", e.left, ops[e.op], e.right)
}
type unaryExpr struct {
op tokenType
right Expr
}
func (e unaryExpr) String() string {
return fmt.Sprintf("(NOT %s)", e.right)
}
type identifierExpr struct {
name string
}
func (e identifierExpr) String() string {
return e.name
}
type stringExpr struct {
value string
}
func (e stringExpr) String() string {
return "'" + e.value + "'"
}
type numberExpr struct {
value float64
}
func (e numberExpr) String() string {
return strconv.FormatFloat(e.value, 'f', -1, 64)
}
type boolExpr struct {
value bool
}
func (e boolExpr) String() string {
return strconv.FormatBool(e.value)
}
// ---------------------------------------------------------------------------
// Recursive-descent parser
// ---------------------------------------------------------------------------
type parser struct {
lex *lexer
cur token
peeked bool
}
func newParser(input string) *parser {
p := &parser{lex: newLexer(input)}
p.advance()
return p
}
func (p *parser) advance() {
if p.peeked {
p.peeked = false
return
}
p.cur = p.lex.next()
}
func (p *parser) peek() token {
if !p.peeked {
p.peeked = true
p.cur = p.lex.next()
}
return p.cur
}
func (p *parser) expect(typ tokenType) token {
tok := p.cur
if tok.typ != typ {
panic(fmt.Sprintf("expected token %d but got %d (%q)", typ, tok.typ, tok.raw))
}
p.advance()
return tok
}
func (p *parser) parse() Expr {
return p.parseOr()
}
// or_expr → and_expr ("OR" and_expr)*
func (p *parser) parseOr() Expr {
e := p.parseAnd()
for p.cur.typ == tokenOr {
op := p.cur.typ
p.advance()
right := p.parseAnd()
e = binaryExpr{left: e, op: op, right: right}
}
return e
}
// and_expr → not_expr ("AND" not_expr)*
func (p *parser) parseAnd() Expr {
e := p.parseNot()
for p.cur.typ == tokenAnd {
op := p.cur.typ
p.advance()
right := p.parseNot()
e = binaryExpr{left: e, op: op, right: right}
}
return e
}
// not_expr → "NOT" not_expr | primary
func (p *parser) parseNot() Expr {
if p.cur.typ == tokenNot {
op := p.cur.typ
p.advance()
right := p.parseNot()
return unaryExpr{op: op, right: right}
}
return p.parsePrimary()
}
// primary → comparison | "(" expression ")"
func (p *parser) parsePrimary() Expr {
if p.cur.typ == tokenLParen {
p.advance()
e := p.parseOr()
p.expect(tokenRParen)
return e
}
return p.parseComparison()
}
// comparison → IDENTIFIER OP value | value
// comparison → IDENTIFIER OP value
func (p *parser) parseComparison() Expr {
if p.cur.typ == tokenIdentifier {
id := p.cur.raw
p.advance()
switch p.cur.typ {
case tokenEq, tokenNeq, tokenGt, tokenLt, tokenGte, tokenLte:
op := p.cur.typ
p.advance()
right := p.parseValue()
return binaryExpr{left: identifierExpr{name: id}, op: op, right: right}
default:
// identifier alone treat as boolean check
return binaryExpr{
left: identifierExpr{name: id},
op: tokenEq,
right: boolExpr{value: true},
}
}
}
return p.parseValue()
}
// value → STRING | NUMBER | BOOLEAN
func (p *parser) parseValue() Expr {
switch p.cur.typ {
case tokenString:
v := stringExpr{value: p.cur.raw}
p.advance()
return v
case tokenNumber:
f, _ := strconv.ParseFloat(p.cur.raw, 64)
p.advance()
return numberExpr{value: f}
case tokenTrue:
p.advance()
return boolExpr{value: true}
case tokenFalse:
p.advance()
return boolExpr{value: false}
default:
// treat as identifier (e.g. bare variable reference)
id := identifierExpr{name: p.cur.raw}
p.advance()
return id
}
}
// ---------------------------------------------------------------------------
// Evaluator
// ---------------------------------------------------------------------------
var reMediaURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(jpg|jpeg|png|gif|bmp|webp|svg|mp4|avi|mov|wmv|flv|mkv|m4v|mp3|wav|ogg|aac)`)
var reImageURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(jpg|jpeg|png|gif|bmp|webp|svg)`)
var reVideoURL = regexp.MustCompile(`(?i)https?://[^\s]*\.(mp4|avi|mov|wmv|flv|mkv|m4v)`)
var reAnyURL = regexp.MustCompile(`(?i)https?://[^\s]+`)
// buildExprContext builds a variable context from a chunk's content and metadata.
// It auto-detects media/image/video URLs and language hints.
func buildExprContext(chunk ContentProvider, metadata map[string]interface{}) map[string]interface{} {
vars := make(map[string]interface{})
content := chunk.GetContent()
// Pre-populate from metadata
for k, v := range metadata {
vars[k] = v
}
// Auto-detect URL presence
vars["has_media_url"] = reMediaURL.MatchString(content)
vars["has_image_url"] = reImageURL.MatchString(content)
vars["has_video_url"] = reVideoURL.MatchString(content)
vars["has_url"] = reAnyURL.MatchString(content)
vars["length"] = len([]rune(content))
return vars
}
// ContentProvider allows evaluating expressions against any type that has content.
type ContentProvider interface {
GetContent() string
}
// Evaluate parses and evaluates a boolean expression against a variable map.
func Evaluate(exprStr string, vars map[string]interface{}) (bool, error) {
p := newParser(exprStr)
ast := p.parse()
res, err := eval(ast, vars)
if err != nil {
return false, fmt.Errorf("evaluate %q: %w", exprStr, err)
}
b, ok := toBool(res)
if !ok {
return false, fmt.Errorf("evaluate %q: result %v (%T) is not a boolean", exprStr, res, res)
}
return b, nil
}
// CompileExpression parses an expression string into a reusable AST.
func CompileExpression(exprStr string) (Expr, error) {
defer func() {
if r := recover(); r != nil {
panic(fmt.Sprintf("compile expression %q: %v", exprStr, r))
}
}()
p := newParser(exprStr)
return p.parse(), nil
}
// EvalCompiled evaluates a pre-compiled expression AST against variables.
func EvalCompiled(ast interface{}, vars map[string]interface{}) (bool, error) {
e, ok := ast.(Expr)
if !ok {
return false, fmt.Errorf("invalid AST type: %T", ast)
}
res, err := eval(e, vars)
if err != nil {
return false, err
}
b, ok := toBool(res)
if !ok {
return false, fmt.Errorf("result %v (%T) is not boolean", res, res)
}
return b, nil
}
func eval(e Expr, vars map[string]interface{}) (interface{}, error) {
switch n := e.(type) {
case binaryExpr:
return evalBinary(n, vars)
case unaryExpr:
return evalUnary(n, vars)
case identifierExpr:
v, ok := vars[n.name]
if !ok {
return nil, fmt.Errorf("undefined variable: %s", n.name)
}
return v, nil
case stringExpr:
return n.value, nil
case numberExpr:
return n.value, nil
case boolExpr:
return n.value, nil
default:
return nil, fmt.Errorf("unknown expression type: %T", e)
}
}
func evalBinary(e binaryExpr, vars map[string]interface{}) (interface{}, error) {
left, err := eval(e.left, vars)
if err != nil {
return nil, err
}
right, err := eval(e.right, vars)
if err != nil {
return nil, err
}
switch e.op {
case tokenAnd:
l, ok := toBool(left)
if !ok {
return false, fmt.Errorf("AND requires boolean left operand")
}
if !l {
return false, nil
}
r, ok := toBool(right)
if !ok {
return false, fmt.Errorf("AND requires boolean right operand")
}
return r, nil
case tokenOr:
l, ok := toBool(left)
if !ok {
return false, fmt.Errorf("OR requires boolean left operand")
}
if l {
return true, nil
}
r, ok := toBool(right)
if !ok {
return false, fmt.Errorf("OR requires boolean right operand")
}
return r, nil
case tokenEq:
return compareEq(left, right), nil
case tokenNeq:
return !compareEq(left, right), nil
case tokenGt, tokenLt, tokenGte, tokenLte:
return compareOrder(left, right, e.op)
default:
return false, fmt.Errorf("unknown binary op %d", e.op)
}
}
func evalUnary(e unaryExpr, vars map[string]interface{}) (interface{}, error) {
right, err := eval(e.right, vars)
if err != nil {
return nil, err
}
b, ok := toBool(right)
if !ok {
return false, fmt.Errorf("NOT requires boolean operand")
}
return !b, nil
}
func toBool(v interface{}) (bool, bool) {
switch vv := v.(type) {
case bool:
return vv, true
case string:
return vv == "true" || vv == "TRUE" || vv == "1", true
case float64:
return vv != 0, true
case int:
return vv != 0, true
}
return false, false
}
func compareEq(a, b interface{}) bool {
// Normalise numeric types
af, aIsNum := toFloat(a)
bf, bIsNum := toFloat(b)
if aIsNum && bIsNum {
return af == bf
}
// Fall back to string comparison
return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b)
}
func toFloat(v interface{}) (float64, bool) {
switch vv := v.(type) {
case float64:
return vv, true
case int:
return float64(vv), true
case string:
f, err := strconv.ParseFloat(vv, 64)
return f, err == nil
}
return 0, false
}
func compareOrder(a, b interface{}, op tokenType) (bool, error) {
af, aOK := toFloat(a)
bf, bOK := toFloat(b)
if aOK && bOK {
switch op {
case tokenGt:
return af > bf, nil
case tokenLt:
return af < bf, nil
case tokenGte:
return af >= bf, nil
case tokenLte:
return af <= bf, nil
}
}
// String fallback
sa := fmt.Sprintf("%v", a)
sb := fmt.Sprintf("%v", b)
switch op {
case tokenGt:
return sa > sb, nil
case tokenLt:
return sa < sb, nil
case tokenGte:
return sa >= sb, nil
case tokenLte:
return sa <= sb, nil
}
return false, fmt.Errorf("unsupported comparison op %d between %T and %T", op, a, b)
}
// ---------------------------------------------------------------------------
// Language heuristics
// ---------------------------------------------------------------------------
// DetectLanguage returns a best-effort language code ('zh', 'en', etc.)
// based on the proportion of CJK characters.
func DetectLanguage(text string) string {
cjk := 0
total := 0
for _, r := range text {
if unicode.Is(unicode.Han, r) {
cjk++
}
if unicode.IsLetter(r) {
total++
}
}
if total > 0 && float64(cjk)/float64(total) > 0.3 {
return "zh"
}
return "en"
}
// RuneCount returns the number of runes in text.
func RuneCount(text string) int {
return len([]rune(text))
}
// Ensure math is used (for NaN etc.)
var _ = math.NaN

View File

@@ -0,0 +1,491 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package chunk
import (
"fmt"
"strings"
)
// ---------------------------------------------------------------------------
// Overlap condition
// ---------------------------------------------------------------------------
type overlapConfig struct {
Size int `json:"size"`
Unit string `json:"unit,omitempty"` // "char" (default) or "sentence"
}
type overlapCondition struct {
Name string
Condition Expr // pre-compiled expression AST from CompileExpression
OverlapConfig overlapConfig
}
type mergeConfig struct {
TargetSize int `json:"target_size"`
Strategy string `json:"strategy"` // "greedy"
}
type filterConfig struct {
MinLength int `json:"min_length"`
MaxLength int `json:"max_length"`
}
type metadataConfig struct {
IncludeIndex bool `json:"include_index"`
CustomFields map[string]string `json:"custom_fields,omitempty"`
}
// ---------------------------------------------------------------------------
// PostprocessOperator
// ---------------------------------------------------------------------------
type PostprocessOperator struct {
merge *mergeConfig
overlap struct {
unit string // "char" (default) or "sentence"
conditions []overlapCondition
defaultCfg overlapConfig
}
filter *filterConfig
addMetadata *metadataConfig
}
func NewPostprocessOperator(config map[string]interface{}) (*PostprocessOperator, error) {
op := &PostprocessOperator{}
// Merge
if m, ok := config["merge"].(map[string]interface{}); ok {
op.merge = &mergeConfig{}
if ts, ok := m["target_size"].(float64); ok {
op.merge.TargetSize = int(ts)
} else {
op.merge.TargetSize = 500
}
if s, ok := m["strategy"].(string); ok {
op.merge.Strategy = s
} else {
op.merge.Strategy = "greedy"
}
}
// Overlap
if ov, ok := config["overlap"].(map[string]interface{}); ok {
if u, ok := ov["unit"].(string); ok {
op.overlap.unit = u
} else {
op.overlap.unit = "char"
}
// Default
if d, ok := ov["default"].(map[string]interface{}); ok {
op.overlap.defaultCfg = parseOverlapConfig(d)
}
// Conditions
if conds, ok := ov["conditions"].([]interface{}); ok {
for _, ci := range conds {
c, ok := ci.(map[string]interface{})
if !ok {
continue
}
cond := overlapCondition{}
if n, ok := c["name"].(string); ok {
cond.Name = n
}
if exprStr, ok := c["if"].(string); ok {
expression, err := CompileExpression(exprStr)
if err == nil {
cond.Condition = expression
}
}
if thenMap, ok := c["then"].(map[string]interface{}); ok {
cond.OverlapConfig = parseOverlapConfig(thenMap)
}
op.overlap.conditions = append(op.overlap.conditions, cond)
}
}
}
// Filter
if f, ok := config["filter"].(map[string]interface{}); ok {
op.filter = &filterConfig{}
if v, ok := f["min_length"].(float64); ok {
op.filter.MinLength = int(v)
}
if v, ok := f["max_length"].(float64); ok {
op.filter.MaxLength = int(v)
}
}
// Add metadata
if am, ok := config["add_metadata"].(map[string]interface{}); ok {
op.addMetadata = &metadataConfig{}
if inc, ok := am["include_index"].(bool); ok {
op.addMetadata.IncludeIndex = inc
}
if cf, ok := am["custom_fields"].(map[string]interface{}); ok {
m := make(map[string]string, len(cf))
for k, v := range cf {
m[k] = fmt.Sprintf("%v", v)
}
op.addMetadata.CustomFields = m
}
}
return op, nil
}
func (o *PostprocessOperator) Prepare(chunkCtx *ChunkContext) error {
return nil
}
func (o *PostprocessOperator) Execute(chunkCtx *ChunkContext) error {
chunks := chunkCtx.SplitChunks
if len(chunks) == 0 {
return nil
}
// 1. Merge
if o.merge != nil {
chunks = o.mergeChunks(chunks)
}
// 2. Overlap
chunks = o.applyOverlap(chunks)
// 3. Filter
if o.filter != nil {
chunks = o.filterChunks(chunks)
}
// 4. Add metadata
if o.addMetadata != nil {
chunks = o.addChunkMetadata(chunks)
}
// Re-index
for i := range chunks {
chunks[i].Index = i
chunks[i].Size = len(chunks[i].GetContent())
}
chunkCtx.ResultChunks = chunks
return nil
}
func (o *PostprocessOperator) Finish(chunkCtx *ChunkContext) error {
return nil
}
func (o *PostprocessOperator) String() string {
var buf strings.Builder
buf.WriteString("postprocess:\n")
if o.merge != nil {
fmt.Fprintf(&buf, " merge:\n")
fmt.Fprintf(&buf, " target_size: %d\n", o.merge.TargetSize)
fmt.Fprintf(&buf, " strategy: %q\n", o.merge.Strategy)
}
fmt.Fprintf(&buf, " overlap:\n")
fmt.Fprintf(&buf, " unit: %q\n", o.overlap.unit)
fmt.Fprintf(&buf, " default:\n")
fmt.Fprintf(&buf, " size: %d\n", o.overlap.defaultCfg.Size)
if o.overlap.defaultCfg.Unit != "" {
fmt.Fprintf(&buf, " unit: %q\n", o.overlap.defaultCfg.Unit)
}
if len(o.overlap.conditions) > 0 {
fmt.Fprintf(&buf, " conditions:\n")
for _, c := range o.overlap.conditions {
fmt.Fprintf(&buf, " - name: %q\n", c.Name)
fmt.Fprintf(&buf, " condition: %q\n", c.Condition.String())
fmt.Fprintf(&buf, " then:\n")
fmt.Fprintf(&buf, " size: %d\n", c.OverlapConfig.Size)
if c.OverlapConfig.Unit != "" {
fmt.Fprintf(&buf, " unit: %q\n", c.OverlapConfig.Unit)
}
}
}
if o.filter != nil {
fmt.Fprintf(&buf, " filter:\n")
fmt.Fprintf(&buf, " min_length: %d\n", o.filter.MinLength)
fmt.Fprintf(&buf, " max_length: %d\n", o.filter.MaxLength)
}
if o.addMetadata != nil {
fmt.Fprintf(&buf, " add_metadata:\n")
fmt.Fprintf(&buf, " include_index: %t\n", o.addMetadata.IncludeIndex)
if len(o.addMetadata.CustomFields) > 0 {
fmt.Fprintf(&buf, " custom_fields:\n")
for k, v := range o.addMetadata.CustomFields {
fmt.Fprintf(&buf, " %s: %q\n", k, v)
}
}
}
return buf.String()
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func parseOverlapConfig(m map[string]interface{}) overlapConfig {
cfg := overlapConfig{}
if size, ok := m["size"].(float64); ok {
cfg.Size = int(size)
}
if u, ok := m["unit"].(string); ok {
cfg.Unit = u
}
return cfg
}
// mergeChunks greedily merges small chunks into larger ones up to target_size.
func (o *PostprocessOperator) mergeChunks(chunks []ChunkData) []ChunkData {
target := o.merge.TargetSize
if target <= 0 {
target = 500
}
var merged []ChunkData
var buf strings.Builder
var bufMeta map[string]interface{}
firstIndex := 0
for i, c := range chunks {
// If this single chunk already exceeds target, flush first then add
if len([]rune(c.Content)) >= target {
if buf.Len() > 0 {
merged = append(merged, ChunkData{
Content: buf.String(),
Index: firstIndex,
Metadata: bufMeta,
})
buf.Reset()
bufMeta = nil
}
merged = append(merged, c)
firstIndex = i + 1
continue
}
if buf.Len() == 0 {
buf.WriteString(c.Content)
bufMeta = c.Metadata
firstIndex = c.Index
} else {
nextLen := len([]rune(c.Content))
// If adding this chunk would exceed target, flush current and start new
if buf.Len()+nextLen+1 > target {
merged = append(merged, ChunkData{
Content: buf.String(),
Index: firstIndex,
Metadata: bufMeta,
})
buf.Reset()
buf.WriteString(c.Content)
bufMeta = c.Metadata
firstIndex = c.Index
} else {
buf.WriteString(" ")
buf.WriteString(c.Content)
// Merge metadata (last wins for overlapping keys)
if c.Metadata != nil && bufMeta == nil {
bufMeta = make(map[string]interface{})
}
for k, v := range c.Metadata {
bufMeta[k] = v
}
}
}
}
// Flush remaining
if buf.Len() > 0 {
merged = append(merged, ChunkData{
Content: buf.String(),
Index: firstIndex,
Metadata: bufMeta,
})
}
return merged
}
// applyOverlap evaluates conditions on each chunk and prepends overlap from the previous chunk.
func (o *PostprocessOperator) applyOverlap(chunks []ChunkData) []ChunkData {
if len(chunks) <= 1 {
return chunks
}
result := make([]ChunkData, len(chunks))
copy(result, chunks)
for i := 1; i < len(chunks); i++ {
// Determine overlap size for chunks[i]
cfg := o.resolveOverlapConfig(chunks[i])
overlapSize := cfg.Size
if overlapSize <= 0 {
continue
}
prevContent := result[i-1].Content
prevRunes := []rune(prevContent)
if len(prevRunes) == 0 {
continue
}
// Which unit?
unit := cfg.Unit
if unit == "" {
unit = o.overlap.unit
}
var overlapText string
switch unit {
case "sentence":
// Take last N sentences
sentences := splitSentencesGeneric(prevContent)
if len(sentences) < overlapSize {
overlapText = prevContent
} else {
overlapText = strings.Join(sentences[len(sentences)-overlapSize:], " ")
}
default: // "char"
if overlapSize >= len(prevRunes) {
overlapText = prevContent
} else {
overlapText = string(prevRunes[len(prevRunes)-overlapSize:])
}
}
result[i].Content = overlapText + result[i].Content
}
return result
}
// resolveOverlapConfig evaluates overlap conditions for a chunk.
func (o *PostprocessOperator) resolveOverlapConfig(chunk ChunkData) overlapConfig {
vars := buildExprContext(&chunk, chunk.Metadata)
for _, cond := range o.overlap.conditions {
if cond.Condition == nil {
continue
}
result, err := EvalCompiled(cond.Condition, vars)
if err != nil {
continue
}
if result {
cfg := cond.OverlapConfig
if cfg.Unit == "" {
cfg.Unit = o.overlap.unit
}
return cfg
}
}
cfg := o.overlap.defaultCfg
if cfg.Unit == "" {
cfg.Unit = o.overlap.unit
}
return cfg
}
// filterChunks removes chunks outside the length bounds.
func (o *PostprocessOperator) filterChunks(chunks []ChunkData) []ChunkData {
filtered := make([]ChunkData, 0, len(chunks))
for _, c := range chunks {
l := len([]rune(c.Content))
if o.filter.MinLength > 0 && l < o.filter.MinLength {
continue
}
if o.filter.MaxLength > 0 && l > o.filter.MaxLength {
continue
}
filtered = append(filtered, c)
}
return filtered
}
// addChunkMetadata enriches chunks with metadata.
func (o *PostprocessOperator) addChunkMetadata(chunks []ChunkData) []ChunkData {
result := make([]ChunkData, len(chunks))
for i, c := range chunks {
if c.Metadata == nil {
c.Metadata = make(map[string]interface{})
}
if o.addMetadata.IncludeIndex {
c.Metadata["index"] = i
}
for field, action := range o.addMetadata.CustomFields {
switch action {
case "auto_detect":
switch field {
case "has_media_url":
c.Metadata[field] = reMediaURL.MatchString(c.Content)
case "has_image_url":
c.Metadata[field] = reImageURL.MatchString(c.Content)
case "has_video_url":
c.Metadata[field] = reVideoURL.MatchString(c.Content)
case "language":
c.Metadata[field] = DetectLanguage(c.Content)
case "length":
c.Metadata[field] = RuneCount(c.Content)
default:
// Unknown auto-detect field — check for URLs generically
c.Metadata[field] = reAnyURL.MatchString(c.Content)
}
default:
c.Metadata[field] = action
}
}
result[i] = c
}
return result
}
// splitSentencesGeneric splits text into sentences using common punctuation.
var sentenceBoundaries = []rune{'。', '', '', '.', '!', '?'}
func splitSentencesGeneric(text string) []string {
runes := []rune(text)
boundSet := make(map[rune]bool)
for _, r := range sentenceBoundaries {
boundSet[r] = true
}
var sentences []string
var buf strings.Builder
for _, r := range runes {
buf.WriteRune(r)
if boundSet[r] {
sentences = append(sentences, strings.TrimSpace(buf.String()))
buf.Reset()
}
}
if buf.Len() > 0 {
sentences = append(sentences, strings.TrimSpace(buf.String()))
}
return sentences
}

View File

@@ -0,0 +1,133 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package chunk
import (
"fmt"
"regexp"
"strings"
)
type PreprocessOperator struct {
normalizeNewlines bool
stripWhitespace bool
removeEmptyLines bool
softLineBreakMerging bool
}
func NewPreprocessOperator(config map[string]interface{}) (*PreprocessOperator, error) {
operator := &PreprocessOperator{}
if v, ok := config["normalize_newlines"]; ok {
operator.normalizeNewlines, ok = v.(bool)
if !ok {
return nil, fmt.Errorf("preprocess: normalize_newlines must be bool")
}
}
if v, ok := config["strip_whitespace"]; ok {
operator.stripWhitespace, ok = v.(bool)
if !ok {
return nil, fmt.Errorf("preprocess: strip_whitespace must be bool")
}
}
if v, ok := config["remove_empty_lines"]; ok {
operator.removeEmptyLines, ok = v.(bool)
if !ok {
return nil, fmt.Errorf("preprocess: remove_empty_lines must be bool")
}
}
if v, ok := config["soft_line_break_merging"]; ok {
operator.softLineBreakMerging, ok = v.(bool)
if !ok {
return nil, fmt.Errorf("preprocess: soft_line_break_merging must be bool")
}
}
return operator, nil
}
func (o *PreprocessOperator) Prepare(chunkCtx *ChunkContext) error {
return nil
}
func (o *PreprocessOperator) Execute(chunkCtx *ChunkContext) error {
text := chunkCtx.Origin
if o.normalizeNewlines {
// \r\n → \n, \r → \n
text = strings.ReplaceAll(text, "\r\n", "\n")
text = strings.ReplaceAll(text, "\r", "\n")
// Collapse multiple \n into one
re := regexp.MustCompile(`\n{2,}`)
text = re.ReplaceAllString(text, "\n")
}
if o.stripWhitespace {
// Trim leading/trailing whitespace on each line
lines := strings.Split(text, "\n")
for i, line := range lines {
lines[i] = strings.TrimSpace(line)
}
text = strings.Join(lines, "\n")
}
if o.removeEmptyLines {
lines := strings.Split(text, "\n")
filtered := make([]string, 0, len(lines))
for _, line := range lines {
if strings.TrimSpace(line) != "" {
filtered = append(filtered, line)
}
}
text = strings.Join(filtered, "\n")
}
if o.softLineBreakMerging {
lines := strings.Split(text, "\n")
var merged []string
var current strings.Builder
sentenceEnd := regexp.MustCompile(`[.!?][\s]*$`)
for i, line := range lines {
if current.Len() > 0 {
current.WriteString(" ")
}
current.WriteString(line)
if i == len(lines)-1 || sentenceEnd.MatchString(line) {
merged = append(merged, current.String())
current.Reset()
}
}
text = strings.Join(merged, "\n")
}
chunkCtx.TextAfterPreprocess = text
return nil
}
func (o *PreprocessOperator) Finish(chunkCtx *ChunkContext) error {
return nil
}
func (o *PreprocessOperator) String() string {
var buf strings.Builder
buf.WriteString("preprocess:\n")
fmt.Fprintf(&buf, " normalize_newlines: %t\n", o.normalizeNewlines)
fmt.Fprintf(&buf, " strip_whitespace: %t\n", o.stripWhitespace)
fmt.Fprintf(&buf, " remove_empty_lines: %t\n", o.removeEmptyLines)
return buf.String()
}

View File

@@ -0,0 +1,186 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package chunk
import (
"fmt"
"strings"
"unicode/utf8"
)
type SplitOperator struct {
strategy string
boundaries []string
keepSeparators bool
}
func NewSplitOperator(config map[string]interface{}) (*SplitOperator, error) {
op := &SplitOperator{}
if v, ok := config["strategy"]; ok {
if s, ok := v.(string); ok {
op.strategy = s
}
}
if params, ok := config["params"].(map[string]interface{}); ok {
if b, ok := params["boundaries"]; ok {
if boundStrs, ok := b.([]interface{}); ok {
for _, bs := range boundStrs {
if s, ok := bs.(string); ok {
op.boundaries = append(op.boundaries, s)
}
}
}
}
if ks, ok := params["keep_separators"]; ok {
if b, ok := ks.(bool); ok {
op.keepSeparators = b
}
}
}
return op, nil
}
func (o *SplitOperator) Prepare(ctx *ChunkContext) error {
return nil
}
func (o *SplitOperator) Execute(ctx *ChunkContext) error {
text := ctx.TextAfterPreprocess
if o.strategy == "" {
o.strategy = "sentence"
}
switch o.strategy {
case "sentence":
ctx.SplitChunks = o.splitSentences(text)
case "char":
ctx.SplitChunks = o.splitByChar(text)
case "paragraph":
ctx.SplitChunks = o.splitByParagraph(text)
default:
ctx.SplitChunks = o.splitSentences(text)
}
return nil
}
func (o *SplitOperator) Finish(ctx *ChunkContext) error {
return nil
}
func (o *SplitOperator) String() string {
var buf strings.Builder
buf.WriteString("split:\n")
fmt.Fprintf(&buf, " strategy: %q\n", o.strategy)
fmt.Fprintf(&buf, " boundaries:\n")
for _, r := range o.boundaries {
fmt.Fprintf(&buf, " - %q\n", r)
}
fmt.Fprintf(&buf, " keep_separators: %t\n", o.keepSeparators)
return buf.String()
}
// splitSentences splits text at multi-rune boundaries, optionally keeping separators.
func (o *SplitOperator) splitSentences(text string) []ChunkData {
if len(o.boundaries) == 0 {
o.boundaries = []string{"。", "", "", "\n"}
}
var chunks []ChunkData
var buf strings.Builder
i := 0
for i < len(text) {
// Try to match any boundary at current position (first match wins)
matchedBound := ""
for _, bound := range o.boundaries {
if bound != "" && i+len(bound) <= len(text) && text[i:i+len(bound)] == bound {
matchedBound = bound
break
}
}
if matchedBound != "" {
if o.keepSeparators {
buf.WriteString(matchedBound)
}
if buf.Len() > 0 {
chunks = append(chunks, ChunkData{
Content: buf.String(),
Index: len(chunks),
Metadata: map[string]interface{}{
"language": DetectLanguage(buf.String()),
},
})
buf.Reset()
}
i += len(matchedBound)
} else {
r, size := utf8.DecodeRuneInString(text[i:])
buf.WriteRune(r)
i += size
}
}
// flush remaining text
if buf.Len() > 0 {
chunks = append(chunks, ChunkData{
Content: buf.String(),
Index: len(chunks),
Metadata: map[string]interface{}{
"language": DetectLanguage(buf.String()),
},
})
}
return chunks
}
func (o *SplitOperator) splitByChar(text string) []ChunkData {
var chunks []ChunkData
for len(text) > 0 {
r, size := utf8.DecodeRuneInString(text)
chunks = append(chunks, ChunkData{
Content: string(r),
Index: len(chunks),
Metadata: map[string]interface{}{"language": DetectLanguage(text)},
})
text = text[size:]
}
return chunks
}
func (o *SplitOperator) splitByParagraph(text string) []ChunkData {
paragraphs := strings.Split(text, "\n")
chunks := make([]ChunkData, 0, len(paragraphs))
for i, p := range paragraphs {
trimmed := strings.TrimSpace(p)
if trimmed == "" {
continue
}
chunks = append(chunks, ChunkData{
Content: trimmed,
Index: i,
Metadata: map[string]interface{}{"language": DetectLanguage(trimmed)},
})
}
return chunks
}

View File

@@ -0,0 +1,55 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package chunk
// Operator defines the interface for all chunking pipeline stages.
type Operator interface {
// Prepare configures the operator from a DSL stage config map.
Prepare(ctx *ChunkContext) error
// Execute runs the operator on the shared context.
Execute(ctx *ChunkContext) error
// Finish performs any cleanup.
Finish(ctx *ChunkContext) error
String() string
}
// ChunkData represents a single chunk produced by the pipeline.
type ChunkData struct {
Content string `json:"content"`
Size int `json:"size"`
Index int `json:"index,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (c *ChunkData) GetContent() string {
if c == nil {
return ""
}
return c.Content
}
// ChunkContext flows through the pipeline, carrying text and chunks.
type ChunkContext struct {
Origin string // raw text
TextAfterPreprocess string // text after preprocess operator
SplitChunks []ChunkData // chunks after split operator
ResultChunks []ChunkData // final or intermediate chunks
}

View File

@@ -0,0 +1,153 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package ingestion
import (
"encoding/json"
"fmt"
"strings"
"ragflow/internal/ingestion/chunk"
)
/*
DSL reference — see comment block above for the full JSON structure.
Pipeline stages:
1. "preprocess" → chunk.PreprocessOperator
2. "split" → chunk.SplitOperator
3. "postprocess" → chunk.PostprocessOperator
*/
// ChunkPlan holds the ordered pipeline operators.
type ChunkPlan struct {
Operators []chunk.Operator
Version string
Description string
Name string
}
// ChunkEngine parses DSL JSON into a plan and executes it.
type ChunkEngine struct{}
func NewChunkEngine() *ChunkEngine {
return &ChunkEngine{}
}
// ---------------------------------------------------------------------------
// Compile — compile DSL JSON into an ordered operator list
// ---------------------------------------------------------------------------
func (e *ChunkEngine) Compile(dsl string) (*ChunkPlan, error) {
var parsed map[string]interface{}
if err := json.Unmarshal([]byte(dsl), &parsed); err != nil {
return nil, fmt.Errorf("compile DSL: %w", err)
}
plan := &ChunkPlan{}
pipelineRaw, ok := parsed["pipeline"].([]interface{})
if !ok || len(pipelineRaw) == 0 {
return plan, nil
}
plan.Name, ok = parsed["name"].(string)
if !ok {
plan.Name = "No name"
}
plan.Description, ok = parsed["description"].(string)
if !ok {
plan.Description = "No description"
}
plan.Version, ok = parsed["version"].(string)
if !ok {
plan.Version = "1.0"
}
for i, operatorRaw := range pipelineRaw {
var operator map[string]interface{}
operator, ok = operatorRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("pipeline[%d]: expected object", i)
}
var op chunk.Operator
var err error
operatorName, _ := operator["operator"].(string)
switch operatorName {
case "preprocess":
op, err = chunk.NewPreprocessOperator(operator)
if err != nil {
return nil, fmt.Errorf("create preprocess operator[%d]: %w", i, err)
}
case "split":
op, err = chunk.NewSplitOperator(operator)
if err != nil {
return nil, fmt.Errorf("create split operator[%d]: %w", i, err)
}
case "postprocess":
op, err = chunk.NewPostprocessOperator(operator)
if err != nil {
return nil, fmt.Errorf("create postprocess operator[%d]: %w", i, err)
}
default:
return nil, fmt.Errorf("pipeline[%d]: unknown operator %s", i, operatorName)
}
delete(operator, "operator")
plan.Operators = append(plan.Operators, op)
}
return plan, nil
}
// ---------------------------------------------------------------------------
// Execute — run the pipeline operators on input text
// ---------------------------------------------------------------------------
func (e *ChunkEngine) Execute(plan *ChunkPlan, text string) (*chunk.ChunkContext, error) {
chunkContext := &chunk.ChunkContext{Origin: text}
for i, op := range plan.Operators {
if err := op.Prepare(chunkContext); err != nil {
return nil, fmt.Errorf("re-prepare operator[%d]: %w", i, err)
}
if err := op.Execute(chunkContext); err != nil {
return nil, fmt.Errorf("execute operator[%d]: %w", i, err)
}
if err := op.Finish(chunkContext); err != nil {
return nil, fmt.Errorf("finish operator[%d]: %w", i, err)
}
}
return chunkContext, nil
}
// ---------------------------------------------------------------------------
// Explain — describe the plan in human-readable form
// ---------------------------------------------------------------------------
func (e *ChunkEngine) Explain(plan *ChunkPlan) (string, error) {
var buf strings.Builder
buf.WriteString("Chunk Pipeline Plan:\n")
for i, op := range plan.Operators {
buf.WriteString(fmt.Sprintf(" [%d] %s\n", i, op.String()))
}
return buf.String(), nil
}

View File

@@ -0,0 +1,346 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package ingestion
import (
"fmt"
"strings"
"testing"
"ragflow/internal/ingestion/chunk"
)
// The full DSL example from chunk_engine.go L22-L95.
var mediaAwareDSL = `{
"version": "1.0",
"name": "media_aware_chunking",
"description": "Disable overlap when encountering image/video URLs",
"pipeline": [
{
"operator": "preprocess",
"normalize_newlines": true,
"strip_whitespace": true,
"remove_empty_lines": true
},
{
"operator": "split",
"strategy": "sentence",
"params": {
"boundaries": [". ", "! ", "? ", "\n"],
"keep_separators": true
}
},
{
"operator": "postprocess",
"merge": {
"target_size": 500,
"strategy": "greedy"
},
"overlap": {
"enabled": true,
"unit": "char",
"conditions": [
{
"name": "Contains media URL",
"if": "has_media_url = true",
"then": {"size": 0}
},
{
"name": "Contains image URL",
"if": "has_image_url = true",
"then": {"size": 0}
},
{
"name": "Contains video URL",
"if": "has_video_url = true",
"then": {"size": 0}
},
{
"name": "Normal English long sentence",
"if": "language = 'en' AND length > 50 AND has_media_url = false",
"then": {"size": 1, "unit": "sentence"}
},
{
"name": "Normal English short sentence",
"if": "language = 'en' AND length <= 50 AND has_media_url = false",
"then": {"size": 30}
}
],
"default": {"size": 50}
},
"filter": {
"min_length": 10,
"max_length": 1200
},
"add_metadata": {
"include_index": true,
"custom_fields": {
"has_media_url": "auto_detect"
}
}
}
]
}`
var minimalDSL = `{
"pipeline": [
{"operator": "preprocess", "normalize_newlines": true},
{"operator": "split", "strategy": "sentence", "params": {"boundaries": ["\n"], "keep_separators": false}},
{"operator": "postprocess", "filter": {"min_length": 1}}
]
}`
// ---------------------------------------------------------------------------
// Plan success tests
// ---------------------------------------------------------------------------
func TestPlan_FullDSL(t *testing.T) {
engine := NewChunkEngine()
plan, err := engine.Compile(mediaAwareDSL)
if err != nil {
t.Fatalf("Compile(mediaAwareDSL) unexpected error: %v", err)
}
if plan == nil {
t.Fatal("Plan returned nil")
}
// Must have exactly 3 operators
if len(plan.Operators) != 3 {
t.Fatalf("expected 3 operators, got %d", len(plan.Operators))
}
// Verify operator types in order
expectedTypes := []string{
"*chunk.PreprocessOperator",
"*chunk.SplitOperator",
"*chunk.PostprocessOperator",
}
for i, op := range plan.Operators {
typ := fmt.Sprintf("%T", op)
if typ != expectedTypes[i] {
t.Errorf("operator[%d]: expected %s, got %s", i, expectedTypes[i], typ)
}
}
// Verify operators implement Operator interface
for i, op := range plan.Operators {
var iface chunk.Operator = op
_ = iface // compile-time check
if op == nil {
t.Errorf("operator[%d] is nil", i)
}
}
}
func TestPlan_MinimalDSL(t *testing.T) {
engine := NewChunkEngine()
plan, err := engine.Compile(minimalDSL)
if err != nil {
t.Fatalf("Compile(minimalDSL) unexpected error: %v", err)
}
if plan == nil {
t.Fatal("Plan returned nil")
}
if len(plan.Operators) != 3 {
t.Fatalf("expected 3 operators, got %d", len(plan.Operators))
}
}
// ---------------------------------------------------------------------------
// Plan error tests
// ---------------------------------------------------------------------------
func TestPlan_InvalidJSON(t *testing.T) {
engine := NewChunkEngine()
invalid := `{bad json}`
plan, err := engine.Compile(invalid)
if err == nil {
t.Fatal("expected error for invalid JSON, got nil")
}
if plan != nil {
t.Fatal("expected nil plan on error")
}
}
func TestPlan_UnknownOperator(t *testing.T) {
engine := NewChunkEngine()
dsl := `{"pipeline": [{"operator": "unknown_operator"}]}`
plan, err := engine.Compile(dsl)
if err == nil {
t.Fatal("expected error for unknown operator, got nil")
}
if !strings.Contains(err.Error(), "unknown_operator") {
t.Errorf("error should mention unknown operator, got: %v", err)
}
if plan != nil {
t.Fatal("expected nil plan on error")
}
}
func TestPlan_EmptyPipeline(t *testing.T) {
engine := NewChunkEngine()
dsl := `{"pipeline": []}`
plan, err := engine.Compile(dsl)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if plan == nil {
t.Fatal("plan should not be nil")
}
if len(plan.Operators) != 0 {
t.Fatalf("expected 0 operators, got %d", len(plan.Operators))
}
}
func TestPlan_MissingPipeline(t *testing.T) {
engine := NewChunkEngine()
dsl := `{"version": "1.0"}`
plan, err := engine.Compile(dsl)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if plan == nil {
t.Fatal("plan should not be nil")
}
if len(plan.Operators) != 0 {
t.Fatalf("expected 0 operators, got %d", len(plan.Operators))
}
}
// ---------------------------------------------------------------------------
// Plan + Execute integration test
// ---------------------------------------------------------------------------
func TestPlan_Execute_FullPipeline(t *testing.T) {
engine := NewChunkEngine()
plan, err := engine.Compile(mediaAwareDSL)
if err != nil {
t.Fatalf("Plan error: %v", err)
}
inputText := `这是第一句话。这是第二句话!这是第三句话?\n这是第四句话。`
ctx, err := engine.Execute(plan, inputText)
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if ctx == nil {
t.Fatal("Execute returned nil context")
}
if len(ctx.ResultChunks) == 0 {
t.Fatal("expected at least one chunk after execution")
}
for i, c := range ctx.ResultChunks {
if c.Content == "" {
t.Errorf("chunk[%d] has empty content", i)
}
println(c.Content)
if c.Metadata == nil {
t.Errorf("chunk[%d] has nil metadata", i)
}
}
}
func TestPlan_Execute_MinimalPipeline(t *testing.T) {
engine := NewChunkEngine()
plan, err := engine.Compile(minimalDSL)
if err != nil {
t.Fatalf("Plan error: %v", err)
}
inputText := "Hello world.\nGoodbye world."
ctx, err := engine.Execute(plan, inputText)
if err != nil {
t.Fatalf("Execute error: %v", err)
}
if ctx == nil {
t.Fatal("Execute returned nil context")
}
if len(ctx.ResultChunks) == 0 {
t.Fatal("expected at least one chunk after execution")
}
}
// ---------------------------------------------------------------------------
// Edge cases
// ---------------------------------------------------------------------------
func TestPlan_Explain(t *testing.T) {
engine := NewChunkEngine()
plan, err := engine.Compile(mediaAwareDSL)
if err != nil {
t.Fatalf("Plan error: %v", err)
}
_, err = engine.Explain(plan)
if err != nil {
t.Fatalf("Explain error: %v", err)
}
//fmt.Println(explanation)
}
func TestPlan_ReuseEngine(t *testing.T) {
engine := NewChunkEngine()
// First plan
plan1, err := engine.Compile(mediaAwareDSL)
if err != nil {
t.Fatalf("first Plan error: %v", err)
}
// Second plan from the same engine
plan2, err := engine.Compile(minimalDSL)
if err != nil {
t.Fatalf("second Plan error: %v", err)
}
if len(plan1.Operators) != len(plan2.Operators) {
t.Errorf("plan1 has %d operators, plan2 has %d", len(plan1.Operators), len(plan2.Operators))
}
}
// Benchmark
func BenchmarkPlan_FullDSL(b *testing.B) {
engine := NewChunkEngine()
dsl := mediaAwareDSL
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := engine.Compile(dsl)
if err != nil {
b.Fatalf("Plan error: %v", err)
}
}
}
func BenchmarkPlan_Execute_FullDSL(b *testing.B) {
engine := NewChunkEngine()
dsl := mediaAwareDSL
plan, err := engine.Compile(dsl)
if err != nil {
b.Fatalf("Plan error: %v", err)
}
inputText := strings.Repeat("这是第一句话。这是第二句话!这是第三句话?\n", 100)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := engine.Execute(plan, inputText)
if err != nil {
b.Fatalf("Execute error: %v", err)
}
}
}

View File

@@ -1,3 +1,19 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package ingestion
import (