diff --git a/cmd/server_main.go b/cmd/server_main.go index 82a746de1e..52b7f285eb 100644 --- a/cmd/server_main.go +++ b/cmd/server_main.go @@ -223,6 +223,7 @@ func startServer(config *server.Config) { tenantService := service.NewTenantService() chatService := service.NewChatService() chatChannelService := service.NewChatChannelService() + langfuseService := service.NewLangfuseService() chatSessionService := service.NewChatSessionService() openaiChatService := service.NewOpenAIChatService() systemService := service.NewSystemService() @@ -248,6 +249,7 @@ func startServer(config *server.Config) { llmHandler := handler.NewLLMHandler(llmService, userService) chatHandler := handler.NewChatHandler(chatService, userService) chatChannelHandler := handler.NewChatChannelHandler(chatChannelService) + langfuseHandler := handler.NewLangfuseHandler(langfuseService) chatSessionHandler := handler.NewChatSessionHandler(chatSessionService, userService) openaiChatHandler := handler.NewOpenAIChatHandler(openaiChatService) connectorHandler := handler.NewConnectorHandler(connectorService, userService) @@ -321,7 +323,7 @@ func startServer(config *server.Config) { adminRuntimeHandler := handler.NewAdminRuntimeHandler(adminRuntimeSelector) // Initialize router - r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler) + r := router.NewRouter(authHandler, userHandler, tenantHandler, documentHandler, datasetsHandler, systemHandler, knowledgebaseHandler, chunkHandler, llmHandler, chatHandler, chatChannelHandler, langfuseHandler, chatSessionHandler, connectorHandler, searchHandler, fileHandler, memoryHandler, mcpHandler, skillSearchHandler, providerHandler, agentHandler, searchBotHandler, difyRetrievalHandler, pluginHandler, modelHandler, fileCommitHandler, adminRuntimeHandler, openaiChatHandler) // Create Gin engine ginEngine := gin.New() diff --git a/internal/dao/langfuse.go b/internal/dao/langfuse.go new file mode 100644 index 0000000000..5e4e0dee6d --- /dev/null +++ b/internal/dao/langfuse.go @@ -0,0 +1,104 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "errors" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "ragflow/internal/entity" +) + +// LangfuseDAO is the data access object for tenant Langfuse credentials. +type LangfuseDAO struct{} + +// NewLangfuse creates a new Langfuse DAO. +func NewLangfuse() *LangfuseDAO { + return &LangfuseDAO{} +} + +// GetByTenantID returns the Langfuse credentials row for a tenant. +// It returns (nil, nil) when no row exists, mirroring the Python +// TenantLangfuseService.filter_by_tenant behaviour (DoesNotExist -> None). +func (dao *LangfuseDAO) GetByTenantID(tenantID string) (*entity.TenantLangfuse, error) { + var row entity.TenantLangfuse + err := DB.Where("tenant_id = ?", tenantID).First(&row).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, err + } + return &row, nil +} + +// Create inserts a new Langfuse credentials row (mirrors save). +func (dao *LangfuseDAO) Create(row *entity.TenantLangfuse) error { + return DB.Create(row).Error +} + +// UpdateByTenantID updates the Langfuse credentials row for a tenant +func (dao *LangfuseDAO) UpdateByTenantID(tenantID string, updates map[string]any) error { + res := DB.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", tenantID).Updates(updates) + if res.Error != nil { + return res.Error + } + if res.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +// DeleteByTenantID deletes the Langfuse credentials row for a tenant +// (mirrors delete_model / delete_ty_tenant_id). +func (dao *LangfuseDAO) DeleteByTenantID(tenantID string) error { + res := DB.Where("tenant_id = ?", tenantID).Delete(&entity.TenantLangfuse{}) + if res.Error != nil { + return res.Error + } + if res.RowsAffected == 0 { + return gorm.ErrRecordNotFound + } + return nil +} + +func (dao *LangfuseDAO) SaveByTenantID(row *entity.TenantLangfuse) error { + return DB.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "tenant_id"}}, + DoUpdates: clause.Assignments(map[string]any{ + "secret_key": row.SecretKey, + "public_key": row.PublicKey, + "host": row.Host, + }), + }).Create(row).Error +} + +func (dao *LangfuseDAO) DeleteExistingByTenantID(tenantID string) error { + return DB.Transaction(func(tx *gorm.DB) error { + var row entity.TenantLangfuse + err := tx.Where("tenant_id = ?", tenantID).First(&row).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return gorm.ErrRecordNotFound + } + return err + } + return tx.Delete(&row).Error + }) +} diff --git a/internal/dao/langfuse_test.go b/internal/dao/langfuse_test.go new file mode 100644 index 0000000000..f6808ed8d0 --- /dev/null +++ b/internal/dao/langfuse_test.go @@ -0,0 +1,118 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "testing" + + "github.com/glebarez/sqlite" + "gorm.io/gorm" + + "ragflow/internal/entity" +) + +// setupLangfuseTestDB initializes an in-memory SQLite database for Langfuse DAO tests. +func setupLangfuseTestDB(t *testing.T) *gorm.DB { + t.Helper() + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) + if err != nil { + t.Fatalf("failed to open sqlite: %v", err) + } + if err := db.AutoMigrate(&entity.TenantLangfuse{}); err != nil { + t.Fatalf("failed to migrate: %v", err) + } + return db +} + +func TestLangfuseDAO_GetByTenantID_NotFound(t *testing.T) { + db := setupLangfuseTestDB(t) + pushDB(t, db) + dao := NewLangfuse() + + row, err := dao.GetByTenantID("missing") + if err != nil { + t.Fatalf("expected nil error for missing row, got %v", err) + } + if row != nil { + t.Fatalf("expected nil row for missing tenant, got %+v", row) + } +} + +func TestLangfuseDAO_CRUD(t *testing.T) { + db := setupLangfuseTestDB(t) + pushDB(t, db) + dao := NewLangfuse() + + // 1. Create + row := &entity.TenantLangfuse{ + TenantID: "tenant-1", + SecretKey: "sk-1", + PublicKey: "pk-1", + Host: "https://cloud.langfuse.com", + } + if err := dao.Create(row); err != nil { + t.Fatalf("failed to create: %v", err) + } + + // 2. GetByTenantID + got, err := dao.GetByTenantID("tenant-1") + if err != nil { + t.Fatalf("failed to get: %v", err) + } + if got == nil { + t.Fatalf("expected a row, got nil") + } + if got.SecretKey != "sk-1" || got.PublicKey != "pk-1" || got.Host != "https://cloud.langfuse.com" { + t.Fatalf("unexpected row: %+v", got) + } + // BeforeCreate hook should have populated timestamps. + if got.CreateTime == nil || got.UpdateTime == nil { + t.Fatalf("expected timestamps to be populated, got %+v", got) + } + + // 3. UpdateByTenantID + updates := map[string]any{ + "secret_key": "sk-2", + "public_key": "pk-2", + "host": "https://eu.langfuse.com", + } + if err := dao.UpdateByTenantID("tenant-1", updates); err != nil { + t.Fatalf("failed to update: %v", err) + } + got, err = dao.GetByTenantID("tenant-1") + if err != nil { + t.Fatalf("failed to get after update: %v", err) + } + if got.SecretKey != "sk-2" || got.PublicKey != "pk-2" || got.Host != "https://eu.langfuse.com" { + t.Fatalf("update not applied: %+v", got) + } + + // 4. DeleteByTenantID + if err := dao.DeleteByTenantID("tenant-1"); err != nil { + t.Fatalf("failed to delete: %v", err) + } + got, err = dao.GetByTenantID("tenant-1") + if err != nil { + t.Fatalf("expected nil error after delete, got %v", err) + } + if got != nil { + t.Fatalf("expected row to be deleted, got %+v", got) + } +} diff --git a/internal/entity/llm.go b/internal/entity/llm.go index 57c97af311..0590f44e5c 100644 --- a/internal/entity/llm.go +++ b/internal/entity/llm.go @@ -62,6 +62,18 @@ func (TenantLangfuse) TableName() string { return "tenant_langfuse" } +// LangfuseInfoResponse is the GET /langfuse/api-key payload: the stored +// credentials enriched with the resolved Langfuse project id/name. Field +// order mirrors the Python filter_by_tenant_with_info dict plus project info. +type LangfuseInfoResponse struct { + TenantID string `json:"tenant_id"` + Host string `json:"host"` + SecretKey string `json:"secret_key"` + PublicKey string `json:"public_key"` + ProjectID string `json:"project_id"` + ProjectName string `json:"project_name"` +} + // MyLLM represents LLM information for a tenant with factory details type MyLLM struct { ID string `gorm:"column:id" json:"id"` diff --git a/internal/handler/langfuse.go b/internal/handler/langfuse.go new file mode 100644 index 0000000000..72403f19c2 --- /dev/null +++ b/internal/handler/langfuse.go @@ -0,0 +1,122 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package handler + +import ( + "github.com/gin-gonic/gin" + + "ragflow/internal/common" + "ragflow/internal/entity" + "ragflow/internal/service" +) + +// LangfuseService is the behaviour the handler depends on (interface enables +// mocking in tests). +type LangfuseService interface { + SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) + GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) + DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error) +} + +// LangfuseHandler handles /langfuse/api-key HTTP requests. +type LangfuseHandler struct { + langfuseService LangfuseService +} + +// NewLangfuseHandler creates a new Langfuse handler. +func NewLangfuseHandler(langfuseService LangfuseService) *LangfuseHandler { + return &LangfuseHandler{langfuseService: langfuseService} +} + +// NewLangfuse keeps a zero-arg constructor consistent with other handlers. +func NewLangfuse() *LangfuseHandler { + return NewLangfuseHandler(service.NewLangfuseService()) +} + +// SetLangfuseRequest is the POST/PUT body. Empty-value validation happens in +// the service layer to reproduce the Python "Missing required fields" message. +type SetLangfuseRequest struct { + SecretKey string `json:"secret_key"` + PublicKey string `json:"public_key"` + Host string `json:"host"` +} + +// SetAPIKey handles POST/PUT /langfuse/api-key. +func (h *LangfuseHandler) SetAPIKey(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req SetLangfuseRequest + if err := c.ShouldBindJSON(&req); err != nil { + jsonError(c, common.CodeDataError, "Invalid request: "+err.Error()) + return + } + + row, code, err := h.langfuseService.SetAPIKey(user.ID, req.SecretKey, req.PublicKey, req.Host) + if err != nil { + jsonError(c, code, err.Error()) + return + } + + // Echo back the stored keys, matching the Python langfuse_keys payload. + jsonResponse(c, common.CodeSuccess, gin.H{ + "tenant_id": row.TenantID, + "secret_key": row.SecretKey, + "public_key": row.PublicKey, + "host": row.Host, + }, "success") +} + +// GetAPIKey handles GET /langfuse/api-key. +func (h *LangfuseHandler) GetAPIKey(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + data, code, message, err := h.langfuseService.GetAPIKey(user.ID) + if err != nil { + jsonError(c, code, message) + return + } + jsonResponse(c, code, data, message) +} + +// DeleteAPIKey handles DELETE /langfuse/api-key. +func (h *LangfuseHandler) DeleteAPIKey(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + ok, code, message, err := h.langfuseService.DeleteAPIKey(user.ID) + if err != nil { + jsonError(c, code, message) + return + } + // No record: mirror get_json_result(message=...) with data=nil. + if message != "" { + jsonResponse(c, common.CodeSuccess, nil, message) + return + } + jsonResponse(c, common.CodeSuccess, ok, "success") +} diff --git a/internal/handler/langfuse_test.go b/internal/handler/langfuse_test.go new file mode 100644 index 0000000000..1aa43a6502 --- /dev/null +++ b/internal/handler/langfuse_test.go @@ -0,0 +1,262 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package handler + +import ( + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gin-gonic/gin" + + "ragflow/internal/common" + "ragflow/internal/entity" +) + +type fakeLangfuseService struct { + setFn func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) + getFn func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) + deleteFn func(tenantID string) (bool, common.ErrorCode, string, error) +} + +func (f fakeLangfuseService) SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) { + if f.setFn == nil { + return nil, common.CodeServerError, errors.New("unexpected SetAPIKey call") + } + return f.setFn(tenantID, secretKey, publicKey, host) +} + +func (f fakeLangfuseService) GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) { + if f.getFn == nil { + return nil, common.CodeServerError, "", errors.New("unexpected GetAPIKey call") + } + return f.getFn(tenantID) +} + +func (f fakeLangfuseService) DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error) { + if f.deleteFn == nil { + return false, common.CodeServerError, "", errors.New("unexpected DeleteAPIKey call") + } + return f.deleteFn(tenantID) +} + +func serveLangfuse(method, target, body string, h func(c *gin.Context)) *httptest.ResponseRecorder { + gin.SetMode(gin.TestMode) + router := gin.New() + router.Handle(method, target, func(c *gin.Context) { + c.Set("user", &entity.User{ID: "tenant-1"}) + h(c) + }) + + resp := httptest.NewRecorder() + var req *http.Request + if body == "" { + req = httptest.NewRequest(method, target, nil) + } else { + req = httptest.NewRequest(method, target, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + } + router.ServeHTTP(resp, req) + return resp +} + +func decode(t *testing.T, resp *httptest.ResponseRecorder) map[string]interface{} { + t.Helper() + var payload map[string]interface{} + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatalf("unmarshal response: %v (body=%s)", err, resp.Body.String()) + } + return payload +} + +func TestLangfuseHandler_SetAPIKey_Success(t *testing.T) { + var gotTenant, gotSecret, gotPublic, gotHost string + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) { + gotTenant, gotSecret, gotPublic, gotHost = tenantID, secretKey, publicKey, host + return &entity.TenantLangfuse{TenantID: tenantID, SecretKey: secretKey, PublicKey: publicKey, Host: host}, common.CodeSuccess, nil + }, + }} + + body := `{"secret_key":"sk","public_key":"pk","host":"https://a.langfuse.com"}` + resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", body, h.SetAPIKey) + + if gotTenant != "tenant-1" || gotSecret != "sk" || gotPublic != "pk" || gotHost != "https://a.langfuse.com" { + t.Fatalf("service args: tenant=%q secret=%q public=%q host=%q", gotTenant, gotSecret, gotPublic, gotHost) + } + payload := decode(t, resp) + if payload["code"] != float64(common.CodeSuccess) { + t.Fatalf("payload=%v", payload) + } + data, ok := payload["data"].(map[string]interface{}) + if !ok { + t.Fatalf("expected data object, got %v", payload["data"]) + } + if data["secret_key"] != "sk" || data["public_key"] != "pk" || data["host"] != "https://a.langfuse.com" || data["tenant_id"] != "tenant-1" { + t.Fatalf("unexpected data: %v", data) + } +} + +func TestLangfuseHandler_SetAPIKey_ServiceError(t *testing.T) { + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) { + return nil, common.CodeDataError, errors.New("Invalid Langfuse keys") + }, + }} + + body := `{"secret_key":"sk","public_key":"pk","host":"host"}` + resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", body, h.SetAPIKey) + + payload := decode(t, resp) + if payload["code"] != float64(common.CodeDataError) { + t.Fatalf("payload=%v", payload) + } + if payload["message"] != "Invalid Langfuse keys" { + t.Fatalf("message=%v", payload["message"]) + } +} + +func TestLangfuseHandler_SetAPIKey_BindFailureStopsEarly(t *testing.T) { + called := false + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + setFn: func(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) { + called = true + return nil, common.CodeSuccess, nil + }, + }} + + resp := serveLangfuse(http.MethodPost, "/api/v1/langfuse/api-key", `{not-json`, h.SetAPIKey) + + if called { + t.Fatal("service should not be called when binding fails") + } + payload := decode(t, resp) + if payload["code"] != float64(common.CodeDataError) { + t.Fatalf("payload=%v", payload) + } +} + +func TestLangfuseHandler_GetAPIKey_Success(t *testing.T) { + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) { + return &entity.LangfuseInfoResponse{ + TenantID: tenantID, Host: "host", SecretKey: "sk", PublicKey: "pk", + ProjectID: "proj-1", ProjectName: "My Project", + }, common.CodeSuccess, "success", nil + }, + }} + + resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey) + + payload := decode(t, resp) + if payload["code"] != float64(common.CodeSuccess) || payload["message"] != "success" { + t.Fatalf("payload=%v", payload) + } + data, ok := payload["data"].(map[string]interface{}) + if !ok { + t.Fatalf("expected data object, got %v", payload["data"]) + } + if data["project_id"] != "proj-1" || data["project_name"] != "My Project" { + t.Fatalf("unexpected data: %v", data) + } +} + +func TestLangfuseHandler_GetAPIKey_NoRecord(t *testing.T) { + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) { + return nil, common.CodeSuccess, "Have not record any Langfuse keys.", nil + }, + }} + + resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey) + + payload := decode(t, resp) + if payload["code"] != float64(common.CodeSuccess) { + t.Fatalf("payload=%v", payload) + } + if payload["message"] != "Have not record any Langfuse keys." { + t.Fatalf("message=%v", payload["message"]) + } + if payload["data"] != nil { + t.Fatalf("expected nil data, got %v", payload["data"]) + } +} + +func TestLangfuseHandler_GetAPIKey_Unauthorized(t *testing.T) { + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + getFn: func(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) { + return nil, common.CodeDataError, "Invalid Langfuse keys loaded", errors.New("unauthorized") + }, + }} + + resp := serveLangfuse(http.MethodGet, "/api/v1/langfuse/api-key", "", h.GetAPIKey) + + payload := decode(t, resp) + if payload["code"] != float64(common.CodeDataError) { + t.Fatalf("payload=%v", payload) + } + if payload["message"] != "Invalid Langfuse keys loaded" { + t.Fatalf("message=%v", payload["message"]) + } +} + +func TestLangfuseHandler_DeleteAPIKey_Success(t *testing.T) { + var gotTenant string + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + deleteFn: func(tenantID string) (bool, common.ErrorCode, string, error) { + gotTenant = tenantID + return true, common.CodeSuccess, "", nil + }, + }} + + resp := serveLangfuse(http.MethodDelete, "/api/v1/langfuse/api-key", "", h.DeleteAPIKey) + + if gotTenant != "tenant-1" { + t.Fatalf("tenant=%q", gotTenant) + } + payload := decode(t, resp) + if payload["code"] != float64(common.CodeSuccess) { + t.Fatalf("payload=%v", payload) + } + if payload["data"] != true { + t.Fatalf("expected data true, got %v", payload["data"]) + } +} + +func TestLangfuseHandler_DeleteAPIKey_NoRecord(t *testing.T) { + h := &LangfuseHandler{langfuseService: fakeLangfuseService{ + deleteFn: func(tenantID string) (bool, common.ErrorCode, string, error) { + return false, common.CodeSuccess, "Have not record any Langfuse keys.", nil + }, + }} + + resp := serveLangfuse(http.MethodDelete, "/api/v1/langfuse/api-key", "", h.DeleteAPIKey) + + payload := decode(t, resp) + if payload["code"] != float64(common.CodeSuccess) { + t.Fatalf("payload=%v", payload) + } + if payload["message"] != "Have not record any Langfuse keys." { + t.Fatalf("message=%v", payload["message"]) + } + if payload["data"] != nil { + t.Fatalf("expected nil data, got %v", payload["data"]) + } +} diff --git a/internal/router/router.go b/internal/router/router.go index 41195cc99f..124fc1d3db 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -35,6 +35,7 @@ type Router struct { llmHandler *handler.LLMHandler chatHandler *handler.ChatHandler chatChannelHandler *handler.ChatChannelHandler + langfuseHandler *handler.LangfuseHandler openaiChatHandler *handler.OpenAIChatHandler chatSessionHandler *handler.ChatSessionHandler connectorHandler *handler.ConnectorHandler @@ -66,6 +67,7 @@ func NewRouter( llmHandler *handler.LLMHandler, chatHandler *handler.ChatHandler, chatChannelHandler *handler.ChatChannelHandler, + langfuseHandler *handler.LangfuseHandler, chatSessionHandler *handler.ChatSessionHandler, connectorHandler *handler.ConnectorHandler, searchHandler *handler.SearchHandler, @@ -95,6 +97,7 @@ func NewRouter( llmHandler: llmHandler, chatHandler: chatHandler, chatChannelHandler: chatChannelHandler, + langfuseHandler: langfuseHandler, openaiChatHandler: openaiChatHandler, chatSessionHandler: chatSessionHandler, connectorHandler: connectorHandler, @@ -655,6 +658,15 @@ func (r *Router) Setup(engine *gin.Engine) { chanChannel.DELETE("/:channel_id", r.chatChannelHandler.DeleteChatChannel) } + // Langfuse tracing keys + langfuse := v1.Group("/langfuse") + { + langfuse.POST("/api-key", r.langfuseHandler.SetAPIKey) + langfuse.PUT("/api-key", r.langfuseHandler.SetAPIKey) + langfuse.GET("/api-key", r.langfuseHandler.GetAPIKey) + langfuse.DELETE("/api-key", r.langfuseHandler.DeleteAPIKey) + } + // Chat session (conversation) routes session := authorized.Group("/v1/conversation") { diff --git a/internal/service/langfuse.go b/internal/service/langfuse.go index 0193562189..ca17ac080a 100644 --- a/internal/service/langfuse.go +++ b/internal/service/langfuse.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "net/http" + "strings" "sync" "time" @@ -256,3 +257,88 @@ func (c *LangfuseClient) Shutdown(ctx context.Context) error { func basicAuth(public, secret string) string { return "Basic " + base64.StdEncoding.EncodeToString([]byte(public+":"+secret)) } + +// ErrLangfuseUnauthorized indicates the Langfuse credentials were rejected +var ErrLangfuseUnauthorized = errors.New("langfuse: unauthorized") + +type LangfuseAPIError struct { + StatusCode int + Body string +} + +func (e *LangfuseAPIError) Error() string { + if e.Body == "" { + return fmt.Sprintf("langfuse: unexpected status %d", e.StatusCode) + } + return fmt.Sprintf("langfuse: unexpected status %d: %s", e.StatusCode, e.Body) +} + +func IsLangfuseAPIError(err error) bool { + var apiErr *LangfuseAPIError + return errors.As(err, &apiErr) +} + +// langfuseProjectsResponse mirrors the body of GET /api/public/projects. +type langfuseProjectsResponse struct { + Data []struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"data"` +} + +// GetProject calls GET {host}/api/public/projects and returns the first +// project's id and name. +func (c *LangfuseClient) GetProject(ctx context.Context) (string, string, error) { + if c == nil { + return "", "", fmt.Errorf("nil langfuse client") + } + + url := strings.TrimRight(c.Host, "/") + "/api/public/projects" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", "", err + } + req.Header.Set("Authorization", basicAuth(c.PublicKey, c.SecretKey)) + + res, err := c.HTTP.Do(req) + if err != nil { + return "", "", err + } + defer res.Body.Close() + + if res.StatusCode == http.StatusUnauthorized || res.StatusCode == http.StatusForbidden { + return "", "", ErrLangfuseUnauthorized + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + body, _ := io.ReadAll(res.Body) + return "", "", &LangfuseAPIError{StatusCode: res.StatusCode, Body: string(body)} + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return "", "", err + } + + var parsed langfuseProjectsResponse + if err := json.Unmarshal(body, &parsed); err != nil { + return "", "", err + } + if len(parsed.Data) == 0 { + return "", "", fmt.Errorf("langfuse: no project found") + } + return parsed.Data[0].ID, parsed.Data[0].Name, nil +} + +// AuthCheck verifies the credentials are valid, mirroring the Python langfuse +// SDK's auth_check(). It returns (false, nil) when the credentials are +// rejected, and (false, err) for transport/remote errors. +func (c *LangfuseClient) AuthCheck(ctx context.Context) (bool, error) { + _, _, err := c.GetProject(ctx) + if err != nil { + if errors.Is(err, ErrLangfuseUnauthorized) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/internal/service/langfuse_service.go b/internal/service/langfuse_service.go new file mode 100644 index 0000000000..8c60220a30 --- /dev/null +++ b/internal/service/langfuse_service.go @@ -0,0 +1,141 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "errors" + "fmt" + + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" + + "gorm.io/gorm" +) + +// langfuseVerifier abstracts Langfuse credential verification so the business +// logic can be unit-tested without performing real network calls. +type langfuseVerifier interface { + // AuthCheck mirrors the Python langfuse SDK auth_check(). + AuthCheck(ctx context.Context, host, publicKey, secretKey string) (bool, error) + // GetProject mirrors api.projects.get().dict()["data"][0] (id, name). + GetProject(ctx context.Context, host, publicKey, secretKey string) (string, string, error) +} + +// defaultLangfuseVerifier uses a real LangfuseClient for verification. +type defaultLangfuseVerifier struct{} + +func (defaultLangfuseVerifier) AuthCheck(ctx context.Context, host, publicKey, secretKey string) (bool, error) { + client := NewLangfuseClient(host, publicKey, secretKey) + defer client.Shutdown(context.Background()) + return client.AuthCheck(ctx) +} + +func (defaultLangfuseVerifier) GetProject(ctx context.Context, host, publicKey, secretKey string) (string, string, error) { + client := NewLangfuseClient(host, publicKey, secretKey) + defer client.Shutdown(context.Background()) + return client.GetProject(ctx) +} + +// LangfuseService implements the /langfuse/api-key business logic, mirroring +// the Python TenantLangfuseService + langfuse_api handlers. +type LangfuseService struct { + langfuseDAO *dao.LangfuseDAO + verifier langfuseVerifier +} + +// NewLangfuseService creates a LangfuseService with the default (live) verifier. +func NewLangfuseService() *LangfuseService { + return &LangfuseService{ + langfuseDAO: dao.NewLangfuse(), + verifier: defaultLangfuseVerifier{}, + } +} + +// SetAPIKey validates and stores (insert or update) the Langfuse credentials +// for a tenant. +func (s *LangfuseService) SetAPIKey(tenantID, secretKey, publicKey, host string) (*entity.TenantLangfuse, common.ErrorCode, error) { + if secretKey == "" || publicKey == "" || host == "" { + return nil, common.CodeDataError, errors.New("Missing required fields") + } + + ok, err := s.verifier.AuthCheck(context.Background(), host, publicKey, secretKey) + if err != nil { + return nil, common.CodeServerError, err + } + if !ok { + return nil, common.CodeDataError, errors.New("Invalid Langfuse keys") + } + + row := &entity.TenantLangfuse{ + TenantID: tenantID, + SecretKey: secretKey, + PublicKey: publicKey, + Host: host, + } + + if err := s.langfuseDAO.SaveByTenantID(row); err != nil { + return nil, common.CodeServerError, err + } + + return row, common.CodeSuccess, nil +} + +// GetAPIKey returns the stored credentials enriched with the Langfuse project +// id/name. +func (s *LangfuseService) GetAPIKey(tenantID string) (*entity.LangfuseInfoResponse, common.ErrorCode, string, error) { + row, err := s.langfuseDAO.GetByTenantID(tenantID) + if err != nil { + return nil, common.CodeServerError, "", err + } + if row == nil { + return nil, common.CodeSuccess, "Have not record any Langfuse keys.", nil + } + + projectID, projectName, err := s.verifier.GetProject(context.Background(), row.Host, row.PublicKey, row.SecretKey) + if err != nil { + if errors.Is(err, ErrLangfuseUnauthorized) { + return nil, common.CodeDataError, "Invalid Langfuse keys loaded", err + } + if IsLangfuseAPIError(err) { + return nil, common.CodeSuccess, fmt.Sprintf("Error from Langfuse: %s", err.Error()), nil + } + return nil, common.CodeServerError, "", err + } + + info := &entity.LangfuseInfoResponse{ + TenantID: row.TenantID, + Host: row.Host, + SecretKey: row.SecretKey, + PublicKey: row.PublicKey, + ProjectID: projectID, + ProjectName: projectName, + } + return info, common.CodeSuccess, "success", nil +} + +// DeleteAPIKey removes the stored credentials for a tenant. +func (s *LangfuseService) DeleteAPIKey(tenantID string) (bool, common.ErrorCode, string, error) { + if err := s.langfuseDAO.DeleteExistingByTenantID(tenantID); err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return false, common.CodeSuccess, "Have not record any Langfuse keys.", nil + } + return false, common.CodeServerError, "", err + } + return true, common.CodeSuccess, "", nil +} diff --git a/internal/service/langfuse_service_test.go b/internal/service/langfuse_service_test.go new file mode 100644 index 0000000000..9a64a5e2c7 --- /dev/null +++ b/internal/service/langfuse_service_test.go @@ -0,0 +1,245 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "errors" + "testing" + + "github.com/glebarez/sqlite" + "gorm.io/gorm" + + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" +) + +func setupLangfuseServiceTestDB(t *testing.T) *gorm.DB { + t.Helper() + + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) + if err != nil { + t.Fatalf("failed to open sqlite: %v", err) + } + if err := db.AutoMigrate(&entity.TenantLangfuse{}); err != nil { + t.Fatalf("failed to migrate test schema: %v", err) + } + + origDB := dao.DB + dao.DB = db + t.Cleanup(func() { dao.DB = origDB }) + return db +} + +// stubLangfuseVerifier is a controllable langfuseVerifier for tests. +type stubLangfuseVerifier struct { + authOK bool + authErr error + projID string + projName string + projErr error +} + +func (s stubLangfuseVerifier) AuthCheck(_ context.Context, _, _, _ string) (bool, error) { + return s.authOK, s.authErr +} + +func (s stubLangfuseVerifier) GetProject(_ context.Context, _, _, _ string) (string, string, error) { + return s.projID, s.projName, s.projErr +} + +func newLangfuseServiceForTest(v langfuseVerifier) *LangfuseService { + return &LangfuseService{langfuseDAO: dao.NewLangfuse(), verifier: v} +} + +func TestLangfuseService_SetAPIKey_MissingFields(t *testing.T) { + setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: true}) + + _, code, err := svc.SetAPIKey("tenant-1", "", "pk", "host") + if code != common.CodeDataError || err == nil || err.Error() != "Missing required fields" { + t.Fatalf("expected Missing required fields/CodeDataError, got code=%d err=%v", code, err) + } +} + +func TestLangfuseService_SetAPIKey_InvalidKeys(t *testing.T) { + setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: false}) + + _, code, err := svc.SetAPIKey("tenant-1", "sk", "pk", "host") + if code != common.CodeDataError || err == nil || err.Error() != "Invalid Langfuse keys" { + t.Fatalf("expected Invalid Langfuse keys/CodeDataError, got code=%d err=%v", code, err) + } +} + +func TestLangfuseService_SetAPIKey_VerifierError(t *testing.T) { + setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{authErr: errors.New("network down")}) + + _, code, err := svc.SetAPIKey("tenant-1", "sk", "pk", "host") + if code != common.CodeServerError || err == nil || err.Error() != "network down" { + t.Fatalf("expected verifier error/CodeServerError, got code=%d err=%v", code, err) + } +} + +func TestLangfuseService_SetAPIKey_CreateThenUpdate(t *testing.T) { + db := setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{authOK: true}) + + // Create + row, code, err := svc.SetAPIKey("tenant-1", "sk-1", "pk-1", "https://a.langfuse.com") + if err != nil || code != common.CodeSuccess { + t.Fatalf("create failed: code=%d err=%v", code, err) + } + if row.SecretKey != "sk-1" || row.Host != "https://a.langfuse.com" { + t.Fatalf("unexpected row: %+v", row) + } + + var count int64 + db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count) + if count != 1 { + t.Fatalf("expected 1 row after create, got %d", count) + } + + // Update (same tenant) should not create a second row + _, code, err = svc.SetAPIKey("tenant-1", "sk-2", "pk-2", "https://b.langfuse.com") + if err != nil || code != common.CodeSuccess { + t.Fatalf("update failed: code=%d err=%v", code, err) + } + db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count) + if count != 1 { + t.Fatalf("expected still 1 row after update, got %d", count) + } + stored, _ := dao.NewLangfuse().GetByTenantID("tenant-1") + if stored == nil || stored.SecretKey != "sk-2" || stored.Host != "https://b.langfuse.com" { + t.Fatalf("update not persisted: %+v", stored) + } +} + +func TestLangfuseService_GetAPIKey_NoRecord(t *testing.T) { + setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{}) + + data, code, message, err := svc.GetAPIKey("tenant-1") + if err != nil || code != common.CodeSuccess || data != nil { + t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err) + } + if message != "Have not record any Langfuse keys." { + t.Fatalf("unexpected message: %q", message) + } +} + +func TestLangfuseService_GetAPIKey_Unauthorized(t *testing.T) { + setupLangfuseServiceTestDB(t) + if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil { + t.Fatalf("seed failed: %v", err) + } + svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: ErrLangfuseUnauthorized}) + + data, code, message, err := svc.GetAPIKey("tenant-1") + if data != nil || code != common.CodeDataError || err == nil { + t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err) + } + if message != "Invalid Langfuse keys loaded" { + t.Fatalf("unexpected message: %q", message) + } +} + +func TestLangfuseService_GetAPIKey_ApiError(t *testing.T) { + setupLangfuseServiceTestDB(t) + if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil { + t.Fatalf("seed failed: %v", err) + } + svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: &LangfuseAPIError{StatusCode: 500, Body: "boom"}}) + + data, code, message, err := svc.GetAPIKey("tenant-1") + if data != nil || code != common.CodeSuccess || err != nil { + t.Fatalf("unexpected: code=%d data=%v err=%v", code, data, err) + } + if message != "Error from Langfuse: langfuse: unexpected status 500: boom" { + t.Fatalf("unexpected message: %q", message) + } +} + +func TestLangfuseService_GetAPIKey_NonAPIError(t *testing.T) { + setupLangfuseServiceTestDB(t) + if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil { + t.Fatalf("seed failed: %v", err) + } + svc := newLangfuseServiceForTest(stubLangfuseVerifier{projErr: errors.New("json parse failed")}) + + data, code, message, err := svc.GetAPIKey("tenant-1") + if data != nil || code != common.CodeServerError || message != "" || err == nil { + t.Fatalf("unexpected: code=%d message=%q data=%v err=%v", code, message, data, err) + } +} + +func TestLangfuseService_GetAPIKey_Success(t *testing.T) { + setupLangfuseServiceTestDB(t) + if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "https://a.langfuse.com"}); err != nil { + t.Fatalf("seed failed: %v", err) + } + svc := newLangfuseServiceForTest(stubLangfuseVerifier{projID: "proj-1", projName: "My Project"}) + + data, code, message, err := svc.GetAPIKey("tenant-1") + if err != nil || code != common.CodeSuccess || message != "success" { + t.Fatalf("unexpected: code=%d message=%q err=%v", code, message, err) + } + if data == nil { + t.Fatalf("expected data, got nil") + } + if data.TenantID != "tenant-1" || data.Host != "https://a.langfuse.com" || + data.SecretKey != "sk" || data.PublicKey != "pk" || + data.ProjectID != "proj-1" || data.ProjectName != "My Project" { + t.Fatalf("unexpected data: %+v", data) + } +} + +func TestLangfuseService_DeleteAPIKey_NoRecord(t *testing.T) { + setupLangfuseServiceTestDB(t) + svc := newLangfuseServiceForTest(stubLangfuseVerifier{}) + + ok, code, message, err := svc.DeleteAPIKey("tenant-1") + if ok || code != common.CodeSuccess || err != nil { + t.Fatalf("unexpected: ok=%v code=%d err=%v", ok, code, err) + } + if message != "Have not record any Langfuse keys." { + t.Fatalf("unexpected message: %q", message) + } +} + +func TestLangfuseService_DeleteAPIKey_Success(t *testing.T) { + db := setupLangfuseServiceTestDB(t) + if err := dao.NewLangfuse().Create(&entity.TenantLangfuse{TenantID: "tenant-1", SecretKey: "sk", PublicKey: "pk", Host: "host"}); err != nil { + t.Fatalf("seed failed: %v", err) + } + svc := newLangfuseServiceForTest(stubLangfuseVerifier{}) + + ok, code, message, err := svc.DeleteAPIKey("tenant-1") + if !ok || code != common.CodeSuccess || message != "" || err != nil { + t.Fatalf("unexpected: ok=%v code=%d message=%q err=%v", ok, code, message, err) + } + + var count int64 + db.Model(&entity.TenantLangfuse{}).Where("tenant_id = ?", "tenant-1").Count(&count) + if count != 0 { + t.Fatalf("expected row deleted, count=%d", count) + } +}