feat[Go]: implement create_connector API (#15285)

### What problem does this PR solve?

implement create_connector API

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Haruko386
2026-05-27 15:54:11 +08:00
committed by GitHub
parent 2c099bbb95
commit 82318dee5d
5 changed files with 193 additions and 7 deletions

View File

@@ -16,7 +16,10 @@
package entity
import "time"
import (
"encoding/json"
"time"
)
// Connector connector model
type Connector struct {
@@ -39,6 +42,53 @@ func (Connector) TableName() string {
return "connector"
}
// MarshalJSON formats connector timestamps to match the Python API contract.
func (c Connector) MarshalJSON() ([]byte, error) {
type connectorJSON struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
Name string `json:"name"`
Source string `json:"source"`
InputType string `json:"input_type"`
Config JSONMap `json:"config"`
RefreshFreq int64 `json:"refresh_freq"`
PruneFreq int64 `json:"prune_freq"`
TimeoutSecs int64 `json:"timeout_secs"`
IndexingStart *string `json:"indexing_start"`
Status string `json:"status"`
CreateTime *int64 `json:"create_time,omitempty"`
CreateDate *string `json:"create_date,omitempty"`
UpdateTime *int64 `json:"update_time,omitempty"`
UpdateDate *string `json:"update_date,omitempty"`
}
return json.Marshal(connectorJSON{
ID: c.ID,
TenantID: c.TenantID,
Name: c.Name,
Source: c.Source,
InputType: c.InputType,
Config: c.Config,
RefreshFreq: c.RefreshFreq,
PruneFreq: c.PruneFreq,
TimeoutSecs: c.TimeoutSecs,
IndexingStart: formatConnectorTime(c.IndexingStart),
Status: c.Status,
CreateTime: c.CreateTime,
CreateDate: formatConnectorTime(c.CreateDate),
UpdateTime: c.UpdateTime,
UpdateDate: formatConnectorTime(c.UpdateDate),
})
}
func formatConnectorTime(value *time.Time) *string {
if value == nil {
return nil
}
formatted := value.Format("2006-01-02T15:04:05")
return &formatted
}
// Connector2Kb connector to knowledge base mapping model
type Connector2Kb struct {
ID string `gorm:"column:id;primaryKey;size:32" json:"id"`

View File

@@ -20,6 +20,7 @@ import (
"net/http"
"ragflow/internal/common"
"ragflow/internal/entity"
"strings"
"github.com/gin-gonic/gin"
@@ -28,6 +29,7 @@ import (
type connectorService interface {
ListConnectors(userID string) (*service.ListConnectorsResponse, error)
CreateConnector(userID string, req *service.CreateConnectorRequest) (*entity.Connector, error)
GetConnector(connectorID string, userID string) (*entity.Connector, common.ErrorCode, error)
}
@@ -105,3 +107,70 @@ func (h *ConnectorHandler) GetConnector(c *gin.Context) {
"message": "success",
})
}
// CreateConnector create connector
// @Summary create Connectors
// @Description create a connectors for the current user
// @Tags connector
// @Accept json
// @Produce json
// @Success 200 {object} service.ListConnectorsResponse
// @Router /connector/ [post]
func (h *ConnectorHandler) CreateConnector(c *gin.Context) {
user, errorCode, errorMessage := GetUser(c)
if errorCode != common.CodeSuccess {
jsonError(c, errorCode, errorMessage)
return
}
var req service.CreateConnectorRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"code": common.CodeBadRequest,
"data": nil,
"message": "Invalid request body: " + err.Error(),
})
return
}
if strings.TrimSpace(req.Name) == "" {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeDataError,
"data": nil,
"message": "name is required",
})
return
}
if strings.TrimSpace(req.Source) == "" {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeDataError,
"data": nil,
"message": "source is required",
})
return
}
if req.Config == nil {
c.JSON(http.StatusOK, gin.H{
"code": common.CodeDataError,
"data": nil,
"message": "config is required",
})
return
}
connector, err := h.connectorService.CreateConnector(user.ID, &req)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"code": common.CodeServerError,
"data": nil,
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": common.CodeSuccess,
"data": connector,
"message": "success",
})
}

View File

