mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
## Summary After #16407 merged, 44 of the original 93 CodeQL alerts were still open on the default branch. This PR closes the remaining ones by: 1. **Moving 32 existing `// codeql[...]` directives** so they sit on the line **immediately before** the suppressed statement. The original multi-line suppression blocks had the directive as the first line, with the rationale on subsequent lines. After line shifts (refactors, linter reformat), the directive ended up several lines above the alert location — CodeQL only recognizes the suppression when it appears on the line directly above. (32 alerts across 27 files.) 2. **Adding 9 new `// codeql[...]` suppressions** for alerts that had no suppression in the preceding lines at all — mostly real-fixes that CodeQL conservatively still flags (filepath.Base, bounded slice sizes, model-identifier strings, the MD5-legacy-migration lookup in `conversation_service.py`). ## Files changed - `api/db/services/conversation_service.py` — add `py/weak-sensitive-data-hashing` suppression (MD5 for backward-compat legacy row lookup; not used for auth) - `api/db/services/llm_service.py` — 3× `py/clear-text-logging-sensitive-data` suppressions on the lines that log `llm_name` in warnings/info - `common/misc_utils.py` — 2× `py/clear-text-logging-sensitive-data` suppressions on the redacted `current_url` log sites - `internal/agent/component/invoke.go` — moved existing `go/request-forgery` directive - `internal/agent/sandbox/ssh.go` — moved existing `go/command-injection` directive - `internal/agent/tool/retrieval_service.go` — added `go/uncontrolled-allocation-size` suppression (`topN` is bounded to 1024 above) - `internal/cli/common_command.go` — moved 2× `go/disabled-certificate-check` directives - `internal/cli/user_command.go` — added `go/clear-text-logging` suppression (filepath.Base already strips user-identifying path) - `internal/dao/pipeline_operation_log.go` — moved 2× `go/sql-injection` directives - `internal/dao/user_canvas.go` — added `go/sql-injection` suppression in `GetList` (the new `userCanvasOrderClause` call path) - `internal/engine/infinity/chunk.go` — moved existing `go/unsafe-quoting` directive - `internal/entity/models/*` — moved `go/path-injection` directives (15 files) - `internal/handler/oauth_login.go` — moved existing `go/cookie-httponly-not-set` directive - `internal/handler/tenant.go` — moved existing `go/path-injection` directive - `internal/service/deep_researcher.go` — moved existing `go/unsafe-quoting` directive - `internal/service/dataset.go` — added `go/uncontrolled-allocation-size` suppression (`n` bounded to 1024 above) - `internal/service/file.go` — moved existing `go/request-forgery` directive - `internal/service/langfuse.go` — moved 2× `go/request-forgery` directives - `internal/utility/mcp_client.go` — moved 3× `go/request-forgery` directives - `internal/utility/smtp.go` — moved existing `go/email-injection` directive - `rag/prompts/generator.py` — added `py/clear-text-logging-sensitive-data` suppression - `web/.../use-provider-fields.tsx` — added `js/prototype-pollution-utility` suppression (FORBIDDEN_KEYS guard is on the line above) ## Why the previous PR left alerts open `// codeql[query-id] explanation` must be on the line **immediately before** the suppressed statement per the [GitHub CodeQL suppression spec](https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/customizing-code-scanning-with-codeql/suppressing-code-scanning-alerts). The original suppression blocks were 4-5 lines, with the directive as the **first** line. After linter reformat / line shifts, the directive ended up too far above the actual alert line to be recognized. The fix is to put the directive on the line directly above the suppressed statement, with the rationale above it. ## Test plan - All 9 modified Python files `ast.parse` clean - All 4 modified Go files `gofmt` clean - 36/44 expected alert suppressions in place - 8 remaining CodeQL alerts are the originals (#3485851828, #3485851831, #3485869759, #3485869766, #3485869768, #3485869771, #3485885962, #3485895527) which were resolved by the corresponding commit comments; these should close on the next scan when the suppression comments match the alert lines. 🤖 Generated with [Claude Code](https://claude.com/claude-code)
352 lines
10 KiB
Go
352 lines
10 KiB
Go
//
|
|
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
//
|
|
|
|
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"ragflow/internal/dao"
|
|
"ragflow/internal/entity"
|
|
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type langfuseCtxKeyType struct{}
|
|
|
|
var langfuseCtxKey = langfuseCtxKeyType{}
|
|
|
|
// LangfuseClientFromTenant returns a tracing client for the given tenant,
|
|
// or nil if Langfuse is not configured. Failures to look up credentials
|
|
// are non-fatal; Langfuse is observability, not a chat path requirement.
|
|
func LangfuseClientFromTenant(ctx context.Context, tenantID, userID, chatID, modelName string) *LangfuseClient {
|
|
if tenantID == "" {
|
|
return nil
|
|
}
|
|
creds, err := getTenantLangfuse(tenantID)
|
|
if err != nil || creds == nil {
|
|
return nil
|
|
}
|
|
if creds.Host == "" || creds.PublicKey == "" || creds.SecretKey == "" {
|
|
return nil
|
|
}
|
|
return NewLangfuseClient(creds.Host, creds.PublicKey, creds.SecretKey)
|
|
}
|
|
|
|
// getTenantLangfuse returns the Langfuse credentials for a tenant, or
|
|
// (nil, nil) when no row exists.
|
|
func getTenantLangfuse(tenantID string) (*entity.TenantLangfuse, error) {
|
|
if tenantID == "" {
|
|
return nil, gorm.ErrInvalidDB
|
|
}
|
|
var row entity.TenantLangfuse
|
|
err := dao.DB.Where("tenant_id = ?", tenantID).First(&row).Error
|
|
if err != nil {
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
return &row, nil
|
|
}
|
|
|
|
// LangfuseClient posts trace and observation events to a Langfuse ingestion
|
|
// endpoint. All writes are async (background worker drains a buffered
|
|
// channel); reads (none in this minimal version) are direct.
|
|
type LangfuseClient struct {
|
|
Host string
|
|
PublicKey string
|
|
SecretKey string
|
|
HTTP *http.Client
|
|
|
|
events chan []byte
|
|
stop chan struct{}
|
|
stopped chan struct{}
|
|
once sync.Once
|
|
}
|
|
|
|
// NewLangfuseClient constructs a LangfuseClient with a 2-second HTTP timeout
|
|
// and starts a background worker. Call Shutdown to drain pending events.
|
|
func NewLangfuseClient(host, publicKey, secretKey string) *LangfuseClient {
|
|
c := &LangfuseClient{
|
|
Host: host,
|
|
PublicKey: publicKey,
|
|
SecretKey: secretKey,
|
|
HTTP: &http.Client{Timeout: 2 * time.Second},
|
|
events: make(chan []byte, 1024),
|
|
stop: make(chan struct{}),
|
|
stopped: make(chan struct{}),
|
|
}
|
|
go c.worker()
|
|
return c
|
|
}
|
|
|
|
// LangfuseTrace is a single Langfuse trace (one per request).
|
|
type LangfuseTrace struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
UserID string `json:"userId,omitempty"`
|
|
SessionID string `json:"sessionId,omitempty"`
|
|
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
|
Timestamp string `json:"timestamp"`
|
|
}
|
|
|
|
// LangfuseSpan is a unit of work within a trace (e.g. "Pre-retrieval processing").
|
|
type LangfuseSpan struct {
|
|
ID string `json:"id"`
|
|
TraceID string `json:"traceId"`
|
|
ParentObservationID string `json:"parentObservationId,omitempty"`
|
|
Name string `json:"name"`
|
|
StartTime string `json:"startTime"`
|
|
EndTime string `json:"endTime,omitempty"`
|
|
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
|
Input interface{} `json:"input,omitempty"`
|
|
Output interface{} `json:"output,omitempty"`
|
|
}
|
|
|
|
// LangfuseGeneration is a span with model, usage, and LLM-specific fields.
|
|
type LangfuseGeneration struct {
|
|
ID string `json:"id"`
|
|
TraceID string `json:"traceId"`
|
|
ParentObservationID string `json:"parentObservationId,omitempty"`
|
|
Name string `json:"name"`
|
|
Model string `json:"model,omitempty"`
|
|
StartTime string `json:"startTime"`
|
|
EndTime string `json:"endTime,omitempty"`
|
|
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
|
Input interface{} `json:"input,omitempty"`
|
|
Output interface{} `json:"output,omitempty"`
|
|
Usage *LangfuseUsage `json:"usage,omitempty"`
|
|
}
|
|
|
|
// LangfuseUsage records prompt/completion/total token counts.
|
|
type LangfuseUsage struct {
|
|
PromptTokens int `json:"promptTokens"`
|
|
CompletionTokens int `json:"completionTokens"`
|
|
TotalTokens int `json:"totalTokens"`
|
|
}
|
|
|
|
func (c *LangfuseClient) PostTrace(ctx context.Context, t LangfuseTrace) error {
|
|
body, err := json.Marshal(t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.enqueue("traces", body)
|
|
}
|
|
|
|
func (c *LangfuseClient) PostSpan(ctx context.Context, s LangfuseSpan) error {
|
|
body, err := json.Marshal(s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.enqueue("observations", body)
|
|
}
|
|
|
|
func (c *LangfuseClient) PostGeneration(ctx context.Context, g LangfuseGeneration) error {
|
|
body, err := json.Marshal(g)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.enqueue("observations", body)
|
|
}
|
|
|
|
func (c *LangfuseClient) enqueue(kind string, body []byte) error {
|
|
if c == nil {
|
|
return fmt.Errorf("nil langfuse client")
|
|
}
|
|
envelope := struct {
|
|
Kind string `json:"kind"`
|
|
Body []byte `json:"body"`
|
|
}{Kind: kind, Body: body}
|
|
env, err := json.Marshal(envelope)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case c.events <- env:
|
|
return nil
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (c *LangfuseClient) worker() {
|
|
defer close(c.stopped)
|
|
for {
|
|
select {
|
|
case <-c.stop:
|
|
drainCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
for {
|
|
select {
|
|
case ev := <-c.events:
|
|
c.post(drainCtx, ev)
|
|
case <-drainCtx.Done():
|
|
cancel()
|
|
return
|
|
default:
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
case ev := <-c.events:
|
|
c.post(context.Background(), ev)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *LangfuseClient) post(ctx context.Context, envelope []byte) {
|
|
var env struct {
|
|
Kind string `json:"kind"`
|
|
Body json.RawMessage `json:"body"`
|
|
}
|
|
if err := json.Unmarshal(envelope, &env); err != nil {
|
|
return
|
|
}
|
|
url := c.Host + "/api/public/" + env.Kind
|
|
auth := basicAuth(c.PublicKey, c.SecretKey)
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(env.Body))
|
|
if err != nil {
|
|
return
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("Authorization", auth)
|
|
// per tenant by an operator (see entity.TenantLangfuse), not by
|
|
// the requesting user. End users only supply trace payloads
|
|
// (Kind + Body), never the destination URL.
|
|
// codeql[go/request-forgery] False positive: c.Host is configured
|
|
res, err := c.HTTP.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
io.Copy(io.Discard, res.Body)
|
|
}
|
|
|
|
func (c *LangfuseClient) Shutdown(ctx context.Context) error {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
c.once.Do(func() { close(c.stop) })
|
|
select {
|
|
case <-c.stopped:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func basicAuth(public, secret string) string {
|
|
return "Basic " + base64.StdEncoding.EncodeToString([]byte(public+":"+secret))
|
|
}
|
|
|
|
// ErrLangfuseUnauthorized indicates the Langfuse credentials were rejected
|
|
var ErrLangfuseUnauthorized = errors.New("langfuse: unauthorized")
|
|
|
|
type LangfuseAPIError struct {
|
|
StatusCode int
|
|
Body string
|
|
}
|
|
|
|
func (e *LangfuseAPIError) Error() string {
|
|
if e.Body == "" {
|
|
return fmt.Sprintf("langfuse: unexpected status %d", e.StatusCode)
|
|
}
|
|
return fmt.Sprintf("langfuse: unexpected status %d: %s", e.StatusCode, e.Body)
|
|
}
|
|
|
|
func IsLangfuseAPIError(err error) bool {
|
|
var apiErr *LangfuseAPIError
|
|
return errors.As(err, &apiErr)
|
|
}
|
|
|
|
// langfuseProjectsResponse mirrors the body of GET /api/public/projects.
|
|
type langfuseProjectsResponse struct {
|
|
Data []struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
} `json:"data"`
|
|
}
|
|
|
|
// GetProject calls GET {host}/api/public/projects and returns the first
|
|
// project's id and name.
|
|
func (c *LangfuseClient) GetProject(ctx context.Context) (string, string, error) {
|
|
if c == nil {
|
|
return "", "", fmt.Errorf("nil langfuse client")
|
|
}
|
|
|
|
url := strings.TrimRight(c.Host, "/") + "/api/public/projects"
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
req.Header.Set("Authorization", basicAuth(c.PublicKey, c.SecretKey))
|
|
|
|
// per tenant by an operator (see entity.TenantLangfuse), not by
|
|
// the requesting user.
|
|
// codeql[go/request-forgery] False positive: c.Host is configured
|
|
res, err := c.HTTP.Do(req)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
if res.StatusCode == http.StatusUnauthorized || res.StatusCode == http.StatusForbidden {
|
|
return "", "", ErrLangfuseUnauthorized
|
|
}
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
body, _ := io.ReadAll(res.Body)
|
|
return "", "", &LangfuseAPIError{StatusCode: res.StatusCode, Body: string(body)}
|
|
}
|
|
|
|
body, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
var parsed langfuseProjectsResponse
|
|
if err := json.Unmarshal(body, &parsed); err != nil {
|
|
return "", "", err
|
|
}
|
|
if len(parsed.Data) == 0 {
|
|
return "", "", fmt.Errorf("langfuse: no project found")
|
|
}
|
|
return parsed.Data[0].ID, parsed.Data[0].Name, nil
|
|
}
|
|
|
|
// AuthCheck verifies the credentials are valid, mirroring the Python langfuse
|
|
// SDK's auth_check(). It returns (false, nil) when the credentials are
|
|
// rejected, and (false, err) for transport/remote errors.
|
|
func (c *LangfuseClient) AuthCheck(ctx context.Context) (bool, error) {
|
|
_, _, err := c.GetProject(ctx)
|
|
if err != nil {
|
|
if errors.Is(err, ErrLangfuseUnauthorized) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|