diff --git a/api/apps/restful_apis/connector_api.py b/api/apps/restful_apis/connector_api.py index 5e799cd814..b7b09bae0f 100644 --- a/api/apps/restful_apis/connector_api.py +++ b/api/apps/restful_apis/connector_api.py @@ -184,49 +184,98 @@ async def test_connector(connector_id): """Validate connector configuration without persisting changes or triggering sync. For the REST API connector, this uses `RestAPIConnector.validate_config` - against the existing saved configuration. + against the existing saved configuration. For BigQuery, it runs `SELECT 1` + plus a free dry-run of the configured base query under `maximum_bytes_billed` + so bad credentials, wrong location, or runaway scans surface before scheduled + syncs run (and incur cost). """ if not ConnectorService.accessible(connector_id, current_user.id): return _connector_auth_error(connector_id, current_user.id) - from common.data_source.rest_api_connector import RestAPIConnector from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError ok, conn = ConnectorService.get_by_id(connector_id) if not ok: return get_data_error_result(message="Can't find this Connector!") - if conn.source != DocumentSource.REST_API: - return get_json_result( - code=RetCode.ARGUMENT_ERROR, - message="Test endpoint currently supports only REST API connectors.", - data=False, - ) - config = conn.config or {} credentials = config.get("credentials") or {} - try: - await asyncio.to_thread( - RestAPIConnector.validate_config, - config=config, - credentials=credentials, - ) - except (ConnectorValidationError, ConnectorMissingCredentialError) as exc: - return get_json_result( - code=RetCode.DATA_ERROR, - message=str(exc), - data=False, - ) - except Exception as exc: - logging.exception("REST API connector validation failed: %s", exc) - return get_json_result( - code=RetCode.SERVER_ERROR, - message="REST API connector validation failed, please check logs.", - data=False, - ) + if conn.source == DocumentSource.REST_API: + from common.data_source.rest_api_connector import RestAPIConnector - return get_json_result(data=True) + try: + await asyncio.to_thread( + RestAPIConnector.validate_config, + config=config, + credentials=credentials, + ) + except (ConnectorValidationError, ConnectorMissingCredentialError) as exc: + return get_json_result( + code=RetCode.DATA_ERROR, + message=str(exc), + data=False, + ) + except Exception as exc: + logging.exception("REST API connector validation failed: %s", exc) + return get_json_result( + code=RetCode.SERVER_ERROR, + message="REST API connector validation failed, please check logs.", + data=False, + ) + + return get_json_result(data=True) + + if conn.source == DocumentSource.BIGQUERY: + from common.data_source.bigquery_connector import BigQueryConnector + + def _validate_bigquery(): + connector_kwargs = { + "project_id": config.get("project_id", ""), + "dataset_id": config.get("dataset_id") or None, + "table_id": config.get("table_id") or None, + "location": config.get("location") or None, + "query": config.get("query", ""), + "content_columns": config.get("content_columns", ""), + "metadata_columns": config.get("metadata_columns", ""), + "id_column": config.get("id_column") or None, + "timestamp_column": config.get("timestamp_column") or None, + "use_query_cache": config.get("use_query_cache", True), + } + if config.get("page_size") is not None: + connector_kwargs["page_size"] = int(config["page_size"]) + if config.get("maximum_bytes_billed") is not None: + connector_kwargs["maximum_bytes_billed"] = int(config["maximum_bytes_billed"]) + if config.get("job_timeout_ms") is not None: + connector_kwargs["job_timeout_ms"] = int(config["job_timeout_ms"]) + + connector = BigQueryConnector(**connector_kwargs) + connector.load_credentials(credentials) + connector.validate_connector_settings() + + try: + await asyncio.to_thread(_validate_bigquery) + except (ConnectorValidationError, ConnectorMissingCredentialError) as exc: + return get_json_result( + code=RetCode.DATA_ERROR, + message=str(exc), + data=False, + ) + except Exception as exc: + logging.exception("BigQuery connector validation failed: %s", exc) + return get_json_result( + code=RetCode.SERVER_ERROR, + message="BigQuery connector validation failed, please check logs.", + data=False, + ) + + return get_json_result(data=True) + + return get_json_result( + code=RetCode.ARGUMENT_ERROR, + message="Test endpoint currently supports only REST API and BigQuery connectors.", + data=False, + ) WEB_FLOW_TTL_SECS = 15 * 60 diff --git a/api/apps/services/provider_api_service.py b/api/apps/services/provider_api_service.py index 4016f7a862..60f9b11f03 100644 --- a/api/apps/services/provider_api_service.py +++ b/api/apps/services/provider_api_service.py @@ -337,9 +337,14 @@ async def create_provider_instance(tenant_id: str, provider_id_or_name: str, ins api_key_str = "" if api_key: api_key_str = api_key if isinstance(api_key, str) else json.dumps(api_key) - success, msg = await verify_api_key(provider_name, api_key, base_url, region, model_info) - if not success: - return False, msg + + # Only verify when there are models to probe. Generic providers such as + # "OpenAI-API-Compatible" may start empty and receive custom models later. + factory_entry = next((f for f in FACTORY_LLM_INFOS if f["name"] == provider_name), None) + if (factory_entry and factory_entry.get("llm")) or model_info: + success, msg = await verify_api_key(provider_name, api_key, base_url, region, model_info) + if not success: + return False, msg extra_fields = {} if base_url: diff --git a/common/constants.py b/common/constants.py index 402e402d5f..4bef10c495 100644 --- a/common/constants.py +++ b/common/constants.py @@ -154,6 +154,7 @@ class FileSource(StrEnum): SEAFILE = "seafile" MYSQL = "mysql" POSTGRESQL = "postgresql" + BIGQUERY = "bigquery" DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" OUTLOOK = "outlook" diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index c0ebc8c809..39bfc0a11b 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -47,6 +47,7 @@ from .imap_connector import ImapConnector from .zendesk_connector import ZendeskConnector from .seafile_connector import SeaFileConnector from .rdbms_connector import RDBMSConnector +from .bigquery_connector import BigQueryConnector from .webdav_connector import WebDAVConnector from .rest_api_connector import RestAPIConnector from .config import BlobType, DocumentSource @@ -94,6 +95,7 @@ __all__ = [ "ZendeskConnector", "SeaFileConnector", "RDBMSConnector", + "BigQueryConnector", "WebDAVConnector", "DingTalkAITableConnector", "RestAPIConnector", diff --git a/common/data_source/bigquery_connector.py b/common/data_source/bigquery_connector.py new file mode 100644 index 0000000000..1aeec24a0f --- /dev/null +++ b/common/data_source/bigquery_connector.py @@ -0,0 +1,677 @@ +"""Google BigQuery data source connector for importing query/table rows into documents. + +This connector shares the user-facing row model of ``RDBMSConnector`` (MySQL/PostgreSQL): +selected content columns become document text, selected metadata columns become metadata, +an optional ID column produces stable document IDs, and an optional timestamp column enables +cursor-based incremental sync plus deleted-row pruning. + +Unlike the RDBMS connector, BigQuery is a Google Cloud query-job service. This implementation +therefore uses the official ``google-cloud-bigquery`` client, service-account authentication, +explicit billing controls (``maximum_bytes_billed``, ``job_timeout_ms``), dry-run validation, +parameterized cursors with a resolved column type, and BigQuery-aware value serialization. +""" + +import base64 +import copy +import hashlib +import json +import logging +from datetime import date, datetime, time, timezone +from decimal import Decimal +from typing import Any, Dict, Generator, List, Optional, Tuple + +from common.data_source.config import DocumentSource, INDEX_BATCH_SIZE +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, +) +from common.data_source.interfaces import ( + LoadConnector, + PollConnector, + SecondsSinceUnixEpoch, + SlimConnectorWithPermSync, +) +from common.data_source.models import Document, SlimDocument + +try: + from google.cloud import bigquery + from google.oauth2 import service_account +except ImportError: # pragma: no cover - import guarded at runtime + bigquery = None + service_account = None + + +# Marker keys used to round-trip non-JSON-native cursor values through the +# connector config (which is persisted as JSON). +_CURSOR_TYPE_KEY = "__ragflow_bq_cursor_type__" + +# Default cost guard: 1 GiB. Users can raise this explicitly. +DEFAULT_MAXIMUM_BYTES_BILLED = 1024 * 1024 * 1024 + +# Maps a BigQuery field type to the ScalarQueryParameter type used for cursors. +_CURSOR_PARAM_TYPE_MAP = { + "TIMESTAMP": "TIMESTAMP", + "DATETIME": "DATETIME", + "DATE": "DATE", + "TIME": "TIME", + "INTEGER": "INT64", + "INT64": "INT64", + "NUMERIC": "NUMERIC", + "BIGNUMERIC": "BIGNUMERIC", + "FLOAT": "FLOAT64", + "FLOAT64": "FLOAT64", +} + + +class BigQueryConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): + """Import rows from a BigQuery table or custom query into documents. + + The flow mirrors ``RDBMSConnector``: + 1. Authenticate with a service account and build a BigQuery client. + 2. Read rows from a single configured table or a single custom SQL query. + 3. Build document content from the selected content columns. + 4. Copy the selected metadata columns into document metadata. + 5. Use the configured ID column as the stable document ID, or hash the content. + 6. For incremental sync, treat the timestamp column as an ordered cursor and filter by it. + 7. For deleted-file sync, read a slim snapshot of current row IDs. + + "Empty query means all tables" is intentionally NOT supported here: scanning every table + is dangerous and expensive in BigQuery. Either a custom ``query`` or both ``dataset_id`` + and ``table_id`` must be provided. + """ + + def __init__( + self, + project_id: str, + dataset_id: Optional[str] = None, + table_id: Optional[str] = None, + location: Optional[str] = None, + query: str = "", + content_columns: str = "", + metadata_columns: Optional[str] = None, + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, + batch_size: int = INDEX_BATCH_SIZE, + page_size: int = 1000, + maximum_bytes_billed: Optional[int] = DEFAULT_MAXIMUM_BYTES_BILLED, + job_timeout_ms: Optional[int] = None, + use_query_cache: bool = True, + ) -> None: + """Initialize the BigQuery connector. + + Args: + project_id: GCP project that owns the query jobs. + dataset_id: Dataset id (table mode). + table_id: Table id (table mode). + location: Default location for the client and query jobs (e.g. "US", "EU"). + query: Custom GoogleSQL query (custom query mode). Takes precedence over table mode. + content_columns: Comma-separated column names used for document content. + metadata_columns: Comma-separated column names used as metadata (optional). + id_column: Column used as the stable document ID (optional; hash fallback otherwise). + timestamp_column: Column used for incremental cursor sync (optional). + batch_size: Number of documents per yielded batch. + page_size: BigQuery result page size (a fetch hint, not the batch size). + maximum_bytes_billed: Hard cost guard passed to every query job. + job_timeout_ms: Optional per-job timeout in milliseconds. + use_query_cache: Whether to allow BigQuery's query result cache. + """ + self.project_id = (project_id or "").strip() + self.dataset_id = (dataset_id or "").strip() + self.table_id = (table_id or "").strip() + self.location = (location or "").strip() + self.query = (query or "").strip() + self.content_columns = [c.strip() for c in (content_columns or "").split(",") if c.strip()] + self.metadata_columns = [c.strip() for c in (metadata_columns or "").split(",") if c.strip()] + self.id_column = id_column.strip() if id_column else None + self.timestamp_column = timestamp_column.strip() if timestamp_column else None + self.batch_size = batch_size + self.page_size = page_size + self.maximum_bytes_billed = maximum_bytes_billed + self.job_timeout_ms = job_timeout_ms + self.use_query_cache = use_query_cache + + self._client = None + self._credentials: Dict[str, Any] = {} + self._cursor_param_type: Optional[str] = None + self._sync_connector_id: str | None = None + self._sync_config: Dict[str, Any] | None = None + self._pending_sync_cursor_value: Any = None + self._pending_sync_cursor_id: Any = None + + # ------------------------------------------------------------------ # + # Credentials & client + # ------------------------------------------------------------------ # + def load_credentials(self, credentials: Dict[str, Any]) -> Dict[str, Any] | None: + """Load BigQuery service-account credentials. + + Accepts ``service_account_json`` as either a dict or a JSON string. + """ + logging.debug("Loading credentials for BigQuery project: %s", self.project_id) + + raw = (credentials or {}).get("service_account_json") + if not raw: + raise ConnectorMissingCredentialError("BigQuery: missing service_account_json") + + if isinstance(raw, str): + try: + service_account_info = json.loads(raw) + except json.JSONDecodeError as exc: + raise ConnectorMissingCredentialError( + f"BigQuery: service_account_json is not valid JSON: {exc}" + ) + elif isinstance(raw, dict): + service_account_info = raw + else: + raise ConnectorMissingCredentialError( + "BigQuery: service_account_json must be a JSON string or object" + ) + + self._credentials = {"service_account_info": service_account_info} + return None + + def _get_client(self): + """Create and cache a BigQuery client from the loaded service account.""" + if self._client is not None: + return self._client + + if bigquery is None or service_account is None: + raise ConnectorValidationError( + "BigQuery client not installed. Please install google-cloud-bigquery." + ) + + service_account_info = self._credentials.get("service_account_info") + if not service_account_info: + raise ConnectorMissingCredentialError("BigQuery credentials not loaded.") + + try: + creds = service_account.Credentials.from_service_account_info(service_account_info) + except Exception as exc: + raise ConnectorValidationError(f"Failed to build BigQuery credentials: {exc}") + + try: + self._client = bigquery.Client( + project=self.project_id or None, + credentials=creds, + location=self.location or None, + ) + except Exception as exc: + raise ConnectorValidationError(f"Failed to create BigQuery client: {exc}") + + return self._client + + # ------------------------------------------------------------------ # + # Query construction + # ------------------------------------------------------------------ # + def _build_base_query(self) -> str: + """Return the single base query (custom query takes precedence over table mode).""" + if self.query: + return self.query.rstrip(";") + if self.dataset_id and self.table_id: + return f"SELECT * FROM `{self.project_id}.{self.dataset_id}.{self.table_id}`" + raise ConnectorValidationError( + "BigQuery requires either a custom query or both dataset_id and table_id." + ) + + @staticmethod + def _wrap_query(base_query: str, select_clause: str = "*") -> str: + return f"SELECT {select_clause} FROM ({base_query}) AS ragflow_src" + + def _build_query_job_config( + self, + query_parameters: Optional[List[Any]] = None, + dry_run: bool = False, + ): + config = bigquery.QueryJobConfig() + config.use_legacy_sql = False + config.use_query_cache = self.use_query_cache + if self.maximum_bytes_billed: + config.maximum_bytes_billed = int(self.maximum_bytes_billed) + if self.job_timeout_ms: + config.job_timeout_ms = int(self.job_timeout_ms) + if query_parameters: + config.query_parameters = query_parameters + if dry_run: + config.dry_run = True + config.use_query_cache = False + return config + + def _resolve_schema(self, client: Any, dry_run_job: Any = None) -> List[Any]: + if not self.query and self.dataset_id and self.table_id: + table = client.get_table(f"{self.project_id}.{self.dataset_id}.{self.table_id}") + return table.schema + else: + if dry_run_job is None: + dry_run_job = client.query( + self._wrap_query(self._build_base_query()), + job_config=self._build_query_job_config(dry_run=True), + location=self.location or None, + ) + return dry_run_job.schema or [] + + def _get_cursor_column_field_type(self) -> str: + """Resolve the BigQuery field type of the timestamp column.""" + client = self._get_client() + schema = self._resolve_schema(client) + + for field in schema: + if field.name == self.timestamp_column: + return field.field_type + raise ConnectorValidationError( + f"BigQuery timestamp column '{self.timestamp_column}' was not found in the schema." + ) + + def _resolve_cursor_param_type(self) -> str: + if self._cursor_param_type is not None: + return self._cursor_param_type + field_type = (self._get_cursor_column_field_type() or "").upper() + param_type = _CURSOR_PARAM_TYPE_MAP.get(field_type) + if param_type is None: + raise ConnectorValidationError( + f"BigQuery timestamp column type '{field_type}' is not supported as a cursor." + ) + self._cursor_param_type = param_type + return param_type + + def _make_cursor_param(self, name: str, value: Any, param_type: str): + return bigquery.ScalarQueryParameter(name, param_type, value) + + def _build_time_filtered_query( + self, + base_query: str, + start: Any = None, + end: Any = None, + start_id: Any = None, + ) -> Tuple[str, List[Any]]: + wrapped = self._wrap_query(base_query) + if not self.timestamp_column or (start is None and end is None): + return wrapped, [] + + param_type = self._resolve_cursor_param_type() + conditions: List[str] = [] + params: List[Any] = [] + if start is not None: + if self.id_column and start_id is not None: + conditions.append( + f"(ragflow_src.{self.timestamp_column} > @start_cursor OR " + f"(ragflow_src.{self.timestamp_column} = @start_cursor AND ragflow_src.{self.id_column} > @start_cursor_id))" + ) + params.append(self._make_cursor_param("start_cursor", start, param_type)) + params.append(self._make_cursor_param("start_cursor_id", start_id, "STRING")) + else: + conditions.append(f"ragflow_src.{self.timestamp_column} >= @start_cursor") + params.append(self._make_cursor_param("start_cursor", start, param_type)) + if end is not None: + conditions.append(f"ragflow_src.{self.timestamp_column} <= @end_cursor") + params.append(self._make_cursor_param("end_cursor", end, param_type)) + + if conditions: + wrapped = f"{wrapped} WHERE {' AND '.join(conditions)}" + return wrapped, params + + def _build_max_timestamp_query(self, base_query: str) -> str: + if self.id_column: + # We need both the max timestamp and the max id for that timestamp + return ( + f"SELECT ragflow_src.{self.timestamp_column}, MAX(ragflow_src.{self.id_column}) " + f"FROM ({base_query}) AS ragflow_src " + f"WHERE ragflow_src.{self.timestamp_column} = (" + f" SELECT MAX({self.timestamp_column}) FROM ({base_query})" + f") " + f"GROUP BY ragflow_src.{self.timestamp_column}" + ) + return ( + f"SELECT MAX(ragflow_src.{self.timestamp_column}), NULL " + f"FROM ({base_query}) AS ragflow_src" + ) + + def _build_slim_query(self, base_query: str) -> str: + columns = [self.id_column] if self.id_column else self.content_columns + select_clause = ", ".join(f"ragflow_src.{column}" for column in columns) + return self._wrap_query(base_query, select_clause) + + # ------------------------------------------------------------------ # + # Cursor (de)serialization + # ------------------------------------------------------------------ # + @staticmethod + def serialize_cursor_value(value: Any) -> Any: + # Connector config is JSON, so datetime/date must be wrapped. Other + # scalar cursors (int/float/str) round-trip natively. + if isinstance(value, datetime): + return {_CURSOR_TYPE_KEY: "datetime", "value": value.isoformat()} + if isinstance(value, date): + return {_CURSOR_TYPE_KEY: "date", "value": value.isoformat()} + if isinstance(value, time): + return {_CURSOR_TYPE_KEY: "time", "value": value.isoformat()} + if isinstance(value, Decimal): + return {_CURSOR_TYPE_KEY: "decimal", "value": str(value)} + return value + + @staticmethod + def deserialize_cursor_value(value: Any) -> Any: + if isinstance(value, dict) and _CURSOR_TYPE_KEY in value: + kind = value.get(_CURSOR_TYPE_KEY) + if kind == "datetime": + return datetime.fromisoformat(value["value"]) + if kind == "date": + return date.fromisoformat(value["value"]) + if kind == "time": + return time.fromisoformat(value["value"]) + if kind == "decimal": + return Decimal(value["value"]) + return value + + # ------------------------------------------------------------------ # + # Value rendering + # ------------------------------------------------------------------ # + @staticmethod + def _render_content_value(value: Any) -> Optional[str]: + """Render a value for document content. Returns None to skip the value.""" + if isinstance(value, (bytes, bytearray)): + # Binary content is not meaningful as document text; skip it. + return None + if isinstance(value, (datetime, date, time)): + return value.isoformat() + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False, default=str) + return str(value) + + @staticmethod + def _render_metadata_value(value: Any) -> str: + if isinstance(value, (bytes, bytearray)): + return base64.b64encode(bytes(value)).decode("ascii") + if isinstance(value, (datetime, date, time)): + return value.isoformat() + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False, default=str) + return str(value) + + def _build_content(self, row_dict: Dict[str, Any]) -> str: + content_parts = [] + for col in self.content_columns: + if col not in row_dict or row_dict[col] is None: + continue + rendered = self._render_content_value(row_dict[col]) + if rendered is None: + continue + content_parts.append(f"【{col}】:\n{rendered}") + return "\n\n".join(content_parts) + + def _id_prefix(self) -> str: + if not self.query and self.dataset_id and self.table_id: + return f"bigquery:{self.project_id}:{self.dataset_id}.{self.table_id}" + return f"bigquery:{self.project_id}:query" + + def _build_document_id_from_row(self, row_dict: Dict[str, Any]) -> str: + prefix = self._id_prefix() + if self.id_column and self.id_column in row_dict and row_dict[self.id_column] is not None: + return f"{prefix}:{row_dict[self.id_column]}" + content = self._build_content(row_dict) + content_hash = hashlib.md5(content.encode()).hexdigest() + return f"{prefix}:{content_hash}" + + def _row_to_document(self, row_dict: Dict[str, Any]) -> Document: + content = self._build_content(row_dict) + metadata = {} + for col in self.metadata_columns: + if col not in row_dict or row_dict[col] is None: + continue + metadata[col] = self._render_metadata_value(row_dict[col]) + + doc_updated_at = datetime.now(timezone.utc) + if self.timestamp_column and row_dict.get(self.timestamp_column) is not None: + ts_value = row_dict[self.timestamp_column] + if isinstance(ts_value, datetime): + if ts_value.tzinfo is None: + doc_updated_at = ts_value.replace(tzinfo=timezone.utc) + else: + doc_updated_at = ts_value.astimezone(timezone.utc) + elif isinstance(ts_value, date): + doc_updated_at = datetime( + ts_value.year, ts_value.month, ts_value.day, tzinfo=timezone.utc + ) + + first_content_col = self.content_columns[0] if self.content_columns else "record" + semantic_id = ( + str(row_dict.get(first_content_col, "bigquery_record")) + .replace("\n", " ") + .replace("\r", " ") + .strip()[:100] + ) + blob = content.encode("utf-8") + + return Document( + id=self._build_document_id_from_row(row_dict), + blob=blob, + source=DocumentSource.BIGQUERY, + semantic_identifier=semantic_id, + extension=".txt", + doc_updated_at=doc_updated_at, + size_bytes=len(blob), + metadata=metadata if metadata else None, + ) + + # ------------------------------------------------------------------ # + # Query execution + # ------------------------------------------------------------------ # + def _yield_rows_from_query( + self, + query: str, + query_parameters: Optional[List[Any]] = None, + ) -> Generator[list[Document], None, None]: + client = self._get_client() + logging.info("Executing BigQuery query: %s...", query[:200]) + job = client.query( + query, + job_config=self._build_query_job_config(query_parameters=query_parameters), + location=self.location or None, + ) + result = job.result(page_size=self.page_size) + + batch: list[Document] = [] + for row in result: + try: + doc = self._row_to_document(dict(row)) + batch.append(doc) + if len(batch) >= self.batch_size: + yield batch + batch = [] + except Exception as exc: + logging.warning("Error converting BigQuery row to document: %s", exc) + continue + + if batch: + yield batch + + def _yield_slim_documents_from_query( + self, + query: str, + ) -> Generator[list[SlimDocument], None, None]: + client = self._get_client() + logging.debug("Executing BigQuery slim query: %s...", query[:200]) + job = client.query( + query, + job_config=self._build_query_job_config(), + location=self.location or None, + ) + result = job.result(page_size=self.page_size) + + batch: list[SlimDocument] = [] + for row in result: + batch.append(SlimDocument(id=self._build_document_id_from_row(dict(row)))) + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + + def _yield_documents( + self, + start: Any = None, + end: Any = None, + start_id: Any = None, + ) -> Generator[list[Document], None, None]: + base_query = self._build_base_query() + query, params = self._build_time_filtered_query(base_query, start, end, start_id) + yield from self._yield_rows_from_query(query, params) + + def get_max_cursor_value(self) -> Tuple[Any, Any]: + if not self.timestamp_column: + return None, None + + client = self._get_client() + query = self._build_max_timestamp_query(self._build_base_query()) + logging.debug("Executing BigQuery max timestamp query: %s...", query[:200]) + job = client.query( + query, + job_config=self._build_query_job_config(), + location=self.location or None, + ) + row = next(iter(job.result()), None) + if row is None or row[0] is None: + return None, None + return row[0], row[1] + + # ------------------------------------------------------------------ # + # LoadConnector / PollConnector + # ------------------------------------------------------------------ # + def load_from_state(self) -> Generator[list[Document], None, None]: + """Load all rows from the configured table/query (full sync).""" + logging.debug("Loading all records from BigQuery project: %s", self.project_id) + return self._yield_documents() + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> Generator[list[Document], None, None]: + """Poll for new/updated rows. Provided for interface completeness. + + Orchestration drives full/incremental sync via ``load_from_state`` / + ``load_from_cursor_range``; this falls back to a full sync without a + timestamp column. + """ + if not self.timestamp_column: + logging.warning( + "No timestamp column configured for incremental sync. Falling back to full sync." + ) + return self.load_from_state() + start_dt = datetime.fromtimestamp(start, tz=timezone.utc) if start else None + end_dt = datetime.fromtimestamp(end, tz=timezone.utc) if end else None + return self._yield_documents(start_dt, end_dt) + + def load_from_cursor_range( + self, + start_value: Any = None, + end_value: Any = None, + start_id: Any = None, + ) -> Generator[list[Document], None, None]: + if end_value is None: + return iter(()) + if start_value is not None and end_value < start_value: + return iter(()) + return self._yield_documents(start_value, end_value, start_id) + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> Generator[list[SlimDocument], None, None]: + del callback + yield from self._yield_slim_documents_from_query( + self._build_slim_query(self._build_base_query()) + ) + + # ------------------------------------------------------------------ # + # Sync-state persistence (success-only cursor) + # ------------------------------------------------------------------ # + def prepare_sync_state(self, connector_id: str, config: Dict[str, Any]) -> None: + self._sync_connector_id = connector_id + self._sync_config = copy.deepcopy(config) + if not self.timestamp_column: + self._pending_sync_cursor_value = None + self._pending_sync_cursor_id = None + return + self._pending_sync_cursor_value, self._pending_sync_cursor_id = self.get_max_cursor_value() + + def get_saved_sync_cursor_value(self) -> Any: + if self._sync_config is None: + return None + return self.deserialize_cursor_value(self._sync_config.get("sync_cursor_value")) + + def get_saved_sync_cursor_id(self) -> Any: + if self._sync_config is None: + return None + return self._sync_config.get("sync_cursor_id") + + def persist_sync_state(self) -> None: + if not self.timestamp_column or self._sync_connector_id is None or self._sync_config is None: + return + + from api.db.services.connector_service import ConnectorService + + updated_conf = copy.deepcopy(self._sync_config) + updated_conf["sync_cursor_value"] = self.serialize_cursor_value( + self._pending_sync_cursor_value + ) + updated_conf["sync_cursor_id"] = self._pending_sync_cursor_id + ConnectorService.update_by_id(self._sync_connector_id, {"config": updated_conf}) + self._sync_config = updated_conf + + # ------------------------------------------------------------------ # + # Validation + # ------------------------------------------------------------------ # + def validate_connector_settings(self) -> None: + """Validate settings via SELECT 1 plus a dry-run of the configured base query.""" + if not self._credentials: + raise ConnectorMissingCredentialError("BigQuery credentials not loaded.") + if not self.project_id: + raise ConnectorValidationError("BigQuery project_id is required.") + if not self.content_columns: + raise ConnectorValidationError("At least one content column must be specified.") + if not self.query and not (self.dataset_id and self.table_id): + raise ConnectorValidationError( + "BigQuery requires either a custom query or both dataset_id and table_id." + ) + + try: + client = self._get_client() + + # Cheap connectivity check. + client.query( + "SELECT 1", + job_config=self._build_query_job_config(), + location=self.location or None, + ).result() + + # Free cost/validity check of the actual base query. + dry_run_job = client.query( + self._wrap_query(self._build_base_query()), + job_config=self._build_query_job_config(dry_run=True), + location=self.location or None, + ) + estimated_bytes = getattr(dry_run_job, "total_bytes_processed", None) + if estimated_bytes is not None: + logging.info( + "BigQuery base query dry-run estimate: %s bytes processed.", estimated_bytes + ) + + schema = self._resolve_schema(client, dry_run_job) + schema_columns = {field.name for field in schema} + + required = set(self.content_columns) + optional = set(self.metadata_columns) + if self.id_column: + optional.add(self.id_column) + if self.timestamp_column: + optional.add(self.timestamp_column) + + missing = (required | optional) - schema_columns + if missing: + raise ConnectorValidationError( + f"BigQuery configured columns not found in schema: {', '.join(sorted(missing))}" + ) + + if self.timestamp_column: + self._resolve_cursor_param_type() + except (ConnectorValidationError, ConnectorMissingCredentialError): + raise + except Exception as exc: + raise ConnectorValidationError(f"BigQuery validation failed: {exc}") diff --git a/common/data_source/config.py b/common/data_source/config.py index f73e3d2b98..6e476d0fda 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -68,6 +68,7 @@ class DocumentSource(str, Enum): SEAFILE = "seafile" MYSQL = "mysql" POSTGRESQL = "postgresql" + BIGQUERY = "bigquery" DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" OUTLOOK = "outlook" diff --git a/common/data_source/rdbms_connector.py b/common/data_source/rdbms_connector.py index 1e269c0678..4d38d303f1 100644 --- a/common/data_source/rdbms_connector.py +++ b/common/data_source/rdbms_connector.py @@ -359,7 +359,7 @@ class RDBMSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): conditions = [] if start is not None: conditions.append( - f"ragflow_src.{self.timestamp_column} > {self._format_sql_value(start)}" + f"ragflow_src.{self.timestamp_column} >= {self._format_sql_value(start)}" ) if end is not None: conditions.append( @@ -666,16 +666,17 @@ class RDBMSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): self, start_value: Any = None, end_value: Any = None, + start_id: Any = None, ) -> Generator[list[Document], None, None]: - """Yield documents whose timestamp column falls in ``(start_value, end_value]``. + """Yield documents whose timestamp column falls in ``[start_value, end_value]``. Returns an empty iterator when *end_value* is ``None`` or the range is - empty (``end_value <= start_value``). + empty (``end_value < start_value``). """ if end_value is None: self._close_connection() return iter(()) - if start_value is not None and end_value <= start_value: + if start_value is not None and end_value < start_value: self._close_connection() return iter(()) return self._yield_documents(start_value, end_value) diff --git a/pyproject.toml b/pyproject.toml index 9b5f947e47..c4584c749f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ dependencies = [ "chardet>=5.2.0,<6.0.0", "cn2an==0.5.22", "cohere==5.6.2", - "crawl4ai>=0.8.0", + "crawl4ai>=0.8.0,<0.8.6", "dashscope==1.25.11", "deepl==1.18.0", "debugpy>=1.8.13", @@ -49,6 +49,7 @@ dependencies = [ "flask-session==0.8.0", "google-api-python-client>=2.190.0,<3.0.0", "google-auth-oauthlib>=1.2.0,<2.0.0", + "google-cloud-bigquery>=3.25.0,<4.0.0", "google-cloud-storage>=2.19.0,<3.0.0", "google-genai>=1.41.0,<2.0.0", "google-search-results==2.4.2", @@ -214,6 +215,8 @@ exclude-dependencies = [ # crawl4ai>=0.8.6 depends on unclecode-litellm, which installs the same # `litellm` package namespace as upstream litellm and can corrupt imports. "unclecode-litellm", + # Transitive dep of agentrun-sdk; not imported anywhere in RAGFlow code. + "agentrun-mem0ai", ] override-dependencies = [ # moodlepy<=0.24.1 pins attrs<23.0.0, but trio>=0.26.0 requires attrs>=23.2.0. diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index fcd682d772..62dad5823e 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -59,6 +59,7 @@ from common.data_source import ( ZendeskConnector, SeaFileConnector, RDBMSConnector, + BigQueryConnector, DingTalkAITableConnector, RestAPIConnector, OneDriveConnector, @@ -208,6 +209,7 @@ class SyncBase: document_batch_generator = await self._generate(task) failed_docs = 0 + had_parse_errors = False added_docs = 0 updated_docs = 0 next_update = datetime(1970, 1, 1, tzinfo=timezone.utc) @@ -257,6 +259,8 @@ class SyncBase: f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"] ) + if err: + had_parse_errors = True SyncLogsService.increase_docs( task["id"], max_update, len(docs), "\n".join(err), len(err) @@ -295,8 +299,9 @@ class SyncBase: logging.info(summary) if ( - isinstance(self, _RDBMSBase) + isinstance(self, _CursorPersistingSyncBase) and failed_docs == 0 + and not had_parse_errors ): self.connector.persist_sync_state() SyncLogsService.done(task["id"], task["connector_id"]) @@ -2053,7 +2058,17 @@ class DingTalkAITable(SyncBase): return document_generator -class _RDBMSBase(SyncBase): +class _CursorPersistingSyncBase(SyncBase): + """Base for connectors that persist a sync cursor only after a fully successful sync. + + ``_run_sync_task_logic`` calls ``self.connector.persist_sync_state()`` for any + subclass of this base when no document batch failed. Sources whose connector + exposes ``prepare_sync_state``/``persist_sync_state`` (RDBMS, BigQuery) extend + this instead of being matched by an ``isinstance(self, _RDBMSBase)`` check. + """ + + +class _RDBMSBase(_CursorPersistingSyncBase): DB_TYPE: str = "" LOG_NAME: str = "" DEFAULT_PORT: int = 0 @@ -2089,9 +2104,11 @@ class _RDBMSBase(SyncBase): else: poll_start = task["poll_range_start"] start_cursor_value = self.connector.get_saved_sync_cursor_value() + start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None document_generator = self.connector.load_from_cursor_range( start_cursor_value, self.connector._pending_sync_cursor_value, + start_cursor_id, ) _begin_info = f"from {poll_start}" @@ -2113,6 +2130,73 @@ class PostgreSQL(_RDBMSBase): DEFAULT_PORT: int = 5432 +class BigQuery(_CursorPersistingSyncBase): + SOURCE_NAME: str = FileSource.BIGQUERY + + def _get_source_prefix(self): + return "[BigQuery]" + + async def _generate(self, task: dict): + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + connector_kwargs = { + "project_id": self.conf.get("project_id", ""), + "dataset_id": self.conf.get("dataset_id") or None, + "table_id": self.conf.get("table_id") or None, + "location": self.conf.get("location") or None, + "query": self.conf.get("query", ""), + "content_columns": self.conf.get("content_columns", ""), + "metadata_columns": self.conf.get("metadata_columns", ""), + "id_column": self.conf.get("id_column") or None, + "timestamp_column": self.conf.get("timestamp_column") or None, + "batch_size": batch_size, + "use_query_cache": self.conf.get("use_query_cache", True), + } + if self.conf.get("page_size") is not None: + connector_kwargs["page_size"] = int(self.conf["page_size"]) + if self.conf.get("maximum_bytes_billed") is not None: + connector_kwargs["maximum_bytes_billed"] = int(self.conf["maximum_bytes_billed"]) + if self.conf.get("job_timeout_ms") is not None: + connector_kwargs["job_timeout_ms"] = int(self.conf["job_timeout_ms"]) + + self.connector = BigQueryConnector(**connector_kwargs) + + credentials = self.conf.get("credentials") + if not credentials: + raise ValueError("BigQuery connector is missing credentials.") + + self.connector.load_credentials(credentials) + self.connector.validate_connector_settings() + self.connector.prepare_sync_state(task["connector_id"], self.conf) + + if task["reindex"] == "1" or not task["poll_range_start"]: + document_generator = self.connector.load_from_state() + elif not self.connector.timestamp_column: + document_generator = self.connector.load_from_state() + else: + start_cursor_value = self.connector.get_saved_sync_cursor_value() + start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None + document_generator = self.connector.load_from_cursor_range( + start_cursor_value, + self.connector._pending_sync_cursor_value, + start_cursor_id, + ) + + target = ( + f"{self.conf.get('dataset_id')}.{self.conf.get('table_id')}" + if not self.conf.get("query") + else "custom query" + ) + self.log_connection("BigQuery", f"{self.conf.get('project_id')}:{target}", task) + return document_generator + + class REST_API(SyncBase): SOURCE_NAME: str = FileSource.REST_API @@ -2173,6 +2257,7 @@ func_factory = { FileSource.SEAFILE: SeaFile, FileSource.MYSQL: MySQL, FileSource.POSTGRESQL: PostgreSQL, + FileSource.BIGQUERY: BigQuery, FileSource.DINGTALK_AI_TABLE: DingTalkAITable, FileSource.REST_API: REST_API, } diff --git a/test/unit_test/data_source/test_bigquery_connector.py b/test/unit_test/data_source/test_bigquery_connector.py new file mode 100644 index 0000000000..baa856bebe --- /dev/null +++ b/test/unit_test/data_source/test_bigquery_connector.py @@ -0,0 +1,430 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import types +from datetime import date, datetime, time, timezone +from decimal import Decimal + +import pytest + +from common.data_source import bigquery_connector as bq_mod +from common.data_source.bigquery_connector import BigQueryConnector +from common.data_source.config import DocumentSource +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, +) + + +# --------------------------------------------------------------------------- +# Fakes for the google-cloud-bigquery client surface +# --------------------------------------------------------------------------- + + +class _FakeSchemaField: + def __init__(self, name, field_type): + self.name = name + self.field_type = field_type + + +class _FakeScalarQueryParameter: + def __init__(self, name, type_, value): + self.name = name + self.type_ = type_ + self.value = value + + +class _FakeQueryJobConfig: + def __init__(self): + self.use_legacy_sql = None + self.use_query_cache = None + self.maximum_bytes_billed = None + self.job_timeout_ms = None + self.query_parameters = None + self.dry_run = False + + +class _FakeJob: + def __init__(self, rows=None, schema=None, total_bytes=123): + self._rows = rows if rows is not None else [] + self.schema = schema or [] + self.total_bytes_processed = total_bytes + + def result(self, page_size=None): + return iter(self._rows) + + +class _FakeClient: + def __init__(self, rows=None, result_schema=None, table_schema=None): + self._rows = rows if rows is not None else [] + self._result_schema = result_schema or [] + self._table_schema = table_schema or [] + self.queries = [] + + def query(self, query, job_config=None, location=None): + self.queries.append((query, job_config, location)) + return _FakeJob(rows=self._rows, schema=self._result_schema) + + def get_table(self, ref): + return types.SimpleNamespace(schema=self._table_schema) + + +@pytest.fixture(autouse=True) +def _patch_bigquery(monkeypatch): + fake = types.SimpleNamespace( + QueryJobConfig=_FakeQueryJobConfig, + ScalarQueryParameter=_FakeScalarQueryParameter, + ) + monkeypatch.setattr(bq_mod, "bigquery", fake) + monkeypatch.setattr(bq_mod, "service_account", types.SimpleNamespace()) + + +SERVICE_ACCOUNT = {"type": "service_account", "project_id": "p"} + + +def _make_connector(client=None, **overrides): + kwargs = dict( + project_id="my-proj", + dataset_id="ds", + table_id="tbl", + content_columns="name,description", + metadata_columns="status", + id_column="id", + timestamp_column=None, + batch_size=2, + ) + kwargs.update(overrides) + connector = BigQueryConnector(**kwargs) + connector._credentials = {"service_account_info": SERVICE_ACCOUNT} + if client is not None: + connector._client = client + return connector + + +# --------------------------------------------------------------------------- +# Credentials +# --------------------------------------------------------------------------- + + +@pytest.mark.p2 +def test_load_credentials_accepts_json_string(): + connector = BigQueryConnector(project_id="p", content_columns="name") + connector.load_credentials({"service_account_json": json.dumps(SERVICE_ACCOUNT)}) + assert connector._credentials["service_account_info"] == SERVICE_ACCOUNT + + +@pytest.mark.p2 +def test_load_credentials_accepts_dict(): + connector = BigQueryConnector(project_id="p", content_columns="name") + connector.load_credentials({"service_account_json": SERVICE_ACCOUNT}) + assert connector._credentials["service_account_info"] == SERVICE_ACCOUNT + + +@pytest.mark.p2 +def test_missing_credentials_raises(): + connector = BigQueryConnector(project_id="p", content_columns="name") + with pytest.raises(ConnectorMissingCredentialError): + connector.load_credentials({}) + + +@pytest.mark.p2 +def test_invalid_credentials_json_raises(): + connector = BigQueryConnector(project_id="p", content_columns="name") + with pytest.raises(ConnectorMissingCredentialError): + connector.load_credentials({"service_account_json": "{not-json"}) + + +# --------------------------------------------------------------------------- +# Query construction +# --------------------------------------------------------------------------- + + +@pytest.mark.p2 +def test_table_query_quotes_identifiers(): + connector = _make_connector() + assert connector._build_base_query() == "SELECT * FROM `my-proj.ds.tbl`" + + +@pytest.mark.p2 +def test_custom_query_strips_trailing_semicolon(): + connector = _make_connector(query="SELECT * FROM t WHERE x = 1;") + assert connector._build_base_query() == "SELECT * FROM t WHERE x = 1" + + +@pytest.mark.p2 +def test_missing_table_and_query_raises(): + connector = _make_connector(dataset_id="", table_id="", query="") + with pytest.raises(ConnectorValidationError): + connector._build_base_query() + + +@pytest.mark.p2 +def test_time_filtered_query_compound_cursor_with_id_column(): + client = _FakeClient(table_schema=[ + _FakeSchemaField("updated_at", "TIMESTAMP"), + _FakeSchemaField("id", "STRING") + ]) + connector = _make_connector(client=client, timestamp_column="updated_at", id_column="id") + start = datetime(2026, 1, 1, tzinfo=timezone.utc) + end = datetime(2026, 2, 1, tzinfo=timezone.utc) + + query, params = connector._build_time_filtered_query( + connector._build_base_query(), start, end, start_id="last-id" + ) + + assert "(ragflow_src.updated_at > @start_cursor OR (ragflow_src.updated_at = @start_cursor AND ragflow_src.id > @start_cursor_id))" in query + assert "ragflow_src.updated_at <= @end_cursor" in query + assert [(p.name, p.type_, p.value) for p in params] == [ + ("start_cursor", "TIMESTAMP", start), + ("start_cursor_id", "STRING", "last-id"), + ("end_cursor", "TIMESTAMP", end), + ] + +@pytest.mark.p2 +def test_time_filtered_query_uses_gte_without_id_column(): + client = _FakeClient(table_schema=[_FakeSchemaField("updated_at", "TIMESTAMP")]) + connector = _make_connector(client=client, timestamp_column="updated_at", id_column=None) + start = datetime(2026, 1, 1, tzinfo=timezone.utc) + end = datetime(2026, 2, 1, tzinfo=timezone.utc) + + query, params = connector._build_time_filtered_query( + connector._build_base_query(), start, end + ) + + assert "ragflow_src.updated_at >= @start_cursor" in query + assert "ragflow_src.updated_at <= @end_cursor" in query + assert [(p.name, p.type_, p.value) for p in params] == [ + ("start_cursor", "TIMESTAMP", start), + ("end_cursor", "TIMESTAMP", end), + ] + + +@pytest.mark.p2 +def test_cursor_param_type_resolves_for_date_and_int(): + date_client = _FakeClient(table_schema=[_FakeSchemaField("d", "DATE")]) + int_client = _FakeClient(table_schema=[_FakeSchemaField("n", "INTEGER")]) + + date_conn = _make_connector(client=date_client, timestamp_column="d") + int_conn = _make_connector(client=int_client, timestamp_column="n") + + assert date_conn._resolve_cursor_param_type() == "DATE" + assert int_conn._resolve_cursor_param_type() == "INT64" + + +@pytest.mark.p2 +def test_unsupported_cursor_type_raises(): + client = _FakeClient(table_schema=[_FakeSchemaField("ts", "BOOL")]) + connector = _make_connector(client=client, timestamp_column="ts") + with pytest.raises(ConnectorValidationError): + connector._resolve_cursor_param_type() + + +@pytest.mark.p2 +def test_slim_query_uses_id_column(): + connector = _make_connector() + slim = connector._build_slim_query(connector._build_base_query()) + assert "ragflow_src.id" in slim + + +# --------------------------------------------------------------------------- +# Value rendering & row conversion +# --------------------------------------------------------------------------- + + +@pytest.mark.p2 +def test_value_serialization_types(): + assert BigQueryConnector._render_content_value(Decimal("1.50")) == "1.50" + assert BigQueryConnector._render_content_value(b"binary") is None + assert BigQueryConnector._render_content_value(date(2026, 1, 2)) == "2026-01-02" + assert BigQueryConnector._render_content_value(time(3, 4, 5)) == "03:04:05" + assert ( + BigQueryConnector._render_content_value({"a": 1, "b": [1, 2]}) + == '{"a": 1, "b": [1, 2]}' + ) + assert ( + BigQueryConnector._render_content_value("POINT(1 2)") == "POINT(1 2)" + ) # GEOGRAPHY WKT passes through + + # Metadata base64-encodes bytes instead of skipping. + assert BigQueryConnector._render_metadata_value(b"hi") == "aGk=" + + +@pytest.mark.p2 +def test_row_to_document_content_metadata_id_timestamp(): + connector = _make_connector(timestamp_column="updated_at") + ts = datetime(2026, 3, 4, 5, 6, 7, tzinfo=timezone.utc) + row = { + "id": 42, + "name": "Acme", + "description": "A company", + "status": "active", + "updated_at": ts, + } + + doc = connector._row_to_document(row) + + assert doc.id == "bigquery:my-proj:ds.tbl:42" + assert "【name】:\nAcme" in doc.blob.decode("utf-8") + assert "【description】:\nA company" in doc.blob.decode("utf-8") + assert doc.metadata == {"status": "active"} + assert doc.source == DocumentSource.BIGQUERY + assert doc.extension == ".txt" + assert doc.doc_updated_at == ts + assert doc.semantic_identifier == "Acme" + + +@pytest.mark.p2 +def test_document_id_falls_back_to_content_hash_without_id_column(): + connector = _make_connector(id_column=None) + row = {"name": "Acme", "description": "A company", "status": "active"} + doc_id = connector._build_document_id_from_row(row) + assert doc_id.startswith("bigquery:my-proj:ds.tbl:") + assert len(doc_id.rsplit(":", 1)[1]) == 32 # md5 hex + + +@pytest.mark.p2 +def test_custom_query_mode_id_prefix(): + connector = _make_connector(dataset_id="", table_id="", query="SELECT * FROM t") + row = {"id": 7, "name": "x"} + assert connector._build_document_id_from_row(row) == "bigquery:my-proj:query:7" + + +# --------------------------------------------------------------------------- +# Batching & cursor round-trip +# --------------------------------------------------------------------------- + + +@pytest.mark.p2 +def test_batches_accumulate_to_batch_size(): + rows = [ + {"id": i, "name": f"n{i}", "description": "d", "status": "s"} + for i in range(5) + ] + client = _FakeClient(rows=rows) + connector = _make_connector(client=client, batch_size=2) + + batches = list(connector.load_from_state()) + + assert [len(b) for b in batches] == [2, 2, 1] + + +@pytest.mark.p2 +def test_cursor_serialize_deserialize_roundtrip(): + dt = datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc) + d = date(2026, 1, 1) + t = time(12, 0) + dec = Decimal("1.23") + + assert BigQueryConnector.deserialize_cursor_value( + BigQueryConnector.serialize_cursor_value(dt) + ) == dt + assert BigQueryConnector.deserialize_cursor_value( + BigQueryConnector.serialize_cursor_value(d) + ) == d + assert BigQueryConnector.deserialize_cursor_value( + BigQueryConnector.serialize_cursor_value(t) + ) == t + assert BigQueryConnector.deserialize_cursor_value( + BigQueryConnector.serialize_cursor_value(dec) + ) == dec + assert BigQueryConnector.serialize_cursor_value(42) == 42 + assert BigQueryConnector.deserialize_cursor_value(42) == 42 + + +@pytest.mark.p2 +def test_load_from_cursor_range_empty_without_end(): + connector = _make_connector(timestamp_column="updated_at") + assert list(connector.load_from_cursor_range(start_value=None, end_value=None)) == [] + + +@pytest.mark.p2 +def test_load_from_cursor_range_empty_when_end_not_after_start(): + connector = _make_connector(timestamp_column="updated_at") + start = datetime(2026, 2, 1, tzinfo=timezone.utc) + end = datetime(2026, 1, 1, tzinfo=timezone.utc) + assert list(connector.load_from_cursor_range(start, end)) == [] + + +# --------------------------------------------------------------------------- +# Slim docs & validation +# --------------------------------------------------------------------------- + + +@pytest.mark.p2 +def test_slim_docs_use_id_column(): + rows = [{"id": 1}, {"id": 2}] + client = _FakeClient(rows=rows) + connector = _make_connector(client=client, batch_size=10) + + slim_batches = list(connector.retrieve_all_slim_docs_perm_sync()) + ids = [doc.id for batch in slim_batches for doc in batch] + + assert ids == ["bigquery:my-proj:ds.tbl:1", "bigquery:my-proj:ds.tbl:2"] + + +@pytest.mark.p2 +def test_validation_detects_missing_content_column(): + client = _FakeClient(table_schema=[_FakeSchemaField("other", "STRING")]) + connector = _make_connector(client=client, content_columns="name") + with pytest.raises(ConnectorValidationError, match="name"): + connector.validate_connector_settings() + +@pytest.mark.p2 +def test_validation_detects_missing_metadata_column(): + client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")]) + connector = _make_connector(client=client, content_columns="name", metadata_columns="status") + with pytest.raises(ConnectorValidationError, match="status"): + connector.validate_connector_settings() + +@pytest.mark.p2 +def test_validation_detects_missing_id_column(): + client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")]) + connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="id") + with pytest.raises(ConnectorValidationError, match="id"): + connector.validate_connector_settings() + +@pytest.mark.p2 +def test_validation_detects_missing_timestamp_column(): + client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING")]) + connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="", timestamp_column="ts") + with pytest.raises(ConnectorValidationError, match="ts"): + connector.validate_connector_settings() + +@pytest.mark.p2 +def test_validation_detects_unsupported_cursor_type_early(): + client = _FakeClient(table_schema=[_FakeSchemaField("name", "STRING"), _FakeSchemaField("ts", "BOOL")]) + connector = _make_connector(client=client, content_columns="name", metadata_columns="", id_column="", timestamp_column="ts") + with pytest.raises(ConnectorValidationError, match="not supported as a cursor"): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validation_missing_credentials_raises(): + connector = BigQueryConnector(project_id="p", content_columns="name") + with pytest.raises(ConnectorMissingCredentialError): + connector.validate_connector_settings() + + +@pytest.mark.p2 +def test_validation_dry_run_failure_becomes_validation_error(): + class _FailingClient(_FakeClient): + def query(self, query, job_config=None, location=None): + raise RuntimeError("bad query / would scan too much") + + connector = _make_connector(client=_FailingClient()) + with pytest.raises(ConnectorValidationError): + connector.validate_connector_settings() diff --git a/test/unit_test/rag/test_sync_data_source.py b/test/unit_test/rag/test_sync_data_source.py index fd4add0392..57fbfa043b 100644 --- a/test/unit_test/rag/test_sync_data_source.py +++ b/test/unit_test/rag/test_sync_data_source.py @@ -314,7 +314,7 @@ class _FakeRDBMSConnector: self.load_from_state_called = True return iter((["full-sync"],)) - def load_from_cursor_range(self, start_value=None, end_value=None): + def load_from_cursor_range(self, start_value=None, start_id=None, end_value=None): self.load_from_cursor_range_called = True return iter(([ _make_fake_doc("incremental-doc") ],)) @@ -405,6 +405,54 @@ async def test_rdbms_cursor_persists_only_after_success(monkeypatch): assert connector.persist_sync_state_called is True +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_rdbms_cursor_does_not_persist_when_parse_returns_errors(monkeypatch): + monkeypatch.setattr(sync_data_source, "RDBMSConnector", _FakeRDBMSConnector) + _patch_common_dependencies(monkeypatch) + monkeypatch.setattr( + sync_data_source.KnowledgebaseService, + "get_by_id", + lambda *_args, **_kwargs: (True, object()), + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "increase_docs", + lambda *_args, **_kwargs: None, + ) + + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "duplicate_and_parse", + lambda *_args, **_kwargs: (["parse error"], ["parsed-doc-id"]), + ) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.MySQL( + { + "host": "localhost", + "port": 3306, + "database": "db", + "query": "SELECT * FROM t", + "content_columns": "name", + "timestamp_column": "ts", + "credentials": {"username": "u", "password": "p"}, + "sync_deleted_files": False, + } + ) + + await sync._run_task_logic(task) + + connector = _FakeRDBMSConnector.instance + assert connector is not None + assert connector.persist_sync_state_called is False + + @pytest.mark.asyncio @pytest.mark.p2 async def test_rdbms_cursor_does_not_persist_when_batch_is_skipped(monkeypatch): @@ -456,6 +504,263 @@ async def test_rdbms_cursor_does_not_persist_when_batch_is_skipped(monkeypatch): assert connector.persist_sync_state_called is False +class _FakeBigQueryConnector: + instance = None + + def __init__( + self, + project_id, + dataset_id=None, + table_id=None, + location=None, + query="", + content_columns="", + metadata_columns=None, + id_column=None, + timestamp_column=None, + batch_size=2, + page_size=1000, + maximum_bytes_billed=None, + job_timeout_ms=None, + use_query_cache=True, + ): + self.project_id = project_id + self.dataset_id = dataset_id + self.table_id = table_id + self.query = query + self.content_columns = content_columns + self.timestamp_column = timestamp_column + self.batch_size = batch_size + self.load_from_state_called = False + self.load_from_cursor_range_called = False + self.retrieve_all_slim_docs_perm_sync_called = False + self.prepare_sync_state_called = False + self.persist_sync_state_called = False + self._pending_sync_cursor_value = None + _FakeBigQueryConnector.instance = self + + def load_credentials(self, credentials): + self.credentials = credentials + + def validate_connector_settings(self): + return None + + def prepare_sync_state(self, connector_id, config): + self.prepare_sync_state_called = True + self.prepare_sync_state_args = (connector_id, config) + + def get_saved_sync_cursor_value(self): + return None + + def retrieve_all_slim_docs_perm_sync(self, callback=None): + del callback + self.retrieve_all_slim_docs_perm_sync_called = True + yield [types.SimpleNamespace(id="bq-row-1")] + + def load_from_state(self): + self.load_from_state_called = True + return iter((["full-sync"],)) + + def load_from_cursor_range(self, start_value=None, start_id=None, end_value=None): + self.load_from_cursor_range_called = True + return iter(([_make_fake_doc("bq-incremental-doc")],)) + + def persist_sync_state(self): + self.persist_sync_state_called = True + + +def _bigquery_conf(**overrides): + conf = { + "project_id": "proj", + "dataset_id": "ds", + "table_id": "tbl", + "content_columns": "name", + "credentials": {"service_account_json": "{}"}, + "sync_deleted_files": False, + } + conf.update(overrides) + return conf + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_generate_full_sync_on_first_run(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": None, + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf()) + + document_generator = await sync._generate(task) + connector = _FakeBigQueryConnector.instance + + assert connector is not None + assert connector.prepare_sync_state_called is True + assert connector.load_from_state_called is True + assert connector.load_from_cursor_range_called is False + assert list(document_generator) == [["full-sync"]] + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_generate_incremental_cursor_path(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at")) + + document_generator = await sync._generate(task) + connector = _FakeBigQueryConnector.instance + + assert connector is not None + assert connector.load_from_cursor_range_called is True + assert connector.load_from_state_called is False + assert [doc.id for doc in list(document_generator)[0]] == ["bq-incremental-doc"] + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_cursor_persists_only_after_success(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + _patch_common_dependencies(monkeypatch) + monkeypatch.setattr( + sync_data_source.KnowledgebaseService, + "get_by_id", + lambda *_args, **_kwargs: (True, object()), + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "increase_docs", + lambda *_args, **_kwargs: None, + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "duplicate_and_parse", + lambda *_args, **_kwargs: ([], ["parsed-doc-id"]), + ) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at")) + + await sync._run_task_logic(task) + + connector = _FakeBigQueryConnector.instance + assert connector is not None + assert connector.persist_sync_state_called is True + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_cursor_does_not_persist_when_parse_returns_errors(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + _patch_common_dependencies(monkeypatch) + monkeypatch.setattr( + sync_data_source.KnowledgebaseService, + "get_by_id", + lambda *_args, **_kwargs: (True, object()), + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "increase_docs", + lambda *_args, **_kwargs: None, + ) + + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "duplicate_and_parse", + lambda *_args, **_kwargs: (["parse error"], ["parsed-doc-id"]), + ) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at")) + + await sync._run_task_logic(task) + + connector = _FakeBigQueryConnector.instance + assert connector is not None + assert connector.persist_sync_state_called is False + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_cursor_does_not_persist_when_batch_is_skipped(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + _patch_common_dependencies(monkeypatch) + monkeypatch.setattr( + sync_data_source.KnowledgebaseService, + "get_by_id", + lambda *_args, **_kwargs: (True, object()), + ) + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "increase_docs", + lambda *_args, **_kwargs: None, + ) + + def _raise_in_duplicate_and_parse(*_args, **_kwargs): + raise RuntimeError("batch failed") + + monkeypatch.setattr( + sync_data_source.SyncLogsService, + "duplicate_and_parse", + _raise_in_duplicate_and_parse, + ) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": datetime(2026, 1, 1, tzinfo=timezone.utc), + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf(timestamp_column="updated_at")) + + await sync._run_task_logic(task) + + connector = _FakeBigQueryConnector.instance + assert connector is not None + assert connector.persist_sync_state_called is False + + +@pytest.mark.asyncio +@pytest.mark.p2 +async def test_bigquery_collect_prune_snapshot_when_enabled(monkeypatch): + monkeypatch.setattr(sync_data_source, "BigQueryConnector", _FakeBigQueryConnector) + + task = { + **_make_task(), + "reindex": "0", + "poll_range_start": None, + "skip_connection_log": True, + } + sync = sync_data_source.BigQuery(_bigquery_conf(sync_deleted_files=True)) + + await sync._generate(task) + file_list = sync._collect_prune_snapshot(task) + connector = _FakeBigQueryConnector.instance + + assert connector.retrieve_all_slim_docs_perm_sync_called is True + assert [doc.id for doc in file_list] == ["bq-row-1"] + + class _FakeDropboxConnector: instance = None diff --git a/web/src/assets/svg/data-source/bigquery.svg b/web/src/assets/svg/data-source/bigquery.svg new file mode 100644 index 0000000000..843ab80283 --- /dev/null +++ b/web/src/assets/svg/data-source/bigquery.svg @@ -0,0 +1,4 @@ + + + + diff --git a/web/src/components/markdown-content/index.tsx b/web/src/components/markdown-content/index.tsx index 1d5affe156..7eb6398e37 100644 --- a/web/src/components/markdown-content/index.tsx +++ b/web/src/components/markdown-content/index.tsx @@ -1,5 +1,6 @@ import Image from '@/components/image'; import SvgIcon from '@/components/svg-icon'; +import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins'; import { IReference, IReferenceChunk } from '@/interfaces/database/chat'; import { citationMarkerReg } from '@/utils/citation-utils'; import { getExtension } from '@/utils/document-util'; @@ -10,7 +11,6 @@ import Markdown from 'react-markdown'; import SyntaxHighlighter from 'react-syntax-highlighter'; import rehypeKatex from 'rehype-katex'; import rehypeRaw from 'rehype-raw'; -import { MarkdownRemarkPlugins } from '@/constants/markdown-remark-plugins'; import { visitParents } from 'unist-util-visit-parents'; import { useTranslation } from 'react-i18next'; @@ -72,7 +72,11 @@ const MarkdownContent = ({ text = t('chat.searching'); } const nextText = replaceTextByOldReg(text); - return pipe(replaceThinkToSection, replaceRetrievingToSection, preprocessLaTeX)(nextText); + return pipe( + replaceThinkToSection, + replaceRetrievingToSection, + preprocessLaTeX, + )(nextText); }, [content, t]); useEffect(() => { diff --git a/web/src/components/paddleocr-options-form-field.tsx b/web/src/components/paddleocr-options-form-field.tsx index acadc8244d..87ffba5baf 100644 --- a/web/src/components/paddleocr-options-form-field.tsx +++ b/web/src/components/paddleocr-options-form-field.tsx @@ -6,7 +6,14 @@ import { buildOptions } from '@/utils/form'; import { useFormContext, useWatch } from 'react-hook-form'; import { useTranslation } from 'react-i18next'; -const algorithmOptions = buildOptions(['PaddleOCR-VL-1.6', 'PaddleOCR-VL-1.5', 'PaddleOCR-VL', 'PP-OCRv6', 'PP-OCRv5', 'PP-StructureV3']); +const algorithmOptions = buildOptions([ + 'PaddleOCR-VL-1.6', + 'PaddleOCR-VL-1.5', + 'PaddleOCR-VL', + 'PP-OCRv6', + 'PP-OCRv5', + 'PP-StructureV3', +]); export function PaddleOCROptionsFormField({ namePrefix = 'parser_config', @@ -57,7 +64,10 @@ export function PaddleOCROptionsFormField({ ( )} { diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 12d93500b3..c542ded061 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -41,6 +41,7 @@ export enum DataSourceKey { SEAFILE = 'seafile', MYSQL = 'mysql', POSTGRESQL = 'postgresql', + BIGQUERY = 'bigquery', REST_API = 'rest_api', RSS = 'rss', ONEDRIVE = 'onedrive', @@ -160,6 +161,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.POSTGRESQL]: { syncDeletedFiles: true, }, + [DataSourceKey.BIGQUERY]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = ( @@ -333,6 +337,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.POSTGRESQL}Description`), icon: , }, + [DataSourceKey.BIGQUERY]: { + name: 'BigQuery', + description: t(`setting.${DataSourceKey.BIGQUERY}Description`), + icon: , + }, [DataSourceKey.ONEDRIVE]: { name: 'OneDrive', description: t(`setting.${DataSourceKey.ONEDRIVE}Description`), @@ -1483,6 +1492,166 @@ export const DataSourceFormFields = { tooltip: t('setting.postgresqlTimestampColumnTip'), }, ], + [DataSourceKey.BIGQUERY]: [ + { + label: 'Project ID', + name: 'config.project_id', + type: FormFieldType.Text, + required: true, + placeholder: 'my-gcp-project', + tooltip: t('setting.bigqueryProjectIdTip'), + }, + { + label: 'Location', + name: 'config.location', + type: FormFieldType.Text, + required: false, + placeholder: 'US', + tooltip: t('setting.bigqueryLocationTip'), + }, + { + label: 'Service Account JSON', + name: 'config.credentials.service_account_json', + type: FormFieldType.Password, + required: true, + placeholder: '{ "type": "service_account", "project_id": "...", ... }', + tooltip: t('setting.bigqueryServiceAccountJsonTip'), + }, + { + label: 'Dataset ID', + name: 'config.dataset_id', + type: FormFieldType.Text, + required: false, + placeholder: 'analytics', + tooltip: t('setting.bigqueryDatasetIdTip'), + customValidate: (val: string, values: any) => { + const hasQuery = !!(values?.config?.query ?? '').trim(); + const hasTable = !!(values?.config?.table_id ?? '').trim(); + if (!hasQuery && !((val ?? '').trim() && hasTable)) { + return 'Dataset ID is required when not using a custom SQL Query'; + } + return true; + }, + }, + { + label: 'Table ID', + name: 'config.table_id', + type: FormFieldType.Text, + required: false, + placeholder: 'customers', + tooltip: t('setting.bigqueryTableIdTip'), + customValidate: (val: string, values: any) => { + const hasQuery = !!(values?.config?.query ?? '').trim(); + const hasDataset = !!(values?.config?.dataset_id ?? '').trim(); + if (!hasQuery && !(hasDataset && (val ?? '').trim())) { + return 'Table ID is required when not using a custom SQL Query'; + } + return true; + }, + }, + { + label: 'SQL Query', + name: 'config.query', + type: FormFieldType.Textarea, + required: false, + placeholder: 'Leave empty to use Dataset ID + Table ID', + tooltip: t('setting.bigqueryQueryTip'), + customValidate: (val: string, values: any) => { + const hasQuery = !!(val ?? '').trim(); + const hasDataset = !!(values?.config?.dataset_id ?? '').trim(); + const hasTable = !!(values?.config?.table_id ?? '').trim(); + if (!hasQuery && !(hasDataset && hasTable)) { + return 'Provide a SQL Query, or both Dataset ID and Table ID'; + } + return true; + }, + }, + { + label: 'Content Columns', + name: 'config.content_columns', + type: FormFieldType.Text, + required: true, + placeholder: 'name,description,notes', + tooltip: t('setting.bigqueryContentColumnsTip'), + }, + { + label: 'Metadata Columns', + name: 'config.metadata_columns', + type: FormFieldType.Text, + required: false, + placeholder: 'customer_id,status,region', + tooltip: t('setting.bigqueryMetadataColumnsTip'), + }, + { + label: 'ID Column', + name: 'config.id_column', + type: FormFieldType.Text, + required: false, + placeholder: 'customer_id', + tooltip: t('setting.bigqueryIdColumnTip'), + }, + { + label: 'Timestamp Column', + name: 'config.timestamp_column', + type: FormFieldType.Text, + required: false, + placeholder: 'updated_at', + tooltip: t('setting.bigqueryTimestampColumnTip'), + }, + { + label: 'Max Bytes Billed', + name: 'config.maximum_bytes_billed', + type: FormFieldType.Number, + required: false, + placeholder: '1073741824', + tooltip: t('setting.bigqueryMaximumBytesBilledTip'), + validation: { + min: 1, + message: 'Max Bytes Billed must be at least 1', + }, + }, + { + label: 'Job Timeout (ms)', + name: 'config.job_timeout_ms', + type: FormFieldType.Number, + required: false, + placeholder: '300000', + tooltip: t('setting.bigqueryJobTimeoutMsTip'), + validation: { + min: 1, + message: 'Job Timeout must be at least 1', + }, + }, + { + label: 'Page Size', + name: 'config.page_size', + type: FormFieldType.Number, + required: false, + placeholder: '1000', + validation: { + min: 1, + message: 'Page Size must be at least 1', + }, + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + placeholder: '100', + validation: { + min: 1, + message: 'Batch Size must be at least 1', + }, + }, + { + label: 'Use Query Cache', + name: 'config.use_query_cache', + type: FormFieldType.Checkbox, + required: false, + defaultValue: true, + }, + ], [DataSourceKey.REST_API]: [ // ── Essential fields ────────────────────────────────────────────── { @@ -2150,6 +2319,29 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.BIGQUERY]: { + name: '', + source: DataSourceKey.BIGQUERY, + config: { + project_id: '', + dataset_id: '', + table_id: '', + location: '', + query: '', + content_columns: '', + metadata_columns: '', + id_column: '', + timestamp_column: '', + batch_size: 100, + page_size: 1000, + maximum_bytes_billed: 1073741824, + job_timeout_ms: 300000, + use_query_cache: true, + credentials: { + service_account_json: '', + }, + }, + }, [DataSourceKey.ONEDRIVE]: { name: '', source: DataSourceKey.ONEDRIVE, diff --git a/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx b/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx index c3c1b80814..ae1f05c873 100644 --- a/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx +++ b/web/src/pages/user-setting/data-source/data-source-detail-page/index.tsx @@ -244,7 +244,8 @@ const SourceDetailPage = () => { />
- {detail?.source === DataSourceKey.REST_API && ( + {(detail?.source === DataSourceKey.REST_API || + detail?.source === DataSourceKey.BIGQUERY) && (