feat(go-api): Langfuse API key migration behavior (#16356)

## Summary

- Align Langfuse API key set/get/delete behavior with the Python
implementation.
- Improve DAO handling for Langfuse credential save/delete flows.
- Add tests for Langfuse service error handling and API key lifecycle
behavior.
This commit is contained in:
Hz_
2026-06-25 19:25:55 +08:00
committed by GitHub
parent 46b97bd1a1
commit e290a0d23e
10 changed files with 1105 additions and 1 deletions

View File

@@ -223,6 +223,7 @@ func startServer(config *server.Config) {
tenantService := service.NewTenantService()
chatService := service.NewChatService()
chatChannelService := service.NewChatChannelService()
langfuseService := service.NewLangfuseService()
chatSessionService := service.NewChatSessionService()
openaiChatService := service.NewOpenAIChatService()
systemService := service.NewSystemService()
@@ -248,6 +249,7 @@ func startServer(config *server.Config) {
llmHandler := handler.NewLLMHandler(llmService, userService)
chatHandler := handler.NewChatHandler(chatService, userService)
chatChannelHandler := handler.NewChatChannelHandler(chatChannelService)
langfuseHandler := handler.NewLangfuseHandler(langfuseService)
chatSessionHandler := handler.NewChatSessionHandler(chatSessionService, userService)
openaiChatHandler := handler.NewOpenAIChatHandler(openaiChatService)
connectorHandler := handler.NewConnectorHandler(connectorService, userService)
@@ -321,7 +323,7 @@ func startServer(config *server.Config) {
adminRuntimeHandler := handler.NewAdminRuntimeHandler(adminRuntimeSelector)
// Initialize router
r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler)
r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, langfuseHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler)
// Create Gin engine
ginEngine := gin.New()

104
internal/dao/langfuse.go Normal file
View File

@@ -0,0 +1,104 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package dao
import (
"errors"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"ragflow/internal/entity"
)
// LangfuseDAO is the data access object for tenant Langfuse credentials.
type LangfuseDAO struct{}
// NewLangfuse creates a new Langfuse DAO.
func NewLangfuse() *LangfuseDAO {
return &LangfuseDAO{}
}
// GetByTenantID returns the Langfuse credentials row for a tenant.
// It returns (nil, nil) when no row exists, mirroring the Python
// TenantLangfuseService.filter_by_tenant behaviour (DoesNotExist -> None).
func (dao *LangfuseDAO) GetByTenantID(tenantID string) (*entity.TenantLangfuse, error) {
var row entity.TenantLangfuse
err := DB.Where("tenant_id = ?", tenantID).First(&row).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &row, nil
}
// Create inserts a new Langfuse credentials row (mirrors save).
func (dao *LangfuseDAO) Create(row *entity.TenantLangfuse) error {
return DB.Create(row).Error
}
// UpdateByTenantID updates the Langfuse credentials row for a tenant
func (dao *LangfuseDAO) UpdateByTenantID(tenantID string, updates map[string]any) error {
res := DB.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", tenantID).Updates(updates)
if res.Error != nil {
return res.Error
}
if res.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
}
// DeleteByTenantID deletes the Langfuse credentials row for a tenant
// (mirrors delete_model / delete_ty_tenant_id).
func (dao *LangfuseDAO) DeleteByTenantID(tenantID string) error {
res := DB.Where("tenant_id = ?", tenantID).Delete(&entity.TenantLangfuse{})
if res.Error != nil {
return res.Error
}
if res.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return nil
}
func (dao *LangfuseDAO) SaveByTenantID(row *entity.TenantLangfuse) error {
return DB.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "tenant_id"}},
DoUpdates: clause.Assignments(map[string]any{
"secret_key": row.SecretKey,
"public_key": row.PublicKey,
"host": row.Host,
}),
}).Create(row).Error
}
func (dao *LangfuseDAO) DeleteExistingByTenantID(tenantID string) error {
return DB.Transaction(func(tx *gorm.DB) error {
var row entity.TenantLangfuse
err := tx.Where("tenant_id = ?", tenantID).First(&row).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return gorm.ErrRecordNotFound
}
return err
}
return tx.Delete(&row).Error
})
}

View File

