Feature big query connector (#15871)

### What problem does this PR solve?

This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.

It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.

The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
This commit is contained in:
Attili-sys
2026-06-29 17:08:40 +03:00
committed by GitHub
parent 1087a25f22
commit 5fc254eb2e
18 changed files with 1854 additions and 49 deletions

View File

@@ -184,49 +184,98 @@ async def test_connector(connector_id):
"""Validate connector configuration without persisting changes or triggering sync.
For the REST API connector, this uses `RestAPIConnector.validate_config`
against the existing saved configuration.
against the existing saved configuration. For BigQuery, it runs `SELECT 1`
plus a free dry-run of the configured base query under `maximum_bytes_billed`
so bad credentials, wrong location, or runaway scans surface before scheduled
syncs run (and incur cost).
"""
if not ConnectorService.accessible(connector_id, current_user.id):
return _connector_auth_error(connector_id, current_user.id)
from common.data_source.rest_api_connector import RestAPIConnector
from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError
ok, conn = ConnectorService.get_by_id(connector_id)
if not ok:
return get_data_error_result(message="Can't find this Connector!")
if conn.source != DocumentSource.REST_API:
return get_json_result(
code=RetCode.ARGUMENT_ERROR,
message="Test endpoint currently supports only REST API connectors.",
data=False,
)
config = conn.config or {}
credentials = config.get("credentials") or {}
try:
await asyncio.to_thread(
RestAPIConnector.validate_config,
config=config,
credentials=credentials,
)
except (ConnectorValidationError, ConnectorMissingCredentialError) as exc:
return get_json_result(
code=RetCode.DATA_ERROR,
message=str(exc),
data=False,
)
except Exception as exc:
logging.exception("REST API connector validation failed: %s", exc)
return get_json_result(
code=RetCode.SERVER_ERROR,
message="REST API connector validation failed, please check logs.",
data=False,
)
if conn.source == DocumentSource.REST_API:
from common.data_source.rest_api_connector import RestAPIConnector
return get_json_result(data=True)
try:
await asyncio.to_thread(
RestAPIConnector.validate_config,
config=config,
credentials=credentials,
)
except (ConnectorValidationError, ConnectorMissingCredentialError) as exc:
return get_json_result(
code=RetCode.DATA_ERROR,
message=str(exc),
data=False,
)
except Exception as exc:
logging.exception("REST API connector validation failed: %s", exc)
return get_json_result(
code=RetCode.SERVER_ERROR,
message="REST API connector validation failed, please check logs.",
data=False,
)
return get_json_result(data=True)
if conn.source == DocumentSource.BIGQUERY:
from common.data_source.bigquery_connector import BigQueryConnector
def _validate_bigquery():
connector_kwargs = {
"project_id": config.get("project_id", ""),
"dataset_id": config.get("dataset_id") or None,
"table_id": config.get("table_id") or None,
"location": config.get("location") or None,
"query": config.get("query", ""),
"content_columns": config.get("content_columns", ""),
"metadata_columns": config.get("metadata_columns", ""),
"id_column": config.get("id_column") or None,
"timestamp_column": config.get("timestamp_column") or None,
"use_query_cache": config.get("use_query_cache", True),
}
if config.get("page_size") is not None:
connector_kwargs["page_size"] = int(config["page_size"])
if config.get("maximum_bytes_billed") is not None:
connector_kwargs["maximum_bytes_billed"] = int(config["maximum_bytes_billed"])
if config.get("job_timeout_ms") is not None:
connector_kwargs["job_timeout_ms"] = int(config["job_timeout_ms"])
connector = BigQueryConnector(**connector_kwargs)
connector.load_credentials(credentials)
connector.validate_connector_settings()
try:
await asyncio.to_thread(_validate_bigquery)
except (ConnectorValidationError, ConnectorMissingCredentialError) as exc:
return get_json_result(
code=RetCode.DATA_ERROR,
message=str(exc),
data=False,
)
except Exception as exc:
logging.exception("BigQuery connector validation failed: %s", exc)
return get_json_result(
code=RetCode.SERVER_ERROR,
message="BigQuery connector validation failed, please check logs.",
data=False,
)
return get_json_result(data=True)
return get_json_result(
code=RetCode.ARGUMENT_ERROR,
message="Test endpoint currently supports only REST API and BigQuery connectors.",
data=False,
)
WEB_FLOW_TTL_SECS = 15 * 60

View File

@@ -337,9 +337,14 @@ async def create_provider_instance(tenant_id: str, provider_id_or_name: str, ins
api_key_str = ""
if api_key:
api_key_str = api_key if isinstance(api_key, str) else json.dumps(api_key)
success, msg = await verify_api_key(provider_name, api_key, base_url, region, model_info)
if not success:
return False, msg
# Only verify when there are models to probe. Generic providers such as
# "OpenAI-API-Compatible" may start empty and receive custom models later.
factory_entry = next((f for f in FACTORY_LLM_INFOS if f["name"] == provider_name), None)
if (factory_entry and factory_entry.get("llm")) or model_info:
success, msg = await verify_api_key(provider_name, api_key, base_url, region, model_info)
if not success:
return False, msg
extra_fields = {}
if base_url:

View File

@@ -154,6 +154,7 @@ class FileSource(StrEnum):
SEAFILE = "seafile"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
BIGQUERY = "bigquery"
DINGTALK_AI_TABLE = "dingtalk_ai_table"
ONEDRIVE = "onedrive"
OUTLOOK = "outlook"

View File

@@ -47,6 +47,7 @@ from .imap_connector import ImapConnector
from .zendesk_connector import ZendeskConnector
from .seafile_connector import SeaFileConnector
from .rdbms_connector import RDBMSConnector
from .bigquery_connector import BigQueryConnector
from .webdav_connector import WebDAVConnector
from .rest_api_connector import RestAPIConnector
from .config import BlobType, DocumentSource
@@ -94,6 +95,7 @@ __all__ = [
"ZendeskConnector",
"SeaFileConnector",
"RDBMSConnector",
"BigQueryConnector",
"WebDAVConnector",
"DingTalkAITableConnector",
"RestAPIConnector",

View File

@@ -0,0 +1,677 @@
"""Google BigQuery data source connector for importing query/table rows into documents.
This connector shares the user-facing row model of ``RDBMSConnector`` (MySQL/PostgreSQL):
selected content columns become document text, selected metadata columns become metadata,
an optional ID column produces stable document IDs, and an optional timestamp column enables
cursor-based incremental sync plus deleted-row pruning.
Unlike the RDBMS connector, BigQuery is a Google Cloud query-job service. This implementation
therefore uses the official ``google-cloud-bigquery`` client, service-account authentication,
explicit billing controls (``maximum_bytes_billed``, ``job_timeout_ms``), dry-run validation,
parameterized cursors with a resolved column type, and BigQuery-aware value serialization.
"""
import base64
import copy
import hashlib
import json
import logging
from datetime import date, datetime, time, timezone
from decimal import Decimal
from typing import Any, Dict, Generator, List, Optional, Tuple
from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE
from common.data_source.exceptions import (
ConnectorMissingCredentialError,
ConnectorValidationError,
)
from common.data_source.interfaces import (
LoadConnector,
PollConnector,
SecondsSinceUnixEpoch,
SlimConnectorWithPermSync,
)
from common.data_source.models import Document, SlimDocument
try:
from google.cloud import bigquery
from google.oauth2 import service_account
except ImportError: # pragma: no cover - import guarded at runtime
bigquery = None
service_account = None
# Marker keys used to round-trip non-JSON-native cursor values through the
# connector config (which is persisted as JSON).
_CURSOR_TYPE_KEY = "__ragflow_bq_cursor_type__"
# Default cost guard: 1 GiB. Users can raise this explicitly.
DEFAULT_MAXIMUM_BYTES_BILLED = 1024 * 1024 * 1024
# Maps a BigQuery field type to the ScalarQueryParameter type used for cursors.
_CURSOR_PARAM_TYPE_MAP = {
"TIMESTAMP": "TIMESTAMP",
"DATETIME": "DATETIME",
"DATE": "DATE",
"TIME": "TIME",
"INTEGER": "INT64",
"INT64": "INT64",
"NUMERIC": "NUMERIC",
"BIGNUMERIC": "BIGNUMERIC",
"FLOAT": "FLOAT64",
"FLOAT64": "FLOAT64",
}
class BigQueryConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
"""Import rows from a BigQuery table or custom query into documents.
The flow mirrors ``RDBMSConnector``:
1. Authenticate with a service account and build a BigQuery client.
2. Read rows from a single configured table or a single custom SQL query.
3. Build document content from the selected content columns.
4. Copy the selected metadata columns into document metadata.
5. Use the configured ID column as the stable document ID, or hash the content.
6. For incremental sync, treat the timestamp column as an ordered cursor and filter by it.
7. For deleted-file sync, read a slim snapshot of current row IDs.
"Empty query means all tables" is intentionally NOT supported here: scanning every table
is dangerous and expensive in BigQuery. Either a custom ``query`` or both ``dataset_id``
and ``table_id`` must be provided.
"""
def __init__(
self,
project_id: str,
dataset_id: Optional[str] = None,
table_id: Optional[str] = None,
location: Optional[str] = None,
query: str = "",
content_columns: str = "",
metadata_columns: Optional[str] = None,
id_column: Optional[str] = None,
timestamp_column: Optional[str] = None,
batch_size: int = INDEX_BATCH_SIZE,
page_size: int = 1000,
maximum_bytes_billed: Optional[int] = DEFAULT_MAXIMUM_BYTES_BILLED,
job_timeout_ms: Optional[int] = None,
use_query_cache: bool = True,
) -> None:
"""Initialize the BigQuery connector.
Args:
project_id: GCP project that owns the query jobs.
dataset_id: Dataset id (table mode).
table_id: Table id (table mode).
location: Default location for the client and query jobs (e.g. "US", "EU").
query: Custom GoogleSQL query (custom query mode). Takes precedence over table mode.
content_columns: Comma-separated column names used for document content.
metadata_columns: Comma-separated column names used as metadata (optional).
id_column: Column used as the stable document ID (optional; hash fallback otherwise).
timestamp_column: Column used for incremental cursor sync (optional).
batch_size: Number of documents per yielded batch.
page_size: BigQuery result page size (a fetch hint, not the batch size).
maximum_bytes_billed: Hard cost guard passed to every query job.
job_timeout_ms: Optional per-job timeout in milliseconds.
use_query_cache: Whether to allow BigQuery's query result cache.
"""
self.project_id = (project_id or "").strip()
self.dataset_id = (dataset_id or "").strip()
self.table_id = (table_id or "").strip()
self.location = (location or "").strip()
self.query = (query or "").strip()
self.content_columns = [c.strip() for c in (content_columns or "").split(",") if c.strip()]
self.metadata_columns = [c.strip() for c in (metadata_columns or "").split(",") if c.strip()]
self.id_column = id_column.strip() if id_column else None
self.timestamp_column = timestamp_column.strip() if timestamp_column else None
self.batch_size = batch_size
self.page_size = page_size
self.maximum_bytes_billed = maximum_bytes_billed
self.job_timeout_ms = job_timeout_ms
self.use_query_cache = use_query_cache
self._client = None
self._credentials: Dict[str, Any] = {}
self._cursor_param_type: Optional[str] = None
self._sync_connector_id: str | None = None
self._sync_config: Dict[str, Any] | None = None
self._pending_sync_cursor_value: Any = None
self._pending_sync_cursor_id: Any = None
# ------------------------------------------------------------------ #
# Credentials & client
# ------------------------------------------------------------------ #
def load_credentials(self, credentials: Dict[str, Any]) -> Dict[str, Any] | None:
"""Load BigQuery service-account credentials.
Accepts ``service_account_json`` as either a dict or a JSON string.
"""
logging.debug("Loading credentials for BigQuery project: %s", self.project_id)
raw = (credentials or {}).get("service_account_json")
if not raw:
raise ConnectorMissingCredentialError("BigQuery: missing service_account_json")
if isinstance(raw, str):
try:
service_account_info = json.loads(raw)
except json.JSONDecodeError as exc:
raise ConnectorMissingCredentialError(
f"BigQuery: service_account_json is not valid JSON: {exc}"
)
elif isinstance(raw, dict):
service_account_info = raw
else:
raise ConnectorMissingCredentialError(
"BigQuery: service_account_json must be a JSON string or object"
)
self._credentials = {"service_account_info": service_account_info}
return None
def _get_client(self):
"""Create and cache a BigQuery client from the loaded service account."""
if self._client is not None:
return self._client
if bigquery is None or service_account is None:
raise ConnectorValidationError(
"BigQuery client not installed. Please install google-cloud-bigquery."
)
service_account_info = self._credentials.get("service_account_info")
if not service_account_info:
raise ConnectorMissingCredentialError("BigQuery credentials not loaded.")
try:
creds = service_account.Credentials.from_service_account_info(service_account_info)
except Exception as exc:
raise ConnectorValidationError(f"Failed to build BigQuery credentials: {exc}")
try:
self._client = bigquery.Client(
project=self.project_id or None,
credentials=creds,
location=self.location or None,
)
except Exception as exc:
raise ConnectorValidationError(f"Failed to create BigQuery client: {exc}")
return self._client
# ------------------------------------------------------------------ #
# Query construction
# ------------------------------------------------------------------ #
def _build_base_query(self) -> str:
"""Return the single base query (custom query takes precedence over table mode)."""
if self.query:
return self.query.rstrip(";")
if self.dataset_id and self.table_id:
return f"SELECT * FROM `{self.project_id}.{self.dataset_id}.{self.table_id}`"
raise ConnectorValidationError(
"BigQuery requires either a custom query or both dataset_id and table_id."
)
@staticmethod
def _wrap_query(base_query: str, select_clause: str = "*") -> str:
return f"SELECT {select_clause} FROM ({base_query}) AS ragflow_src"
def _build_query_job_config(
self,
query_parameters: Optional[List[Any]] = None,
dry_run: bool = False,
):
config = bigquery.QueryJobConfig()
config.use_legacy_sql = False
config.use_query_cache = self.use_query_cache
if self.maximum_bytes_billed:
config.maximum_bytes_billed = int(self.maximum_bytes_billed)
if self.job_timeout_ms:
config.job_timeout_ms = int(self.job_timeout_ms)
if query_parameters:
config.query_parameters = query_parameters
if dry_run:
config.dry_run = True
config.use_query_cache = False
return config
def _resolve_schema(self, client: Any, dry_run_job: Any = None) -> List[Any]:
if not self.query and self.dataset_id and self.table_id:
table = client.get_table(f"{self.project_id}.{self.dataset_id}.{self.table_id}")
return table.schema
else:
if dry_run_job is None:
dry_run_job = client.query(
self._wrap_query(self._build_base_query()),
job_config=self._build_query_job_config(dry_run=True),
location=self.location or None,
)
return dry_run_job.schema or []
def _get_cursor_column_field_type(self) -> str:
"""Resolve the BigQuery field type of the timestamp column."""
client = self._get_client()
schema = self._resolve_schema(client)
for field in schema:
if field.name == self.timestamp_column:
return field.field_type
raise ConnectorValidationError(
f"BigQuery timestamp column '{self.timestamp_column}' was not found in the schema."
)
def _resolve_cursor_param_type(self) -> str:
if self._cursor_param_type is not None:
return self._cursor_param_type
field_type = (self._get_cursor_column_field_type() or "").upper()
param_type = _CURSOR_PARAM_TYPE_MAP.get(field_type)
if param_type is None:
raise ConnectorValidationError(
f"BigQuery timestamp column type '{field_type}' is not supported as a cursor."
)
self._cursor_param_type = param_type
return param_type
def _make_cursor_param(self, name: str, value: Any, param_type: str):
return bigquery.ScalarQueryParameter(name, param_type, value)
def _build_time_filtered_query(
self,
base_query: str,
start: Any = None,
end: Any = None,
start_id: Any = None,
) -> Tuple[str, List[Any]]:
wrapped = self._wrap_query(base_query)
if not self.timestamp_column or (start is None and end is None):
return wrapped, []
param_type = self._resolve_cursor_param_type()
conditions: List[str] = []
params: List[Any] = []
if start is not None:
if self.id_column and start_id is not None:
conditions.append(
f"(ragflow_src.{self.timestamp_column} > @start_cursor OR "
f"(ragflow_src.{self.timestamp_column} = @start_cursor AND ragflow_src.{self.id_column} > @start_cursor_id))"
)
params.append(self._make_cursor_param("start_cursor", start, param_type))
params.append(self._make_cursor_param("start_cursor_id", start_id, "STRING"))
else:
conditions.append(f"ragflow_src.{self.timestamp_column} >= @start_cursor")
params.append(self._make_cursor_param("start_cursor", start, param_type))
if end is not None:
conditions.append(f"ragflow_src.{self.timestamp_column} <= @end_cursor")
params.append(self._make_cursor_param("end_cursor", end, param_type))
if conditions:
wrapped = f"{wrapped} WHERE {' AND '.join(conditions)}"
return wrapped, params
def _build_max_timestamp_query(self, base_query: str) -> str:
if self.id_column:
# We need both the max timestamp and the max id for that timestamp
return (
f"SELECT ragflow_src.{self.timestamp_column}, MAX(ragflow_src.{self.id_column}) "
f"FROM ({base_query}) AS ragflow_src "
f"WHERE ragflow_src.{self.timestamp_column} = ("
f" SELECT MAX({self.timestamp_column}) FROM ({base_query})"
f") "
f"GROUP BY ragflow_src.{self.timestamp_column}"
)
return (
f"SELECT MAX(ragflow_src.{self.timestamp_column}), NULL "
f"FROM ({base_query}) AS ragflow_src"
)
def _build_slim_query(self, base_query: str) -> str:
columns = [self.id_column] if self.id_column else self.content_columns
select_clause = ", ".join(f"ragflow_src.{column}" for column in columns)
return self._wrap_query(base_query, select_clause)
# ------------------------------------------------------------------ #
# Cursor (de)serialization
# ------------------------------------------------------------------ #
@staticmethod
def serialize_cursor_value(value: Any) -> Any:
# Connector config is JSON, so datetime/date must be wrapped. Other
# scalar cursors (int/float/str) round-trip natively.
if isinstance(value, datetime):
return {_CURSOR_TYPE_KEY: "datetime", "value": value.isoformat()}
if isinstance(value, date):
return {_CURSOR_TYPE_KEY: "date", "value": value.isoformat()}
if isinstance(value, time):
return {_CURSOR_TYPE_KEY: "time", "value": value.isoformat()}
if isinstance(value, Decimal):
return {_CURSOR_TYPE_KEY: "decimal", "value": str(value)}
return value
@staticmethod
def deserialize_cursor_value(value: Any) -> Any:
if isinstance(value, dict) and _CURSOR_TYPE_KEY in value:
kind = value.get(_CURSOR_TYPE_KEY)
if kind == "datetime":
return datetime.fromisoformat(value["value"])
if kind == "date":
return date.fromisoformat(value["value"])
if kind == "time":
return time.fromisoformat(value["value"])
if kind == "decimal":
return Decimal(value["value"])
return value
# ------------------------------------------------------------------ #
# Value rendering
# ------------------------------------------------------------------ #
@staticmethod
def _render_content_value(value: Any) -> Optional[str]:
"""Render a value for document content. Returns None to skip the value."""
if isinstance(value, (bytes, bytearray)):
# Binary content is not meaningful as document text; skip it.
return None
if isinstance(value, (datetime, date, time)):
return value.isoformat()
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False, default=str)
return str(value)
@staticmethod
def _render_metadata_value(value: Any) -> str:
if isinstance(value, (bytes, bytearray)):
return base64.b64encode(bytes(value)).decode("ascii")
if isinstance(value, (datetime, date, time)):
return value.isoformat()
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False, default=str)
return str(value)
def _build_content(self, row_dict: Dict[str, Any]) -> str:
content_parts = []
for col in self.content_columns:
if col not in row_dict or row_dict[col] is None:
continue
rendered = self._render_content_value(row_dict[col])
if rendered is None:
continue
content_parts.append(f"{col}】:\n{rendered}")
return "\n\n".join(content_parts)
def _id_prefix(self) -> str:
if not self.query and self.dataset_id and self.table_id:
return f"bigquery:{self.project_id}:{self.dataset_id}.{self.table_id}"
return f"bigquery:{self.project_id}:query"
def _build_document_id_from_row(self, row_dict: Dict[str, Any]) -> str:
prefix = self._id_prefix()
if self.id_column and self.id_column in row_dict and row_dict[self.id_column] is not None:
return f"{prefix}:{row_dict[self.id_column]}"
content = self._build_content(row_dict)
content_hash = hashlib.md5(content.encode()).hexdigest()
return f"{prefix}:{content_hash}"
def _row_to_document(self, row_dict: Dict[str, Any]) -> Document:
content = self._build_content(row_dict)
metadata = {}
for col in self.metadata_columns:
if col not in row_dict or row_dict[col] is None:
continue
metadata[col] = self._render_metadata_value(row_dict[col])
doc_updated_at = datetime.now(timezone.utc)
if self.timestamp_column and row_dict.get(self.timestamp_column) is not None:
ts_value = row_dict[self.timestamp_column]
if isinstance(ts_value, datetime):
if ts_value.tzinfo is None:
doc_updated_at = ts_value.replace(tzinfo=timezone.utc)
else:
doc_updated_at = ts_value.astimezone(timezone.utc)
elif isinstance(ts_value, date):
doc_updated_at = datetime(
ts_value.year, ts_value.month, ts_value.day, tzinfo=timezone.utc
)
first_content_col = self.content_columns[0] if self.content_columns else "record"
semantic_id = (
str(row_dict.get(first_content_col, "bigquery_record"))
.replace("\n", " ")
.replace("\r", " ")
.strip()[:100]
)
blob = content.encode("utf-8")
return Document(
id=self._build_document_id_from_row(row_dict),
blob=blob,
source=DocumentSource.BIGQUERY,
semantic_identifier=semantic_id,
extension=".txt",
doc_updated_at=doc_updated_at,
size_bytes=len(blob),
metadata=metadata if metadata else None,
)
# ------------------------------------------------------------------ #
# Query execution
# ------------------------------------------------------------------ #
def _yield_rows_from_query(
self,
query: str,
query_parameters: Optional[List[Any]] = None,
) -> Generator[list[Document], None, None]:
client = self._get_client()
logging.info("Executing BigQuery query: %s...", query[:200])
job = client.query(
query,
job_config=self._build_query_job_config(query_parameters=query_parameters),
location=self.location or None,
)
result = job.result(page_size=self.page_size)
batch: list[Document] = []
for row in result:
try:
doc = self._row_to_document(dict(row))
batch.append(doc)
if len(batch) >= self.batch_size:
yield batch
batch = []
except Exception as exc:
logging.warning("Error converting BigQuery row to document: %s", exc)
continue
if batch:
yield batch
def _yield_slim_documents_from_query(
self,
query: str,
) -> Generator[list[SlimDocument], None, None]:
client = self._get_client()
logging.debug("Executing BigQuery slim query: %s...", query[:200])
job = client.query(
query,
job_config=self._build_query_job_config(),
location=self.location or None,
)
result = job.result(page_size=self.page_size)
batch: list[SlimDocument] = []
for row in result:
batch.append(SlimDocument(id=self._build_document_id_from_row(dict(row))))
if len(batch) >= self.batch_size:
yield batch
batch = []
if batch:
yield batch
def _yield_documents(
self,
start: Any = None,
end: Any = None,
start_id: Any = None,
) -> Generator[list[Document], None, None]:
base_query = self._build_base_query()
query, params = self._build_time_filtered_query(base_query, start, end, start_id)
yield from self._yield_rows_from_query(query, params)
def get_max_cursor_value(self) -> Tuple[Any, Any]:
if not self.timestamp_column:
return None, None
client = self._get_client()
query = self._build_max_timestamp_query(self._build_base_query())
logging.debug("Executing BigQuery max timestamp query: %s...", query[:200])
job = client.query(
query,
job_config=self._build_query_job_config(),
location=self.location or None,
)
row = next(iter(job.result()), None)
if row is None or row[0] is None:
return None, None
return row[0], row[1]
# ------------------------------------------------------------------ #
# LoadConnector / PollConnector
# ------------------------------------------------------------------ #
def load_from_state(self) -> Generator[list[Document], None, None]:
"""Load all rows from the configured table/query (full sync)."""
logging.debug("Loading all records from BigQuery project: %s", self.project_id)
return self._yield_documents()
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> Generator[list[Document], None, None]:
"""Poll for new/updated rows. Provided for interface completeness.
Orchestration drives full/incremental sync via ``load_from_state`` /
``load_from_cursor_range``; this falls back to a full sync without a
timestamp column.
"""
if not self.timestamp_column:
logging.warning(
"No timestamp column configured for incremental sync. Falling back to full sync."
)
return self.load_from_state()
start_dt = datetime.fromtimestamp(start, tz=timezone.utc) if start else None
end_dt = datetime.fromtimestamp(end, tz=timezone.utc) if end else None
return self._yield_documents(start_dt, end_dt)
def load_from_cursor_range(
self,
start_value: Any = None,
end_value: Any = None,
start_id: Any = None,
) -> Generator[list[Document], None, None]:
if end_value is None:
return iter(())
if start_value is not None and end_value < start_value:
return iter(())
return self._yield_documents(start_value, end_value, start_id)
def retrieve_all_slim_docs_perm_sync(
self,
callback: Any = None,
) -> Generator[list[SlimDocument], None, None]:
del callback
yield from self._yield_slim_documents_from_query(
self._build_slim_query(self._build_base_query())
)
# ------------------------------------------------------------------ #
# Sync-state persistence (success-only cursor)
# ------------------------------------------------------------------ #
def prepare_sync_state(self, connector_id: str, config: Dict[str, Any]) -> None:
self._sync_connector_id = connector_id
self._sync_config = copy.deepcopy(config)
if not self.timestamp_column:
self._pending_sync_cursor_value = None
self._pending_sync_cursor_id = None
return
self._pending_sync_cursor_value, self._pending_sync_cursor_id = self.get_max_cursor_value()
def get_saved_sync_cursor_value(self) -> Any:
if self._sync_config is None:
return None
return self.deserialize_cursor_value(self._sync_config.get("sync_cursor_value"))
def get_saved_sync_cursor_id(self) -> Any:
if self._sync_config is None:
return None
return self._sync_config.get("sync_cursor_id")
def persist_sync_state(self) -> None:
if not self.timestamp_column or self._sync_connector_id is None or self._sync_config is None:
return
from api.db.services.connector_service import ConnectorService
updated_conf = copy.deepcopy(self._sync_config)
updated_conf["sync_cursor_value"] = self.serialize_cursor_value(
self._pending_sync_cursor_value
)
updated_conf["sync_cursor_id"] = self._pending_sync_cursor_id
ConnectorService.update_by_id(self._sync_connector_id, {"config": updated_conf})
self._sync_config = updated_conf
# ------------------------------------------------------------------ #
# Validation
# ------------------------------------------------------------------ #
def validate_connector_settings(self) -> None:
"""Validate settings via SELECT 1 plus a dry-run of the configured base query."""
if not self._credentials:
raise ConnectorMissingCredentialError("BigQuery credentials not loaded.")
if not self.project_id:
raise ConnectorValidationError("BigQuery project_id is required.")
if not self.content_columns:
raise ConnectorValidationError("At least one content column must be specified.")
if not self.query and not (self.dataset_id and self.table_id):
raise ConnectorValidationError(
"BigQuery requires either a custom query or both dataset_id and table_id."
)
try:
client = self._get_client()
# Cheap connectivity check.
client.query(
"SELECT 1",
job_config=self._build_query_job_config(),
location=self.location or None,
).result()
# Free cost/validity check of the actual base query.
dry_run_job = client.query(
self._wrap_query(self._build_base_query()),
job_config=self._build_query_job_config(dry_run=True),
location=self.location or None,
)
estimated_bytes = getattr(dry_run_job, "total_bytes_processed", None)
if estimated_bytes is not None:
logging.info(
"BigQuery base query dry-run estimate: %s bytes processed.", estimated_bytes
)
schema = self._resolve_schema(client, dry_run_job)
schema_columns = {field.name for field in schema}
required = set(self.content_columns)
optional = set(self.metadata_columns)
if self.id_column:
optional.add(self.id_column)
if self.timestamp_column:
optional.add(self.timestamp_column)
missing = (required | optional) - schema_columns
if missing:
raise ConnectorValidationError(
f"BigQuery configured columns not found in schema: {', '.join(sorted(missing))}"
)
if self.timestamp_column:
self._resolve_cursor_param_type()
except (ConnectorValidationError, ConnectorMissingCredentialError):
raise
except Exception as exc:
raise ConnectorValidationError(f"BigQuery validation failed: {exc}")

View File

@@ -68,6 +68,7 @@ class DocumentSource(str, Enum):
SEAFILE = "seafile"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
BIGQUERY = "bigquery"
DINGTALK_AI_TABLE = "dingtalk_ai_table"
ONEDRIVE = "onedrive"
OUTLOOK = "outlook"

View File

@@ -359,7 +359,7 @@ class RDBMSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
conditions = []
if start is not None:
conditions.append(
f"ragflow_src.{self.timestamp_column} > {self._format_sql_value(start)}"
f"ragflow_src.{self.timestamp_column} >= {self._format_sql_value(start)}"
)
if end is not None:
conditions.append(
@@ -666,16 +666,17 @@ class RDBMSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
self,
start_value: Any = None,
end_value: Any = None,
start_id: Any = None,
) -> Generator[list[Document], None, None]:
"""Yield documents whose timestamp column falls in ``(start_value, end_value]``.
"""Yield documents whose timestamp column falls in ``[start_value, end_value]``.
Returns an empty iterator when *end_value* is ``None`` or the range is
empty (``end_value <= start_value``).
empty (``end_value < start_value``).
"""
if end_value is None:
self._close_connection()
return iter(())
if start_value is not None and end_value <= start_value:
if start_value is not None and end_value < start_value:
self._close_connection()
return iter(())
return self._yield_documents(start_value, end_value)

View File

@@ -28,7 +28,7 @@ dependencies = [
"chardet>=5.2.0,<6.0.0",
"cn2an==0.5.22",
"cohere==5.6.2",
"crawl4ai>=0.8.0",
"crawl4ai>=0.8.0,<0.8.6",
"dashscope==1.25.11",
"deepl==1.18.0",
"debugpy>=1.8.13",
@@ -49,6 +49,7 @@ dependencies = [
"flask-session==0.8.0",
"google-api-python-client>=2.190.0,<3.0.0",
"google-auth-oauthlib>=1.2.0,<2.0.0",
"google-cloud-bigquery>=3.25.0,<4.0.0",
"google-cloud-storage>=2.19.0,<3.0.0",
"google-genai>=1.41.0,<2.0.0",
"google-search-results==2.4.2",
@@ -214,6 +215,8 @@ exclude-dependencies = [
# crawl4ai>=0.8.6 depends on unclecode-litellm, which installs the same
# `litellm` package namespace as upstream litellm and can corrupt imports.
"unclecode-litellm",
# Transitive dep of agentrun-sdk; not imported anywhere in RAGFlow code.
"agentrun-mem0ai",
]
override-dependencies = [
# moodlepy<=0.24.1 pins attrs<23.0.0, but trio>=0.26.0 requires attrs>=23.2.0.

View File

@@ -59,6 +59,7 @@ from common.data_source import (
ZendeskConnector,
SeaFileConnector,
RDBMSConnector,
BigQueryConnector,
DingTalkAITableConnector,
RestAPIConnector,
OneDriveConnector,
@@ -208,6 +209,7 @@ class SyncBase:
document_batch_generator = await self._generate(task)
failed_docs = 0
had_parse_errors = False
added_docs = 0
updated_docs = 0
next_update = datetime(1970, 1, 1, tzinfo=timezone.utc)
@@ -257,6 +259,8 @@ class SyncBase:
f"{self.SOURCE_NAME}/{task['connector_id']}",
task["auto_parse"]
)
if err:
had_parse_errors = True
SyncLogsService.increase_docs(
task["id"], max_update,
len(docs), "\n".join(err), len(err)
@@ -295,8 +299,9 @@ class SyncBase:
logging.info(summary)
if (
isinstance(self, _RDBMSBase)
isinstance(self, _CursorPersistingSyncBase)
and failed_docs == 0
and not had_parse_errors
):
self.connector.persist_sync_state()
SyncLogsService.done(task["id"], task["connector_id"])
@@ -2053,7 +2058,17 @@ class DingTalkAITable(SyncBase):
return document_generator
class _RDBMSBase(SyncBase):
class _CursorPersistingSyncBase(SyncBase):
"""Base for connectors that persist a sync cursor only after a fully successful sync.
``_run_sync_task_logic`` calls ``self.connector.persist_sync_state()`` for any
subclass of this base when no document batch failed. Sources whose connector
exposes ``prepare_sync_state``/``persist_sync_state`` (RDBMS, BigQuery) extend
this instead of being matched by an ``isinstance(self, _RDBMSBase)`` check.
"""
class _RDBMSBase(_CursorPersistingSyncBase):
DB_TYPE: str = ""
LOG_NAME: str = ""
DEFAULT_PORT: int = 0
@@ -2089,9 +2104,11 @@ class _RDBMSBase(SyncBase):
else:
poll_start = task["poll_range_start"]
start_cursor_value = self.connector.get_saved_sync_cursor_value()
start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None
document_generator = self.connector.load_from_cursor_range(
start_cursor_value,
self.connector._pending_sync_cursor_value,
start_cursor_id,
)
_begin_info = f"from {poll_start}"
@@ -2113,6 +2130,73 @@ class PostgreSQL(_RDBMSBase):
DEFAULT_PORT: int = 5432
class BigQuery(_CursorPersistingSyncBase):
SOURCE_NAME: str = FileSource.BIGQUERY
def _get_source_prefix(self):
return "[BigQuery]"
async def _generate(self, task: dict):
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
try:
batch_size = int(raw_batch_size)
except (TypeError, ValueError):
batch_size = INDEX_BATCH_SIZE
if batch_size <= 0:
batch_size = INDEX_BATCH_SIZE
connector_kwargs = {
"project_id": self.conf.get("project_id", ""),
"dataset_id": self.conf.get("dataset_id") or None,
"table_id": self.conf.get("table_id") or None,
"location": self.conf.get("location") or None,
"query": self.conf.get("query", ""),
"content_columns": self.conf.get("content_columns", ""),
"metadata_columns": self.conf.get("metadata_columns", ""),
"id_column": self.conf.get("id_column") or None,
"timestamp_column": self.conf.get("timestamp_column") or None,
"batch_size": batch_size,
"use_query_cache": self.conf.get("use_query_cache", True),
}
if self.conf.get("page_size") is not None:
connector_kwargs["page_size"] = int(self.conf["page_size"])
if self.conf.get("maximum_bytes_billed") is not None:
connector_kwargs["maximum_bytes_billed"] = int(self.conf["maximum_bytes_billed"])
if self.conf.get("job_timeout_ms") is not None:
connector_kwargs["job_timeout_ms"] = int(self.conf["job_timeout_ms"])
self.connector = BigQueryConnector(**connector_kwargs)
credentials = self.conf.get("credentials")
if not credentials:
raise ValueError("BigQuery connector is missing credentials.")
self.connector.load_credentials(credentials)
self.connector.validate_connector_settings()
self.connector.prepare_sync_state(task["connector_id"], self.conf)
if task["reindex"] == "1" or not task["poll_range_start"]:
document_generator = self.connector.load_from_state()
elif not self.connector.timestamp_column:
document_generator = self.connector.load_from_state()
else:
start_cursor_value = self.connector.get_saved_sync_cursor_value()
start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None
document_generator = self.connector.load_from_cursor_range(
start_cursor_value,
self.connector._pending_sync_cursor_value,
start_cursor_id,
)
target = (
f"{self.conf.get('dataset_id')}.{self.conf.get('table_id')}"
if not self.conf.get("query")
else "custom query"
)
self.log_connection("BigQuery", f"{self.conf.get('project_id')}:{target}", task)
return document_generator
class REST_API(SyncBase):
SOURCE_NAME: str = FileSource.REST_API
@@ -2173,6 +2257,7 @@ func_factory = {
FileSource.SEAFILE: SeaFile,
FileSource.MYSQL: MySQL,
FileSource.POSTGRESQL: PostgreSQL,
FileSource.BIGQUERY: BigQuery,
FileSource.DINGTALK_AI_TABLE: DingTalkAITable,
FileSource.REST_API: REST_API,
}

View File

@@ -0,0 +1,430 @@
#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import types
from datetime import date, datetime, time, timezone
from decimal import Decimal
import pytest
from common.data_source import bigquery_connector as bq_mod
from common.data_source.bigquery_connector import BigQueryConnector
from common.data_source.config import DocumentSource
from common.data_source.exceptions import (
ConnectorMissingCredentialError,
ConnectorValidationError,
)
# ---------------------------------------------------------------------------
# Fakes for the google-cloud-bigquery client surface
# ---------------------------------------------------------------------------
class _FakeSchemaField:
def __init__(self, name, field_type):
self.name = name
self.field_type = field_type
class _FakeScalarQueryParameter:
def __init__(self, name, type_, value):
self.name = name
self.type_ = type_
self.value = value
class _FakeQueryJobConfig:
def __init__(self):
self.use_legacy_sql = None
self.use_query_cache = None
self.maximum_bytes_billed = None
self.job_timeout_ms = None
self.query_parameters = None
self.dry_run = False
class _FakeJob:
def __init__(self, rows=None, schema=None, total_bytes=123):
self._rows = rows if rows is not None else []
self.schema = schema or []
self.total_bytes_processed = total_bytes
def result(self, page_size=None):
return iter(self._rows)
class _FakeClient:
def __init__(self, rows=None, result_schema=None, table_schema=None):
self._rows = rows if rows is not None else []
self._result_schema = result_schema or []
self._table_schema = table_schema or []
self.queries = []
def query(self, query, job_config=None, location=None):
self.queries.append((query, job_config, location))
return _FakeJob(rows=self._rows, schema=self._result_schema)
def get_table(self, ref):
return types.SimpleNamespace(schema=self._table_schema)
@pytest.fixture(autouse=True)
def _patch_bigquery(monkeypatch):
fake = types.SimpleNamespace(
QueryJobConfig=_FakeQueryJobConfig,
ScalarQueryParameter=_FakeScalarQueryParameter,
)
monkeypatch.setattr(bq_mod, "bigquery", fake)
monkeypatch.setattr(bq_mod, "service_account", types.SimpleNamespace())
SERVICE_ACCOUNT = {"type": "service_account", "project_id": "p"}
def _make_connector(client=None, **overrides):
kwargs = dict(
project_id="my-proj",
dataset_id="ds",
table_id="tbl",
content_columns="name,description",
metadata_columns="status",
id_column="id",
timestamp_column=None,
batch_size=2,
)
kwargs.update(overrides)
connector = BigQueryConnector(**kwargs)
connector._credentials = {"service_account_info": SERVICE_ACCOUNT}
if client is not None:
connector._client = client
return connector
# ---------------------------------------------------------------------------
# Credentials
# ---------------------------------------------------------------------------
@pytest.mark.p2
def test_load_credentials_accepts_json_string():
connector = BigQueryConnector(project_id="p", content_columns="name")
connector.load_credentials({"service_account_json": json.dumps(SERVICE_ACCOUNT)})
assert connector._credentials["service_account_info"] == SERVICE_ACCOUNT
@pytest.mark.p2
def test_load_credentials_accepts_dict():
connector = BigQueryConnector(project_id="p", content_columns="name")
connector.load_credentials({"service_account_json": SERVICE_ACCOUNT})
assert connector._credentials["service_account_info"] == SERVICE_ACCOUNT
@pytest.mark.p2
def test_missing_credentials_raises():
connector = BigQueryConnector(project_id="p", content_columns="name")
with pytest.raises(ConnectorMissingCredentialError):
connector.load_credentials({})
@pytest.mark.p2
def test_invalid_credentials_json_raises():
connector = BigQueryConnector(project_id="p", content_columns="name")
with pytest.raises(ConnectorMissingCredentialError):
connector.load_credentials({"service_account_json": "{not-json"})
# ---------------------------------------------------------------------------
# Query construction
# ---------------------------------------------------------------------------
@pytest.mark.p2
def test_table_query_quotes_identifiers():
connector = _make_connector()
assert connector._build_base_query() == "SELECT * FROM `my-proj.ds.tbl`"
@pytest.mark.p2
def test_custom_query_strips_trailing_semicolon():
connector = _make_connector(query="SELECT * FROM t WHERE x = 1;")
assert connector._build_base_query() == "SELECT * FROM t WHERE x = 1"
@pytest.mark.p2
def test_missing_table_and_query_raises():
connector = _make_connector(dataset_id="", table_id="", query="")
with pytest.raises(ConnectorValidationError):
connector._build_base_query()
@pytest.mark.p2
def test_time_filtered_query_compound_cursor_with_id_column():
client = _FakeClient(table_schema=[
_FakeSchemaField("updated_at", "TIMESTAMP"),
_FakeSchemaField("id", "STRING")
])
connector = _make_connector(client=client, timestamp_column="updated_at", id_column="id")
start = datetime(2026, 1, 1, tzinfo=timezone.utc)
end = datetime(2026, 2, 1, tzinfo=timezone.utc)
query, params = connector._build_time_filtered_query(
connector._build_base_query(), start, end, start_id="last-id"
)
assert "(ragflow_src.updated_at > @start_cursor OR (ragflow_src.updated_at = @start_cursor AND ragflow_src.id > @start_cursor_id))" in query
assert "ragflow_src.updated_at <= @end_cursor" in query
assert [(p.name, p.type_, p.value) for p in params] == [
("start_cursor", "TIMESTAMP", start),
("start_cursor_id", "STRING", "last-id"),
("end_cursor", "TIMESTAMP", end),
]
@pytest.mark.p2
def test_time_filtered_query_uses_gte_without_id_column():
client = _FakeClient(table_schema=[_FakeSchemaField("updated_at", "TIMESTAMP")])
connector = _make_connector(client=client, timestamp_column="updated_at", id_column=None)
start = datetime(2026, 1, 1, tzinfo=timezone.utc)
end = datetime(2026, 2, 1, tzinfo=timezone.utc)
query, params = connector._build_time_filtered_query(
connector._build_base_query(), start, end
)
assert "ragflow_src.updated_at >= @start_cursor" in query
assert "ragflow_src.updated_at <= @end_cursor" in query
assert [(p.name, p.type_, p.value) for p in params] == [
("start_cursor", "TIMESTAMP", start),
("end_cursor", "TIMESTAMP", end),
]
@pytest.mark.p2
def test_cursor_param_type_resolves_for_date_and_int():
date_client = _FakeClient(table_schema=[_FakeSchemaField("d", "DATE")])
int_client = _FakeClient(table_schema=[_FakeSchemaField("n", "INTEGER")])
date_conn = _make_connector(client=date_client, timestamp_column="d")
int_conn = _make_connector(client=int_client, timestamp_column="n")
assert date_conn._resolve_cursor_param_type() == "DATE"
assert int_conn._resolve_cursor_param_type() == "INT64"
@pytest.mark.p2
def test_unsupported_cursor_type_raises():
client = _FakeClient(table_schema=[_FakeSchemaField("ts", "BOOL")])
connector = _make_connector(client=client, timestamp_column="ts")
with pytest.raises(ConnectorValidationError):
connector._resolve_cursor_param_type()
@pytest.mark.p2
def test_slim_query_uses_id_column():
connector = _make_connector()
slim = connector._build_slim_query(connector._build_base_query())
assert "ragflow_src.id" in slim
# ---------------------------------------------------------------------------
# Value rendering & row conversion
# ---------------------------------------------------------------------------
@pytest.mark.p2
def test_value_serialization_types():
assert BigQueryConnector._render_content_value(Decimal("1.50")) == "1.50"
assert BigQueryConnector._render_content_value(b"binary") is None
assert BigQueryConnector._render_content_value(date(2026, 1, 2)) == "2026-01-02"
assert BigQueryConnector._render_content_value(time(3, 4, 5)) == "03:04:05"
assert (
BigQueryConnector._render_content_value({"a": 1, "b": [1, 2]})
== '{"a": 1, "b": [1, 2]}'
)
assert (
BigQueryConnector._render_content_value("POINT(1 2)") == "POINT(1 2)"
) # GEOGRAPHY WKT passes through
# Metadata base64-encodes bytes instead of skipping.
assert BigQueryConnector._render_metadata_value(b"hi") == "aGk="
@pytest.mark.p2
def test_row_to_document_content_metadata_id_timestamp():
connector = _make_connector(timestamp_column="updated_at")
ts = datetime(2026, 3, 4, 5, 6, 7, tzinfo=timezone.utc)
row = {
"id": 42,
"name": "Acme",
"description": "A company",
"status": "active",
"updated_at": ts,
}
doc = connector._row_to_document(row)
assert doc.id == "bigquery:my-proj:ds.tbl:42"
assert "【name】:\nAcme" in doc.blob.decode("utf-8")
assert "【description】:\nA company" in doc.blob.decode("utf-8")
assert doc.metadata == {"status": "active"}
assert doc.source == DocumentSource.BIGQUERY
assert doc.extension == ".txt"
assert doc.doc_updated_at == ts
assert doc.semantic_identifier == "Acme"
@pytest.mark.p2
def test_document_id_falls_back_to_content_hash_without_id_column():
connector = _make_connector(id_column=None)
row = {"name": "Acme", "description": "A company", "status": "active"}
doc_id = connector._build_document_id_from_row(row)
assert doc_id.startswith("bigquery:my-proj:ds.tbl:")
assert len(doc_id.rsplit(":", 1)[1]) == 32 # md5 hex
@pytest.mark.p2
def test_custom_query_mode_id_prefix():
connector = _make_connector(dataset_id="", table_id="", query="SELECT * FROM t")
row = {"id": 7, "name": "x"}
assert connector._build_document_id_from_row(row) == "bigquery:my-proj:query:7"
# ---------------------------------------------------------------------------
# Batching & cursor round-trip
# ---------------------------------------------------------------------------
@pytest.mark.p2
def test_batches_accumulate_to_batch_size():
rows = [
{"id": i, "name": f"n{i}", "description": "d", "status": "s"}
for i in range(5)
]
client = _FakeClient(rows=rows)
connector = _make_connector(client=client, batch_size=2)
batches = list(connector.load_from_state())
assert [len(b) for b in batches] == [2, 2, 1]
@pytest.mark.p2
def test_cursor_serialize_deserialize_roundtrip():
dt = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc)
d = date(2026, 1, 1)
t = time(12, 0)
dec = Decimal("1.23")
assert BigQueryConnector.deserialize_cursor_value(
BigQueryConnector.serialize_cursor_value(dt)
) == dt
assert BigQueryConnector.deserialize_cursor_value(
BigQueryConnector.serialize_cursor_value(d)
) == d
assert BigQueryConnector.deserialize_cursor_value(
BigQueryConnector.serialize_cursor_value(t)
) == t
assert BigQueryConnector.deserialize_cursor_value(
BigQueryConnector.serialize_cursor_value(dec)
) == dec
assert BigQueryConnector.serialize_cursor_value(42) == 42
assert BigQueryConnector.deserialize_cursor_value(42) == 42
@pytest.mark.p2
def test_load_from_cursor_range_empty_without_end():
connector = _make_connector(timestamp_column="updated_at")
assert list(connector.load_from_cursor_range(start_value=None, end_value=None)) == []
@pytest.mark.p2
def test_load_from_cursor_range_empty_when_end_not_after_start():
connector = _make_connector(timestamp_column="updated_at")
start = datetime(2026, 2, 1, tzinfo=timezone.utc)
end = datetime(2026, 1, 1, tzinfo=timezone.utc)
assert list(connector.load_from_cursor_range(start, end)) == []
# ---------------------------------------------------------------------------
# Slim docs & validation
# ---------------------------------------------------------------------------
@pytest.mark.p2
def test_slim_docs_use_id_column():
rows = [{"id": 1}, {"id": 2}]
client = _FakeClient(rows=rows)
connector = _make_connector(client=client, batch_size=10)
slim_batches = list(connector.retrieve_all_slim_docs_perm_sync())
ids = [doc.id for batch in slim_batches for doc in batch]
assert ids == ["bigquery:my-proj:ds.tbl:1", "bigquery:my-proj:ds.tbl:2"]
@pytest.mark.p2
def test_validation_detects_missing_content_column():
client = _FakeClient(table_schema=[_FakeSchemaField("other", "STRING")])
connector = _make_connector(client=client, content_columns="name")
with pytest.raises(ConnectorValidationError, match="name"):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_detects_missing_metadata_column():
client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")])
connector = _make_connector(client=client, content_columns="name", metadata_columns="status")
with pytest.raises(ConnectorValidationError, match="status"):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_detects_missing_id_column():
client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")])
connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="id")
with pytest.raises(ConnectorValidationError, match="id"):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_detects_missing_timestamp_column():
client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")])
connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="", timestamp_column="ts")
with pytest.raises(ConnectorValidationError, match="ts"):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_detects_unsupported_cursor_type_early():
client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING"), _FakeSchemaField("ts", "BOOL")])
connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="", timestamp_column="ts")
with pytest.raises(ConnectorValidationError, match="not supported as a cursor"):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_missing_credentials_raises():
connector = BigQueryConnector(project_id="p", content_columns="name")
with pytest.raises(ConnectorMissingCredentialError):
connector.validate_connector_settings()
@pytest.mark.p2
def test_validation_dry_run_failure_becomes_validation_error():
class _FailingClient(_FakeClient):
def query(self, query, job_config=None, location=None):
raise RuntimeError("bad query / would scan too much")
connector = _make_connector(client=_FailingClient())
with pytest.raises(ConnectorValidationError):
connector.validate_connector_settings()

View File

@@ -314,7 +314,7 @@ class _FakeRDBMSConnector:
self.load_from_state_called = True
return iter((["full-sync"],))
def load_from_cursor_range(self, start_value=None, end_value=None):
def load_from_cursor_range(self, start_value=None, start_id=None, end_value=None):
self.load_from_cursor_range_called = True
return iter(([ _make_fake_doc("incremental-doc") ],))
@@ -405,6 +405,54 @@ async def test_rdbms_cursor_persists_only_after_success(monkeypatch):
assert connector.persist_sync_state_called is True
@pytest.mark.asyncio
@pytest.mark.p2
async def test_rdbms_cursor_does_not_persist_when_parse_returns_errors(monkeypatch):
monkeypatch.setattr(sync_data_source, "RDBMSConnector", _FakeRDBMSConnector)
_patch_common_dependencies(monkeypatch)
monkeypatch.setattr(
sync_data_source.KnowledgebaseService,
"get_by_id",
lambda *_args, **_kwargs: (True, object()),
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"increase_docs",
lambda *_args, **_kwargs: None,
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"duplicate_and_parse",
lambda *_args, **_kwargs: (["parse error"], ["parsed-doc-id"]),
)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
"skip_connection_log": True,
}
sync = sync_data_source.MySQL(
{
"host": "localhost",
"port": 3306,
"database": "db",
"query": "SELECT * FROM t",
"content_columns": "name",
"timestamp_column": "ts",
"credentials": {"username": "u", "password": "p"},
"sync_deleted_files": False,
}
)
await sync._run_task_logic(task)
connector = _FakeRDBMSConnector.instance
assert connector is not None
assert connector.persist_sync_state_called is False
@pytest.mark.asyncio
@pytest.mark.p2
async def test_rdbms_cursor_does_not_persist_when_batch_is_skipped(monkeypatch):
@@ -456,6 +504,263 @@ async def test_rdbms_cursor_does_not_persist_when_batch_is_skipped(monkeypatch):
assert connector.persist_sync_state_called is False
class _FakeBigQueryConnector:
instance = None
def __init__(
self,
project_id,
dataset_id=None,
table_id=None,
location=None,
query="",
content_columns="",
metadata_columns=None,
id_column=None,
timestamp_column=None,
batch_size=2,
page_size=1000,
maximum_bytes_billed=None,
job_timeout_ms=None,
use_query_cache=True,
):
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.query = query
self.content_columns = content_columns
self.timestamp_column = timestamp_column
self.batch_size = batch_size
self.load_from_state_called = False
self.load_from_cursor_range_called = False
self.retrieve_all_slim_docs_perm_sync_called = False
self.prepare_sync_state_called = False
self.persist_sync_state_called = False
self._pending_sync_cursor_value = None
_FakeBigQueryConnector.instance = self
def load_credentials(self, credentials):
self.credentials = credentials
def validate_connector_settings(self):
return None
def prepare_sync_state(self, connector_id, config):
self.prepare_sync_state_called = True
self.prepare_sync_state_args = (connector_id, config)
def get_saved_sync_cursor_value(self):
return None
def retrieve_all_slim_docs_perm_sync(self, callback=None):
del callback
self.retrieve_all_slim_docs_perm_sync_called = True
yield [types.SimpleNamespace(id="bq-row-1")]
def load_from_state(self):
self.load_from_state_called = True
return iter((["full-sync"],))
def load_from_cursor_range(self, start_value=None, start_id=None, end_value=None):
self.load_from_cursor_range_called = True
return iter(([_make_fake_doc("bq-incremental-doc")],))
def persist_sync_state(self):
self.persist_sync_state_called = True
def _bigquery_conf(**overrides):
conf = {
"project_id": "proj",
"dataset_id": "ds",
"table_id": "tbl",
"content_columns": "name",
"credentials": {"service_account_json": "{}"},
"sync_deleted_files": False,
}
conf.update(overrides)
return conf
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_generate_full_sync_on_first_run(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": None,
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf())
document_generator = await sync._generate(task)
connector = _FakeBigQueryConnector.instance
assert connector is not None
assert connector.prepare_sync_state_called is True
assert connector.load_from_state_called is True
assert connector.load_from_cursor_range_called is False
assert list(document_generator) == [["full-sync"]]
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_generate_incremental_cursor_path(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at"))
document_generator = await sync._generate(task)
connector = _FakeBigQueryConnector.instance
assert connector is not None
assert connector.load_from_cursor_range_called is True
assert connector.load_from_state_called is False
assert [doc.id for doc in list(document_generator)[0]] == ["bq-incremental-doc"]
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_cursor_persists_only_after_success(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
_patch_common_dependencies(monkeypatch)
monkeypatch.setattr(
sync_data_source.KnowledgebaseService,
"get_by_id",
lambda *_args, **_kwargs: (True, object()),
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"increase_docs",
lambda *_args, **_kwargs: None,
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"duplicate_and_parse",
lambda *_args, **_kwargs: ([], ["parsed-doc-id"]),
)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at"))
await sync._run_task_logic(task)
connector = _FakeBigQueryConnector.instance
assert connector is not None
assert connector.persist_sync_state_called is True
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_cursor_does_not_persist_when_parse_returns_errors(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
_patch_common_dependencies(monkeypatch)
monkeypatch.setattr(
sync_data_source.KnowledgebaseService,
"get_by_id",
lambda *_args, **_kwargs: (True, object()),
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"increase_docs",
lambda *_args, **_kwargs: None,
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"duplicate_and_parse",
lambda *_args, **_kwargs: (["parse error"], ["parsed-doc-id"]),
)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at"))
await sync._run_task_logic(task)
connector = _FakeBigQueryConnector.instance
assert connector is not None
assert connector.persist_sync_state_called is False
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_cursor_does_not_persist_when_batch_is_skipped(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
_patch_common_dependencies(monkeypatch)
monkeypatch.setattr(
sync_data_source.KnowledgebaseService,
"get_by_id",
lambda *_args, **_kwargs: (True, object()),
)
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"increase_docs",
lambda *_args, **_kwargs: None,
)
def _raise_in_duplicate_and_parse(*_args, **_kwargs):
raise RuntimeError("batch failed")
monkeypatch.setattr(
sync_data_source.SyncLogsService,
"duplicate_and_parse",
_raise_in_duplicate_and_parse,
)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc),
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at"))
await sync._run_task_logic(task)
connector = _FakeBigQueryConnector.instance
assert connector is not None
assert connector.persist_sync_state_called is False
@pytest.mark.asyncio
@pytest.mark.p2
async def test_bigquery_collect_prune_snapshot_when_enabled(monkeypatch):
monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector)
task = {
**_make_task(),
"reindex": "0",
"poll_range_start": None,
"skip_connection_log": True,
}
sync = sync_data_source.BigQuery(_bigquery_conf(sync_deleted_files=True))
await sync._generate(task)
file_list = sync._collect_prune_snapshot(task)
connector = _FakeBigQueryConnector.instance
assert connector.retrieve_all_slim_docs_perm_sync_called is True
assert [doc.id for doc in file_list] == ["bq-row-1"]
class _FakeDropboxConnector:
instance = None

View File

@@ -0,0 +1,4 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 128 128">
<path fill="#4386FA" d="M58.4 17.6C38.7 17.6 22.7 33.6 22.7 53.3c0 19.7 16 35.7 35.7 35.7 8.1 0 15.6-2.7 21.6-7.3l4.7 4.7v4.1l24.6 24.5 8.5-8.5L93.7 81.9h-4.1l-4.7-4.7c4.6-6 7.3-13.5 7.3-21.6 0-19.7-16-35.7-35.7-35.7-.1 0-.1 0-.1 0zm0 11.4c13.4 0 24.3 10.9 24.3 24.3S71.8 77.6 58.4 77.6 34.1 66.7 34.1 53.3 45 29 58.4 29z"/>
<path fill="#FFF" d="M44.6 50.1v8.9c1.6 2.8 3.9 5.2 6.7 6.8V50.1h-6.7zm10.4-7.4v25.8c1.1.2 2.2.3 3.4.3.9 0 1.9-.1 2.8-.2V42.7H55zm10.6 11.6v14c2.8-1.6 5.1-3.9 6.7-6.6v-7.4h-6.7z"/>
</svg>

After

Width:  |  Height:  |  Size: 580 B

View File

@@ -1,5 +1,6 @@
import Image from '@/components/image';
import SvgIcon from '@/components/svg-icon';
import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins';
import { IReference, IReferenceChunk } from '@/interfaces/database/chat';
import { citationMarkerReg } from '@/utils/citation-utils';
import { getExtension } from '@/utils/document-util';
@@ -10,7 +11,6 @@ import Markdown from 'react-markdown';
import SyntaxHighlighter from 'react-syntax-highlighter';
import rehypeKatex from 'rehype-katex';
import rehypeRaw from 'rehype-raw';
import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins';
import { visitParents } from 'unist-util-visit-parents';
import { useTranslation } from 'react-i18next';
@@ -72,7 +72,11 @@ const MarkdownContent = ({
text = t('chat.searching');
}
const nextText = replaceTextByOldReg(text);
return pipe(replaceThinkToSection, replaceRetrievingToSection, preprocessLaTeX)(nextText);
return pipe(
replaceThinkToSection,
replaceRetrievingToSection,
preprocessLaTeX,
)(nextText);
}, [content, t]);
useEffect(() => {

View File

@@ -6,7 +6,14 @@ import { buildOptions } from '@/utils/form';
import { useFormContext, useWatch } from 'react-hook-form';
import { useTranslation } from 'react-i18next';
const algorithmOptions = buildOptions(['PaddleOCR-VL-1.6', 'PaddleOCR-VL-1.5', 'PaddleOCR-VL', 'PP-OCRv6', 'PP-OCRv5', 'PP-StructureV3']);
const algorithmOptions = buildOptions([
'PaddleOCR-VL-1.6',
'PaddleOCR-VL-1.5',
'PaddleOCR-VL',
'PP-OCRv6',
'PP-OCRv5',
'PP-StructureV3',
]);
export function PaddleOCROptionsFormField({
namePrefix = 'parser_config',
@@ -57,7 +64,10 @@ export function PaddleOCROptionsFormField({
<RAGFlowFormItem
name={buildName('paddleocr_access_token')}
label={t('knowledgeConfiguration.paddleocrAccessToken', 'AI Studio Access Token')}
label={t(
'knowledgeConfiguration.paddleocrAccessToken',
'AI Studio Access Token',
)}
tooltip={t(
'knowledgeConfiguration.paddleocrAccessTokenTip',
'Access token for PaddleOCR API (optional)',
@@ -67,14 +77,19 @@ export function PaddleOCROptionsFormField({
{(field) => (
<Input
{...field}
placeholder={t('knowledgeConfiguration.paddleocrAccessTokenPlaceholder')}
placeholder={t(
'knowledgeConfiguration.paddleocrAccessTokenPlaceholder',
)}
/>
)}
</RAGFlowFormItem>
<RAGFlowFormItem
name={buildName('paddleocr_algorithm')}
label={t('knowledgeConfiguration.paddleocrAlgorithm', 'PaddleOCR Algorithm')}
label={t(
'knowledgeConfiguration.paddleocrAlgorithm',
'PaddleOCR Algorithm',
)}
tooltip={t(
'knowledgeConfiguration.paddleocrAlgorithmTip',
'Algorithm to use for PaddleOCR parsing',

View File

@@ -1405,6 +1405,32 @@ Example: Virtual Hosted Style`,
'Column to use as unique document ID. If not specified, a hash of the content will be used.',
postgresqlTimestampColumnTip:
'Datetime/timestamp column for incremental sync. Only rows modified after the last sync will be fetched.',
bigqueryDescription:
'Connect to Google BigQuery to sync rows from a table or a custom GoogleSQL query.',
bigqueryProjectIdTip:
'GCP project that owns the query jobs (e.g. my-gcp-project).',
bigqueryLocationTip:
'Default location for the client and query jobs, such as US or EU. Leave empty to let BigQuery infer it.',
bigqueryServiceAccountJsonTip:
'Service account key JSON with BigQuery access. Paste the full key file contents.',
bigqueryDatasetIdTip:
'Dataset id for table mode. Required together with Table ID when no SQL query is provided.',
bigqueryTableIdTip:
'Table id for table mode. Required together with Dataset ID when no SQL query is provided.',
bigqueryQueryTip:
'Custom GoogleSQL query. Takes precedence over Dataset ID and Table ID. Standard SQL only.',
bigqueryContentColumnsTip:
'Comma-separated column names whose values will be combined as document content for vectorization.',
bigqueryMetadataColumnsTip:
'Comma-separated column names to store as document metadata (not vectorized, but searchable).',
bigqueryIdColumnTip:
'Column to use as unique document ID. If not specified, a hash of the content will be used.',
bigqueryTimestampColumnTip:
'Timestamp, datetime, date, or numeric column for incremental sync. Only rows newer than the last sync will be fetched.',
bigqueryMaximumBytesBilledTip:
'Hard cost guard applied to every query job, in bytes. Defaults to 1 GiB.',
bigqueryJobTimeoutMsTip:
'Optional per-query job timeout in milliseconds.',
rest_apiDescription:
'Connect any REST API endpoint as a data source using a flexible, configuration-driven connector.',
onedriveDescription:

View File

@@ -1,5 +1,6 @@
import Image from '@/components/image';
import SvgIcon from '@/components/svg-icon';
import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins';
import { IReference, IReferenceChunk } from '@/interfaces/database/chat';
import { getExtension } from '@/utils/document-util';
import DOMPurify from 'dompurify';
@@ -8,7 +9,6 @@ import Markdown from 'react-markdown';
import SyntaxHighlighter from 'react-syntax-highlighter';
import rehypeKatex from 'rehype-katex';
import rehypeRaw from 'rehype-raw';
import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins';
import { visitParents } from 'unist-util-visit-parents';
import { useTranslation } from 'react-i18next';
@@ -75,7 +75,11 @@ const MarkdownContent = ({
text = t('chat.searching');
}
const nextText = replaceTextByOldReg(text);
return pipe(replaceThinkToSection, replaceRetrievingToSection, preprocessLaTeX)(nextText);
return pipe(
replaceThinkToSection,
replaceRetrievingToSection,
preprocessLaTeX,
)(nextText);
}, [content, t]);
useEffect(() => {

View File

@@ -41,6 +41,7 @@ export enum DataSourceKey {
SEAFILE = 'seafile',
MYSQL = 'mysql',
POSTGRESQL = 'postgresql',
BIGQUERY = 'bigquery',
REST_API = 'rest_api',
RSS = 'rss',
ONEDRIVE = 'onedrive',
@@ -160,6 +161,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
[DataSourceKey.POSTGRESQL]: {
syncDeletedFiles: true,
},
[DataSourceKey.BIGQUERY]: {
syncDeletedFiles: true,
},
};
const isDataSourceFeatureVisible = (
@@ -333,6 +337,11 @@ export const generateDataSourceInfo = (t: TFunction) => {
description: t(`setting.${DataSourceKey.POSTGRESQL}Description`),
icon: <SvgIcon name={'data-source/postgresql'} width={38} />,
},
[DataSourceKey.BIGQUERY]: {
name: 'BigQuery',
description: t(`setting.${DataSourceKey.BIGQUERY}Description`),
icon: <SvgIcon name={'data-source/bigquery'} width={38} />,
},
[DataSourceKey.ONEDRIVE]: {
name: 'OneDrive',
description: t(`setting.${DataSourceKey.ONEDRIVE}Description`),
@@ -1483,6 +1492,166 @@ export const DataSourceFormFields = {
tooltip: t('setting.postgresqlTimestampColumnTip'),
},
],
[DataSourceKey.BIGQUERY]: [
{
label: 'Project ID',
name: 'config.project_id',
type: FormFieldType.Text,
required: true,
placeholder: 'my-gcp-project',
tooltip: t('setting.bigqueryProjectIdTip'),
},
{
label: 'Location',
name: 'config.location',
type: FormFieldType.Text,
required: false,
placeholder: 'US',
tooltip: t('setting.bigqueryLocationTip'),
},
{
label: 'Service Account JSON',
name: 'config.credentials.service_account_json',
type: FormFieldType.Password,
required: true,
placeholder: '{ "type": "service_account", "project_id": "...", ... }',
tooltip: t('setting.bigqueryServiceAccountJsonTip'),
},
{
label: 'Dataset ID',
name: 'config.dataset_id',
type: FormFieldType.Text,
required: false,
placeholder: 'analytics',
tooltip: t('setting.bigqueryDatasetIdTip'),
customValidate: (val: string, values: any) => {
const hasQuery = !!(values?.config?.query ?? '').trim();
const hasTable = !!(values?.config?.table_id ?? '').trim();
if (!hasQuery && !((val ?? '').trim() && hasTable)) {
return 'Dataset ID is required when not using a custom SQL Query';
}
return true;
},
},
{
label: 'Table ID',
name: 'config.table_id',
type: FormFieldType.Text,
required: false,
placeholder: 'customers',
tooltip: t('setting.bigqueryTableIdTip'),
customValidate: (val: string, values: any) => {
const hasQuery = !!(values?.config?.query ?? '').trim();
const hasDataset = !!(values?.config?.dataset_id ?? '').trim();
if (!hasQuery && !(hasDataset && (val ?? '').trim())) {
return 'Table ID is required when not using a custom SQL Query';
}
return true;
},
},
{
label: 'SQL Query',
name: 'config.query',
type: FormFieldType.Textarea,
required: false,
placeholder: 'Leave empty to use Dataset ID + Table ID',
tooltip: t('setting.bigqueryQueryTip'),
customValidate: (val: string, values: any) => {
const hasQuery = !!(val ?? '').trim();
const hasDataset = !!(values?.config?.dataset_id ?? '').trim();
const hasTable = !!(values?.config?.table_id ?? '').trim();
if (!hasQuery && !(hasDataset && hasTable)) {
return 'Provide a SQL Query, or both Dataset ID and Table ID';
}
return true;
},
},
{
label: 'Content Columns',
name: 'config.content_columns',
type: FormFieldType.Text,
required: true,
placeholder: 'name,description,notes',
tooltip: t('setting.bigqueryContentColumnsTip'),
},
{
label: 'Metadata Columns',
name: 'config.metadata_columns',
type: FormFieldType.Text,
required: false,
placeholder: 'customer_id,status,region',
tooltip: t('setting.bigqueryMetadataColumnsTip'),
},
{
label: 'ID Column',
name: 'config.id_column',
type: FormFieldType.Text,
required: false,
placeholder: 'customer_id',
tooltip: t('setting.bigqueryIdColumnTip'),
},
{
label: 'Timestamp Column',
name: 'config.timestamp_column',
type: FormFieldType.Text,
required: false,
placeholder: 'updated_at',
tooltip: t('setting.bigqueryTimestampColumnTip'),
},
{
label: 'Max Bytes Billed',
name: 'config.maximum_bytes_billed',
type: FormFieldType.Number,
required: false,
placeholder: '1073741824',
tooltip: t('setting.bigqueryMaximumBytesBilledTip'),
validation: {
min: 1,
message: 'Max Bytes Billed must be at least 1',
},
},
{
label: 'Job Timeout (ms)',
name: 'config.job_timeout_ms',
type: FormFieldType.Number,
required: false,
placeholder: '300000',
tooltip: t('setting.bigqueryJobTimeoutMsTip'),
validation: {
min: 1,
message: 'Job Timeout must be at least 1',
},
},
{
label: 'Page Size',
name: 'config.page_size',
type: FormFieldType.Number,
required: false,
placeholder: '1000',
validation: {
min: 1,
message: 'Page Size must be at least 1',
},
},
{
label: 'Batch Size',
name: 'config.batch_size',
type: FormFieldType.Number,
required: false,
placeholder: '100',
validation: {
min: 1,
message: 'Batch Size must be at least 1',
},
},
{
label: 'Use Query Cache',
name: 'config.use_query_cache',
type: FormFieldType.Checkbox,
required: false,
defaultValue: true,
},
],
[DataSourceKey.REST_API]: [
// ── Essential fields ──────────────────────────────────────────────
{
@@ -2150,6 +2319,29 @@ export const DataSourceFormDefaultValues = {
},
},
},
[DataSourceKey.BIGQUERY]: {
name: '',
source: DataSourceKey.BIGQUERY,
config: {
project_id: '',
dataset_id: '',
table_id: '',
location: '',
query: '',
content_columns: '',
metadata_columns: '',
id_column: '',
timestamp_column: '',
batch_size: 100,
page_size: 1000,
maximum_bytes_billed: 1073741824,
job_timeout_ms: 300000,
use_query_cache: true,
credentials: {
service_account_json: '',
},
},
},
[DataSourceKey.ONEDRIVE]: {
name: '',
source: DataSourceKey.ONEDRIVE,

View File

@@ -244,7 +244,8 @@ const SourceDetailPage = () => {
/>
</div>
<div className="max-w-[1200px] flex justify-end gap-2">
{detail?.source === DataSourceKey.REST_API && (
{(detail?.source === DataSourceKey.REST_API ||
detail?.source === DataSourceKey.BIGQUERY) && (
<Button
type="button"
variant="outline"