@@ -25,6 +25,10 @@ func (s fakeConnectorService) ListConnectors(string) (*service.ListConnectorsRes
return &service.ListConnectorsResponse{Connectors: []*dao.ConnectorListItem{}}, nil
}
func (s fakeConnectorService) CreateConnector(string, *service.CreateConnectorRequest) (*entity.Connector, error) {
return s.connector, s.err
}
func (s fakeConnectorService) GetConnector(string, string) (*entity.Connector, common.ErrorCode, error) {
if s.err != nil {
return nil, s.code, s.err
@@ -32,6 +36,13 @@ func (s fakeConnectorService) GetConnector(string, string) (*entity.Connector, c
return s.connector, common.CodeSuccess, nil
}
func (s fakeConnectorService) UpdateConnector(string, string, *service.UpdateConnectorRequest) (*entity.Connector, common.ErrorCode, error) {
if s.err != nil {
return nil, s.code, s.err
}
return s.connector, common.CodeSuccess, nil
}
func TestConnectorHandlerGetConnector(t *testing.T) {
gin.SetMode(gin.TestMode)

View File

@@ -308,6 +308,7 @@ func (r *Router) Setup(engine *gin.Engine) {
connector := v1.Group("/connectors")
{
connector.GET("/", r.connectorHandler.ListConnectors)
connector.POST("/", r.connectorHandler.CreateConnector)
connector.GET("/:connector_id", r.connectorHandler.GetConnector)
}
@@ -357,12 +358,12 @@ func (r *Router) Setup(engine *gin.Engine) {
// Tenant routes (per-tenant resources)
tenant := v1.Group("/tenant")
{
tenant.POST("/chunk_store", r.tenantHandler.CreateChunkStore) // Internal API only for GO
tenant.DELETE("/chunk_store", r.tenantHandler.DeleteChunkStore) // Internal API only for GO
tenant.POST("/metadata_store", r.tenantHandler.CreateMetadataStore) // Internal API only for GO
tenant.DELETE("/metadata_store", r.tenantHandler.DeleteMetadataStore) // Internal API only for GO
tenant.POST("/insert_chunks_from_file", r.tenantHandler.InsertChunksFromFile) // Internal API only for GO
tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) // Internal API only for GO
tenant.POST("/chunk_store", r.tenantHandler.CreateChunkStore) // Internal API only for GO
tenant.DELETE("/chunk_store", r.tenantHandler.DeleteChunkStore) // Internal API only for GO
tenant.POST("/metadata_store", r.tenantHandler.CreateMetadataStore) // Internal API only for GO
tenant.DELETE("/metadata_store", r.tenantHandler.DeleteMetadataStore) // Internal API only for GO
tenant.POST("/insert_chunks_from_file", r.tenantHandler.InsertChunksFromFile) // Internal API only for GO
tenant.POST("/insert_metadata_from_file", r.tenantHandler.InsertMetadataFromFile) // Internal API only for GO
}
// Document routes

View File

@@ -26,6 +26,13 @@ import (
"gorm.io/gorm"
)
const (
connectorInputTypePoll = "poll"
connectorStatusUnstarted = "0"
defaultConnectorFreq = 5
defaultConnectorTimeout = 60 * 29
)
// ConnectorService connector service
type ConnectorService struct {
connectorDAO *dao.ConnectorDAO
@@ -45,6 +52,54 @@ type ListConnectorsResponse struct {
Connectors []*dao.ConnectorListItem `json:"connectors"`
}
// CreateConnectorRequest creates a connector with Python-compatible defaults.
type CreateConnectorRequest struct {
Name string `json:"name"`
Source string `json:"source"`
Config entity.JSONMap `json:"config"`
RefreshFreq *int64 `json:"refresh_freq,omitempty"`
PruneFreq *int64 `json:"prune_freq,omitempty"`
TimeoutSecs *int64 `json:"timeout_secs,omitempty"`
}
// CreateConnector creates a connector owned by the current user.
// Equivalent to Python's create_connector endpoint.
func (s *ConnectorService) CreateConnector(userID string, req *CreateConnectorRequest) (*entity.Connector, error) {
refreshFreq := int64(defaultConnectorFreq)
if req.RefreshFreq != nil {
refreshFreq = *req.RefreshFreq
}
pruneFreq := int64(defaultConnectorFreq)
if req.PruneFreq != nil {
pruneFreq = *req.PruneFreq
}
timeoutSecs := int64(defaultConnectorTimeout)
if req.TimeoutSecs != nil {
timeoutSecs = *req.TimeoutSecs
}
connector := &entity.Connector{
ID: common.GenerateUUID(),
TenantID: userID,
Name: req.Name,
Source: req.Source,
InputType: connectorInputTypePoll,
Config: req.Config,
RefreshFreq: refreshFreq,
PruneFreq: pruneFreq,
TimeoutSecs: timeoutSecs,
Status: connectorStatusUnstarted,
}
if err := s.connectorDAO.Create(connector); err != nil {
return nil, err
}
return s.connectorDAO.GetByID(connector.ID)
}
// GetConnector returns one connector when the user can access its tenant.
func (s *ConnectorService) GetConnector(connectorID string, userID string) (*entity.Connector, common.ErrorCode, error) {
if connectorID == "" {