@@ -0,0 +1,118 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package dao
import (
"testing"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
"ragflow/internal/entity"
)
// setupLangfuseTestDB initializes an in-memory SQLite database for Langfuse DAO tests.
func setupLangfuseTestDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
TranslateError: true,
})
if err != nil {
t.Fatalf("failed to open sqlite: %v", err)
}
if err := db.AutoMigrate(&entity.TenantLangfuse{}); err != nil {
t.Fatalf("failed to migrate: %v", err)
}
return db
}
func TestLangfuseDAO_GetByTenantID_NotFound(t *testing.T) {
db := setupLangfuseTestDB(t)
pushDB(t, db)
dao := NewLangfuse()
row, err := dao.GetByTenantID("missing")
if err != nil {
t.Fatalf("expected nil error for missing row, got %v", err)
}
if row != nil {
t.Fatalf("expected nil row for missing tenant, got %+v", row)
}
}
func TestLangfuseDAO_CRUD(t *testing.T) {
db := setupLangfuseTestDB(t)
pushDB(t, db)
dao := NewLangfuse()
// 1. Create
row := &entity.TenantLangfuse{
TenantID: "tenant-1",
SecretKey: "sk-1",
PublicKey: "pk-1",
Host: "https://cloud.langfuse.com",
}
if err := dao.Create(row); err != nil {
t.Fatalf("failed to create: %v", err)
}
// 2. GetByTenantID
got, err := dao.GetByTenantID("tenant-1")
if err != nil {
t.Fatalf("failed to get: %v", err)
}
if got == nil {
t.Fatalf("expected a row, got nil")
}
if got.SecretKey != "sk-1" || got.PublicKey != "pk-1" || got.Host != "https://cloud.langfuse.com" {
t.Fatalf("unexpected row: %+v", got)
}
// BeforeCreate hook should have populated timestamps.
if got.CreateTime == nil || got.UpdateTime == nil {
t.Fatalf("expected timestamps to be populated, got %+v", got)
}
// 3. UpdateByTenantID
updates := map[string]any{
"secret_key": "sk-2",
"public_key": "pk-2",
"host": "https://eu.langfuse.com",
}
if err := dao.UpdateByTenantID("tenant-1", updates); err != nil {
t.Fatalf("failed to update: %v", err)
}
got, err = dao.GetByTenantID("tenant-1")
if err != nil {
t.Fatalf("failed to get after update: %v", err)
}
if got.SecretKey != "sk-2" || got.PublicKey != "pk-2" || got.Host != "https://eu.langfuse.com" {
t.Fatalf("update not applied: %+v", got)
}
// 4. DeleteByTenantID
if err := dao.DeleteByTenantID("tenant-1"); err != nil {
t.Fatalf("failed to delete: %v", err)
}
got, err = dao.GetByTenantID("tenant-1")
if err != nil {
t.Fatalf("expected nil error after delete, got %v", err)
}
if got != nil {
t.Fatalf("expected row to be deleted, got %+v", got)
}
}

View File

@@ -62,6 +62,18 @@ func (TenantLangfuse) TableName() string {
return "tenant_langfuse"
}
// LangfuseInfoResponse is the GET /langfuse/api-key payload: the stored
// credentials enriched with the resolved Langfuse project id/name. Field
// order mirrors the Python filter_by_tenant_with_info dict plus project info.
type LangfuseInfoResponse struct {
TenantID string `json:"tenant_id"`
Host string `json:"host"`
SecretKey string `json:"secret_key"`
PublicKey string `json:"public_key"`
ProjectID string `json:"project_id"`
ProjectName string `json:"project_name"`
}
// MyLLM represents LLM information for a tenant with factory details
type MyLLM struct {
ID string `gorm:"column:id" json:"id"`

View File

@@ -0,0 +1,122 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package handler
import (
"github.com/gin-gonic/gin"
"ragflow/internal/common"
"ragflow/internal/entity"
"ragflow/internal/service"
)
// LangfuseService is the behaviour the handler depends on (interface enables
// mocking in tests).
type LangfuseService interface {
SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error)
GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error)
DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error)
}
// LangfuseHandler handles /langfuse/api-key HTTP requests.
type LangfuseHandler struct {
langfuseService LangfuseService
}
// NewLangfuseHandler creates a new Langfuse handler.
func NewLangfuseHandler(langfuseService LangfuseService) *LangfuseHandler {
return &LangfuseHandler{langfuseService: langfuseService}
}
// NewLangfuse keeps a zero-arg constructor consistent with other handlers.
func NewLangfuse() *LangfuseHandler {
return NewLangfuseHandler(service.NewLangfuseService())
}
// SetLangfuseRequest is the POST/PUT body. Empty-value validation happens in
// the service layer to reproduce the Python "Missing required fields" message.
type SetLangfuseRequest struct {
SecretKey string `json:"secret_key"`
PublicKey string `json:"public_key"`
Host string `json:"host"`
}
// SetAPIKey handles POST/PUT /langfuse/api-key.
func (h *LangfuseHandler) SetAPIKey(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req SetLangfuseRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonError(c, common.CodeDataError, "Invalid request: "+err.Error())
return
}
row, code, err := h.langfuseService.SetAPIKey(user.ID, req.SecretKey, req.PublicKey, req.Host)
if err != nil {
jsonError(c, code, err.Error())
return
}
// Echo back the stored keys, matching the Python langfuse_keys payload.
jsonResponse(c, common.CodeSuccess, gin.H{
"tenant_id": row.TenantID,
"secret_key": row.SecretKey,
"public_key": row.PublicKey,
"host": row.Host,
}, "success")
}
// GetAPIKey handles GET /langfuse/api-key.
func (h *LangfuseHandler) GetAPIKey(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
data, code, message, err := h.langfuseService.GetAPIKey(user.ID)
if err != nil {
jsonError(c, code, message)
return
}
jsonResponse(c, code, data, message)
}
// DeleteAPIKey handles DELETE /langfuse/api-key.
func (h *LangfuseHandler) DeleteAPIKey(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
ok, code, message, err := h.langfuseService.DeleteAPIKey(user.ID)
if err != nil {
jsonError(c, code, message)
return
}
// No record: mirror get_json_result(message=...) with data=nil.
if message != "" {
jsonResponse(c, common.CodeSuccess, nil, message)
return
}
jsonResponse(c, common.CodeSuccess, ok, "success")
}

View File

@@ -0,0 +1,262 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package handler
import (
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/gin-gonic/gin"
"ragflow/internal/common"
"ragflow/internal/entity"
)
type fakeLangfuseService struct {
setFn func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error)
getFn func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error)
deleteFn func(tenantID string) (bool, common.ErrorCode, string, error)
}
func (f fakeLangfuseService) SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) {
if f.setFn == nil {
return nil, common.CodeServerError, errors.New("unexpected SetAPIKey call")
}
return f.setFn(tenantID, secretKey, publicKey, host)
}
func (f fakeLangfuseService) GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) {
if f.getFn == nil {
return nil, common.CodeServerError, "", errors.New("unexpected GetAPIKey call")
}
return f.getFn(tenantID)
}
func (f fakeLangfuseService) DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error) {
if f.deleteFn == nil {
return false, common.CodeServerError, "", errors.New("unexpected DeleteAPIKey call")
}
return f.deleteFn(tenantID)
}
func serveLangfuse(method, target, body string, h func(c *gin.Context)) *httptest.ResponseRecorder {
gin.SetMode(gin.TestMode)
router := gin.New()
router.Handle(method, target, func(c *gin.Context) {
c.Set("user", &entity.User{ID: "tenant-1"})
h(c)
})
resp := httptest.NewRecorder()
var req *http.Request
if body == "" {
req = httptest.NewRequest(method, target, nil)
} else {
req = httptest.NewRequest(method, target, strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
}
router.ServeHTTP(resp, req)
return resp
}
func decode(t *testing.T, resp *httptest.ResponseRecorder) map[string]interface{} {
t.Helper()
var payload map[string]interface{}
if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil {
t.Fatalf("unmarshal response: %v (body=%s)", err, resp.Body.String())
}
return payload
}
func TestLangfuseHandler_SetAPIKey_Success(t *testing.T) {
var gotTenant, gotSecret, gotPublic, gotHost string
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) {
gotTenant, gotSecret, gotPublic, gotHost = tenantID, secretKey, publicKey, host
return &entity.TenantLangfuse{TenantID: tenantID, SecretKey: secretKey, PublicKey: publicKey, Host: host}, common.CodeSuccess, nil
},
}}
body := `{"secret_key":"sk","public_key":"pk","host":"https://a.langfuse.com"}`
resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", body, h.SetAPIKey)
if gotTenant != "tenant-1" || gotSecret != "sk" || gotPublic != "pk" || gotHost != "https://a.langfuse.com" {
t.Fatalf("service args: tenant=%q secret=%q public=%q host=%q", gotTenant, gotSecret, gotPublic, gotHost)
}
payload := decode(t, resp)
if payload["code"] != float64(common.CodeSuccess) {
t.Fatalf("payload=%v", payload)
}
data, ok := payload["data"].(map[string]interface{})
if !ok {
t.Fatalf("expected data object, got %v", payload["data"])
}
if data["secret_key"] != "sk" || data["public_key"] != "pk" || data["host"] != "https://a.langfuse.com" || data["tenant_id"] != "tenant-1" {
t.Fatalf("unexpected data: %v", data)
}
}
func TestLangfuseHandler_SetAPIKey_ServiceError(t *testing.T) {
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) {
return nil, common.CodeDataError, errors.New("Invalid Langfuse keys")
},
}}
body := `{"secret_key":"sk","public_key":"pk","host":"host"}`
resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", body, h.SetAPIKey)
payload := decode(t, resp)
if payload["code"] != float64(common.CodeDataError) {
t.Fatalf("payload=%v", payload)
}
if payload["message"] != "Invalid Langfuse keys" {
t.Fatalf("message=%v", payload["message"])
}
}
func TestLangfuseHandler_SetAPIKey_BindFailureStopsEarly(t *testing.T) {
called := false
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) {
called = true
return nil, common.CodeSuccess, nil
},
}}
resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", `{not-json`, h.SetAPIKey)
if called {
t.Fatal("service should not be called when binding fails")
}
payload := decode(t, resp)
if payload["code"] != float64(common.CodeDataError) {
t.Fatalf("payload=%v", payload)
}
}
func TestLangfuseHandler_GetAPIKey_Success(t *testing.T) {
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) {
return &entity.LangfuseInfoResponse{
TenantID: tenantID, Host: "host", SecretKey: "sk", PublicKey: "pk",
ProjectID: "proj-1", ProjectName: "My Project",
}, common.CodeSuccess, "success", nil
},
}}
resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey)
payload := decode(t, resp)
if payload["code"] != float64(common.CodeSuccess) || payload["message"] != "success" {
t.Fatalf("payload=%v", payload)
}
data, ok := payload["data"].(map[string]interface{})
if !ok {
t.Fatalf("expected data object, got %v", payload["data"])
}
if data["project_id"] != "proj-1" || data["project_name"] != "My Project" {
t.Fatalf("unexpected data: %v", data)
}
}
func TestLangfuseHandler_GetAPIKey_NoRecord(t *testing.T) {
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) {
return nil, common.CodeSuccess, "Have not record any Langfuse keys.", nil
},
}}
resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey)
payload := decode(t, resp)
if payload["code"] != float64(common.CodeSuccess) {
t.Fatalf("payload=%v", payload)
}
if payload["message"] != "Have not record any Langfuse keys." {
t.Fatalf("message=%v", payload["message"])
}
if payload["data"] != nil {
t.Fatalf("expected nil data, got %v", payload["data"])
}
}
func TestLangfuseHandler_GetAPIKey_Unauthorized(t *testing.T) {
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) {
return nil, common.CodeDataError, "Invalid Langfuse keys loaded", errors.New("unauthorized")
},
}}
resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey)
payload := decode(t, resp)
if payload["code"] != float64(common.CodeDataError) {
t.Fatalf("payload=%v", payload)
}
if payload["message"] != "Invalid Langfuse keys loaded" {
t.Fatalf("message=%v", payload["message"])
}
}
func TestLangfuseHandler_DeleteAPIKey_Success(t *testing.T) {
var gotTenant string
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
deleteFn: func(tenantID string) (bool, common.ErrorCode, string, error) {
gotTenant = tenantID
return true, common.CodeSuccess, "", nil
},
}}
resp := serveLangfuse(http.MethodDelete, "/api/v1/langfuse/api-key", "", h.DeleteAPIKey)
if gotTenant != "tenant-1" {
t.Fatalf("tenant=%q", gotTenant)
}
payload := decode(t, resp)
if payload["code"] != float64(common.CodeSuccess) {
t.Fatalf("payload=%v", payload)
}
if payload["data"] != true {
t.Fatalf("expected data true, got %v", payload["data"])
}
}
func TestLangfuseHandler_DeleteAPIKey_NoRecord(t *testing.T) {
h := &LangfuseHandler{langfuseService: fakeLangfuseService{
deleteFn: func(tenantID string) (bool, common.ErrorCode, string, error) {
return false, common.CodeSuccess, "Have not record any Langfuse keys.", nil
},
}}
resp := serveLangfuse(http.MethodDelete, "/api/v1/langfuse/api-key", "", h.DeleteAPIKey)
payload := decode(t, resp)
if payload["code"] != float64(common.CodeSuccess) {
t.Fatalf("payload=%v", payload)
}
if payload["message"] != "Have not record any Langfuse keys." {
t.Fatalf("message=%v", payload["message"])
}
if payload["data"] != nil {
t.Fatalf("expected nil data, got %v", payload["data"])
}
}

View File

@@ -35,6 +35,7 @@ type Router struct {
llmHandler *handler.LLMHandler
chatHandler *handler.ChatHandler
chatChannelHandler *handler.ChatChannelHandler
langfuseHandler *handler.LangfuseHandler
openaiChatHandler *handler.OpenAIChatHandler
chatSessionHandler *handler.ChatSessionHandler
connectorHandler *handler.ConnectorHandler
@@ -66,6 +67,7 @@ func NewRouter(
llmHandler *handler.LLMHandler,
chatHandler *handler.ChatHandler,
chatChannelHandler *handler.ChatChannelHandler,
langfuseHandler *handler.LangfuseHandler,
chatSessionHandler *handler.ChatSessionHandler,
connectorHandler *handler.ConnectorHandler,
searchHandler *handler.SearchHandler,
@@ -95,6 +97,7 @@ func NewRouter(
llmHandler: llmHandler,
chatHandler: chatHandler,
chatChannelHandler: chatChannelHandler,
langfuseHandler: langfuseHandler,
openaiChatHandler: openaiChatHandler,
chatSessionHandler: chatSessionHandler,
connectorHandler: connectorHandler,
@@ -655,6 +658,15 @@ func (r *Router) Setup(engine *gin.Engine) {
chanChannel.DELETE("/:channel_id", r.chatChannelHandler.DeleteChatChannel)
}
// Langfuse tracing keys
langfuse := v1.Group("/langfuse")
{
langfuse.POST("/api-key", r.langfuseHandler.SetAPIKey)
langfuse.PUT("/api-key", r.langfuseHandler.SetAPIKey)
langfuse.GET("/api-key", r.langfuseHandler.GetAPIKey)
langfuse.DELETE("/api-key", r.langfuseHandler.DeleteAPIKey)
}
// Chat session (conversation) routes
session := authorized.Group("/v1/conversation")
{

View File

@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
@@ -256,3 +257,88 @@ func (c *LangfuseClient) Shutdown(ctx context.Context) error {
func basicAuth(public, secret string) string {
return "Basic " + base64.StdEncoding.EncodeToString([]byte(public+":"+secret))
}
// ErrLangfuseUnauthorized indicates the Langfuse credentials were rejected
var ErrLangfuseUnauthorized = errors.New("langfuse: unauthorized")
type LangfuseAPIError struct {
StatusCode int
Body string
}
func (e *LangfuseAPIError) Error() string {
if e.Body == "" {
return fmt.Sprintf("langfuse: unexpected status %d", e.StatusCode)
}
return fmt.Sprintf("langfuse: unexpected status %d: %s", e.StatusCode, e.Body)
}
func IsLangfuseAPIError(err error) bool {
var apiErr *LangfuseAPIError
return errors.As(err, &apiErr)
}
// langfuseProjectsResponse mirrors the body of GET /api/public/projects.
type langfuseProjectsResponse struct {
Data []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"data"`
}
// GetProject calls GET {host}/api/public/projects and returns the first
// project's id and name.
func (c *LangfuseClient) GetProject(ctx context.Context) (string, string, error) {
if c == nil {
return "", "", fmt.Errorf("nil langfuse client")
}
url := strings.TrimRight(c.Host, "/") + "/api/public/projects"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", "", err
}
req.Header.Set("Authorization", basicAuth(c.PublicKey, c.SecretKey))
res, err := c.HTTP.Do(req)
if err != nil {
return "", "", err
}
defer res.Body.Close()
if res.StatusCode == http.StatusUnauthorized || res.StatusCode == http.StatusForbidden {
return "", "", ErrLangfuseUnauthorized
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
body, _ := io.ReadAll(res.Body)
return "", "", &LangfuseAPIError{StatusCode: res.StatusCode, Body: string(body)}
}
body, err := io.ReadAll(res.Body)
if err != nil {
return "", "", err
}
var parsed langfuseProjectsResponse
if err := json.Unmarshal(body, &parsed); err != nil {
return "", "", err
}
if len(parsed.Data) == 0 {
return "", "", fmt.Errorf("langfuse: no project found")
}
return parsed.Data[0].ID, parsed.Data[0].Name, nil
}
// AuthCheck verifies the credentials are valid, mirroring the Python langfuse
// SDK's auth_check(). It returns (false, nil) when the credentials are
// rejected, and (false, err) for transport/remote errors.
func (c *LangfuseClient) AuthCheck(ctx context.Context) (bool, error) {
_, _, err := c.GetProject(ctx)
if err != nil {
if errors.Is(err, ErrLangfuseUnauthorized) {
return false, nil
}
return false, err
}
return true, nil
}

View File

@@ -0,0 +1,141 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"context"
"errors"
"fmt"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
"gorm.io/gorm"
)
// langfuseVerifier abstracts Langfuse credential verification so the business
// logic can be unit-tested without performing real network calls.
type langfuseVerifier interface {
// AuthCheck mirrors the Python langfuse SDK auth_check().
AuthCheck(ctx context.Context, host, publicKey, secretKey string) (bool, error)
// GetProject mirrors api.projects.get().dict()["data"][0] (id, name).
GetProject(ctx context.Context, host, publicKey, secretKey string) (string, string, error)
}
// defaultLangfuseVerifier uses a real LangfuseClient for verification.
type defaultLangfuseVerifier struct{}
func (defaultLangfuseVerifier) AuthCheck(ctx context.Context, host, publicKey, secretKey string) (bool, error) {
client := NewLangfuseClient(host, publicKey, secretKey)
defer client.Shutdown(context.Background())
return client.AuthCheck(ctx)
}
func (defaultLangfuseVerifier) GetProject(ctx context.Context, host, publicKey, secretKey string) (string, string, error) {
client := NewLangfuseClient(host, publicKey, secretKey)
defer client.Shutdown(context.Background())
return client.GetProject(ctx)
}
// LangfuseService implements the /langfuse/api-key business logic, mirroring
// the Python TenantLangfuseService + langfuse_api handlers.
type LangfuseService struct {
langfuseDAO *dao.LangfuseDAO
verifier langfuseVerifier
}
// NewLangfuseService creates a LangfuseService with the default (live) verifier.
func NewLangfuseService() *LangfuseService {
return &LangfuseService{
langfuseDAO: dao.NewLangfuse(),
verifier: defaultLangfuseVerifier{},
}
}
// SetAPIKey validates and stores (insert or update) the Langfuse credentials
// for a tenant.
func (s *LangfuseService) SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) {
if secretKey == "" || publicKey == "" || host == "" {
return nil, common.CodeDataError, errors.New("Missing required fields")
}
ok, err := s.verifier.AuthCheck(context.Background(), host, publicKey, secretKey)
if err != nil {
return nil, common.CodeServerError, err
}
if !ok {
return nil, common.CodeDataError, errors.New("Invalid Langfuse keys")
}
row := &entity.TenantLangfuse{
TenantID: tenantID,
SecretKey: secretKey,
PublicKey: publicKey,
Host: host,
}
if err := s.langfuseDAO.SaveByTenantID(row); err != nil {
return nil, common.CodeServerError, err
}
return row, common.CodeSuccess, nil
}
// GetAPIKey returns the stored credentials enriched with the Langfuse project
// id/name.
func (s *LangfuseService) GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) {
row, err := s.langfuseDAO.GetByTenantID(tenantID)
if err != nil {
return nil, common.CodeServerError, "", err
}
if row == nil {
return nil, common.CodeSuccess, "Have not record any Langfuse keys.", nil
}
projectID, projectName, err := s.verifier.GetProject(context.Background(), row.Host, row.PublicKey, row.SecretKey)
if err != nil {
if errors.Is(err, ErrLangfuseUnauthorized) {
return nil, common.CodeDataError, "Invalid Langfuse keys loaded", err
}
if IsLangfuseAPIError(err) {
return nil, common.CodeSuccess, fmt.Sprintf("Error from Langfuse: %s", err.Error()), nil
}
return nil, common.CodeServerError, "", err
}
info := &entity.LangfuseInfoResponse{
TenantID: row.TenantID,
Host: row.Host,
SecretKey: row.SecretKey,
PublicKey: row.PublicKey,
ProjectID: projectID,
ProjectName: projectName,
}
return info, common.CodeSuccess, "success", nil
}
// DeleteAPIKey removes the stored credentials for a tenant.
func (s *LangfuseService) DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error) {
if err := s.langfuseDAO.DeleteExistingByTenantID(tenantID); err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, common.CodeSuccess, "Have not record any Langfuse keys.", nil
}
return false, common.CodeServerError, "", err
}
return true, common.CodeSuccess, "", nil
}

View File

@@ -0,0 +1,245 @@
//
// Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package service
import (
"context"
"errors"
"testing"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
"ragflow/internal/common"
"ragflow/internal/dao"
"ragflow/internal/entity"
)
func setupLangfuseServiceTestDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
TranslateError: true,
})
if err != nil {
t.Fatalf("failed to open sqlite: %v", err)
}
if err := db.AutoMigrate(&entity.TenantLangfuse{}); err != nil {
t.Fatalf("failed to migrate test schema: %v", err)
}
origDB := dao.DB
dao.DB = db
t.Cleanup(func() { dao.DB = origDB })
return db
}
// stubLangfuseVerifier is a controllable langfuseVerifier for tests.
type stubLangfuseVerifier struct {
authOK bool
authErr error
projID string
projName string
projErr error
}
func (s stubLangfuseVerifier) AuthCheck(_ context.Context, _, _, _ string) (bool, error) {
return s.authOK, s.authErr
}
func (s stubLangfuseVerifier) GetProject(_ context.Context, _, _, _ string) (string, string, error) {
return s.projID, s.projName, s.projErr
}
func newLangfuseServiceForTest(v langfuseVerifier) *LangfuseService {
return &LangfuseService{langfuseDAO: dao.NewLangfuse(), verifier: v}
}
func TestLangfuseService_SetAPIKey_MissingFields(t *testing.T) {
setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: true})
_, code, err := svc.SetAPIKey("tenant-1", "", "pk", "host")
if code != common.CodeDataError || err == nil || err.Error() != "Missing required fields" {
t.Fatalf("expected Missing required fields/CodeDataError, got code=%d err=%v", code, err)
}
}
func TestLangfuseService_SetAPIKey_InvalidKeys(t *testing.T) {
setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: false})
_, code, err := svc.SetAPIKey("tenant-1", "sk", "pk", "host")
if code != common.CodeDataError || err == nil || err.Error() != "Invalid Langfuse keys" {
t.Fatalf("expected Invalid Langfuse keys/CodeDataError, got code=%d err=%v", code, err)
}
}
func TestLangfuseService_SetAPIKey_VerifierError(t *testing.T) {
setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{authErr: errors.New("network down")})
_, code, err := svc.SetAPIKey("tenant-1", "sk", "pk", "host")
if code != common.CodeServerError || err == nil || err.Error() != "network down" {
t.Fatalf("expected verifier error/CodeServerError, got code=%d err=%v", code, err)
}
}
func TestLangfuseService_SetAPIKey_CreateThenUpdate(t *testing.T) {
db := setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: true})
// Create
row, code, err := svc.SetAPIKey("tenant-1", "sk-1", "pk-1", "https://a.langfuse.com")
if err != nil || code != common.CodeSuccess {
t.Fatalf("create failed: code=%d err=%v", code, err)
}
if row.SecretKey != "sk-1" || row.Host != "https://a.langfuse.com" {
t.Fatalf("unexpected row: %+v", row)
}
var count int64
db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count)
if count != 1 {
t.Fatalf("expected 1 row after create, got %d", count)
}
// Update (same tenant) should not create a second row
_, code, err = svc.SetAPIKey("tenant-1", "sk-2", "pk-2", "https://b.langfuse.com")
if err != nil || code != common.CodeSuccess {
t.Fatalf("update failed: code=%d err=%v", code, err)
}
db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count)
if count != 1 {
t.Fatalf("expected still 1 row after update, got %d", count)
}
stored, _ := dao.NewLangfuse().GetByTenantID("tenant-1")
if stored == nil || stored.SecretKey != "sk-2" || stored.Host != "https://b.langfuse.com" {
t.Fatalf("update not persisted: %+v", stored)
}
}
func TestLangfuseService_GetAPIKey_NoRecord(t *testing.T) {
setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{})
data, code, message, err := svc.GetAPIKey("tenant-1")
if err != nil || code != common.CodeSuccess || data != nil {
t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err)
}
if message != "Have not record any Langfuse keys." {
t.Fatalf("unexpected message: %q", message)
}
}
func TestLangfuseService_GetAPIKey_Unauthorized(t *testing.T) {
setupLangfuseServiceTestDB(t)
if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil {
t.Fatalf("seed failed: %v", err)
}
svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: ErrLangfuseUnauthorized})
data, code, message, err := svc.GetAPIKey("tenant-1")
if data != nil || code != common.CodeDataError || err == nil {
t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err)
}
if message != "Invalid Langfuse keys loaded" {
t.Fatalf("unexpected message: %q", message)
}
}
func TestLangfuseService_GetAPIKey_ApiError(t *testing.T) {
setupLangfuseServiceTestDB(t)
if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil {
t.Fatalf("seed failed: %v", err)
}
svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: &LangfuseAPIError{StatusCode: 500, Body: "boom"}})
data, code, message, err := svc.GetAPIKey("tenant-1")
if data != nil || code != common.CodeSuccess || err != nil {
t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err)
}
if message != "Error from Langfuse: langfuse: unexpected status 500: boom" {
t.Fatalf("unexpected message: %q", message)
}
}
func TestLangfuseService_GetAPIKey_NonAPIError(t *testing.T) {
setupLangfuseServiceTestDB(t)
if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil {
t.Fatalf("seed failed: %v", err)
}
svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: errors.New("json parse failed")})
data, code, message, err := svc.GetAPIKey("tenant-1")
if data != nil || code != common.CodeServerError || message != "" || err == nil {
t.Fatalf("unexpected: code=%d message=%q data=%v err=%v", code, message, data, err)
}
}
func TestLangfuseService_GetAPIKey_Success(t *testing.T) {
setupLangfuseServiceTestDB(t)
if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "https://a.langfuse.com"}); err != nil {
t.Fatalf("seed failed: %v", err)
}
svc := newLangfuseServiceForTest(stubLangfuseVerifier{projID: "proj-1", projName: "My Project"})
data, code, message, err := svc.GetAPIKey("tenant-1")
if err != nil || code != common.CodeSuccess || message != "success" {
t.Fatalf("unexpected: code=%d message=%q err=%v", code, message, err)
}
if data == nil {
t.Fatalf("expected data, got nil")
}
if data.TenantID != "tenant-1" || data.Host != "https://a.langfuse.com" ||
data.SecretKey != "sk" || data.PublicKey != "pk" ||
data.ProjectID != "proj-1" || data.ProjectName != "My Project" {
t.Fatalf("unexpected data: %+v", data)
}
}
func TestLangfuseService_DeleteAPIKey_NoRecord(t *testing.T) {
setupLangfuseServiceTestDB(t)
svc := newLangfuseServiceForTest(stubLangfuseVerifier{})
ok, code, message, err := svc.DeleteAPIKey("tenant-1")
if ok || code != common.CodeSuccess || err != nil {
t.Fatalf("unexpected: ok=%v code=%d err=%v", ok, code, err)
}
if message != "Have not record any Langfuse keys." {
t.Fatalf("unexpected message: %q", message)
}
}
func TestLangfuseService_DeleteAPIKey_Success(t *testing.T) {
db := setupLangfuseServiceTestDB(t)
if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil {
t.Fatalf("seed failed: %v", err)
}
svc := newLangfuseServiceForTest(stubLangfuseVerifier{})
ok, code, message, err := svc.DeleteAPIKey("tenant-1")
if !ok || code != common.CodeSuccess || message != "" || err != nil {
t.Fatalf("unexpected: ok=%v code=%d message=%q err=%v", ok, code, message, err)
}
var count int64
db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count)
if count != 0 {
t.Fatalf("expected row deleted, count=%d", count)
}
}