2025-11-03 19:59:18 +08:00
|
|
|
#
|
|
|
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
|
|
|
#
|
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
|
#
|
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
#
|
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
# from beartype import BeartypeConf
|
|
|
|
|
# from beartype.claw import beartype_all # <-- you didn't sign up for this
|
|
|
|
|
# beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
|
|
|
|
|
|
|
|
|
|
|
2026-01-12 12:48:23 +08:00
|
|
|
import time
|
2026-04-09 16:40:14 +08:00
|
|
|
start_ts = time.perf_counter()
|
2026-01-12 12:48:23 +08:00
|
|
|
|
2025-12-09 19:23:14 +08:00
|
|
|
import asyncio
|
2025-11-10 19:15:02 +08:00
|
|
|
import copy
|
2025-11-17 09:38:04 +08:00
|
|
|
import faulthandler
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import signal
|
2025-11-03 19:59:18 +08:00
|
|
|
import sys
|
|
|
|
|
import threading
|
|
|
|
|
import traceback
|
2025-11-17 09:38:04 +08:00
|
|
|
from datetime import datetime, timezone
|
2025-11-10 19:15:02 +08:00
|
|
|
from typing import Any
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-12 10:23:40 +08:00
|
|
|
from flask import json
|
|
|
|
|
|
2026-01-15 14:02:15 +08:00
|
|
|
from api.utils.common import hash128
|
2025-11-10 19:15:02 +08:00
|
|
|
from api.db.services.connector_service import ConnectorService, SyncLogsService
|
2026-04-09 16:40:14 +08:00
|
|
|
from api.db.services.document_service import DocumentService
|
2025-11-03 19:59:18 +08:00
|
|
|
from api.db.services.knowledgebase_service import KnowledgebaseService
|
2025-11-17 09:38:04 +08:00
|
|
|
from common import settings
|
2026-05-19 10:07:11 +08:00
|
|
|
from common.constants import ConnectorTaskType, FileSource, TaskStatus
|
2025-11-05 11:07:54 +08:00
|
|
|
from common.config_utils import show_configs
|
Feature/generic api connector (#13545)
# feat: Add Generic REST API Connector
## What problem does this PR solve?
RAGFlow supports many specific data source connectors (MySQL, Slack,
Google Drive, etc.), but there was no way to connect an arbitrary REST
API as a data source. Users with custom or third-party APIs had to write
a new connector class for each one.
This PR adds a **generic, configuration-driven REST API connector** that
lets users connect any REST API as a data source entirely through the UI
— no code changes needed per API.
---
## Features
### Core Connector (`common/data_source/rest_api_connector.py`)
- Implements `LoadConnector` and `PollConnector` interfaces for full and
incremental sync
- **Configurable authentication:** None, API Key (custom header), Bearer
Token, Basic Auth
- **Pluggable pagination:** Page-based, Offset-based, Cursor-based, or
None
- Smart page-size inference from user's query parameters to avoid
duplicate/conflicting params
- Configurable request delay between pages to prevent API rate limiting
- Auto-detection of the items array in JSON responses (`items`,
`results`, `data`, `records`, or first list found)
- **Advanced field mapping** with dot-notation (`country.name`), array
wildcards (`newsType[*].name`), type hints, and default values
- Optional content template rendering (`"Title: {title}\nBody: {body}"`)
- HTML stripping for content fields
- Stable document IDs via `hash128` from a configurable ID field or
auto-generated from item content
- Pydantic configuration schema with automatic coercion of UI string
inputs to dicts/lists
### Backend Registration (`rag/svr/sync_data_source.py`,
`common/constants.py`, `common/data_source/config.py`)
- `REST_API` sync class wired into RAGFlow's `func_factory`
- Full sync (`load_from_state`) and incremental polling (`poll_source`)
support
- Credentials and config passed from task to connector following
existing patterns (MySQL, SeaFile, etc.)
### Test Connection Endpoint (`api/apps/connector_app.py`)
- `POST /v1/connector/<id>/test` validates config schema,
authentication, and API connectivity without triggering a sync
- Clear error messages for auth failures vs. config issues
### Frontend UI (`web/src/pages/user-setting/data-source/constant/`)
- **Postman-style configuration:** Base URL, Query Parameters (key=value
per line), Auth, Content Fields, Metadata Fields, Pagination Type
- Auth-type-aware form: fields for API key header/value, Bearer token,
or Basic username/password appear only when relevant
- **Advanced Settings** toggle for: Custom Headers, Max Pages, Request
Delay, Poll Timestamp Field, Request Body (POST)
- Connector icon (SVG) and i18n strings (English)
- **"Test Connection"** button to validate before syncing
---
## Controls & Safety
- Configurable max pages safety cap (default: 1000, adjustable in UI)
- Configurable request delay between pages (default: 0.5s, adjustable in
UI)
- Auth errors (401/403) fail immediately without retries; transient
errors retry with exponential backoff
- Diagnostic logging: auth setup confirmation, request details on
failure, content field extraction status
---
## Type of change
- [x] New Feature (non-breaking change which adds functionality)
##Visual Screenshots of Features
<img width="482" height="510" alt="Screenshot 2026-03-11 at 5 19 52 PM"
src="https://github.com/user-attachments/assets/dcb7ab4a-1622-44f3-bb02-d6f0527314c4"
/>
(Connector can be configured within the external data sources tab)
Configuration Parameters:
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 46 PM"
src="https://github.com/user-attachments/assets/5e154e71-4ab5-4872-bfb2-04f02b73c18a"
/>
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 54 PM"
src="https://github.com/user-attachments/assets/00cb14b7-0bcf-4b94-9d71-34e93369ecb2"
/>
Connection can be tested before attaching to dataset:
<img width="981" height="681" alt="Screenshot 2026-03-11 at 5 21 40 PM"
src="https://github.com/user-attachments/assets/aaa6eeeb-89a7-4349-bc34-2423bf8be9ee"
/>
Ingestion tested with API connector (works perfectly fine):
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 22 30 PM"
src="https://github.com/user-attachments/assets/afcd0d58-cadd-4152-badc-d2f14d96fbec"
/>
Search & Retrieval works as well with metadata flow:
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 23 05 PM"
src="https://github.com/user-attachments/assets/d41ee935-dcf7-4456-b317-22a76ca032c0"
/>
---------
Co-authored-by: Ahmad Intisar <ahmadintisar@Ahmads-MacBook-M4-Pro.local>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 17:35:01 +05:00
|
|
|
from common.data_source.config import INDEX_BATCH_SIZE
|
2025-12-29 17:05:20 +08:00
|
|
|
from common.data_source import (
|
2025-12-30 17:09:13 +08:00
|
|
|
BlobStorageConnector,
|
2026-03-27 22:58:44 +08:00
|
|
|
RSSConnector,
|
2025-12-30 17:09:13 +08:00
|
|
|
NotionConnector,
|
|
|
|
|
DiscordConnector,
|
|
|
|
|
GoogleDriveConnector,
|
|
|
|
|
MoodleConnector,
|
|
|
|
|
JiraConnector,
|
|
|
|
|
DropboxConnector,
|
|
|
|
|
AirtableConnector,
|
2025-12-29 17:05:20 +08:00
|
|
|
AsanaConnector,
|
2025-12-31 14:40:49 +08:00
|
|
|
ImapConnector,
|
|
|
|
|
ZendeskConnector,
|
2026-02-03 06:42:05 +01:00
|
|
|
SeaFileConnector,
|
2026-02-03 23:14:32 -03:00
|
|
|
RDBMSConnector,
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
BigQueryConnector,
|
2026-03-06 21:13:23 +08:00
|
|
|
DingTalkAITableConnector,
|
Feature/generic api connector (#13545)
# feat: Add Generic REST API Connector
## What problem does this PR solve?
RAGFlow supports many specific data source connectors (MySQL, Slack,
Google Drive, etc.), but there was no way to connect an arbitrary REST
API as a data source. Users with custom or third-party APIs had to write
a new connector class for each one.
This PR adds a **generic, configuration-driven REST API connector** that
lets users connect any REST API as a data source entirely through the UI
— no code changes needed per API.
---
## Features
### Core Connector (`common/data_source/rest_api_connector.py`)
- Implements `LoadConnector` and `PollConnector` interfaces for full and
incremental sync
- **Configurable authentication:** None, API Key (custom header), Bearer
Token, Basic Auth
- **Pluggable pagination:** Page-based, Offset-based, Cursor-based, or
None
- Smart page-size inference from user's query parameters to avoid
duplicate/conflicting params
- Configurable request delay between pages to prevent API rate limiting
- Auto-detection of the items array in JSON responses (`items`,
`results`, `data`, `records`, or first list found)
- **Advanced field mapping** with dot-notation (`country.name`), array
wildcards (`newsType[*].name`), type hints, and default values
- Optional content template rendering (`"Title: {title}\nBody: {body}"`)
- HTML stripping for content fields
- Stable document IDs via `hash128` from a configurable ID field or
auto-generated from item content
- Pydantic configuration schema with automatic coercion of UI string
inputs to dicts/lists
### Backend Registration (`rag/svr/sync_data_source.py`,
`common/constants.py`, `common/data_source/config.py`)
- `REST_API` sync class wired into RAGFlow's `func_factory`
- Full sync (`load_from_state`) and incremental polling (`poll_source`)
support
- Credentials and config passed from task to connector following
existing patterns (MySQL, SeaFile, etc.)
### Test Connection Endpoint (`api/apps/connector_app.py`)
- `POST /v1/connector/<id>/test` validates config schema,
authentication, and API connectivity without triggering a sync
- Clear error messages for auth failures vs. config issues
### Frontend UI (`web/src/pages/user-setting/data-source/constant/`)
- **Postman-style configuration:** Base URL, Query Parameters (key=value
per line), Auth, Content Fields, Metadata Fields, Pagination Type
- Auth-type-aware form: fields for API key header/value, Bearer token,
or Basic username/password appear only when relevant
- **Advanced Settings** toggle for: Custom Headers, Max Pages, Request
Delay, Poll Timestamp Field, Request Body (POST)
- Connector icon (SVG) and i18n strings (English)
- **"Test Connection"** button to validate before syncing
---
## Controls & Safety
- Configurable max pages safety cap (default: 1000, adjustable in UI)
- Configurable request delay between pages (default: 0.5s, adjustable in
UI)
- Auth errors (401/403) fail immediately without retries; transient
errors retry with exponential backoff
- Diagnostic logging: auth setup confirmation, request details on
failure, content field extraction status
---
## Type of change
- [x] New Feature (non-breaking change which adds functionality)
##Visual Screenshots of Features
<img width="482" height="510" alt="Screenshot 2026-03-11 at 5 19 52 PM"
src="https://github.com/user-attachments/assets/dcb7ab4a-1622-44f3-bb02-d6f0527314c4"
/>
(Connector can be configured within the external data sources tab)
Configuration Parameters:
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 46 PM"
src="https://github.com/user-attachments/assets/5e154e71-4ab5-4872-bfb2-04f02b73c18a"
/>
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 54 PM"
src="https://github.com/user-attachments/assets/00cb14b7-0bcf-4b94-9d71-34e93369ecb2"
/>
Connection can be tested before attaching to dataset:
<img width="981" height="681" alt="Screenshot 2026-03-11 at 5 21 40 PM"
src="https://github.com/user-attachments/assets/aaa6eeeb-89a7-4349-bc34-2423bf8be9ee"
/>
Ingestion tested with API connector (works perfectly fine):
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 22 30 PM"
src="https://github.com/user-attachments/assets/afcd0d58-cadd-4152-badc-d2f14d96fbec"
/>
Search & Retrieval works as well with metadata flow:
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 23 05 PM"
src="https://github.com/user-attachments/assets/d41ee935-dcf7-4456-b317-22a76ca032c0"
/>
---------
Co-authored-by: Ahmad Intisar <ahmadintisar@Ahmads-MacBook-M4-Pro.local>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 17:35:01 +05:00
|
|
|
RestAPIConnector,
|
feat(connector): implement OneDrive data source connector (issue #15330) (#15331)
### What problem does this PR solve?
Closes #15330.
RAGFlow had no connector for OneDrive / OneDrive for Business. Users who
store working documents in OneDrive could not index them into a
knowledge base without manually downloading and re-uploading files.
This PR adds a net-new OneDrive data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Enumerates every drive visible to the service principal and pages
through `/drives/{id}/root/delta`, persisting `@odata.deltaLink` values
per drive so subsequent syncs only fetch changed items.
- Optionally narrows ingestion to a sub-folder (`folder_path`) without
needing a separate code path.
- Surfaces typed errors on the validation probe (`GET /drives?$top=1`):
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with a `Files.Read.All` hint), 5xx →
`UnexpectedValidationError`.
- Filters folders, soft-deleted items, and unsupported extensions (`.pdf
.docx .doc .xlsx .xls .pptx .ppt .txt .md .csv`).
#### Files
| File | Change |
|------|--------|
| `common/data_source/onedrive_connector.py` | **New** —
`OneDriveConnector` + `OneDriveCheckpoint`. |
| `common/data_source/config.py` | `DocumentSource.ONEDRIVE =
"onedrive"`. |
| `common/constants.py` | `FileSource.ONEDRIVE = "onedrive"`. |
| `common/data_source/__init__.py` | Export `OneDriveConnector`. |
| `rag/svr/sync_data_source.py` | `OneDrive(SyncBase)` with `batch_size`
normalisation; registered in `func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.ONEDRIVE`, visibility map (`syncDeletedFiles: true`),
info entry, form fields (tenant_id, client_id, client_secret,
folder_path, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`onedriveDescription` + 4 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_onedrive_connector_unit.py` | **New**
— 13 unit tests (`p1`/`p2`) covering auth, validation, checkpoint
helpers, and document filtering. |
#### Required Azure AD permission
`Files.Read.All` (Application, admin-granted).
#### Out of scope
- Interactive end-user OAuth (delegated permissions) — the connector
uses app-only credentials, consistent with the SharePoint / Teams
precedent.
- Binary download of file contents — the sync layer emits `Document`s
carrying `webUrl` + metadata; bytes are hydrated downstream by the parse
pipeline.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 05:26:06 -06:00
|
|
|
OneDriveConnector,
|
feat(connector): implement Outlook data source connector (issue #15332) (#15333)
### What problem does this PR solve?
Closes #15332.
RAGFlow can index Gmail and generic IMAP mailboxes but had no native
connector for Outlook / Microsoft 365 mail. Organisations on Microsoft
365 had no way to bring mailbox content into a knowledge base through
Microsoft Graph.
This PR adds a net-new Outlook data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Pages over `/users/{id}/mailFolders/{folder}/messages/delta` per
mailbox and persists `@odata.deltaLink` values in
`OutlookCheckpoint.delta_links`, so incremental syncs only fetch changed
messages.
- Supports two scoping modes:
- **Tenant-wide** (default): enumerates every user in the tenant via
`/users` and syncs each mailbox. Requires `User.Read.All`.
- **Targeted**: when `user_ids` is provided (comma-separated UPNs or
object IDs), only those mailboxes are synced. `User.Read.All` is not
needed in this mode.
- Lets the caller pick the mail folder (`inbox`, `sentitems`, `archive`,
...). Defaults to `inbox`.
- Maps each message to a `Document` shaped after the Gmail connector:
one `TextSection` carrying `From/To/Cc/Subject` headers + body, with
HTML bodies stripped to text inline (no extra dependency).
- Surfaces typed errors on the validation probe:
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with `Mail.Read` / `User.Read.All`
hint), 404 on a configured mailbox → `ConnectorValidationError`, 5xx →
`UnexpectedValidationError`.
- Skips messages flagged `@removed` by the delta semantics and messages
whose `receivedDateTime` is older than `poll_range_start`.
#### Files
| File | Change |
|------|--------|
| `common/data_source/outlook_connector.py` | **New** —
`OutlookConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`) + `OutlookCheckpoint` + tiny `_strip_html`
helper. |
| `common/data_source/config.py` | `DocumentSource.OUTLOOK = "outlook"`.
|
| `common/constants.py` | `FileSource.OUTLOOK = "outlook"`. |
| `common/data_source/__init__.py` | Export `OutlookConnector`. |
| `rag/svr/sync_data_source.py` | `Outlook(SyncBase)` with `batch_size`
normalisation, CSV/list parsing of `user_ids`; registered in
`func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.OUTLOOK`, visibility map (`syncDeletedFiles: true`), info
entry, form fields (tenant_id, client_id, client_secret, folder,
user_ids, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`outlookDescription` + 5 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_outlook_connector_unit.py` | **New**
— 19 unit tests (`p1`/`p2`/`p3`) covering auth, validation (tenant-wide
vs specific user vs error paths), checkpoint helpers, user enumeration
pagination, message filtering, HTML body stripping. |
#### Required Azure AD permissions
- `Mail.Read` (Application, admin-granted) — always.
- `User.Read.All` (Application, admin-granted) — only when `user_ids` is
left blank so the connector can enumerate mailboxes.
#### Out of scope
- **Attachment indexing.** The current connector emits message body +
headers; binary attachments are flagged via `metadata.has_attachments`
but not pulled. Adding attachment hydration is straightforward but
scoped out per the issue's "decide whether attachments are indexed in
the first version" note.
- **Delegated (per-user) OAuth.** The connector uses app-only
credentials, consistent with the SharePoint / Teams precedent in this
codebase.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 07:52:29 -06:00
|
|
|
OutlookConnector,
|
feat(connectors): add Azure Blob Storage data source connector (#15466)
### What problem does this PR solve?
Closes #15465.
RAGFlow supports S3, Google Cloud Storage, R2, and OCI as data sources
but not Azure Blob Storage, leaving Azure users without a way to index
container objects into a knowledge base. This adds a first-class Azure
Blob Storage data-source connector — distinct from RAGFlow's existing
Azure storage *backends* (`rag/utils/azure_sas_conn.py`,
`rag/utils/azure_spn_conn.py`) which store RAGFlow's own files.
**Highlights**
- `common/data_source/azure_blob_connector.py`: new `AzureBlobConnector`
(`CheckpointedConnectorWithPermSync` + `SlimConnectorWithPermSync`).
- Uses the existing `azure-storage-blob` dependency (already in
`pyproject.toml`).
- Three auth modes, tried in order of precedence:
1. **Account key** — `account_name` + `account_key` + `container_name`.
2. **Connection string** — `connection_string` + `container_name`.
3. **SAS token** — `container_url` + `sas_token` (same shape as
`RAGFlowAzureSasBlob`).
- ETag fingerprint stored per blob in `AzureBlobCheckpoint.etags` —
unchanged blobs (same ETag as last run) are skipped without a download.
Only new/modified blobs are fetched.
- Optional `prefix` scopes indexing to a virtual folder.
- `validate_connector_settings()` probes `get_container_properties()`
and maps `AuthenticationFailed / 403 / ContainerNotFound` to typed
connector exceptions.
- Slim-doc IDs are blob names so prune reconciles correctly.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `azure_blob` in `FileSource`
/ `DocumentSource` and export `AzureBlobConnector`.
- `rag/svr/sync_data_source.py`: new `AzureBlob(SyncBase)` class routed
through `load_from_checkpoint` (ETag fingerprint owns change-detection)
and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.AZURE_BLOB`, auth-mode selector (account key / connection
string / SAS token), all credential fields, prefix + batch-size,
`syncDeletedFiles` capability, default form values, tile entry with
icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips for all
9 new keys.
- `web/src/assets/svg/data-source/azure-blob.svg`: Azure-branded
stacked-cylinders icon.
**Verification**
- `npm run build` (vite + esbuild) passes (37 s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 07:06:01 -06:00
|
|
|
AzureBlobConnector,
|
feat(connectors): add Salesforce CRM data source connector (#15462)
### What problem does this PR solve?
Closes #15461.
RAGFlow had no way to ingest Salesforce CRM data, so support / sales
teams couldn't ground responses on live Accounts, Contacts,
Opportunities, Cases, or Knowledge articles. This adds a first-class
Salesforce data source connector that authenticates against a Connected
App via OAuth 2.0 client-credentials, queries selected SObjects via
SOQL, and turns each record into an indexable document with incremental
sync.
**Highlights**
- `common/data_source/salesforce_connector.py`: new
`SalesforceConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`).
- OAuth 2.0 client-credentials flow; canonical `instance_url` from the
token response so multi-pod orgs route correctly.
- Per-object `SystemModstamp` cursor stored in
`SalesforceCheckpoint.cursors` — a failure mid-object doesn't rewind
sibling objects, and re-syncs only fetch changed rows.
- Deterministic record-to-text formatter (sorted keys) so SOQL field
reordering on the server doesn't mark every row "changed" on each poll.
- `_get_json` raises on non-2xx so 429 / 5xx never silently advance the
checkpoint past missing data.
- `Knowledge__kav` is in the default object set but is skipped silently
when the org doesn't have Salesforce Knowledge enabled (404 on
describe).
- Slim-doc IDs are scoped as `<Object>/<Id>` so prune deletes can't
collide across object types.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `salesforce` in `FileSource`
/ `DocumentSource` and export `SalesforceConnector`.
- `rag/svr/sync_data_source.py`: new `Salesforce(SyncBase)` class routed
through `load_from_checkpoint` (poll_source would re-walk every object
each run) and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.SALESFORCE`, form fields (instance URL, client ID/secret,
objects, api_version, batch size), `syncDeletedFiles` capability,
default form values, and tile entry with the new icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips.
- `web/src/assets/svg/data-source/salesforce.svg`: 48x48 brand-style
icon to match the other Microsoft / cloud tiles.
**Verification**
- `npm run build` (vite + esbuild) passes (1m 26s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 23:24:36 -06:00
|
|
|
SalesforceConnector,
|
feat: implement Microsoft Teams data source connector (#15193)
### What problem does this PR solve?
Closes #15191.
RAGFlow shipped a Microsoft Teams connector stub
(`common/data_source/teams_connector.py`) whose document-loading methods
all returned `[]`, `Teams._generate()` was a `pass`, and Teams was
commented out of the data-source settings UI. As a result there was no
way to index Teams channel conversations into a knowledge base.
This PR implements the connector end to end on top of Microsoft Graph
(Office365-REST-Python-Client). It shares the MSAL client-credentials
auth shape with the SharePoint connector.
**Backend**
- `common/data_source/teams_connector.py`
- `load_credentials()` now builds the Graph client using an MSAL
client-credentials **token callback** — the form `GraphClient` actually
expects. (The previous stub passed a raw access-token string to
`GraphClient(...)`, which is not how that client is driven.) Token
acquisition is lazy, so credential loading performs no network call.
- `validate_connector_settings()` lists teams via Graph.
- `load_from_checkpoint()` is now a generator that pages teams →
channels → messages, flattens each top-level post together with its
replies into one blob-based `Document` (`extension` `.txt`/`.html`,
`blob`, `size_bytes`, `doc_updated_at`). Incremental syncs are bounded
by message `lastModifiedDateTime` (falling back to `createdDateTime`).
Per-message errors surface as `ConnectorFailure` instead of aborting the
run.
- `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument`
batches and the checkpoint helpers return proper `TeamsCheckpoint`s.
- ACL → `ExternalAccess` mapping is intentionally left best-effort
(`load_from_checkpoint_with_perm_sync` delegates to the standard load)
because the sync pipeline does not currently persist `ExternalAccess`.
- `rag/svr/sync_data_source.py`
- Implemented `Teams._generate()` using the existing
`CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google
Drive), supporting full reindex and incremental polling from
`poll_range_start`.
- `TeamsConnector` is already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `TEAMS` data-source enum and added its form fields
(`tenant_id`, `client_id`, `client_secret`), default values, display
metadata, and a Teams icon.
- Added `teamsDescription` / `teamsTenantIdTip` to `en.ts` and `zh.ts`.
**Tests**
- `test/unit_test/data_source/test_teams_connector_unit.py`: mock-based
unit tests covering credential loading (incomplete creds raise, happy
path sets the Graph client, fetch-without-creds raises), post/reply
flattening (incl. the HTML vs text extension), incremental
`lastModifiedDateTime` filtering, and slim-doc listing. All 6 pass;
`ruff check` is clean.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-28 03:10:38 -06:00
|
|
|
TeamsConnector,
|
feat: implement Slack data source connector (#15188)
### What problem does this PR solve?
Closes #15187.
RAGFlow shipped a Slack connector
(`common/data_source/slack_connector.py`) but it was never usable:
`Slack._generate()` in the sync worker was a `pass` stub, the
connector's document-generating code was incompatible with the current
data model,
and Slack was commented out of the data-source settings UI. As a result,
teams had no way to index Slack channels/threads into a knowledge base.
This PR completes the connector end to end.
**Backend**
- `common/data_source/slack_connector.py`
- Rewrote `thread_to_doc` to produce a blob-based `Document`
(`extension`/`blob`/`size_bytes`). The previous implementation built the
doc with a `sections=[...]` argument and omitted the now-required
`blob`/`extension`/ `size_bytes` fields, so it raised a validation error
against the current `Document` model. Thread messages are now cleaned
and flattened into a single UTF-8 text blob.
- Added `load_from_state()` / `poll_source(start, end)` generators. The
connector's checkpoint interface is a no-op stub, so both full and
incremental syncs run through a single channel-iterating generator built
on the existing module helpers (`get_channels`, `filter_channels`,
`get_channel_messages`, `_process_message`), with per-channel thread
de-duplication.
- `rag/svr/sync_data_source.py`
- Implemented `Slack._generate()`. Credentials are loaded via
`StaticCredentialsProvider` (the connector requires `slack_bot_token`
and does not support `load_credentials`). Supports full reindex and
incremental polling from `poll_range_start`, plus the optional channel
filter. Modeled on the Confluence/Dropbox wrappers.
- `SlackConnector` was already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `SLACK` data-source enum and added its form fields (Slack
bot token + optional channel filter), default values, display metadata,
and a Slack icon.
- Added `slackDescription` / `slackBotTokenTip` / `slackChannelsTip`
strings to `en.ts` and `zh.ts`.
**Tests**
- `test/unit_test/data_source/test_slack_connector_unit.py`: unit tests
covering credential loading (`load_credentials` raises,
`set_credentials_provider` initializes clients, missing credentials
raises) and document generation (standalone message + flattened thread,
blob/extension/size_bytes/metadata, and the incremental poll time
window). All 5 pass; `ruff check` is clean.
Required Slack scopes: `channels:read`, `channels:history`,
`users:read`.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-28 01:46:07 -06:00
|
|
|
SlackConnector,
|
feat: implement SharePoint data source connector (#15190)
### What problem does this PR solve?
Closes #15189.
RAGFlow shipped a SharePoint connector stub
(`common/data_source/sharepoint_connector.py`) whose document-loading
methods all returned `[]`, `SharePoint._generate()` was a `pass`, and
SharePoint was commented out of the data-source settings UI. As a result
there was no way to index files stored in SharePoint document libraries.
This PR implements the connector end to end on top of Microsoft Graph
(Office365-REST-Python-Client).
**Backend**
- `common/data_source/sharepoint_connector.py`
- `load_credentials()` now builds the Graph client using an MSAL
client-credentials **token callback** — the form `GraphClient` actually
expects. (The previous stub passed a raw access-token string to
`GraphClient(...)`, which is not how that client is driven.) Token
acquisition is lazy, so credential loading does no network call.
- `validate_connector_settings()` resolves the configured site via
Graph.
- `load_from_checkpoint()` is now a generator that enumerates every
document library under the site, walks folders depth-first, downloads
each file, and yields blob-based `Document` objects (`extension` /
`blob` / `size_bytes` / `doc_updated_at`). Incremental syncs are bounded
by file `lastModifiedDateTime`. Per-file errors are surfaced as
`ConnectorFailure` rather than aborting the run.
- `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument`
batches (no downloads) and the checkpoint helpers return proper
checkpoints.
- ACL → `ExternalAccess` mapping is intentionally left best-effort
(`load_from_checkpoint_with_perm_sync` delegates to the standard load)
because the sync pipeline does not currently persist `ExternalAccess`;
this can be extended once that plumbing exists.
- `rag/svr/sync_data_source.py`
- Implemented `SharePoint._generate()` using the existing
`CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google
Drive), supporting full reindex and incremental polling from
`poll_range_start`.
- `SharePointConnector` is already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `SHAREPOINT` data-source enum and added its form fields
`site_url`, `tenant_id`, `client_id`, `client_secret`), default values,
display metadata, and a SharePoint icon.
- Added `sharepointDescription` / `sharepointSiteUrlTip` to `en.ts` and
`zh.ts`.
**Tests**
- `test/unit_test/data_source/test_sharepoint_connector_unit.py`:
mock-based unit tests covering credential loading (incomplete creds
raise, happy path sets the Graph client, fetch-without-creds raises),
drive traversal + file download, incremental `lastModifiedDateTime`
filtering, and slim-doc listing. All 6 pass; `ruff check` is clean.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-27 23:26:08 -06:00
|
|
|
SharePointConnector,
|
2025-12-29 17:05:20 +08:00
|
|
|
)
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
from common.data_source.models import ConnectorFailure, SeafileSyncScope
|
2025-12-31 19:00:00 +08:00
|
|
|
from common.data_source.webdav_connector import WebDAVConnector
|
2025-11-04 17:29:11 +08:00
|
|
|
from common.data_source.confluence_connector import ConfluenceConnector
|
2025-11-28 13:09:40 +08:00
|
|
|
from common.data_source.gmail_connector import GmailConnector
|
2025-12-12 10:23:40 +08:00
|
|
|
from common.data_source.box_connector import BoxConnector
|
2025-12-30 15:09:52 +08:00
|
|
|
from common.data_source.github.connector import GithubConnector
|
2025-12-29 17:05:20 +08:00
|
|
|
from common.data_source.gitlab_connector import GitlabConnector
|
2025-12-31 17:18:30 +08:00
|
|
|
from common.data_source.bitbucket.connector import BitbucketConnector
|
2025-11-10 19:15:02 +08:00
|
|
|
from common.data_source.interfaces import CheckpointOutputWrapper
|
Feature/generic api connector (#13545)
# feat: Add Generic REST API Connector
## What problem does this PR solve?
RAGFlow supports many specific data source connectors (MySQL, Slack,
Google Drive, etc.), but there was no way to connect an arbitrary REST
API as a data source. Users with custom or third-party APIs had to write
a new connector class for each one.
This PR adds a **generic, configuration-driven REST API connector** that
lets users connect any REST API as a data source entirely through the UI
— no code changes needed per API.
---
## Features
### Core Connector (`common/data_source/rest_api_connector.py`)
- Implements `LoadConnector` and `PollConnector` interfaces for full and
incremental sync
- **Configurable authentication:** None, API Key (custom header), Bearer
Token, Basic Auth
- **Pluggable pagination:** Page-based, Offset-based, Cursor-based, or
None
- Smart page-size inference from user's query parameters to avoid
duplicate/conflicting params
- Configurable request delay between pages to prevent API rate limiting
- Auto-detection of the items array in JSON responses (`items`,
`results`, `data`, `records`, or first list found)
- **Advanced field mapping** with dot-notation (`country.name`), array
wildcards (`newsType[*].name`), type hints, and default values
- Optional content template rendering (`"Title: {title}\nBody: {body}"`)
- HTML stripping for content fields
- Stable document IDs via `hash128` from a configurable ID field or
auto-generated from item content
- Pydantic configuration schema with automatic coercion of UI string
inputs to dicts/lists
### Backend Registration (`rag/svr/sync_data_source.py`,
`common/constants.py`, `common/data_source/config.py`)
- `REST_API` sync class wired into RAGFlow's `func_factory`
- Full sync (`load_from_state`) and incremental polling (`poll_source`)
support
- Credentials and config passed from task to connector following
existing patterns (MySQL, SeaFile, etc.)
### Test Connection Endpoint (`api/apps/connector_app.py`)
- `POST /v1/connector/<id>/test` validates config schema,
authentication, and API connectivity without triggering a sync
- Clear error messages for auth failures vs. config issues
### Frontend UI (`web/src/pages/user-setting/data-source/constant/`)
- **Postman-style configuration:** Base URL, Query Parameters (key=value
per line), Auth, Content Fields, Metadata Fields, Pagination Type
- Auth-type-aware form: fields for API key header/value, Bearer token,
or Basic username/password appear only when relevant
- **Advanced Settings** toggle for: Custom Headers, Max Pages, Request
Delay, Poll Timestamp Field, Request Body (POST)
- Connector icon (SVG) and i18n strings (English)
- **"Test Connection"** button to validate before syncing
---
## Controls & Safety
- Configurable max pages safety cap (default: 1000, adjustable in UI)
- Configurable request delay between pages (default: 0.5s, adjustable in
UI)
- Auth errors (401/403) fail immediately without retries; transient
errors retry with exponential backoff
- Diagnostic logging: auth setup confirmation, request details on
failure, content field extraction status
---
## Type of change
- [x] New Feature (non-breaking change which adds functionality)
##Visual Screenshots of Features
<img width="482" height="510" alt="Screenshot 2026-03-11 at 5 19 52 PM"
src="https://github.com/user-attachments/assets/dcb7ab4a-1622-44f3-bb02-d6f0527314c4"
/>
(Connector can be configured within the external data sources tab)
Configuration Parameters:
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 46 PM"
src="https://github.com/user-attachments/assets/5e154e71-4ab5-4872-bfb2-04f02b73c18a"
/>
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 54 PM"
src="https://github.com/user-attachments/assets/00cb14b7-0bcf-4b94-9d71-34e93369ecb2"
/>
Connection can be tested before attaching to dataset:
<img width="981" height="681" alt="Screenshot 2026-03-11 at 5 21 40 PM"
src="https://github.com/user-attachments/assets/aaa6eeeb-89a7-4349-bc34-2423bf8be9ee"
/>
Ingestion tested with API connector (works perfectly fine):
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 22 30 PM"
src="https://github.com/user-attachments/assets/afcd0d58-cadd-4152-badc-d2f14d96fbec"
/>
Search & Retrieval works as well with metadata flow:
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 23 05 PM"
src="https://github.com/user-attachments/assets/d41ee935-dcf7-4456-b317-22a76ca032c0"
/>
---------
Co-authored-by: Ahmad Intisar <ahmadintisar@Ahmads-MacBook-M4-Pro.local>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 17:35:01 +05:00
|
|
|
from common.data_source.exceptions import ConnectorValidationError
|
2025-11-17 09:38:04 +08:00
|
|
|
from common.log_utils import init_root_logger
|
2025-11-05 11:07:54 +08:00
|
|
|
from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc
|
2025-11-17 09:38:04 +08:00
|
|
|
from common.versions import get_ragflow_version
|
2025-12-12 10:23:40 +08:00
|
|
|
from box_sdk_gen import BoxOAuth, OAuthConfig, AccessToken
|
2025-11-17 09:38:04 +08:00
|
|
|
MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5"))
|
2025-12-09 19:23:14 +08:00
|
|
|
task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
feat(connector): implement Outlook data source connector (issue #15332) (#15333)
### What problem does this PR solve?
Closes #15332.
RAGFlow can index Gmail and generic IMAP mailboxes but had no native
connector for Outlook / Microsoft 365 mail. Organisations on Microsoft
365 had no way to bring mailbox content into a knowledge base through
Microsoft Graph.
This PR adds a net-new Outlook data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Pages over `/users/{id}/mailFolders/{folder}/messages/delta` per
mailbox and persists `@odata.deltaLink` values in
`OutlookCheckpoint.delta_links`, so incremental syncs only fetch changed
messages.
- Supports two scoping modes:
- **Tenant-wide** (default): enumerates every user in the tenant via
`/users` and syncs each mailbox. Requires `User.Read.All`.
- **Targeted**: when `user_ids` is provided (comma-separated UPNs or
object IDs), only those mailboxes are synced. `User.Read.All` is not
needed in this mode.
- Lets the caller pick the mail folder (`inbox`, `sentitems`, `archive`,
...). Defaults to `inbox`.
- Maps each message to a `Document` shaped after the Gmail connector:
one `TextSection` carrying `From/To/Cc/Subject` headers + body, with
HTML bodies stripped to text inline (no extra dependency).
- Surfaces typed errors on the validation probe:
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with `Mail.Read` / `User.Read.All`
hint), 404 on a configured mailbox → `ConnectorValidationError`, 5xx →
`UnexpectedValidationError`.
- Skips messages flagged `@removed` by the delta semantics and messages
whose `receivedDateTime` is older than `poll_range_start`.
#### Files
| File | Change |
|------|--------|
| `common/data_source/outlook_connector.py` | **New** —
`OutlookConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`) + `OutlookCheckpoint` + tiny `_strip_html`
helper. |
| `common/data_source/config.py` | `DocumentSource.OUTLOOK = "outlook"`.
|
| `common/constants.py` | `FileSource.OUTLOOK = "outlook"`. |
| `common/data_source/__init__.py` | Export `OutlookConnector`. |
| `rag/svr/sync_data_source.py` | `Outlook(SyncBase)` with `batch_size`
normalisation, CSV/list parsing of `user_ids`; registered in
`func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.OUTLOOK`, visibility map (`syncDeletedFiles: true`), info
entry, form fields (tenant_id, client_id, client_secret, folder,
user_ids, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`outlookDescription` + 5 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_outlook_connector_unit.py` | **New**
— 19 unit tests (`p1`/`p2`/`p3`) covering auth, validation (tenant-wide
vs specific user vs error paths), checkpoint helpers, user enumeration
pagination, message filtering, HTML body stripping. |
#### Required Azure AD permissions
- `Mail.Read` (Application, admin-granted) — always.
- `User.Read.All` (Application, admin-granted) — only when `user_ids` is
left blank so the connector can enumerate mailboxes.
#### Out of scope
- **Attachment indexing.** The current connector emits message body +
headers; binary attachments are flagged via `metadata.has_attachments`
but not pulled. Adding attachment hydration is straightforward but
scoped out per the issue's "decide whether attachments are indexed in
the first version" note.
- **Delegated (per-user) OAuth.** The connector uses app-only
credentials, consistent with the SharePoint / Teams precedent in this
codebase.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 07:52:29 -06:00
|
|
|
def _redact_mailbox(value: str) -> str:
|
|
|
|
|
"""Return a privacy-preserving representation of a UPN / email / object id.
|
|
|
|
|
|
|
|
|
|
Sync logs surface connector configuration verbatim, so leaking the
|
|
|
|
|
full mailbox list of a tenant is enough to inventory their org from
|
|
|
|
|
a single log file. Preserve the first two characters of the local
|
|
|
|
|
part as a debugging hint and mask the rest.
|
|
|
|
|
"""
|
|
|
|
|
if not value:
|
|
|
|
|
return "<empty>"
|
|
|
|
|
if "@" in value:
|
|
|
|
|
local, _, _domain = value.partition("@")
|
|
|
|
|
local_mask = local if len(local) <= 2 else local[:2] + "***"
|
|
|
|
|
return f"{local_mask}@***"
|
|
|
|
|
return f"{value[:4]}***" if len(value) > 4 else "***"
|
|
|
|
|
|
|
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
class SyncBase:
|
2026-04-29 07:34:36 +05:30
|
|
|
"""
|
|
|
|
|
Base class for all data source synchronization connectors.
|
|
|
|
|
|
|
|
|
|
Defines the standard interface for connecting to external APIs, polling for
|
|
|
|
|
new or updated documents, and managing synchronization state intervals.
|
|
|
|
|
"""
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = None
|
|
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
def __init__(self, conf: dict) -> None:
|
|
|
|
|
self.conf = conf
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
@staticmethod
|
|
|
|
|
def _format_window_boundary(value: datetime | None) -> str:
|
|
|
|
|
if value is None:
|
|
|
|
|
return "beginning"
|
|
|
|
|
return value.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def window_info(cls, task: dict) -> str:
|
|
|
|
|
window_start = None
|
|
|
|
|
if task.get("reindex") != "1" and task.get("poll_range_start"):
|
|
|
|
|
window_start = task["poll_range_start"]
|
|
|
|
|
window_end = datetime.now(timezone.utc)
|
|
|
|
|
return (
|
|
|
|
|
f"sync window: {cls._format_window_boundary(window_start)}"
|
|
|
|
|
f" -> {cls._format_window_boundary(window_end)}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def log_connection(
|
|
|
|
|
cls,
|
|
|
|
|
name: str,
|
|
|
|
|
details: str,
|
|
|
|
|
task: dict,
|
|
|
|
|
extra: str = "",
|
|
|
|
|
):
|
|
|
|
|
if task.get("skip_connection_log"):
|
|
|
|
|
return
|
|
|
|
|
if extra:
|
|
|
|
|
logging.info("Connect to %s: %s, %s, %s", name, details, cls.window_info(task), extra)
|
|
|
|
|
return
|
|
|
|
|
logging.info("Connect to %s: %s, %s", name, details, cls.window_info(task))
|
|
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
async def __call__(self, task: dict):
|
2026-04-29 07:34:36 +05:30
|
|
|
"""
|
|
|
|
|
Entry point for executing a synchronization task worker.
|
|
|
|
|
|
|
|
|
|
Manages task execution boundaries including status logging, asynchronous
|
|
|
|
|
timeouts, and top-level exception handling, while delegating the core
|
|
|
|
|
ingestion logic to `_run_task_logic`.
|
|
|
|
|
"""
|
2025-11-05 14:51:00 +08:00
|
|
|
SyncLogsService.start(task["id"], task["connector_id"])
|
2025-12-09 19:23:14 +08:00
|
|
|
|
|
|
|
|
async with task_limiter:
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.wait_for(self._run_task_logic(task), timeout=task["timeout_secs"])
|
|
|
|
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
msg = f"Task timeout after {task['timeout_secs']} seconds"
|
|
|
|
|
SyncLogsService.update_by_id(task["id"], {"status": TaskStatus.FAIL, "error_msg": msg})
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
msg = "\n".join([
|
|
|
|
|
"".join(traceback.format_exception_only(None, ex)).strip(),
|
|
|
|
|
"".join(traceback.format_exception(None, ex, ex.__traceback__)).strip(),
|
|
|
|
|
])
|
|
|
|
|
SyncLogsService.update_by_id(task["id"], {
|
|
|
|
|
"status": TaskStatus.FAIL,
|
|
|
|
|
"full_exception_trace": msg,
|
|
|
|
|
"error_msg": str(ex)
|
|
|
|
|
})
|
|
|
|
|
return
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
task_type = task.get("task_type", ConnectorTaskType.SYNC)
|
|
|
|
|
if task_type == ConnectorTaskType.SYNC:
|
|
|
|
|
SyncLogsService.schedule(
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
task.get("poll_range_start"),
|
|
|
|
|
task_type=ConnectorTaskType.SYNC,
|
|
|
|
|
)
|
|
|
|
|
elif task_type == ConnectorTaskType.PRUNE and self.conf.get("sync_deleted_files"):
|
|
|
|
|
SyncLogsService.schedule(
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
task_type=ConnectorTaskType.PRUNE,
|
|
|
|
|
)
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-09 19:23:14 +08:00
|
|
|
async def _run_task_logic(self, task: dict):
|
2026-05-19 10:07:11 +08:00
|
|
|
task_type = task.get("task_type", ConnectorTaskType.SYNC)
|
|
|
|
|
if task_type == ConnectorTaskType.PRUNE:
|
|
|
|
|
await self._run_prune_task_logic(task)
|
|
|
|
|
return
|
|
|
|
|
await self._run_sync_task_logic(task)
|
|
|
|
|
|
|
|
|
|
async def _run_sync_task_logic(self, task: dict):
|
2026-04-29 07:34:36 +05:30
|
|
|
"""
|
|
|
|
|
Executes the core synchronization pipeline for a data source task.
|
|
|
|
|
"""
|
2026-05-19 10:07:11 +08:00
|
|
|
document_batch_generator = await self._generate(task)
|
2025-12-09 19:23:14 +08:00
|
|
|
|
|
|
|
|
failed_docs = 0
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
had_parse_errors = False
|
2026-04-09 16:40:14 +08:00
|
|
|
added_docs = 0
|
|
|
|
|
updated_docs = 0
|
2025-12-09 19:23:14 +08:00
|
|
|
next_update = datetime(1970, 1, 1, tzinfo=timezone.utc)
|
2026-04-09 16:40:14 +08:00
|
|
|
source_type = f"{self.SOURCE_NAME}/{task['connector_id']}"
|
|
|
|
|
existing_doc_ids = {
|
|
|
|
|
doc["id"]
|
|
|
|
|
for doc in DocumentService.list_doc_headers_by_kb_and_source_type(
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
source_type,
|
|
|
|
|
)
|
|
|
|
|
}
|
2025-12-09 19:23:14 +08:00
|
|
|
|
|
|
|
|
if task["poll_range_start"]:
|
|
|
|
|
next_update = task["poll_range_start"]
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
for document_batch in document_batch_generator:
|
2025-12-09 19:23:14 +08:00
|
|
|
if not document_batch:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
max_update = max(doc.doc_updated_at for doc in document_batch)
|
|
|
|
|
next_update = max(next_update, max_update)
|
|
|
|
|
|
|
|
|
|
docs = []
|
|
|
|
|
for doc in document_batch:
|
2026-06-15 16:54:25 +08:00
|
|
|
legacy_doc_id = hash128(f"{task['connector_id']}:{doc.id}")
|
|
|
|
|
new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{doc.id}")
|
2025-12-09 19:23:14 +08:00
|
|
|
d = {
|
2026-06-15 16:54:25 +08:00
|
|
|
"id": legacy_doc_id if legacy_doc_id in existing_doc_ids else new_doc_id,
|
2025-12-09 19:23:14 +08:00
|
|
|
"connector_id": task["connector_id"],
|
|
|
|
|
"source": self.SOURCE_NAME,
|
|
|
|
|
"semantic_identifier": doc.semantic_identifier,
|
|
|
|
|
"extension": doc.extension,
|
|
|
|
|
"size_bytes": doc.size_bytes,
|
|
|
|
|
"doc_updated_at": doc.doc_updated_at,
|
|
|
|
|
"blob": doc.blob,
|
|
|
|
|
}
|
|
|
|
|
if doc.metadata:
|
|
|
|
|
d["metadata"] = doc.metadata
|
feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?
S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.
The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.
No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.
This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.
#### Files touched
- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match
#### Worth flagging for review
Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.
On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.
#### Test plan
- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
---------
Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 05:03:56 -07:00
|
|
|
if getattr(doc, "fingerprint", None):
|
|
|
|
|
d["fingerprint"] = doc.fingerprint
|
2025-12-09 19:23:14 +08:00
|
|
|
docs.append(d)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
|
|
|
|
|
err, dids = SyncLogsService.duplicate_and_parse(
|
|
|
|
|
kb, docs, task["tenant_id"],
|
|
|
|
|
f"{self.SOURCE_NAME}/{task['connector_id']}",
|
|
|
|
|
task["auto_parse"]
|
|
|
|
|
)
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
if err:
|
|
|
|
|
had_parse_errors = True
|
2025-12-09 19:23:14 +08:00
|
|
|
SyncLogsService.increase_docs(
|
2026-03-18 09:31:05 -06:00
|
|
|
task["id"], max_update,
|
2025-12-09 19:23:14 +08:00
|
|
|
len(docs), "\n".join(err), len(err)
|
|
|
|
|
)
|
2026-04-09 16:40:14 +08:00
|
|
|
changed_doc_ids = set(dids)
|
|
|
|
|
updated_in_batch = len(changed_doc_ids & existing_doc_ids)
|
|
|
|
|
added_in_batch = len(changed_doc_ids) - updated_in_batch
|
|
|
|
|
added_docs += added_in_batch
|
|
|
|
|
updated_docs += updated_in_batch
|
|
|
|
|
existing_doc_ids.update(changed_doc_ids)
|
2025-12-09 19:23:14 +08:00
|
|
|
|
|
|
|
|
except Exception as batch_ex:
|
|
|
|
|
msg = str(batch_ex)
|
|
|
|
|
code = getattr(batch_ex, "args", [None])[0]
|
|
|
|
|
|
|
|
|
|
if code == 1267 or "collation" in msg.lower():
|
|
|
|
|
logging.warning(f"Skipping {len(docs)} document(s) due to collation conflict")
|
|
|
|
|
else:
|
|
|
|
|
logging.error(f"Error processing batch: {msg}")
|
|
|
|
|
|
|
|
|
|
failed_docs += len(docs)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
prefix = self._get_source_prefix()
|
2026-04-09 16:40:14 +08:00
|
|
|
prefix = f"{prefix} " if prefix else ""
|
|
|
|
|
next_update_info = self._format_window_boundary(next_update)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
total_changed_docs = added_docs + updated_docs
|
2026-04-09 16:40:14 +08:00
|
|
|
summary = (
|
|
|
|
|
f"{prefix}sync summary till {next_update_info}: "
|
|
|
|
|
f"total={total_changed_docs}, added={added_docs}, "
|
2026-05-19 10:07:11 +08:00
|
|
|
f"updated={updated_docs}"
|
2026-04-09 16:40:14 +08:00
|
|
|
)
|
2025-12-09 19:23:14 +08:00
|
|
|
if failed_docs > 0:
|
2026-04-09 16:40:14 +08:00
|
|
|
summary = f"{summary}, skipped={failed_docs}"
|
|
|
|
|
logging.info(summary)
|
2025-12-09 19:23:14 +08:00
|
|
|
|
2026-05-07 13:31:05 +08:00
|
|
|
if (
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
isinstance(self, _CursorPersistingSyncBase)
|
2026-05-07 13:31:05 +08:00
|
|
|
and failed_docs == 0
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
and not had_parse_errors
|
2026-05-07 13:31:05 +08:00
|
|
|
):
|
|
|
|
|
self.connector.persist_sync_state()
|
2025-12-09 19:23:14 +08:00
|
|
|
SyncLogsService.done(task["id"], task["connector_id"])
|
|
|
|
|
task["poll_range_start"] = next_update
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
async def _run_prune_task_logic(self, task: dict):
|
|
|
|
|
if not self.conf.get("sync_deleted_files"):
|
|
|
|
|
SyncLogsService.done(task["id"], task["connector_id"])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
await self._initialize_for_prune(task)
|
|
|
|
|
|
|
|
|
|
file_list = self._collect_prune_snapshot(task)
|
|
|
|
|
if file_list is None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
"%s prune snapshot retrieval failed (connector_id=%s, kb_id=%s)",
|
|
|
|
|
self.SOURCE_NAME,
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
)
|
|
|
|
|
SyncLogsService.done(task["id"], task["connector_id"])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
removed_docs, cleanup_errors = ConnectorService.cleanup_stale_documents_for_task(
|
|
|
|
|
task["id"],
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
task["tenant_id"],
|
|
|
|
|
file_list,
|
|
|
|
|
)
|
|
|
|
|
logging.info(
|
|
|
|
|
"%s prune summary: deleted=%s, errors=%s",
|
|
|
|
|
self.SOURCE_NAME,
|
|
|
|
|
removed_docs,
|
|
|
|
|
len(cleanup_errors),
|
|
|
|
|
)
|
|
|
|
|
SyncLogsService.done(task["id"], task["connector_id"])
|
|
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-11-03 19:59:18 +08:00
|
|
|
raise NotImplementedError
|
2025-12-09 19:23:14 +08:00
|
|
|
|
2025-12-01 17:46:44 +08:00
|
|
|
def _get_source_prefix(self):
|
|
|
|
|
return ""
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
async def _initialize_for_prune(self, task: dict):
|
|
|
|
|
await self._generate(task)
|
|
|
|
|
|
|
|
|
|
def _get_prune_snapshot_kwargs(self, task: dict) -> dict[str, Any]:
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
def _collect_prune_snapshot(self, task: dict):
|
|
|
|
|
if not getattr(self, "connector", None):
|
|
|
|
|
return None
|
|
|
|
|
if not hasattr(self.connector, "retrieve_all_slim_docs_perm_sync"):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
file_list = []
|
|
|
|
|
snapshot_kwargs = self._get_prune_snapshot_kwargs(task)
|
|
|
|
|
try:
|
|
|
|
|
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(**snapshot_kwargs):
|
|
|
|
|
file_list.extend(slim_batch)
|
|
|
|
|
except TypeError:
|
|
|
|
|
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
|
|
|
|
file_list.extend(slim_batch)
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception(
|
|
|
|
|
"%s prune snapshot failed (connector_id=%s, kb_id=%s)",
|
|
|
|
|
self.SOURCE_NAME,
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
)
|
|
|
|
|
return None
|
|
|
|
|
return file_list
|
|
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-22 09:36:16 +08:00
|
|
|
class _BlobLikeBase(SyncBase):
|
|
|
|
|
DEFAULT_BUCKET_TYPE: str = "s3"
|
2025-11-06 16:48:04 +08:00
|
|
|
|
feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?
S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.
The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.
No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.
This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.
#### Files touched
- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match
#### Worth flagging for review
Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.
On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.
#### Test plan
- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
---------
Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 05:03:56 -07:00
|
|
|
def _fingerprint_filtered_generator(self, task: dict):
|
|
|
|
|
"""Generator that uses list_keys() + get_value() to skip unchanged objects.
|
|
|
|
|
|
|
|
|
|
Pre-loads {doc_id: content_hash} for the connector's existing docs in
|
|
|
|
|
this KB, iterates the bucket via list_keys(), and only materializes a
|
|
|
|
|
Document (one GetObject call) when the listing fingerprint differs from
|
|
|
|
|
the persisted content_hash. Unchanged objects are skipped entirely --
|
|
|
|
|
no download, no re-parse.
|
|
|
|
|
|
|
|
|
|
Per-key fetch failures are counted and surfaced via SyncLogsService so
|
|
|
|
|
a partially failing sync (e.g. throttling, IAM regression mid-run)
|
|
|
|
|
doesn't silently report DONE while half the bucket is unreachable.
|
|
|
|
|
Connectors yielding KeyRecord(deleted=True) are skipped here -- actual
|
|
|
|
|
deletion reconciliation lives in the unified delete pass (PR-4).
|
|
|
|
|
"""
|
|
|
|
|
source_type = f"{self.SOURCE_NAME}/{task['connector_id']}"
|
|
|
|
|
existing_fingerprints = DocumentService.list_id_content_hash_map_by_kb_and_source_type(
|
|
|
|
|
task["kb_id"], source_type,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
bypass_count = 0
|
|
|
|
|
fetch_count = 0
|
|
|
|
|
fail_count = 0
|
|
|
|
|
batch = []
|
|
|
|
|
for key_record in self.connector.list_keys():
|
|
|
|
|
if key_record.deleted:
|
|
|
|
|
continue
|
|
|
|
|
|
2026-06-15 16:54:25 +08:00
|
|
|
legacy_doc_id = hash128(f"{task['connector_id']}:{key_record.key}")
|
|
|
|
|
new_doc_id = hash128(f"{task['kb_id']}:{task['connector_id']}:{key_record.key}")
|
|
|
|
|
stored = existing_fingerprints.get(legacy_doc_id, "") or existing_fingerprints.get(new_doc_id, "")
|
feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?
S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.
The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.
No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.
This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.
#### Files touched
- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match
#### Worth flagging for review
Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.
On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.
#### Test plan
- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
---------
Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 05:03:56 -07:00
|
|
|
if key_record.fingerprint and stored and key_record.fingerprint == stored:
|
|
|
|
|
bypass_count += 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
doc = self.connector.get_value(key_record.key)
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
fail_count += 1
|
|
|
|
|
logging.exception(
|
|
|
|
|
"Failed to fetch %s from %s: %s",
|
|
|
|
|
key_record.key,
|
|
|
|
|
self.SOURCE_NAME,
|
|
|
|
|
ex,
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
fetch_count += 1
|
|
|
|
|
batch.append(doc)
|
|
|
|
|
if len(batch) >= self.connector.batch_size:
|
|
|
|
|
yield batch
|
|
|
|
|
batch = []
|
|
|
|
|
|
|
|
|
|
if batch:
|
|
|
|
|
yield batch
|
|
|
|
|
|
|
|
|
|
log_msg = (
|
|
|
|
|
"[%s] fingerprint sync: %d bypassed, %d fetched, %d failed "
|
|
|
|
|
"(connector_id=%s, kb_id=%s)"
|
|
|
|
|
)
|
|
|
|
|
log_args = (
|
|
|
|
|
self.SOURCE_NAME,
|
|
|
|
|
bypass_count,
|
|
|
|
|
fetch_count,
|
|
|
|
|
fail_count,
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
task["kb_id"],
|
|
|
|
|
)
|
|
|
|
|
# Use WARNING when any fetch failed so partial-bucket regressions
|
|
|
|
|
# (auth, throttling, IAM drift) surface without diving into the
|
|
|
|
|
# per-exception traces above.
|
|
|
|
|
if fail_count:
|
|
|
|
|
logging.warning(log_msg, *log_args)
|
|
|
|
|
else:
|
|
|
|
|
logging.info(log_msg, *log_args)
|
|
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-12-22 09:36:16 +08:00
|
|
|
bucket_type = self.conf.get("bucket_type", self.DEFAULT_BUCKET_TYPE)
|
|
|
|
|
|
|
|
|
|
self.connector = BlobStorageConnector(
|
|
|
|
|
bucket_type=bucket_type,
|
|
|
|
|
bucket_name=self.conf["bucket_name"],
|
|
|
|
|
prefix=self.conf.get("prefix", ""),
|
|
|
|
|
)
|
2026-04-24 19:22:32 +08:00
|
|
|
self.connector.set_allow_images(self.conf.get("allow_images", False))
|
2025-11-03 19:59:18 +08:00
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
2025-12-22 09:36:16 +08:00
|
|
|
|
feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?
S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.
The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.
No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.
This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.
#### Files touched
- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match
#### Worth flagging for review
Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.
On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.
#### Test plan
- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
---------
Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 05:03:56 -07:00
|
|
|
# Fingerprint-bypass path: skip GetObject for unchanged ETags. Disabled
|
|
|
|
|
# on full reindex (we want to re-fetch everything in that case).
|
|
|
|
|
use_fingerprint_path = task["reindex"] != "1"
|
|
|
|
|
if use_fingerprint_path:
|
|
|
|
|
document_batch_generator = self._fingerprint_filtered_generator(task)
|
|
|
|
|
else:
|
|
|
|
|
document_batch_generator = self.connector.load_from_state()
|
2025-11-17 09:38:04 +08:00
|
|
|
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = (
|
feat(connectors): ETag-based bypass for incremental S3 ingestion (#14628) (#14677)
### What problem does this PR solve?
S3-family connector syncs currently re-download every in-window object
just so we can compute `xxhash128(blob)` and compare against
`Document.content_hash`. Anything that bumps `LastModified` without
changing bytes (`aws s3 cp` touches, bucket re-encryption, etc.) pays
full bandwidth and re-parses files that didn't actually change. #14628
covers the broader incremental-ingestion redesign; this PR is the first
slice.
The fix is a pre-listing short-circuit. `BlobStorageConnector` (S3 / R2
/ GCS / OCI / S3-compat) now implements a new `FingerprintConnector`
interface: `list_keys()` paginates `list_objects_v2` and yields
`KeyRecord(key, fingerprint)` where `fingerprint = xxhash128(ETag)`. The
orchestrator joins those against the connector's existing `{doc_id:
content_hash}` map and only calls `get_value(key)` when the fingerprint
differs. Unchanged keys are skipped entirely — no `GetObject`, no
re-parse.
No DDL. xxhash128(ETag) is 32 hex chars and reuses the existing
`Document.content_hash` column per @yingfeng's suggestion; the connector
decides at listing time whether to populate it. Local uploads and
connectors that don't opt in fall through to the existing post-download
`xxhash128(blob)` path with no behavior change.
This is PR-1 of a 4-PR series — full design lives on #14628. Subsequent
PRs extend tier 1 to local FS / WebDAV / Dropbox / Seafile / RDBMS
(PR-2), wire up tier 2 cursor connectors with `SyncLogs.next_checkpoint`
(PR-3), and unify deletion via `KeyRecord(deleted=True)` reconciliation
(PR-4). Holding those back keeps this PR additive and reviewable on its
own.
#### Files touched
- `common/data_source/models.py` — new `KeyRecord`; optional
`fingerprint` on `Document`
- `common/data_source/interfaces.py` — `IncrementalCapability` enum,
`FingerprintConnector` ABC
- `common/data_source/blob_connector.py` — `BlobStorageConnector`
implements `FingerprintConnector`; per-object download factored into
`_build_document_from_obj()` so `_yield_blob_objects`, `list_keys`,
`get_value` all share it
- `rag/svr/sync_data_source.py` —
`_BlobLikeBase._fingerprint_filtered_generator` does the bypass loop;
`_run_task_logic` plumbs `doc.fingerprint` into the upload dict
- `api/db/services/document_service.py` —
`list_id_content_hash_map_by_kb_and_source_type()` helper
- `api/db/services/connector_service.py` + `file_service.py` —
fingerprint flows through `duplicate_and_parse → upload_document` and
lands in `content_hash`
- `test/unit_test/common/test_blob_connector_fingerprint.py` — 14 tests
covering ETag normalization (single-part, multipart, quoted, empty),
`list_keys()` not calling `GetObject`, `get_value()` materializing with
fingerprint, deterministic/stable fingerprints, and the bypass loop
asserting `GetObject` is *not* called on a match
#### Worth flagging for review
Old `_BlobLikeBase._generate` called `poll_source(start, now)` with a
`LastModified` window when `poll_range_start` was set. New code uses
`_fingerprint_filtered_generator` (full bucket listing + fingerprint
compare) outside of explicit `reindex=1`. Strictly better for
unchanged-bucket cases since it skips `GetObject`, but it does mean
every sync now does a full `list_objects_v2` paginate. Should still be
cheap for most buckets — flagging in case anyone has a very large bucket
where the time-window filter was meaningful.
On migration: existing rows have `content_hash = xxhash128(blob)` from
the old code. The first sync after this lands sees ETag-derived
fingerprints that don't match, re-fetches every object once, and writes
the new fingerprint. From the second sync onward the bypass works as
expected. "Slow day one, fast every day after." A `fingerprint_backfill:
trust` opt-out is sketched in the design doc but not in this PR.
#### Test plan
- [x] `uv run ruff check` — clean on all 8 touched files
- [x] `uv run pytest
test/unit_test/common/test_blob_connector_fingerprint.py -v` — 14 passed
- [x] Broader unit-test suite — no regressions in anything I touched
- [ ] Manual smoke against a real S3 bucket — configure a connector, run
sync twice, expect the second sync to log `bypassed=N, fetched=0` and no
`GetObject` calls in CloudTrail / bucket access logs
- [ ] Manual smoke with `reindex=1` — confirm the full re-download path
still works
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
---------
Co-authored-by: Yingfeng <yingfeng.zhang@gmail.com>
2026-05-09 05:03:56 -07:00
|
|
|
"fingerprint-bypass"
|
|
|
|
|
if use_fingerprint_path
|
|
|
|
|
else "full reindex"
|
2025-12-22 09:36:16 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logging.info(
|
|
|
|
|
"Connect to {}: {}(prefix/{}) {}".format(
|
|
|
|
|
bucket_type,
|
|
|
|
|
self.conf["bucket_name"],
|
|
|
|
|
self.conf.get("prefix", ""),
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info,
|
2025-12-22 09:36:16 +08:00
|
|
|
)
|
|
|
|
|
)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_batch_generator
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-22 09:36:16 +08:00
|
|
|
class S3(_BlobLikeBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.S3
|
|
|
|
|
DEFAULT_BUCKET_TYPE: str = "s3"
|
|
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-22 09:36:16 +08:00
|
|
|
class R2(_BlobLikeBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.R2
|
|
|
|
|
DEFAULT_BUCKET_TYPE: str = "r2"
|
|
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-22 09:36:16 +08:00
|
|
|
class OCI_STORAGE(_BlobLikeBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.OCI_STORAGE
|
|
|
|
|
DEFAULT_BUCKET_TYPE: str = "oci_storage"
|
|
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-22 09:36:16 +08:00
|
|
|
class GOOGLE_CLOUD_STORAGE(_BlobLikeBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.GOOGLE_CLOUD_STORAGE
|
|
|
|
|
DEFAULT_BUCKET_TYPE: str = "google_cloud_storage"
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2026-03-27 22:58:44 +08:00
|
|
|
class RSS(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.RSS
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = RSSConnector(
|
|
|
|
|
feed_url=self.conf["feed_url"],
|
|
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
)
|
|
|
|
|
self.connector.load_credentials(self.conf.get("credentials", {}))
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
return self.connector.load_from_state()
|
|
|
|
|
|
2026-04-30 07:56:13 -03:00
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
|
|
|
|
document_generator = self.connector.poll_source(
|
2026-03-27 22:58:44 +08:00
|
|
|
task["poll_range_start"].timestamp(),
|
2026-04-30 07:56:13 -03:00
|
|
|
end_time,
|
2026-03-27 22:58:44 +08:00
|
|
|
)
|
2026-04-30 07:56:13 -03:00
|
|
|
return document_generator
|
2026-03-27 22:58:44 +08:00
|
|
|
|
|
|
|
|
|
2025-11-04 17:29:11 +08:00
|
|
|
class Confluence(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.CONFLUENCE
|
|
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-11-04 17:29:11 +08:00
|
|
|
from common.data_source.config import DocumentSource
|
2025-11-17 09:38:04 +08:00
|
|
|
from common.data_source.interfaces import StaticCredentialsProvider
|
2025-11-04 17:29:11 +08:00
|
|
|
|
2025-12-04 18:49:13 +08:00
|
|
|
index_mode = (self.conf.get("index_mode") or "everything").lower()
|
|
|
|
|
if index_mode not in {"everything", "space", "page"}:
|
|
|
|
|
index_mode = "everything"
|
|
|
|
|
|
|
|
|
|
space = ""
|
|
|
|
|
page_id = ""
|
|
|
|
|
|
|
|
|
|
index_recursively = False
|
|
|
|
|
if index_mode == "space":
|
|
|
|
|
space = (self.conf.get("space") or "").strip()
|
|
|
|
|
if not space:
|
|
|
|
|
raise ValueError("Space Key is required when indexing a specific Confluence space.")
|
|
|
|
|
elif index_mode == "page":
|
|
|
|
|
page_id = (self.conf.get("page_id") or "").strip()
|
|
|
|
|
if not page_id:
|
|
|
|
|
raise ValueError("Page ID is required when indexing a specific Confluence page.")
|
|
|
|
|
index_recursively = bool(self.conf.get("index_recursively", False))
|
2025-12-04 17:28:03 +08:00
|
|
|
|
2025-11-04 17:29:11 +08:00
|
|
|
self.connector = ConfluenceConnector(
|
|
|
|
|
wiki_base=self.conf["wiki_base"],
|
|
|
|
|
is_cloud=self.conf.get("is_cloud", True),
|
2025-12-04 17:28:03 +08:00
|
|
|
space=space,
|
|
|
|
|
page_id=page_id,
|
|
|
|
|
index_recursively=index_recursively,
|
2026-03-10 15:02:24 +08:00
|
|
|
|
2025-11-04 17:29:11 +08:00
|
|
|
)
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"],
|
|
|
|
|
connector_name=DocumentSource.CONFLUENCE,
|
|
|
|
|
credential_json=self.conf["credentials"])
|
2025-11-04 17:29:11 +08:00
|
|
|
self.connector.set_credentials_provider(credentials_provider)
|
|
|
|
|
|
|
|
|
|
# Determine the time range for synchronization based on reindex or poll_range_start
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = 0.0
|
|
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
2026-04-28 15:07:14 +08:00
|
|
|
|
2025-11-04 17:29:11 +08:00
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
2025-12-18 10:42:43 +01:00
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(self.connector.load_from_checkpoint(start_time, end_time, checkpoint))
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
2025-12-29 12:01:18 +08:00
|
|
|
logging.warning("Confluence connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure))
|
2025-12-18 10:42:43 +01:00
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError("Too many iterations while loading Confluence documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-18 10:42:43 +01:00
|
|
|
for batch in document_batches():
|
|
|
|
|
yield batch
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Confluence", self.conf["wiki_base"], task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return wrapper()
|
2025-11-04 17:29:11 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class Notion(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.NOTION
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-11-06 16:48:04 +08:00
|
|
|
self.connector = NotionConnector(root_page_id=self.conf["root_page_id"])
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
2025-11-17 09:38:04 +08:00
|
|
|
document_generator = (
|
|
|
|
|
self.connector.load_from_state()
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]
|
2025-12-29 12:01:18 +08:00
|
|
|
else self.connector.poll_source(task["poll_range_start"].timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp())
|
2025-11-17 09:38:04 +08:00
|
|
|
)
|
2025-11-06 16:48:04 +08:00
|
|
|
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(
|
2025-12-29 12:01:18 +08:00
|
|
|
task["poll_range_start"])
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Notion", f"root({self.conf['root_page_id']})", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
2025-11-04 17:29:11 +08:00
|
|
|
class Discord(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.DISCORD
|
2025-11-03 19:59:18 +08:00
|
|
|
|
fix(sync): tolerate list inputs for Discord server_ids / channels (#15790) (#15809)
## Summary
Fixes #15790.
Every Discord sync launched from the current Web UI crashes immediately
with:
```
'list' object has no attribute 'split'
```
The error is raised in
[rag/svr/sync_data_source.py:650-651](rag/svr/sync_data_source.py#L650-L651):
```python
server_ids=server_ids.split(",") if server_ids else [],
channel_names=channel_names.split(",") if channel_names else [],
```
### Root cause
Three independent bugs stack here, all in the Discord branch of
`sync_data_source.py`:
1. **Type mismatch (the user's exact error).** The current form at
[web/src/pages/user-setting/data-source/constant/index.tsx:833-843](web/src/pages/user-setting/data-source/constant/index.tsx#L833-L843)
uses `FormFieldType.Tag` for both **Server IDs** and **Channels**:
```tsx
{ label: 'Server IDs', name: 'config.server_ids', type:
FormFieldType.Tag, required: false },
{ label: 'Channels', name: 'config.channels', type: FormFieldType.Tag,
required: false },
```
Tag inputs serialise to **lists**, not comma-separated strings. The
backend `.split(",")` then explodes on the very first sync.
2. **Field-name mismatch.** The form writes `config.channels`. The
backend reads `self.conf.get("channel_names", None)`. Even if
`.split(",")` were fixed, channels would silently be empty for every
UI-created source.
3. **Int conversion missing.**
[common/data_source/discord_connector.py:82](common/data_source/discord_connector.py#L82)
types `server_ids` as `list[int]` (Discord guild IDs are integers); the
previous `.split(",")` produced strings, so the `channel.guild.id not in
server_ids` filter at
[discord_connector.py:92](common/data_source/discord_connector.py#L92)
silently never matched.
So even the configurations that didn't crash were also broken — there is
no path through the current code that actually filtered by server id
from a UI-created source.
### Fix
A 39-line patch in one function:
- New `Discord._coerce_str_list` static method: accepts `None` / `""` /
`list` / `tuple` / `set` / scalar / comma-separated str, returns a clean
`list[str]` with whitespace trimmed and empty entries dropped.
Smoke-tested against the 10 input shapes that can hit it (see Test
plan).
- `_generate` reads `config.channels` first (the form's actual key) and
falls back to `config.channel_names`, so SDK callers and legacy configs
that already shipped with the old key keep working.
- `server_ids` is coerced to `list[int]`. Non-integer entries are logged
and dropped instead of crashing the sync, so a single malformed tag from
the form doesn't tank the rest of the run.
### What this PR does NOT change
- **Web form key (`config.channels`)** — kept as-is. Renaming it to
`channel_names` would force a UI migration and break in-flight configs;
the backend fallback solves the same problem more safely.
- **`common/data_source/discord_connector.py`** — its signature was
already correct.
- **Other connectors (Slack, Gmail, Confluence, etc.)** — they don't
crash today and were not in the issue's scope.
## Test plan
`Discord._coerce_str_list` has been exercised against all ten realistic
input shapes — list, tuple, set, comma-separated string, str with extra
whitespace, empty entries, integers from a Tag input, None, empty list,
single trailing comma. All pass.
2026-06-10 22:27:42 -07:00
|
|
|
@staticmethod
|
|
|
|
|
def _coerce_str_list(raw):
|
|
|
|
|
"""Normalise a config field that may arrive as a list (Tag input from
|
|
|
|
|
the new web form), a comma-separated string (legacy/SDK callers), or
|
|
|
|
|
None into a clean ``list[str]`` with empty entries dropped.
|
|
|
|
|
|
|
|
|
|
Fixes #15790 — the previous ``.split(',')`` call assumed a string and
|
|
|
|
|
raised ``'list' object has no attribute 'split'`` for any config saved
|
|
|
|
|
through the current UI.
|
|
|
|
|
"""
|
|
|
|
|
if not raw:
|
|
|
|
|
return []
|
|
|
|
|
if isinstance(raw, str):
|
|
|
|
|
items = raw.split(",")
|
|
|
|
|
elif isinstance(raw, (list, tuple, set)):
|
|
|
|
|
items = list(raw)
|
|
|
|
|
else:
|
|
|
|
|
items = [raw]
|
|
|
|
|
# Drop None explicitly so it doesn't survive as the literal string
|
|
|
|
|
# "None" (str(None) == "None") — only stringify real values.
|
|
|
|
|
cleaned: list[str] = []
|
|
|
|
|
for item in items:
|
|
|
|
|
if item is None:
|
|
|
|
|
continue
|
|
|
|
|
text = str(item).strip()
|
|
|
|
|
if text:
|
|
|
|
|
cleaned.append(text)
|
|
|
|
|
return cleaned
|
|
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
fix(sync): tolerate list inputs for Discord server_ids / channels (#15790) (#15809)
## Summary
Fixes #15790.
Every Discord sync launched from the current Web UI crashes immediately
with:
```
'list' object has no attribute 'split'
```
The error is raised in
[rag/svr/sync_data_source.py:650-651](rag/svr/sync_data_source.py#L650-L651):
```python
server_ids=server_ids.split(",") if server_ids else [],
channel_names=channel_names.split(",") if channel_names else [],
```
### Root cause
Three independent bugs stack here, all in the Discord branch of
`sync_data_source.py`:
1. **Type mismatch (the user's exact error).** The current form at
[web/src/pages/user-setting/data-source/constant/index.tsx:833-843](web/src/pages/user-setting/data-source/constant/index.tsx#L833-L843)
uses `FormFieldType.Tag` for both **Server IDs** and **Channels**:
```tsx
{ label: 'Server IDs', name: 'config.server_ids', type:
FormFieldType.Tag, required: false },
{ label: 'Channels', name: 'config.channels', type: FormFieldType.Tag,
required: false },
```
Tag inputs serialise to **lists**, not comma-separated strings. The
backend `.split(",")` then explodes on the very first sync.
2. **Field-name mismatch.** The form writes `config.channels`. The
backend reads `self.conf.get("channel_names", None)`. Even if
`.split(",")` were fixed, channels would silently be empty for every
UI-created source.
3. **Int conversion missing.**
[common/data_source/discord_connector.py:82](common/data_source/discord_connector.py#L82)
types `server_ids` as `list[int]` (Discord guild IDs are integers); the
previous `.split(",")` produced strings, so the `channel.guild.id not in
server_ids` filter at
[discord_connector.py:92](common/data_source/discord_connector.py#L92)
silently never matched.
So even the configurations that didn't crash were also broken — there is
no path through the current code that actually filtered by server id
from a UI-created source.
### Fix
A 39-line patch in one function:
- New `Discord._coerce_str_list` static method: accepts `None` / `""` /
`list` / `tuple` / `set` / scalar / comma-separated str, returns a clean
`list[str]` with whitespace trimmed and empty entries dropped.
Smoke-tested against the 10 input shapes that can hit it (see Test
plan).
- `_generate` reads `config.channels` first (the form's actual key) and
falls back to `config.channel_names`, so SDK callers and legacy configs
that already shipped with the old key keep working.
- `server_ids` is coerced to `list[int]`. Non-integer entries are logged
and dropped instead of crashing the sync, so a single malformed tag from
the form doesn't tank the rest of the run.
### What this PR does NOT change
- **Web form key (`config.channels`)** — kept as-is. Renaming it to
`channel_names` would force a UI migration and break in-flight configs;
the backend fallback solves the same problem more safely.
- **`common/data_source/discord_connector.py`** — its signature was
already correct.
- **Other connectors (Slack, Gmail, Confluence, etc.)** — they don't
crash today and were not in the issue's scope.
## Test plan
`Discord._coerce_str_list` has been exercised against all ten realistic
input shapes — list, tuple, set, comma-separated string, str with extra
whitespace, empty entries, integers from a Tag input, None, empty list,
single trailing comma. All pass.
2026-06-10 22:27:42 -07:00
|
|
|
server_ids_raw = self.conf.get("server_ids", None)
|
|
|
|
|
# Web form stores channels under "channels"; older configs / SDK use
|
|
|
|
|
# "channel_names" — accept either so existing sources keep working.
|
|
|
|
|
channels_raw = self.conf.get("channels", None)
|
|
|
|
|
if channels_raw in (None, "", []):
|
|
|
|
|
channels_raw = self.conf.get("channel_names", None)
|
|
|
|
|
|
|
|
|
|
server_id_strs = self._coerce_str_list(server_ids_raw)
|
|
|
|
|
# DiscordConnector.__init__ takes server_ids as list[str] and converts
|
|
|
|
|
# to list[int] internally (common/data_source/discord_connector.py:247).
|
|
|
|
|
# Validate up-front so a malformed entry warns + drops here rather than
|
|
|
|
|
# crashing the connector's int() cast — but keep the strings.
|
|
|
|
|
server_ids: list[str] = []
|
|
|
|
|
for sid in server_id_strs:
|
|
|
|
|
try:
|
|
|
|
|
int(sid)
|
|
|
|
|
except ValueError:
|
|
|
|
|
logging.warning("Discord connector: ignoring non-integer server_id %r", sid)
|
|
|
|
|
continue
|
|
|
|
|
server_ids.append(sid)
|
|
|
|
|
channel_names = self._coerce_str_list(channels_raw)
|
2025-11-06 16:48:04 +08:00
|
|
|
|
|
|
|
|
self.connector = DiscordConnector(
|
fix(sync): tolerate list inputs for Discord server_ids / channels (#15790) (#15809)
## Summary
Fixes #15790.
Every Discord sync launched from the current Web UI crashes immediately
with:
```
'list' object has no attribute 'split'
```
The error is raised in
[rag/svr/sync_data_source.py:650-651](rag/svr/sync_data_source.py#L650-L651):
```python
server_ids=server_ids.split(",") if server_ids else [],
channel_names=channel_names.split(",") if channel_names else [],
```
### Root cause
Three independent bugs stack here, all in the Discord branch of
`sync_data_source.py`:
1. **Type mismatch (the user's exact error).** The current form at
[web/src/pages/user-setting/data-source/constant/index.tsx:833-843](web/src/pages/user-setting/data-source/constant/index.tsx#L833-L843)
uses `FormFieldType.Tag` for both **Server IDs** and **Channels**:
```tsx
{ label: 'Server IDs', name: 'config.server_ids', type:
FormFieldType.Tag, required: false },
{ label: 'Channels', name: 'config.channels', type: FormFieldType.Tag,
required: false },
```
Tag inputs serialise to **lists**, not comma-separated strings. The
backend `.split(",")` then explodes on the very first sync.
2. **Field-name mismatch.** The form writes `config.channels`. The
backend reads `self.conf.get("channel_names", None)`. Even if
`.split(",")` were fixed, channels would silently be empty for every
UI-created source.
3. **Int conversion missing.**
[common/data_source/discord_connector.py:82](common/data_source/discord_connector.py#L82)
types `server_ids` as `list[int]` (Discord guild IDs are integers); the
previous `.split(",")` produced strings, so the `channel.guild.id not in
server_ids` filter at
[discord_connector.py:92](common/data_source/discord_connector.py#L92)
silently never matched.
So even the configurations that didn't crash were also broken — there is
no path through the current code that actually filtered by server id
from a UI-created source.
### Fix
A 39-line patch in one function:
- New `Discord._coerce_str_list` static method: accepts `None` / `""` /
`list` / `tuple` / `set` / scalar / comma-separated str, returns a clean
`list[str]` with whitespace trimmed and empty entries dropped.
Smoke-tested against the 10 input shapes that can hit it (see Test
plan).
- `_generate` reads `config.channels` first (the form's actual key) and
falls back to `config.channel_names`, so SDK callers and legacy configs
that already shipped with the old key keep working.
- `server_ids` is coerced to `list[int]`. Non-integer entries are logged
and dropped instead of crashing the sync, so a single malformed tag from
the form doesn't tank the rest of the run.
### What this PR does NOT change
- **Web form key (`config.channels`)** — kept as-is. Renaming it to
`channel_names` would force a UI migration and break in-flight configs;
the backend fallback solves the same problem more safely.
- **`common/data_source/discord_connector.py`** — its signature was
already correct.
- **Other connectors (Slack, Gmail, Confluence, etc.)** — they don't
crash today and were not in the issue's scope.
## Test plan
`Discord._coerce_str_list` has been exercised against all ten realistic
input shapes — list, tuple, set, comma-separated string, str with extra
whitespace, empty entries, integers from a Tag input, None, empty list,
single trailing comma. All pass.
2026-06-10 22:27:42 -07:00
|
|
|
server_ids=server_ids,
|
|
|
|
|
channel_names=channel_names,
|
2025-11-06 16:48:04 +08:00
|
|
|
start_date=datetime(1970, 1, 1, tzinfo=timezone.utc).strftime("%Y-%m-%d"),
|
2025-11-17 09:38:04 +08:00
|
|
|
batch_size=self.conf.get("batch_size", 1024),
|
2025-11-06 16:48:04 +08:00
|
|
|
)
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
2025-11-17 09:38:04 +08:00
|
|
|
document_generator = (
|
|
|
|
|
self.connector.load_from_state()
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]
|
2025-12-29 12:01:18 +08:00
|
|
|
else self.connector.poll_source(task["poll_range_start"].timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp())
|
2025-11-17 09:38:04 +08:00
|
|
|
)
|
2025-11-06 16:48:04 +08:00
|
|
|
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format(
|
2025-12-29 12:01:18 +08:00
|
|
|
task["poll_range_start"])
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Discord", f"servers({server_ids}), channel({channel_names})", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class Gmail(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.GMAIL
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-11-28 13:09:40 +08:00
|
|
|
# Gmail sync reuses the generic LoadConnector/PollConnector interface
|
|
|
|
|
# implemented by common.data_source.gmail_connector.GmailConnector.
|
|
|
|
|
#
|
|
|
|
|
# Config expectations (self.conf):
|
|
|
|
|
# credentials: Gmail / Workspace OAuth JSON (with primary admin email)
|
|
|
|
|
# batch_size: optional, defaults to INDEX_BATCH_SIZE
|
|
|
|
|
batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
|
|
|
|
|
self.connector = GmailConnector(batch_size=batch_size)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials")
|
|
|
|
|
if not credentials:
|
|
|
|
|
raise ValueError("Gmail connector is missing credentials.")
|
|
|
|
|
|
|
|
|
|
new_credentials = self.connector.load_credentials(credentials)
|
|
|
|
|
if new_credentials:
|
|
|
|
|
# Persist rotated / refreshed credentials back to connector config
|
|
|
|
|
try:
|
|
|
|
|
updated_conf = copy.deepcopy(self.conf)
|
|
|
|
|
updated_conf["credentials"] = new_credentials
|
|
|
|
|
ConnectorService.update_by_id(task["connector_id"], {"config": updated_conf})
|
|
|
|
|
self.conf = updated_conf
|
|
|
|
|
logging.info(
|
|
|
|
|
"Persisted refreshed Gmail credentials for connector %s",
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
)
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception(
|
|
|
|
|
"Failed to persist refreshed Gmail credentials for connector %s",
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Decide between full reindex and incremental polling by time range.
|
|
|
|
|
if task["reindex"] == "1" or not task.get("poll_range_start"):
|
|
|
|
|
start_time = None
|
|
|
|
|
end_time = None
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-28 13:09:40 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
else:
|
|
|
|
|
poll_start = task["poll_range_start"]
|
|
|
|
|
# Defensive: if poll_start is somehow None, fall back to full load
|
|
|
|
|
if poll_start is None:
|
|
|
|
|
start_time = None
|
|
|
|
|
end_time = None
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-28 13:09:40 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
else:
|
|
|
|
|
start_time = poll_start.timestamp()
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2025-11-28 13:09:40 +08:00
|
|
|
document_generator = self.connector.poll_source(start_time, end_time)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
admin_email = self.connector.primary_admin_email
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
admin_email = "unknown"
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Gmail", f"as {admin_email}", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
2025-11-25 09:40:03 +08:00
|
|
|
class Dropbox(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.DROPBOX
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = DropboxConnector(batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE))
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
2026-04-29 08:05:11 -03:00
|
|
|
poll_start = task["poll_range_start"]
|
|
|
|
|
if task["reindex"] == "1" or not poll_start:
|
2025-11-25 09:40:03 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-25 09:40:03 +08:00
|
|
|
else:
|
2026-04-29 08:05:11 -03:00
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
document_generator = self.connector.poll_source(poll_start.timestamp(), end_time)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2025-11-25 09:40:03 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Dropbox", "workspace", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-11-25 09:40:03 +08:00
|
|
|
|
|
|
|
|
|
2025-11-10 19:15:02 +08:00
|
|
|
class GoogleDrive(SyncBase):
|
2026-04-29 07:34:36 +05:30
|
|
|
"""
|
|
|
|
|
Data synchronization connector for Google Drive.
|
|
|
|
|
Handles both full re-indexing and incremental polling, including the capability
|
|
|
|
|
to synchronize deleted files by retrieving a lightweight snapshot of current files.
|
|
|
|
|
"""
|
2025-11-10 19:15:02 +08:00
|
|
|
SOURCE_NAME: str = FileSource.GOOGLE_DRIVE
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Generates document batches from Google Drive, handling both full and incremental syncs."""
|
2025-11-10 19:15:02 +08:00
|
|
|
connector_kwargs = {
|
|
|
|
|
"include_shared_drives": self.conf.get("include_shared_drives", False),
|
|
|
|
|
"include_my_drives": self.conf.get("include_my_drives", False),
|
|
|
|
|
"include_files_shared_with_me": self.conf.get("include_files_shared_with_me", False),
|
|
|
|
|
"shared_drive_urls": self.conf.get("shared_drive_urls"),
|
|
|
|
|
"my_drive_emails": self.conf.get("my_drive_emails"),
|
|
|
|
|
"shared_folder_urls": self.conf.get("shared_folder_urls"),
|
|
|
|
|
"specific_user_emails": self.conf.get("specific_user_emails"),
|
|
|
|
|
"batch_size": self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
}
|
|
|
|
|
self.connector = GoogleDriveConnector(**connector_kwargs)
|
|
|
|
|
self.connector.set_allow_images(self.conf.get("allow_images", False))
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials")
|
|
|
|
|
if not credentials:
|
|
|
|
|
raise ValueError("Google Drive connector is missing credentials.")
|
|
|
|
|
|
|
|
|
|
new_credentials = self.connector.load_credentials(credentials)
|
|
|
|
|
if new_credentials:
|
|
|
|
|
self._persist_rotated_credentials(task["connector_id"], new_credentials)
|
|
|
|
|
|
2026-04-29 07:34:36 +05:30
|
|
|
# Capture end_time BEFORE the snapshot to prevent the ingestion race condition
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
2025-11-10 19:15:02 +08:00
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = 0.0
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-10 19:15:02 +08:00
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
2026-04-29 07:34:36 +05:30
|
|
|
|
2025-11-10 19:15:02 +08:00
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Yields paginated batches of parsed Google Drive documents using checkpoints."""
|
2025-11-10 19:15:02 +08:00
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(self.connector.load_from_checkpoint(start_time, end_time, checkpoint))
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
2025-12-29 12:01:18 +08:00
|
|
|
logging.warning("Google Drive connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure))
|
2025-11-10 19:15:02 +08:00
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError("Too many iterations while loading Google Drive documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
admin_email = self.connector.primary_admin_email
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
admin_email = "unknown"
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Google Drive", f"as {admin_email}", task)
|
2026-04-29 07:34:36 +05:30
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_batches()
|
2025-11-10 19:15:02 +08:00
|
|
|
|
|
|
|
|
def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None:
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Saves refreshed OAuth credentials back to the database configuration."""
|
2025-11-10 19:15:02 +08:00
|
|
|
try:
|
|
|
|
|
updated_conf = copy.deepcopy(self.conf)
|
|
|
|
|
updated_conf["credentials"] = credentials
|
|
|
|
|
ConnectorService.update_by_id(connector_id, {"config": updated_conf})
|
|
|
|
|
self.conf = updated_conf
|
|
|
|
|
logging.info("Persisted refreshed Google Drive credentials for connector %s", connector_id)
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception("Failed to persist refreshed Google Drive credentials for connector %s", connector_id)
|
2026-04-29 07:34:36 +05:30
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
class Jira(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.JIRA
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-12-01 17:46:44 +08:00
|
|
|
def _get_source_prefix(self):
|
|
|
|
|
return "[Jira]"
|
|
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
2025-11-17 09:38:04 +08:00
|
|
|
connector_kwargs = {
|
|
|
|
|
"jira_base_url": self.conf["base_url"],
|
|
|
|
|
"project_key": self.conf.get("project_key"),
|
|
|
|
|
"jql_query": self.conf.get("jql_query"),
|
|
|
|
|
"batch_size": self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
"include_comments": self.conf.get("include_comments", True),
|
|
|
|
|
"include_attachments": self.conf.get("include_attachments", False),
|
|
|
|
|
"labels_to_skip": self._normalize_list(self.conf.get("labels_to_skip")),
|
|
|
|
|
"comment_email_blacklist": self._normalize_list(self.conf.get("comment_email_blacklist")),
|
|
|
|
|
"scoped_token": self.conf.get("scoped_token", False),
|
|
|
|
|
"attachment_size_limit": self.conf.get("attachment_size_limit"),
|
|
|
|
|
"timezone_offset": self.conf.get("timezone_offset"),
|
2026-03-18 09:31:05 -06:00
|
|
|
"time_buffer_seconds": self.conf.get("time_buffer_seconds"),
|
2025-11-17 09:38:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.connector = JiraConnector(**connector_kwargs)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials")
|
|
|
|
|
if not credentials:
|
|
|
|
|
raise ValueError("Jira connector is missing credentials.")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = 0.0
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-17 09:38:04 +08:00
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
2025-11-17 09:38:04 +08:00
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
|
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
generator = wrapper(
|
|
|
|
|
self.connector.load_from_checkpoint(
|
|
|
|
|
start_time,
|
|
|
|
|
end_time,
|
|
|
|
|
checkpoint,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
for document, failure, next_checkpoint in generator:
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
f"[Jira] Jira connector failure: {getattr(failure, 'failure_message', failure)}"
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
logging.error(f"[Jira] Task {task.get('id')} exceeded iteration limit ({iteration_limit}).")
|
|
|
|
|
raise RuntimeError("Too many iterations while loading Jira documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"Jira",
|
2026-03-18 09:31:05 -06:00
|
|
|
connector_kwargs["jira_base_url"],
|
2026-04-09 16:40:14 +08:00
|
|
|
task,
|
|
|
|
|
(
|
|
|
|
|
f"sync_batch_size={batch_size}, "
|
|
|
|
|
f"overlap_buffer_s={getattr(self.connector, 'time_buffer_seconds', connector_kwargs.get('time_buffer_seconds'))}"
|
|
|
|
|
),
|
2026-03-18 09:31:05 -06:00
|
|
|
)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_batches()
|
2025-11-17 09:38:04 +08:00
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _normalize_list(values: Any) -> list[str] | None:
|
|
|
|
|
if values is None:
|
|
|
|
|
return None
|
|
|
|
|
if isinstance(values, str):
|
|
|
|
|
values = [item.strip() for item in values.split(",")]
|
|
|
|
|
return [str(value).strip() for value in values if value is not None and str(value).strip()]
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class SharePoint(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.SHAREPOINT
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
feat: implement SharePoint data source connector (#15190)
### What problem does this PR solve?
Closes #15189.
RAGFlow shipped a SharePoint connector stub
(`common/data_source/sharepoint_connector.py`) whose document-loading
methods all returned `[]`, `SharePoint._generate()` was a `pass`, and
SharePoint was commented out of the data-source settings UI. As a result
there was no way to index files stored in SharePoint document libraries.
This PR implements the connector end to end on top of Microsoft Graph
(Office365-REST-Python-Client).
**Backend**
- `common/data_source/sharepoint_connector.py`
- `load_credentials()` now builds the Graph client using an MSAL
client-credentials **token callback** — the form `GraphClient` actually
expects. (The previous stub passed a raw access-token string to
`GraphClient(...)`, which is not how that client is driven.) Token
acquisition is lazy, so credential loading does no network call.
- `validate_connector_settings()` resolves the configured site via
Graph.
- `load_from_checkpoint()` is now a generator that enumerates every
document library under the site, walks folders depth-first, downloads
each file, and yields blob-based `Document` objects (`extension` /
`blob` / `size_bytes` / `doc_updated_at`). Incremental syncs are bounded
by file `lastModifiedDateTime`. Per-file errors are surfaced as
`ConnectorFailure` rather than aborting the run.
- `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument`
batches (no downloads) and the checkpoint helpers return proper
checkpoints.
- ACL → `ExternalAccess` mapping is intentionally left best-effort
(`load_from_checkpoint_with_perm_sync` delegates to the standard load)
because the sync pipeline does not currently persist `ExternalAccess`;
this can be extended once that plumbing exists.
- `rag/svr/sync_data_source.py`
- Implemented `SharePoint._generate()` using the existing
`CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google
Drive), supporting full reindex and incremental polling from
`poll_range_start`.
- `SharePointConnector` is already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `SHAREPOINT` data-source enum and added its form fields
`site_url`, `tenant_id`, `client_id`, `client_secret`), default values,
display metadata, and a SharePoint icon.
- Added `sharepointDescription` / `sharepointSiteUrlTip` to `en.ts` and
`zh.ts`.
**Tests**
- `test/unit_test/data_source/test_sharepoint_connector_unit.py`:
mock-based unit tests covering credential loading (incomplete creds
raise, happy path sets the Graph client, fetch-without-creds raises),
drive traversal + file download, incremental `lastModifiedDateTime`
filtering, and slim-doc listing. All 6 pass; `ruff check` is clean.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-27 23:26:08 -06:00
|
|
|
self.connector = SharePointConnector(
|
|
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials") or {}
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = 0.0
|
|
|
|
|
_begin_info = "totally"
|
|
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
|
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
|
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
|
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(
|
|
|
|
|
self.connector.load_from_checkpoint(start_time, end_time, checkpoint)
|
|
|
|
|
)
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
"SharePoint connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure),
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError("Too many iterations while loading SharePoint documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
|
|
|
|
self.log_connection("SharePoint", self.conf.get("credentials", {}).get("site_url", ""), task)
|
|
|
|
|
return document_batches()
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
feat(connector): implement OneDrive data source connector (issue #15330) (#15331)
### What problem does this PR solve?
Closes #15330.
RAGFlow had no connector for OneDrive / OneDrive for Business. Users who
store working documents in OneDrive could not index them into a
knowledge base without manually downloading and re-uploading files.
This PR adds a net-new OneDrive data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Enumerates every drive visible to the service principal and pages
through `/drives/{id}/root/delta`, persisting `@odata.deltaLink` values
per drive so subsequent syncs only fetch changed items.
- Optionally narrows ingestion to a sub-folder (`folder_path`) without
needing a separate code path.
- Surfaces typed errors on the validation probe (`GET /drives?$top=1`):
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with a `Files.Read.All` hint), 5xx →
`UnexpectedValidationError`.
- Filters folders, soft-deleted items, and unsupported extensions (`.pdf
.docx .doc .xlsx .xls .pptx .ppt .txt .md .csv`).
#### Files
| File | Change |
|------|--------|
| `common/data_source/onedrive_connector.py` | **New** —
`OneDriveConnector` + `OneDriveCheckpoint`. |
| `common/data_source/config.py` | `DocumentSource.ONEDRIVE =
"onedrive"`. |
| `common/constants.py` | `FileSource.ONEDRIVE = "onedrive"`. |
| `common/data_source/__init__.py` | Export `OneDriveConnector`. |
| `rag/svr/sync_data_source.py` | `OneDrive(SyncBase)` with `batch_size`
normalisation; registered in `func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.ONEDRIVE`, visibility map (`syncDeletedFiles: true`),
info entry, form fields (tenant_id, client_id, client_secret,
folder_path, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`onedriveDescription` + 4 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_onedrive_connector_unit.py` | **New**
— 13 unit tests (`p1`/`p2`) covering auth, validation, checkpoint
helpers, and document filtering. |
#### Required Azure AD permission
`Files.Read.All` (Application, admin-granted).
#### Out of scope
- Interactive end-user OAuth (delegated permissions) — the connector
uses app-only credentials, consistent with the SharePoint / Teams
precedent.
- Binary download of file contents — the sync layer emits `Document`s
carrying `webUrl` + metadata; bytes are hydrated downstream by the parse
pipeline.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 05:26:06 -06:00
|
|
|
class OneDrive(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.ONEDRIVE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
self.connector = OneDriveConnector(
|
|
|
|
|
batch_size=batch_size,
|
|
|
|
|
folder_path=self.conf.get("folder_path") or None,
|
|
|
|
|
)
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
|
|
|
|
|
|
|
|
|
# Always route through load_from_checkpoint so the connector owns the
|
|
|
|
|
# delta-link bookkeeping; incremental runs pass the previous poll
|
|
|
|
|
# range start as the lastModifiedDateTime floor while the same delta
|
|
|
|
|
# walk drives both modes. poll_source disregarded the checkpoint
|
|
|
|
|
# entirely, which would have re-walked every drive's root each run.
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_ts = 0.0
|
|
|
|
|
else:
|
|
|
|
|
start_ts = task["poll_range_start"].timestamp()
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
document_batch_generator = self.connector.load_from_checkpoint(
|
|
|
|
|
start_ts, end_ts, checkpoint
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.log_connection(
|
|
|
|
|
"OneDrive",
|
|
|
|
|
self.conf.get("folder_path", "/") or "/",
|
|
|
|
|
task,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def wrapper():
|
|
|
|
|
for document_batch in document_batch_generator:
|
|
|
|
|
yield document_batch
|
|
|
|
|
|
|
|
|
|
return wrapper()
|
|
|
|
|
|
|
|
|
|
|
feat(connector): implement Outlook data source connector (issue #15332) (#15333)
### What problem does this PR solve?
Closes #15332.
RAGFlow can index Gmail and generic IMAP mailboxes but had no native
connector for Outlook / Microsoft 365 mail. Organisations on Microsoft
365 had no way to bring mailbox content into a knowledge base through
Microsoft Graph.
This PR adds a net-new Outlook data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Pages over `/users/{id}/mailFolders/{folder}/messages/delta` per
mailbox and persists `@odata.deltaLink` values in
`OutlookCheckpoint.delta_links`, so incremental syncs only fetch changed
messages.
- Supports two scoping modes:
- **Tenant-wide** (default): enumerates every user in the tenant via
`/users` and syncs each mailbox. Requires `User.Read.All`.
- **Targeted**: when `user_ids` is provided (comma-separated UPNs or
object IDs), only those mailboxes are synced. `User.Read.All` is not
needed in this mode.
- Lets the caller pick the mail folder (`inbox`, `sentitems`, `archive`,
...). Defaults to `inbox`.
- Maps each message to a `Document` shaped after the Gmail connector:
one `TextSection` carrying `From/To/Cc/Subject` headers + body, with
HTML bodies stripped to text inline (no extra dependency).
- Surfaces typed errors on the validation probe:
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with `Mail.Read` / `User.Read.All`
hint), 404 on a configured mailbox → `ConnectorValidationError`, 5xx →
`UnexpectedValidationError`.
- Skips messages flagged `@removed` by the delta semantics and messages
whose `receivedDateTime` is older than `poll_range_start`.
#### Files
| File | Change |
|------|--------|
| `common/data_source/outlook_connector.py` | **New** —
`OutlookConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`) + `OutlookCheckpoint` + tiny `_strip_html`
helper. |
| `common/data_source/config.py` | `DocumentSource.OUTLOOK = "outlook"`.
|
| `common/constants.py` | `FileSource.OUTLOOK = "outlook"`. |
| `common/data_source/__init__.py` | Export `OutlookConnector`. |
| `rag/svr/sync_data_source.py` | `Outlook(SyncBase)` with `batch_size`
normalisation, CSV/list parsing of `user_ids`; registered in
`func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.OUTLOOK`, visibility map (`syncDeletedFiles: true`), info
entry, form fields (tenant_id, client_id, client_secret, folder,
user_ids, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`outlookDescription` + 5 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_outlook_connector_unit.py` | **New**
— 19 unit tests (`p1`/`p2`/`p3`) covering auth, validation (tenant-wide
vs specific user vs error paths), checkpoint helpers, user enumeration
pagination, message filtering, HTML body stripping. |
#### Required Azure AD permissions
- `Mail.Read` (Application, admin-granted) — always.
- `User.Read.All` (Application, admin-granted) — only when `user_ids` is
left blank so the connector can enumerate mailboxes.
#### Out of scope
- **Attachment indexing.** The current connector emits message body +
headers; binary attachments are flagged via `metadata.has_attachments`
but not pulled. Adding attachment hydration is straightforward but
scoped out per the issue's "decide whether attachments are indexed in
the first version" note.
- **Delegated (per-user) OAuth.** The connector uses app-only
credentials, consistent with the SharePoint / Teams precedent in this
codebase.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 07:52:29 -06:00
|
|
|
class Outlook(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.OUTLOOK
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
raw_user_ids = self.conf.get("user_ids")
|
|
|
|
|
if isinstance(raw_user_ids, str):
|
|
|
|
|
user_ids = [u.strip() for u in raw_user_ids.split(",") if u.strip()]
|
|
|
|
|
elif isinstance(raw_user_ids, list):
|
|
|
|
|
user_ids = [str(u).strip() for u in raw_user_ids if str(u).strip()]
|
|
|
|
|
else:
|
|
|
|
|
user_ids = []
|
|
|
|
|
|
|
|
|
|
self.connector = OutlookConnector(
|
|
|
|
|
batch_size=batch_size,
|
|
|
|
|
folder=self.conf.get("folder") or "inbox",
|
|
|
|
|
user_ids=user_ids or None,
|
|
|
|
|
)
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
|
|
|
|
|
|
|
|
|
# Always route through load_from_checkpoint so the connector owns the
|
|
|
|
|
# delta-link bookkeeping; incremental runs pass the previous poll
|
|
|
|
|
# range start as the receivedDateTime floor while the same delta
|
|
|
|
|
# walk drives both modes. poll_source disregarded the checkpoint
|
|
|
|
|
# entirely, which would have re-walked every mailbox each run.
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_ts = 0.0
|
|
|
|
|
else:
|
|
|
|
|
start_ts = task["poll_range_start"].timestamp()
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
document_batch_generator = self.connector.load_from_checkpoint(
|
|
|
|
|
start_ts, end_ts, checkpoint
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Redact mailbox identifiers — full UPN / email lists in connector
|
|
|
|
|
# logs leak PII (the entire org's mail directory ends up in
|
|
|
|
|
# tail-of-logs output). Surface the folder, the count, and a small
|
|
|
|
|
# masked preview so operators can still spot a misconfigured run.
|
|
|
|
|
if user_ids:
|
|
|
|
|
preview = ",".join(_redact_mailbox(u) for u in user_ids[:3])
|
|
|
|
|
if len(user_ids) > 3:
|
|
|
|
|
preview = f"{preview},+{len(user_ids) - 3} more"
|
|
|
|
|
details = "{}@{} users (preview: {})".format(
|
|
|
|
|
self.conf.get("folder", "inbox"),
|
|
|
|
|
len(user_ids),
|
|
|
|
|
preview,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
details = "{}@<all-users>".format(self.conf.get("folder", "inbox"))
|
|
|
|
|
self.log_connection("Outlook", details, task)
|
|
|
|
|
|
|
|
|
|
def wrapper():
|
|
|
|
|
for document_batch in document_batch_generator:
|
|
|
|
|
yield document_batch
|
|
|
|
|
|
|
|
|
|
return wrapper()
|
|
|
|
|
|
|
|
|
|
|
feat(connectors): add Salesforce CRM data source connector (#15462)
### What problem does this PR solve?
Closes #15461.
RAGFlow had no way to ingest Salesforce CRM data, so support / sales
teams couldn't ground responses on live Accounts, Contacts,
Opportunities, Cases, or Knowledge articles. This adds a first-class
Salesforce data source connector that authenticates against a Connected
App via OAuth 2.0 client-credentials, queries selected SObjects via
SOQL, and turns each record into an indexable document with incremental
sync.
**Highlights**
- `common/data_source/salesforce_connector.py`: new
`SalesforceConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`).
- OAuth 2.0 client-credentials flow; canonical `instance_url` from the
token response so multi-pod orgs route correctly.
- Per-object `SystemModstamp` cursor stored in
`SalesforceCheckpoint.cursors` — a failure mid-object doesn't rewind
sibling objects, and re-syncs only fetch changed rows.
- Deterministic record-to-text formatter (sorted keys) so SOQL field
reordering on the server doesn't mark every row "changed" on each poll.
- `_get_json` raises on non-2xx so 429 / 5xx never silently advance the
checkpoint past missing data.
- `Knowledge__kav` is in the default object set but is skipped silently
when the org doesn't have Salesforce Knowledge enabled (404 on
describe).
- Slim-doc IDs are scoped as `<Object>/<Id>` so prune deletes can't
collide across object types.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `salesforce` in `FileSource`
/ `DocumentSource` and export `SalesforceConnector`.
- `rag/svr/sync_data_source.py`: new `Salesforce(SyncBase)` class routed
through `load_from_checkpoint` (poll_source would re-walk every object
each run) and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.SALESFORCE`, form fields (instance URL, client ID/secret,
objects, api_version, batch size), `syncDeletedFiles` capability,
default form values, and tile entry with the new icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips.
- `web/src/assets/svg/data-source/salesforce.svg`: 48x48 brand-style
icon to match the other Microsoft / cloud tiles.
**Verification**
- `npm run build` (vite + esbuild) passes (1m 26s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 23:24:36 -06:00
|
|
|
class Salesforce(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.SALESFORCE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
raw_objects = self.conf.get("objects")
|
|
|
|
|
if isinstance(raw_objects, str):
|
|
|
|
|
objects = [o.strip() for o in raw_objects.split(",") if o.strip()]
|
|
|
|
|
elif isinstance(raw_objects, list):
|
|
|
|
|
objects = [str(o).strip() for o in raw_objects if str(o).strip()]
|
|
|
|
|
else:
|
|
|
|
|
objects = None
|
|
|
|
|
|
|
|
|
|
self.connector = SalesforceConnector(
|
|
|
|
|
batch_size=batch_size,
|
|
|
|
|
objects=objects,
|
|
|
|
|
api_version=self.conf.get("api_version") or "v59.0",
|
|
|
|
|
)
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
|
|
|
|
# Fail fast on invalid/inaccessible objects (typos, missing object
|
|
|
|
|
# permissions) before iterating, so a bad `objects` config surfaces
|
|
|
|
|
# as a clear error instead of silently skipping data at sync time.
|
|
|
|
|
# This guards configs that reach runtime without going through the
|
|
|
|
|
# UI (direct API callers, scripts, previously-persisted configs).
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
# Always route through load_from_checkpoint so the per-object
|
|
|
|
|
# SystemModstamp cursor owns incrementality; poll_source would
|
|
|
|
|
# re-query every object from the caller's window each run and
|
|
|
|
|
# ignore the persisted per-object cursors entirely.
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_ts = 0.0
|
|
|
|
|
else:
|
|
|
|
|
start_ts = task["poll_range_start"].timestamp()
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
document_batch_generator = self.connector.load_from_checkpoint(
|
|
|
|
|
start_ts, end_ts, checkpoint
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
instance_url = (self.conf.get("credentials") or {}).get("instance_url", "")
|
|
|
|
|
self.log_connection(
|
|
|
|
|
"Salesforce",
|
|
|
|
|
f"{instance_url} objects({','.join(self.connector.objects)})",
|
|
|
|
|
task,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def wrapper():
|
|
|
|
|
for document_batch in document_batch_generator:
|
|
|
|
|
yield document_batch
|
|
|
|
|
|
|
|
|
|
return wrapper()
|
|
|
|
|
|
|
|
|
|
|
feat(connectors): add Azure Blob Storage data source connector (#15466)
### What problem does this PR solve?
Closes #15465.
RAGFlow supports S3, Google Cloud Storage, R2, and OCI as data sources
but not Azure Blob Storage, leaving Azure users without a way to index
container objects into a knowledge base. This adds a first-class Azure
Blob Storage data-source connector — distinct from RAGFlow's existing
Azure storage *backends* (`rag/utils/azure_sas_conn.py`,
`rag/utils/azure_spn_conn.py`) which store RAGFlow's own files.
**Highlights**
- `common/data_source/azure_blob_connector.py`: new `AzureBlobConnector`
(`CheckpointedConnectorWithPermSync` + `SlimConnectorWithPermSync`).
- Uses the existing `azure-storage-blob` dependency (already in
`pyproject.toml`).
- Three auth modes, tried in order of precedence:
1. **Account key** — `account_name` + `account_key` + `container_name`.
2. **Connection string** — `connection_string` + `container_name`.
3. **SAS token** — `container_url` + `sas_token` (same shape as
`RAGFlowAzureSasBlob`).
- ETag fingerprint stored per blob in `AzureBlobCheckpoint.etags` —
unchanged blobs (same ETag as last run) are skipped without a download.
Only new/modified blobs are fetched.
- Optional `prefix` scopes indexing to a virtual folder.
- `validate_connector_settings()` probes `get_container_properties()`
and maps `AuthenticationFailed / 403 / ContainerNotFound` to typed
connector exceptions.
- Slim-doc IDs are blob names so prune reconciles correctly.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `azure_blob` in `FileSource`
/ `DocumentSource` and export `AzureBlobConnector`.
- `rag/svr/sync_data_source.py`: new `AzureBlob(SyncBase)` class routed
through `load_from_checkpoint` (ETag fingerprint owns change-detection)
and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.AZURE_BLOB`, auth-mode selector (account key / connection
string / SAS token), all credential fields, prefix + batch-size,
`syncDeletedFiles` capability, default form values, tile entry with
icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips for all
9 new keys.
- `web/src/assets/svg/data-source/azure-blob.svg`: Azure-branded
stacked-cylinders icon.
**Verification**
- `npm run build` (vite + esbuild) passes (37 s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 07:06:01 -06:00
|
|
|
class AzureBlob(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.AZURE_BLOB
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
self.connector = AzureBlobConnector(
|
|
|
|
|
batch_size=batch_size,
|
|
|
|
|
prefix=self.conf.get("prefix") or None,
|
|
|
|
|
allow_images=bool(self.conf.get("allow_images", False)),
|
|
|
|
|
auth_mode=self.conf.get("auth_mode"),
|
|
|
|
|
)
|
|
|
|
|
credentials = self.conf.get("credentials") or {}
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
|
|
|
|
|
# Route through load_from_checkpoint so incremental runs are scoped
|
|
|
|
|
# by the poll time window; per-blob ETags ride along as document
|
|
|
|
|
# fingerprints (content_hash) so unchanged blobs aren't re-embedded.
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_ts = 0.0
|
|
|
|
|
else:
|
|
|
|
|
start_ts = task["poll_range_start"].timestamp()
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
document_batch_generator = self.connector.load_from_checkpoint(
|
|
|
|
|
start_ts, end_ts, checkpoint
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
container_hint = (
|
|
|
|
|
credentials.get("container_name")
|
|
|
|
|
or credentials.get("container_url", "").rstrip("/").rsplit("/", 1)[-1]
|
|
|
|
|
or "<container>"
|
|
|
|
|
)
|
|
|
|
|
self.log_connection(
|
|
|
|
|
"Azure Blob",
|
|
|
|
|
f"{container_hint}/{self.conf.get('prefix', '') or ''}",
|
|
|
|
|
task,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def wrapper():
|
|
|
|
|
for document_batch in document_batch_generator:
|
|
|
|
|
yield document_batch
|
|
|
|
|
|
|
|
|
|
return wrapper()
|
|
|
|
|
|
|
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
class Slack(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.SLACK
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
feat: implement Slack data source connector (#15188)
### What problem does this PR solve?
Closes #15187.
RAGFlow shipped a Slack connector
(`common/data_source/slack_connector.py`) but it was never usable:
`Slack._generate()` in the sync worker was a `pass` stub, the
connector's document-generating code was incompatible with the current
data model,
and Slack was commented out of the data-source settings UI. As a result,
teams had no way to index Slack channels/threads into a knowledge base.
This PR completes the connector end to end.
**Backend**
- `common/data_source/slack_connector.py`
- Rewrote `thread_to_doc` to produce a blob-based `Document`
(`extension`/`blob`/`size_bytes`). The previous implementation built the
doc with a `sections=[...]` argument and omitted the now-required
`blob`/`extension`/ `size_bytes` fields, so it raised a validation error
against the current `Document` model. Thread messages are now cleaned
and flattened into a single UTF-8 text blob.
- Added `load_from_state()` / `poll_source(start, end)` generators. The
connector's checkpoint interface is a no-op stub, so both full and
incremental syncs run through a single channel-iterating generator built
on the existing module helpers (`get_channels`, `filter_channels`,
`get_channel_messages`, `_process_message`), with per-channel thread
de-duplication.
- `rag/svr/sync_data_source.py`
- Implemented `Slack._generate()`. Credentials are loaded via
`StaticCredentialsProvider` (the connector requires `slack_bot_token`
and does not support `load_credentials`). Supports full reindex and
incremental polling from `poll_range_start`, plus the optional channel
filter. Modeled on the Confluence/Dropbox wrappers.
- `SlackConnector` was already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `SLACK` data-source enum and added its form fields (Slack
bot token + optional channel filter), default values, display metadata,
and a Slack icon.
- Added `slackDescription` / `slackBotTokenTip` / `slackChannelsTip`
strings to `en.ts` and `zh.ts`.
**Tests**
- `test/unit_test/data_source/test_slack_connector_unit.py`: unit tests
covering credential loading (`load_credentials` raises,
`set_credentials_provider` initializes clients, missing credentials
raises) and document generation (standalone message + flattened thread,
blob/extension/size_bytes/metadata, and the incremental poll time
window). All 5 pass; `ruff check` is clean.
Required Slack scopes: `channels:read`, `channels:history`,
`users:read`.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-28 01:46:07 -06:00
|
|
|
from common.data_source.config import DocumentSource
|
|
|
|
|
from common.data_source.interfaces import StaticCredentialsProvider
|
|
|
|
|
|
|
|
|
|
channels_conf = self.conf.get("channels")
|
|
|
|
|
if isinstance(channels_conf, str):
|
|
|
|
|
channels = [c.strip() for c in channels_conf.split(",") if c.strip()]
|
|
|
|
|
elif isinstance(channels_conf, list):
|
|
|
|
|
channels = [str(c).strip() for c in channels_conf if str(c).strip()]
|
|
|
|
|
else:
|
|
|
|
|
channels = None
|
|
|
|
|
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
self.connector = SlackConnector(
|
|
|
|
|
channels=channels or None,
|
|
|
|
|
channel_regex_enabled=bool(self.conf.get("channel_regex_enabled", False)),
|
|
|
|
|
batch_size=batch_size,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials") or {}
|
|
|
|
|
if not credentials.get("slack_bot_token"):
|
|
|
|
|
raise ValueError("Slack connector is missing the bot token credential.")
|
|
|
|
|
|
|
|
|
|
credentials_provider = StaticCredentialsProvider(
|
|
|
|
|
tenant_id=task["tenant_id"],
|
|
|
|
|
connector_name=DocumentSource.SLACK,
|
|
|
|
|
credential_json=credentials,
|
|
|
|
|
)
|
|
|
|
|
self.connector.set_credentials_provider(credentials_provider)
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
poll_start = task["poll_range_start"]
|
|
|
|
|
if task["reindex"] == "1" or not poll_start:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
_begin_info = "totally"
|
|
|
|
|
else:
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
document_generator = self.connector.poll_source(poll_start.timestamp(), end_time)
|
|
|
|
|
_begin_info = f"from {poll_start}"
|
|
|
|
|
|
|
|
|
|
self.log_connection(
|
|
|
|
|
"Slack",
|
|
|
|
|
f"channels({', '.join(channels) if channels else 'all'})",
|
|
|
|
|
task,
|
|
|
|
|
)
|
|
|
|
|
return document_generator
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class Teams(SyncBase):
|
2025-11-06 16:48:04 +08:00
|
|
|
SOURCE_NAME: str = FileSource.TEAMS
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
async def _generate(self, task: dict):
|
feat: implement Microsoft Teams data source connector (#15193)
### What problem does this PR solve?
Closes #15191.
RAGFlow shipped a Microsoft Teams connector stub
(`common/data_source/teams_connector.py`) whose document-loading methods
all returned `[]`, `Teams._generate()` was a `pass`, and Teams was
commented out of the data-source settings UI. As a result there was no
way to index Teams channel conversations into a knowledge base.
This PR implements the connector end to end on top of Microsoft Graph
(Office365-REST-Python-Client). It shares the MSAL client-credentials
auth shape with the SharePoint connector.
**Backend**
- `common/data_source/teams_connector.py`
- `load_credentials()` now builds the Graph client using an MSAL
client-credentials **token callback** — the form `GraphClient` actually
expects. (The previous stub passed a raw access-token string to
`GraphClient(...)`, which is not how that client is driven.) Token
acquisition is lazy, so credential loading performs no network call.
- `validate_connector_settings()` lists teams via Graph.
- `load_from_checkpoint()` is now a generator that pages teams →
channels → messages, flattens each top-level post together with its
replies into one blob-based `Document` (`extension` `.txt`/`.html`,
`blob`, `size_bytes`, `doc_updated_at`). Incremental syncs are bounded
by message `lastModifiedDateTime` (falling back to `createdDateTime`).
Per-message errors surface as `ConnectorFailure` instead of aborting the
run.
- `retrieve_all_slim_docs_perm_sync()` yields id-only `SlimDocument`
batches and the checkpoint helpers return proper `TeamsCheckpoint`s.
- ACL → `ExternalAccess` mapping is intentionally left best-effort
(`load_from_checkpoint_with_perm_sync` delegates to the standard load)
because the sync pipeline does not currently persist `ExternalAccess`.
- `rag/svr/sync_data_source.py`
- Implemented `Teams._generate()` using the existing
`CheckpointOutputWrapper` pattern (same shape as Confluence/Jira/Google
Drive), supporting full reindex and incremental polling from
`poll_range_start`.
- `TeamsConnector` is already exported from
`common/data_source/__init__.py`.
**Frontend (`web/`)**
- Enabled the `TEAMS` data-source enum and added its form fields
(`tenant_id`, `client_id`, `client_secret`), default values, display
metadata, and a Teams icon.
- Added `teamsDescription` / `teamsTenantIdTip` to `en.ts` and `zh.ts`.
**Tests**
- `test/unit_test/data_source/test_teams_connector_unit.py`: mock-based
unit tests covering credential loading (incomplete creds raise, happy
path sets the Graph client, fetch-without-creds raises), post/reply
flattening (incl. the HTML vs text extension), incremental
`lastModifiedDateTime` filtering, and slim-doc listing. All 6 pass;
`ruff check` is clean.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-28 03:10:38 -06:00
|
|
|
self.connector = TeamsConnector(
|
|
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials") or {}
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = 0.0
|
|
|
|
|
_begin_info = "totally"
|
|
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
|
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
|
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
|
|
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(
|
|
|
|
|
self.connector.load_from_checkpoint(start_time, end_time, checkpoint)
|
|
|
|
|
)
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
"Teams connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure),
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError("Too many iterations while loading Teams documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
|
|
|
|
self.log_connection("Microsoft Teams", "workspace", task)
|
|
|
|
|
return document_batches()
|
2025-11-03 19:59:18 +08:00
|
|
|
|
2025-11-05 15:43:15 +08:00
|
|
|
|
2025-11-26 07:14:42 +01:00
|
|
|
class WebDAV(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.WEBDAV
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
2026-04-30 11:26:27 +02:00
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
2025-11-26 07:14:42 +01:00
|
|
|
self.connector = WebDAVConnector(
|
|
|
|
|
base_url=self.conf["base_url"],
|
2026-04-30 11:26:27 +02:00
|
|
|
remote_path=self.conf.get("remote_path", "/"),
|
|
|
|
|
batch_size=batch_size,
|
2025-11-26 07:14:42 +01:00
|
|
|
)
|
2026-04-01 23:12:27 -07:00
|
|
|
self.connector.set_allow_images(self.conf.get("allow_images", False))
|
2025-11-26 07:14:42 +01:00
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
2025-12-29 12:01:18 +08:00
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
2025-11-26 07:14:42 +01:00
|
|
|
document_batch_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-26 07:14:42 +01:00
|
|
|
else:
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
2026-04-30 11:26:27 +02:00
|
|
|
document_batch_generator = self.connector.poll_source(
|
|
|
|
|
task["poll_range_start"].timestamp(),
|
|
|
|
|
end_ts,
|
|
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "from {}".format(task["poll_range_start"])
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("WebDAV", f"{self.conf['base_url']}(path: {self.conf.get('remote_path', '/')})", task)
|
2025-12-31 19:00:00 +08:00
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-31 19:00:00 +08:00
|
|
|
for document_batch in document_batch_generator:
|
|
|
|
|
yield document_batch
|
2026-01-04 11:19:48 +08:00
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return wrapper()
|
2025-12-29 12:01:18 +08:00
|
|
|
|
|
|
|
|
|
2025-11-21 12:58:49 +01:00
|
|
|
class Moodle(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.MOODLE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = MoodleConnector(
|
|
|
|
|
moodle_url=self.conf["moodle_url"],
|
|
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
)
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-11-21 12:58:49 +01:00
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
|
|
|
|
|
|
|
|
|
# Determine the time range for synchronization based on reindex or poll_range_start
|
2025-12-30 15:09:52 +08:00
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or poll_start is None:
|
2025-11-21 12:58:49 +01:00
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-11-21 12:58:49 +01:00
|
|
|
else:
|
feat(moodle): support deleted-file sync (#14548)
Fixes #14551
### What problem does this PR solve?
The Moodle connector did not let the sync runner clean up indexed
documents that were deleted from the source. Other connectors such as
dropbox, seafile, webdav, and rss already do this through a slim
snapshot pass. This PR adds the same support for Moodle.
When `sync_deleted_files` is on, the runner now asks the Moodle
connector for a lightweight list of every module id that could be
indexed. The runner then compares this list with the index and removes
any indexed document whose id is not in the list.
The slim pass does not download files. It only goes through courses and
modules and yields ids. The id format matches the ids that the loader
produces, so the match is exact.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
### Notes
- `MoodleConnector` now also implements `SlimConnectorWithPermSync`.
- New `retrieve_all_slim_docs_perm_sync` yields slim docs with the same
ids the loader uses (`moodle_resource_<id>`, `moodle_forum_<id>`,
`moodle_page_<id>`, `moodle_book_<id>`, `moodle_assign_<id>`,
`moodle_quiz_<id>`).
- The `Moodle` sync class now returns `(document_generator, file_list)`
so the runner can do the cleanup. If the slim snapshot fails,
`file_list` is set back to `None` and the run continues without cleanup.
- The web data source map exposes `syncDeletedFiles` for Moodle so the
option shows up in the UI.
### How was this tested?
- `ruff check` passes on the changed Python files.
- Manual review of the produced slim ids against the ids the loader
builds in `_process_resource`, `_process_forum`, `_process_page`,
`_process_book`, and `_process_activity`.
- Behavior parity with the merged dropbox (#14476), seafile (#14499),
webdav (#14491), and rss (#14493) PRs.
2026-05-07 11:44:46 +02:00
|
|
|
# Freeze the poll end time BEFORE the slim snapshot so that the
|
|
|
|
|
# snapshot and the poll cover the same point in time. Without
|
|
|
|
|
# this, a module created between the snapshot and the poll
|
|
|
|
|
# could be polled as new and at the same time be missing from
|
|
|
|
|
# the slim list, which would mark it as stale and delete it.
|
|
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
2025-12-30 15:09:52 +08:00
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
feat(moodle): support deleted-file sync (#14548)
Fixes #14551
### What problem does this PR solve?
The Moodle connector did not let the sync runner clean up indexed
documents that were deleted from the source. Other connectors such as
dropbox, seafile, webdav, and rss already do this through a slim
snapshot pass. This PR adds the same support for Moodle.
When `sync_deleted_files` is on, the runner now asks the Moodle
connector for a lightweight list of every module id that could be
indexed. The runner then compares this list with the index and removes
any indexed document whose id is not in the list.
The slim pass does not download files. It only goes through courses and
modules and yields ids. The id format matches the ids that the loader
produces, so the match is exact.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
### Notes
- `MoodleConnector` now also implements `SlimConnectorWithPermSync`.
- New `retrieve_all_slim_docs_perm_sync` yields slim docs with the same
ids the loader uses (`moodle_resource_<id>`, `moodle_forum_<id>`,
`moodle_page_<id>`, `moodle_book_<id>`, `moodle_assign_<id>`,
`moodle_quiz_<id>`).
- The `Moodle` sync class now returns `(document_generator, file_list)`
so the runner can do the cleanup. If the slim snapshot fails,
`file_list` is set back to `None` and the run continues without cleanup.
- The web data source map exposes `syncDeletedFiles` for Moodle so the
option shows up in the UI.
### How was this tested?
- `ruff check` passes on the changed Python files.
- Manual review of the produced slim ids against the ids the loader
builds in `_process_resource`, `_process_forum`, `_process_page`,
`_process_book`, and `_process_activity`.
- Behavior parity with the merged dropbox (#14476), seafile (#14499),
webdav (#14491), and rss (#14493) PRs.
2026-05-07 11:44:46 +02:00
|
|
|
end_ts,
|
2025-12-30 15:09:52 +08:00
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2025-11-21 12:58:49 +01:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Moodle", self.conf["moodle_url"], task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-11-21 12:58:49 +01:00
|
|
|
|
|
|
|
|
|
2025-12-12 10:23:40 +08:00
|
|
|
class BOX(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.BOX
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = BoxConnector(
|
|
|
|
|
folder_id=self.conf.get("folder_id", "0"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credential = json.loads(self.conf['credentials']['box_tokens'])
|
|
|
|
|
|
|
|
|
|
auth = BoxOAuth(
|
|
|
|
|
OAuthConfig(
|
|
|
|
|
client_id=credential['client_id'],
|
|
|
|
|
client_secret=credential['client_secret'],
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
token = AccessToken(
|
|
|
|
|
access_token=credential['access_token'],
|
|
|
|
|
refresh_token=credential['refresh_token'],
|
2025-12-29 12:01:18 +08:00
|
|
|
)
|
2025-12-12 10:23:40 +08:00
|
|
|
auth.token_storage.store(token)
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(auth)
|
2025-12-30 15:09:52 +08:00
|
|
|
poll_start = task["poll_range_start"]
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or poll_start is None:
|
2025-12-12 10:23:40 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-12 10:23:40 +08:00
|
|
|
else:
|
2025-12-30 15:09:52 +08:00
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp(),
|
|
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Box", f"folder_id({self.conf['folder_id']})", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-12-25 17:50:41 +08:00
|
|
|
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-25 17:50:41 +08:00
|
|
|
class Airtable(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.AIRTABLE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
"""
|
|
|
|
|
Sync files from Airtable attachments.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.connector = AirtableConnector(
|
|
|
|
|
base_id=self.conf.get("base_id"),
|
|
|
|
|
table_name_or_id=self.conf.get("table_name_or_id"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials", {})
|
|
|
|
|
if "airtable_access_token" not in credentials:
|
|
|
|
|
raise ValueError("Missing airtable_access_token in credentials")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{"airtable_access_token": credentials["airtable_access_token"]}
|
|
|
|
|
)
|
|
|
|
|
|
2025-12-30 15:09:52 +08:00
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
|
|
|
|
|
if task.get("reindex") == "1" or poll_start is None:
|
2025-12-25 17:50:41 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-25 17:50:41 +08:00
|
|
|
else:
|
2025-12-30 15:09:52 +08:00
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp(),
|
|
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2025-12-25 17:50:41 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"Airtable",
|
|
|
|
|
f"base_id({self.conf.get('base_id')}), table({self.conf.get('table_name_or_id')})",
|
|
|
|
|
task,
|
2025-12-25 17:50:41 +08:00
|
|
|
)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-12-25 17:50:41 +08:00
|
|
|
|
2025-12-29 13:28:37 +08:00
|
|
|
class Asana(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.ASANA
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = AsanaConnector(
|
|
|
|
|
self.conf.get("asana_workspace_id"),
|
|
|
|
|
self.conf.get("asana_project_ids"),
|
|
|
|
|
self.conf.get("asana_team_id"),
|
|
|
|
|
)
|
|
|
|
|
credentials = self.conf.get("credentials", {})
|
|
|
|
|
if "asana_api_token_secret" not in credentials:
|
|
|
|
|
raise ValueError("Missing asana_api_token_secret in credentials")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{"asana_api_token_secret": credentials["asana_api_token_secret"]}
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-30 03:41:36 -03:00
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
|
|
|
|
|
if task.get("reindex") == "1" or not poll_start:
|
2025-12-29 13:28:37 +08:00
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-29 13:28:37 +08:00
|
|
|
else:
|
2026-04-30 03:41:36 -03:00
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
|
|
|
|
end_time,
|
|
|
|
|
)
|
|
|
|
|
_begin_info = f"from {poll_start}"
|
2025-12-29 13:28:37 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"Asana",
|
|
|
|
|
f"workspace_id({self.conf.get('asana_workspace_id')}), project_ids({self.conf.get('asana_project_ids')}), team_id({self.conf.get('asana_team_id')})",
|
|
|
|
|
task,
|
2025-12-29 13:28:37 +08:00
|
|
|
)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-12-29 13:28:37 +08:00
|
|
|
|
2025-12-30 15:09:52 +08:00
|
|
|
class Github(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.GITHUB
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
"""
|
|
|
|
|
Sync files from Github repositories.
|
|
|
|
|
"""
|
|
|
|
|
from common.data_source.connector_runner import ConnectorRunner
|
|
|
|
|
|
|
|
|
|
self.connector = GithubConnector(
|
|
|
|
|
repo_owner=self.conf.get("repository_owner"),
|
|
|
|
|
repositories=self.conf.get("repository_name"),
|
2026-05-15 13:26:31 +08:00
|
|
|
include_prs=self.conf.get("include_pull_requests", True),
|
|
|
|
|
include_issues=self.conf.get("include_issues", True),
|
2025-12-30 15:09:52 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials", {})
|
|
|
|
|
if "github_access_token" not in credentials:
|
|
|
|
|
raise ValueError("Missing github_access_token in credentials")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{"github_access_token": credentials["github_access_token"]}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if task.get("reindex") == "1" or not task.get("poll_range_start"):
|
|
|
|
|
start_time = datetime.fromtimestamp(0, tz=timezone.utc)
|
|
|
|
|
else:
|
|
|
|
|
start_time = task.get("poll_range_start")
|
|
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc)
|
|
|
|
|
|
|
|
|
|
runner = ConnectorRunner(
|
|
|
|
|
connector=self.connector,
|
|
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
include_permissions=False,
|
|
|
|
|
time_range=(start_time, end_time)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
for doc_batch, failure, next_checkpoint in runner.run(checkpoint):
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
"Github connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure),
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
if doc_batch is not None:
|
|
|
|
|
yield doc_batch
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-30 15:09:52 +08:00
|
|
|
for batch in document_batches():
|
|
|
|
|
yield batch
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"Github",
|
|
|
|
|
f"org_name({self.conf.get('repository_owner')}), repo_names({self.conf.get('repository_name')})",
|
|
|
|
|
task,
|
2025-12-30 15:09:52 +08:00
|
|
|
)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return wrapper()
|
2025-12-31 14:40:49 +08:00
|
|
|
|
2025-12-30 17:09:13 +08:00
|
|
|
class IMAP(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.IMAP
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task):
|
|
|
|
|
from common.data_source.config import DocumentSource
|
|
|
|
|
from common.data_source.interfaces import StaticCredentialsProvider
|
|
|
|
|
self.connector = ImapConnector(
|
|
|
|
|
host=self.conf.get("imap_host"),
|
|
|
|
|
port=self.conf.get("imap_port"),
|
|
|
|
|
mailboxes=self.conf.get("imap_mailbox"),
|
|
|
|
|
)
|
|
|
|
|
credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"], connector_name=DocumentSource.IMAP, credential_json=self.conf["credentials"])
|
|
|
|
|
self.connector.set_credentials_provider(credentials_provider)
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
2026-05-06 14:06:46 +08:00
|
|
|
try:
|
|
|
|
|
poll_range_days = float(self.conf.get("poll_range", 30))
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
poll_range_days = 30
|
|
|
|
|
default_initial_sync_start = end_time - poll_range_days * 24 * 60 * 60
|
2025-12-30 17:09:13 +08:00
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
2026-05-06 14:06:46 +08:00
|
|
|
start_time = default_initial_sync_start
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-30 17:09:13 +08:00
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
2026-05-06 14:06:46 +08:00
|
|
|
|
|
|
|
|
if task["reindex"] == "1":
|
|
|
|
|
initial_sync_start = default_initial_sync_start
|
|
|
|
|
should_persist_initial_start = True
|
|
|
|
|
else:
|
|
|
|
|
initial_sync_start = self.conf.get("imap_initial_sync_start")
|
|
|
|
|
should_persist_initial_start = initial_sync_start is None
|
|
|
|
|
try:
|
|
|
|
|
initial_sync_start = float(initial_sync_start)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
initial_sync_start = (
|
|
|
|
|
0 if task["poll_range_start"] else default_initial_sync_start
|
|
|
|
|
)
|
|
|
|
|
should_persist_initial_start = True
|
|
|
|
|
|
|
|
|
|
if should_persist_initial_start:
|
|
|
|
|
updated_conf = copy.deepcopy(self.conf)
|
|
|
|
|
updated_conf["imap_initial_sync_start"] = initial_sync_start
|
|
|
|
|
try:
|
|
|
|
|
ConnectorService.update_by_id(
|
|
|
|
|
task["connector_id"], {"config": updated_conf}
|
|
|
|
|
)
|
|
|
|
|
self.conf = updated_conf
|
|
|
|
|
except Exception:
|
|
|
|
|
logging.exception(
|
|
|
|
|
"Failed to persist IMAP initial sync start for connector %s",
|
|
|
|
|
task["connector_id"],
|
|
|
|
|
)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
self._prune_snapshot_kwargs = {
|
|
|
|
|
"start": initial_sync_start,
|
|
|
|
|
"end": end_time,
|
|
|
|
|
}
|
2026-05-06 14:06:46 +08:00
|
|
|
|
2025-12-30 17:09:13 +08:00
|
|
|
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(self.connector.load_from_checkpoint(start_time, end_time, checkpoint))
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning("IMAP connector failure: %s", getattr(failure, "failure_message", failure))
|
|
|
|
|
continue
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError("Too many iterations while loading IMAP documents.")
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-31 14:40:49 +08:00
|
|
|
for batch in document_batches():
|
|
|
|
|
yield batch
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"IMAP",
|
|
|
|
|
f"host({self.conf['imap_host']}) port({self.conf['imap_port']}) user({self.conf['credentials']['imap_username']}) folder({self.conf['imap_mailbox']})",
|
|
|
|
|
task,
|
2025-12-30 17:09:13 +08:00
|
|
|
)
|
2026-05-19 10:07:11 +08:00
|
|
|
return wrapper()
|
|
|
|
|
|
|
|
|
|
def _get_prune_snapshot_kwargs(self, task: dict) -> dict[str, Any]:
|
|
|
|
|
return getattr(self, "_prune_snapshot_kwargs", {})
|
2025-12-31 14:40:49 +08:00
|
|
|
|
|
|
|
|
class Zendesk(SyncBase):
|
|
|
|
|
|
|
|
|
|
SOURCE_NAME: str = FileSource.ZENDESK
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = ZendeskConnector(content_type=self.conf.get("zendesk_content_type"))
|
|
|
|
|
self.connector.load_credentials(self.conf["credentials"])
|
|
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc).timestamp()
|
|
|
|
|
if task["reindex"] == "1" or not task.get("poll_range_start"):
|
|
|
|
|
start_time = 0
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-31 14:40:49 +08:00
|
|
|
else:
|
|
|
|
|
start_time = task["poll_range_start"].timestamp()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
2025-12-31 14:40:49 +08:00
|
|
|
|
|
|
|
|
raw_batch_size = (
|
|
|
|
|
self.conf.get("sync_batch_size")
|
|
|
|
|
or self.conf.get("batch_size")
|
|
|
|
|
or INDEX_BATCH_SIZE
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
pending_docs = []
|
|
|
|
|
iterations = 0
|
|
|
|
|
iteration_limit = 100_000
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
wrapper = CheckpointOutputWrapper()
|
|
|
|
|
doc_generator = wrapper(
|
|
|
|
|
self.connector.load_from_checkpoint(
|
|
|
|
|
start_time, end_time, checkpoint
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for document, failure, next_checkpoint in doc_generator:
|
|
|
|
|
if failure is not None:
|
|
|
|
|
logging.warning(
|
|
|
|
|
"Zendesk connector failure: %s",
|
|
|
|
|
getattr(failure, "failure_message", failure),
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if document is not None:
|
|
|
|
|
pending_docs.append(document)
|
|
|
|
|
if len(pending_docs) >= batch_size:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
pending_docs = []
|
|
|
|
|
|
|
|
|
|
if next_checkpoint is not None:
|
|
|
|
|
checkpoint = next_checkpoint
|
|
|
|
|
|
|
|
|
|
iterations += 1
|
|
|
|
|
if iterations > iteration_limit:
|
|
|
|
|
raise RuntimeError(
|
|
|
|
|
"Too many iterations while loading Zendesk documents."
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if pending_docs:
|
|
|
|
|
yield pending_docs
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-31 14:40:49 +08:00
|
|
|
for batch in document_batches():
|
|
|
|
|
yield batch
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Zendesk", f"subdomain({self.conf['credentials'].get('zendesk_subdomain')})", task)
|
2026-01-04 11:19:48 +08:00
|
|
|
return wrapper()
|
2025-12-29 12:01:18 +08:00
|
|
|
|
2025-12-29 17:05:20 +08:00
|
|
|
|
|
|
|
|
class Gitlab(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.GITLAB
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
"""
|
|
|
|
|
Sync files from GitLab attachments.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.connector = GitlabConnector(
|
|
|
|
|
project_owner= self.conf.get("project_owner"),
|
|
|
|
|
project_name= self.conf.get("project_name"),
|
|
|
|
|
include_mrs = self.conf.get("include_mrs", False),
|
|
|
|
|
include_issues = self.conf.get("include_issues", False),
|
|
|
|
|
include_code_files= self.conf.get("include_code_files", False),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{
|
|
|
|
|
"gitlab_access_token": self.conf.get("credentials", {}).get("gitlab_access_token"),
|
2026-01-19 11:01:34 +08:00
|
|
|
"gitlab_url": self.conf.get("gitlab_url"),
|
2025-12-29 17:05:20 +08:00
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-29 17:05:20 +08:00
|
|
|
else:
|
|
|
|
|
poll_start = task["poll_range_start"]
|
|
|
|
|
if poll_start is None:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-29 17:05:20 +08:00
|
|
|
else:
|
|
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp()
|
|
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "from {}".format(poll_start)
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Gitlab", f"({self.conf['project_name']})", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2025-12-29 17:05:20 +08:00
|
|
|
|
2025-12-31 17:18:30 +08:00
|
|
|
|
|
|
|
|
class Bitbucket(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.BITBUCKET
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = BitbucketConnector(
|
|
|
|
|
workspace=self.conf.get("workspace"),
|
|
|
|
|
repositories=self.conf.get("repository_slugs"),
|
|
|
|
|
projects=self.conf.get("projects"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{
|
|
|
|
|
"bitbucket_email": self.conf["credentials"].get("bitbucket_account_email"),
|
|
|
|
|
"bitbucket_api_token": self.conf["credentials"].get("bitbucket_api_token"),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
start_time = datetime.fromtimestamp(0, tz=timezone.utc)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2025-12-31 17:18:30 +08:00
|
|
|
else:
|
|
|
|
|
start_time = task.get("poll_range_start")
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {start_time}"
|
2025-12-31 17:18:30 +08:00
|
|
|
|
|
|
|
|
end_time = datetime.now(timezone.utc)
|
|
|
|
|
|
|
|
|
|
def document_batches():
|
|
|
|
|
checkpoint = self.connector.build_dummy_checkpoint()
|
|
|
|
|
|
|
|
|
|
while checkpoint.has_more:
|
|
|
|
|
gen = self.connector.load_from_checkpoint(
|
|
|
|
|
start=start_time.timestamp(),
|
|
|
|
|
end=end_time.timestamp(),
|
|
|
|
|
checkpoint=checkpoint)
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
item = next(gen)
|
|
|
|
|
if isinstance(item, ConnectorFailure):
|
|
|
|
|
logging.exception(
|
|
|
|
|
"Bitbucket connector failure: %s",
|
|
|
|
|
item.failure_message)
|
|
|
|
|
break
|
|
|
|
|
yield [item]
|
|
|
|
|
except StopIteration as e:
|
|
|
|
|
checkpoint = e.value
|
|
|
|
|
break
|
|
|
|
|
|
2026-01-04 11:19:48 +08:00
|
|
|
def wrapper():
|
2025-12-31 17:18:30 +08:00
|
|
|
for batch in document_batches():
|
|
|
|
|
yield batch
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("Bitbucket", f"workspace({self.conf.get('workspace')})", task)
|
2026-01-04 11:19:48 +08:00
|
|
|
return wrapper()
|
2025-12-31 17:18:30 +08:00
|
|
|
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
|
2026-02-03 06:42:05 +01:00
|
|
|
class SeaFile(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.SEAFILE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
conf = self.conf
|
2026-04-30 06:05:12 +02:00
|
|
|
raw_batch_size = conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
2026-02-03 06:42:05 +01:00
|
|
|
self.connector = SeaFileConnector(
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
seafile_url=conf["seafile_url"],
|
2026-04-30 06:05:12 +02:00
|
|
|
batch_size=batch_size,
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
include_shared=conf.get("include_shared", True),
|
|
|
|
|
sync_scope=conf.get("sync_scope", SeafileSyncScope.ACCOUNT),
|
|
|
|
|
repo_id=conf.get("repo_id") or None,
|
|
|
|
|
sync_path=conf.get("sync_path") or None,
|
2026-02-03 06:42:05 +01:00
|
|
|
)
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
self.connector.load_credentials(conf["credentials"])
|
2026-02-03 06:42:05 +01:00
|
|
|
|
|
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
if task["reindex"] == "1" or poll_start is None:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2026-02-03 06:42:05 +01:00
|
|
|
else:
|
2026-04-30 06:05:12 +02:00
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
2026-02-03 06:42:05 +01:00
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
2026-04-30 06:05:12 +02:00
|
|
|
end_ts,
|
2026-02-03 06:42:05 +01:00
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2026-02-03 06:42:05 +01:00
|
|
|
|
feat(seafile): add library and directory sync scope support (#13153)
### What problem does this PR solve?
The SeaFile connector currently synchronises the entire account — every
library
visible to the authenticated user. This is impractical for users who
only need
a subset of their data indexed, especially on large SeaFile instances
with many
shared libraries.
This PR introduces granular sync scope support, allowing users to choose
between
syncing their entire account, a single library, or a specific directory
within a
library. It also adds support for SeaFile library-scoped API tokens
(`/api/v2.1/via-repo-token/` endpoints), enabling tighter access control
without
exposing account-level credentials.
### Type of change
- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
### Test
```
from seafile_connector import SeaFileConnector
import logging
import os
logging.basicConfig(level=logging.DEBUG)
URL = os.environ.get("SEAFILE_URL", "https://seafile.example.com")
TOKEN = os.environ.get("SEAFILE_TOKEN", "")
REPO_ID = os.environ.get("SEAFILE_REPO_ID", "")
SYNC_PATH = os.environ.get("SEAFILE_SYNC_PATH", "/Documents")
REPO_TOKEN = os.environ.get("SEAFILE_REPO_TOKEN", "")
def _test_scope(scope, repo_id=None, sync_path=None):
print(f"\n{'='*50}")
print(f"Testing scope: {scope}")
print(f"{'='*50}")
creds = {"seafile_token": TOKEN} if TOKEN else {}
if REPO_TOKEN and scope in ("library", "directory"):
creds["repo_token"] = REPO_TOKEN
connector = SeaFileConnector(
seafile_url=URL,
batch_size=5,
sync_scope=scope,
include_shared = False,
repo_id=repo_id,
sync_path=sync_path,
)
connector.load_credentials(creds)
connector.validate_connector_settings()
count = 0
for batch in connector.load_from_state():
for doc in batch:
count += 1
print(f" [{count}] {doc.semantic_identifier} "
f"({doc.size_bytes} bytes, {doc.extension})")
print(f"\n-> {scope} scope: {count} document(s) found.\n")
# 1. Account scope
if TOKEN:
_test_scope("account")
else:
print("\nSkipping account scope (set SEAFILE_TOKEN)")
# 2. Library scope
if REPO_ID and (TOKEN or REPO_TOKEN):
_test_scope("library", repo_id=REPO_ID)
else:
print("\nSkipping library scope (set SEAFILE_REPO_ID + token)")
# 3. Directory scope
if REPO_ID and SYNC_PATH and (TOKEN or REPO_TOKEN):
_test_scope("directory", repo_id=REPO_ID, sync_path=SYNC_PATH)
else:
print("\nSkipping directory scope (set SEAFILE_REPO_ID + SEAFILE_SYNC_PATH + token)")
```
2026-02-28 03:24:28 +01:00
|
|
|
scope = conf.get("sync_scope", "account")
|
|
|
|
|
extra = ""
|
|
|
|
|
if scope in ("library", "directory"):
|
|
|
|
|
extra = f" repo_id={conf.get('repo_id')}"
|
|
|
|
|
if scope == "directory":
|
|
|
|
|
extra += f" path={conf.get('sync_path')}"
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection("SeaFile", f"{conf['seafile_url']} (scope={scope}{extra})", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2026-02-03 06:42:05 +01:00
|
|
|
|
2026-02-03 23:14:32 -03:00
|
|
|
|
2026-03-06 21:13:23 +08:00
|
|
|
class DingTalkAITable(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.DINGTALK_AI_TABLE
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
"""
|
|
|
|
|
Sync records from DingTalk AI Table (Notable).
|
|
|
|
|
"""
|
2026-05-06 08:06:23 +02:00
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
2026-03-06 21:13:23 +08:00
|
|
|
self.connector = DingTalkAITableConnector(
|
|
|
|
|
table_id=self.conf.get("table_id"),
|
|
|
|
|
operator_id=self.conf.get("operator_id"),
|
2026-05-06 08:06:23 +02:00
|
|
|
batch_size=batch_size,
|
2026-03-06 21:13:23 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials", {})
|
|
|
|
|
if "access_token" not in credentials:
|
|
|
|
|
raise ValueError("Missing access_token in credentials")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(
|
|
|
|
|
{"access_token": credentials["access_token"]}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
|
|
|
|
|
if task.get("reindex") == "1" or poll_start is None:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2026-03-06 21:13:23 +08:00
|
|
|
else:
|
2026-05-06 08:06:23 +02:00
|
|
|
end_ts = datetime.now(timezone.utc).timestamp()
|
2026-03-06 21:13:23 +08:00
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
2026-05-06 08:06:23 +02:00
|
|
|
end_ts,
|
2026-03-06 21:13:23 +08:00
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2026-03-06 21:13:23 +08:00
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
self.log_connection(
|
|
|
|
|
"DingTalk AI Table",
|
|
|
|
|
f"table_id({self.conf.get('table_id')}), operator_id({self.conf.get('operator_id')})",
|
|
|
|
|
task,
|
2026-03-06 21:13:23 +08:00
|
|
|
)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2026-03-06 21:13:23 +08:00
|
|
|
|
|
|
|
|
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
class _CursorPersistingSyncBase(SyncBase):
|
|
|
|
|
"""Base for connectors that persist a sync cursor only after a fully successful sync.
|
|
|
|
|
|
|
|
|
|
``_run_sync_task_logic`` calls ``self.connector.persist_sync_state()`` for any
|
|
|
|
|
subclass of this base when no document batch failed. Sources whose connector
|
|
|
|
|
exposes ``prepare_sync_state``/``persist_sync_state`` (RDBMS, BigQuery) extend
|
|
|
|
|
this instead of being matched by an ``isinstance(self, _RDBMSBase)`` check.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _RDBMSBase(_CursorPersistingSyncBase):
|
2026-05-07 13:31:05 +08:00
|
|
|
DB_TYPE: str = ""
|
|
|
|
|
LOG_NAME: str = ""
|
|
|
|
|
DEFAULT_PORT: int = 0
|
2026-02-03 23:14:32 -03:00
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
self.connector = RDBMSConnector(
|
2026-05-07 13:31:05 +08:00
|
|
|
db_type=self.DB_TYPE,
|
2026-02-03 23:14:32 -03:00
|
|
|
host=self.conf.get("host", "localhost"),
|
2026-05-07 13:31:05 +08:00
|
|
|
port=int(self.conf.get("port", self.DEFAULT_PORT)),
|
2026-02-03 23:14:32 -03:00
|
|
|
database=self.conf.get("database", ""),
|
|
|
|
|
query=self.conf.get("query", ""),
|
|
|
|
|
content_columns=self.conf.get("content_columns", ""),
|
Fix(connector): expose id_column, timestamp_column, metadata_columns for MySQL/PostgreSQL incremental sync (#13849)
### What problem does this PR solve?
The MySQL and PostgreSQL sync classes in `sync_data_source.py` were not
passing `id_column`, `timestamp_column`, and `metadata_columns` to
`RDBMSConnector`,
making incremental sync and document update impossible even when
configured.
- Without `id_column`: updated records generate new documents instead of
overwriting existing ones (doc ID is derived from content hash, so any
change produces a new ID).
- Without `timestamp_column`: `poll_source` always falls back to full
sync,
ignoring the configured time range.
- The three fields existed in the frontend default values but had no
form
inputs, so users had no way to fill them in.
### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
### Changes
- **Backend** (`rag/svr/sync_data_source.py`): pass `id_column`,
`timestamp_column`, and `metadata_columns` from `self.conf` to
`RDBMSConnector` for both `MySQL` and `PostgreSQL` sync classes.
- **Frontend**
(`web/src/pages/user-setting/data-source/constant/index.tsx`):
add `ID Column`, `Timestamp Column`, and `Metadata Columns` form fields
to MySQL and PostgreSQL data source configuration UI with tooltips.
Signed-off-by: lixintao <lixintao@uniontech.com>
Co-authored-by: lixintao <lixintao@uniontech.com>
2026-04-07 10:24:30 +08:00
|
|
|
metadata_columns=self.conf.get("metadata_columns", ""),
|
|
|
|
|
id_column=self.conf.get("id_column") or None,
|
|
|
|
|
timestamp_column=self.conf.get("timestamp_column") or None,
|
2026-02-03 23:14:32 -03:00
|
|
|
batch_size=self.conf.get("batch_size", INDEX_BATCH_SIZE),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials")
|
|
|
|
|
if not credentials:
|
2026-05-07 13:31:05 +08:00
|
|
|
raise ValueError(f"{self.DB_TYPE} connector is missing credentials.")
|
2026-02-03 23:14:32 -03:00
|
|
|
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
self.connector.validate_connector_settings()
|
2026-05-07 13:31:05 +08:00
|
|
|
self.connector.prepare_sync_state(task["connector_id"], self.conf)
|
|
|
|
|
|
2026-02-03 23:14:32 -03:00
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = "totally"
|
2026-05-07 13:31:05 +08:00
|
|
|
elif not self.connector.timestamp_column:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
_begin_info = f"from {task['poll_range_start']}"
|
2026-02-03 23:14:32 -03:00
|
|
|
else:
|
|
|
|
|
poll_start = task["poll_range_start"]
|
2026-05-07 13:31:05 +08:00
|
|
|
start_cursor_value = self.connector.get_saved_sync_cursor_value()
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None
|
2026-05-07 13:31:05 +08:00
|
|
|
document_generator = self.connector.load_from_cursor_range(
|
|
|
|
|
start_cursor_value,
|
|
|
|
|
self.connector._pending_sync_cursor_value,
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
start_cursor_id,
|
2026-02-03 23:14:32 -03:00
|
|
|
)
|
2026-04-09 17:44:13 +08:00
|
|
|
_begin_info = f"from {poll_start}"
|
2026-02-03 23:14:32 -03:00
|
|
|
|
2026-05-07 13:31:05 +08:00
|
|
|
self.log_connection(self.LOG_NAME, f"{self.conf.get('host')}:{self.conf.get('database')}", task)
|
2026-05-19 10:07:11 +08:00
|
|
|
return document_generator
|
2026-02-03 23:14:32 -03:00
|
|
|
|
|
|
|
|
|
2026-05-07 13:31:05 +08:00
|
|
|
class MySQL(_RDBMSBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.MYSQL
|
|
|
|
|
DB_TYPE: str = "mysql"
|
|
|
|
|
LOG_NAME: str = "MySQL"
|
|
|
|
|
DEFAULT_PORT: int = 3306
|
2026-02-03 23:14:32 -03:00
|
|
|
|
|
|
|
|
|
2026-05-07 13:31:05 +08:00
|
|
|
class PostgreSQL(_RDBMSBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.POSTGRESQL
|
|
|
|
|
DB_TYPE: str = "postgresql"
|
|
|
|
|
LOG_NAME: str = "PostgreSQL"
|
|
|
|
|
DEFAULT_PORT: int = 5432
|
2026-02-03 23:14:32 -03:00
|
|
|
|
|
|
|
|
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
class BigQuery(_CursorPersistingSyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.BIGQUERY
|
|
|
|
|
|
|
|
|
|
def _get_source_prefix(self):
|
|
|
|
|
return "[BigQuery]"
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
|
|
|
|
try:
|
|
|
|
|
batch_size = int(raw_batch_size)
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
if batch_size <= 0:
|
|
|
|
|
batch_size = INDEX_BATCH_SIZE
|
|
|
|
|
|
|
|
|
|
connector_kwargs = {
|
|
|
|
|
"project_id": self.conf.get("project_id", ""),
|
|
|
|
|
"dataset_id": self.conf.get("dataset_id") or None,
|
|
|
|
|
"table_id": self.conf.get("table_id") or None,
|
|
|
|
|
"location": self.conf.get("location") or None,
|
|
|
|
|
"query": self.conf.get("query", ""),
|
|
|
|
|
"content_columns": self.conf.get("content_columns", ""),
|
|
|
|
|
"metadata_columns": self.conf.get("metadata_columns", ""),
|
|
|
|
|
"id_column": self.conf.get("id_column") or None,
|
|
|
|
|
"timestamp_column": self.conf.get("timestamp_column") or None,
|
|
|
|
|
"batch_size": batch_size,
|
|
|
|
|
"use_query_cache": self.conf.get("use_query_cache", True),
|
|
|
|
|
}
|
|
|
|
|
if self.conf.get("page_size") is not None:
|
|
|
|
|
connector_kwargs["page_size"] = int(self.conf["page_size"])
|
|
|
|
|
if self.conf.get("maximum_bytes_billed") is not None:
|
|
|
|
|
connector_kwargs["maximum_bytes_billed"] = int(self.conf["maximum_bytes_billed"])
|
|
|
|
|
if self.conf.get("job_timeout_ms") is not None:
|
|
|
|
|
connector_kwargs["job_timeout_ms"] = int(self.conf["job_timeout_ms"])
|
|
|
|
|
|
|
|
|
|
self.connector = BigQueryConnector(**connector_kwargs)
|
|
|
|
|
|
|
|
|
|
credentials = self.conf.get("credentials")
|
|
|
|
|
if not credentials:
|
|
|
|
|
raise ValueError("BigQuery connector is missing credentials.")
|
|
|
|
|
|
|
|
|
|
self.connector.load_credentials(credentials)
|
|
|
|
|
self.connector.validate_connector_settings()
|
|
|
|
|
self.connector.prepare_sync_state(task["connector_id"], self.conf)
|
|
|
|
|
|
|
|
|
|
if task["reindex"] == "1" or not task["poll_range_start"]:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
elif not self.connector.timestamp_column:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
else:
|
|
|
|
|
start_cursor_value = self.connector.get_saved_sync_cursor_value()
|
|
|
|
|
start_cursor_id = self.connector.get_saved_sync_cursor_id() if hasattr(self.connector, "get_saved_sync_cursor_id") else None
|
|
|
|
|
document_generator = self.connector.load_from_cursor_range(
|
|
|
|
|
start_cursor_value,
|
|
|
|
|
self.connector._pending_sync_cursor_value,
|
|
|
|
|
start_cursor_id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
target = (
|
|
|
|
|
f"{self.conf.get('dataset_id')}.{self.conf.get('table_id')}"
|
|
|
|
|
if not self.conf.get("query")
|
|
|
|
|
else "custom query"
|
|
|
|
|
)
|
|
|
|
|
self.log_connection("BigQuery", f"{self.conf.get('project_id')}:{target}", task)
|
|
|
|
|
return document_generator
|
|
|
|
|
|
|
|
|
|
|
Feature/generic api connector (#13545)
# feat: Add Generic REST API Connector
## What problem does this PR solve?
RAGFlow supports many specific data source connectors (MySQL, Slack,
Google Drive, etc.), but there was no way to connect an arbitrary REST
API as a data source. Users with custom or third-party APIs had to write
a new connector class for each one.
This PR adds a **generic, configuration-driven REST API connector** that
lets users connect any REST API as a data source entirely through the UI
— no code changes needed per API.
---
## Features
### Core Connector (`common/data_source/rest_api_connector.py`)
- Implements `LoadConnector` and `PollConnector` interfaces for full and
incremental sync
- **Configurable authentication:** None, API Key (custom header), Bearer
Token, Basic Auth
- **Pluggable pagination:** Page-based, Offset-based, Cursor-based, or
None
- Smart page-size inference from user's query parameters to avoid
duplicate/conflicting params
- Configurable request delay between pages to prevent API rate limiting
- Auto-detection of the items array in JSON responses (`items`,
`results`, `data`, `records`, or first list found)
- **Advanced field mapping** with dot-notation (`country.name`), array
wildcards (`newsType[*].name`), type hints, and default values
- Optional content template rendering (`"Title: {title}\nBody: {body}"`)
- HTML stripping for content fields
- Stable document IDs via `hash128` from a configurable ID field or
auto-generated from item content
- Pydantic configuration schema with automatic coercion of UI string
inputs to dicts/lists
### Backend Registration (`rag/svr/sync_data_source.py`,
`common/constants.py`, `common/data_source/config.py`)
- `REST_API` sync class wired into RAGFlow's `func_factory`
- Full sync (`load_from_state`) and incremental polling (`poll_source`)
support
- Credentials and config passed from task to connector following
existing patterns (MySQL, SeaFile, etc.)
### Test Connection Endpoint (`api/apps/connector_app.py`)
- `POST /v1/connector/<id>/test` validates config schema,
authentication, and API connectivity without triggering a sync
- Clear error messages for auth failures vs. config issues
### Frontend UI (`web/src/pages/user-setting/data-source/constant/`)
- **Postman-style configuration:** Base URL, Query Parameters (key=value
per line), Auth, Content Fields, Metadata Fields, Pagination Type
- Auth-type-aware form: fields for API key header/value, Bearer token,
or Basic username/password appear only when relevant
- **Advanced Settings** toggle for: Custom Headers, Max Pages, Request
Delay, Poll Timestamp Field, Request Body (POST)
- Connector icon (SVG) and i18n strings (English)
- **"Test Connection"** button to validate before syncing
---
## Controls & Safety
- Configurable max pages safety cap (default: 1000, adjustable in UI)
- Configurable request delay between pages (default: 0.5s, adjustable in
UI)
- Auth errors (401/403) fail immediately without retries; transient
errors retry with exponential backoff
- Diagnostic logging: auth setup confirmation, request details on
failure, content field extraction status
---
## Type of change
- [x] New Feature (non-breaking change which adds functionality)
##Visual Screenshots of Features
<img width="482" height="510" alt="Screenshot 2026-03-11 at 5 19 52 PM"
src="https://github.com/user-attachments/assets/dcb7ab4a-1622-44f3-bb02-d6f0527314c4"
/>
(Connector can be configured within the external data sources tab)
Configuration Parameters:
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 46 PM"
src="https://github.com/user-attachments/assets/5e154e71-4ab5-4872-bfb2-04f02b73c18a"
/>
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 54 PM"
src="https://github.com/user-attachments/assets/00cb14b7-0bcf-4b94-9d71-34e93369ecb2"
/>
Connection can be tested before attaching to dataset:
<img width="981" height="681" alt="Screenshot 2026-03-11 at 5 21 40 PM"
src="https://github.com/user-attachments/assets/aaa6eeeb-89a7-4349-bc34-2423bf8be9ee"
/>
Ingestion tested with API connector (works perfectly fine):
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 22 30 PM"
src="https://github.com/user-attachments/assets/afcd0d58-cadd-4152-badc-d2f14d96fbec"
/>
Search & Retrieval works as well with metadata flow:
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 23 05 PM"
src="https://github.com/user-attachments/assets/d41ee935-dcf7-4456-b317-22a76ca032c0"
/>
---------
Co-authored-by: Ahmad Intisar <ahmadintisar@Ahmads-MacBook-M4-Pro.local>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 17:35:01 +05:00
|
|
|
class REST_API(SyncBase):
|
|
|
|
|
SOURCE_NAME: str = FileSource.REST_API
|
|
|
|
|
|
|
|
|
|
async def _generate(self, task: dict):
|
|
|
|
|
try:
|
|
|
|
|
cfg = RestAPIConnector.parse_storage_config(self.conf)
|
|
|
|
|
except ConnectorValidationError as exc:
|
|
|
|
|
raise ValueError(str(exc)) from exc
|
|
|
|
|
|
|
|
|
|
self.connector = RestAPIConnector.from_parsed_config(cfg)
|
|
|
|
|
self.connector.load_credentials(self.conf.get("credentials") or {})
|
|
|
|
|
|
|
|
|
|
poll_start = task.get("poll_range_start")
|
|
|
|
|
if task.get("reindex") == "1" or poll_start is None:
|
|
|
|
|
document_generator = self.connector.load_from_state()
|
|
|
|
|
begin_info = "totally"
|
|
|
|
|
else:
|
|
|
|
|
document_generator = self.connector.poll_source(
|
|
|
|
|
poll_start.timestamp(),
|
|
|
|
|
datetime.now(timezone.utc).timestamp(),
|
|
|
|
|
)
|
|
|
|
|
begin_info = f"from {poll_start}"
|
|
|
|
|
|
|
|
|
|
logging.info("Connect to REST API: %s %s %s", self.conf.get("method", "GET"), self.conf.get("url"), begin_info)
|
|
|
|
|
return document_generator
|
|
|
|
|
|
|
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
func_factory = {
|
2026-03-27 22:58:44 +08:00
|
|
|
FileSource.RSS: RSS,
|
2025-11-03 19:59:18 +08:00
|
|
|
FileSource.S3: S3,
|
2025-12-22 09:36:16 +08:00
|
|
|
FileSource.R2: R2,
|
|
|
|
|
FileSource.OCI_STORAGE: OCI_STORAGE,
|
|
|
|
|
FileSource.GOOGLE_CLOUD_STORAGE: GOOGLE_CLOUD_STORAGE,
|
2025-11-03 19:59:18 +08:00
|
|
|
FileSource.NOTION: Notion,
|
|
|
|
|
FileSource.DISCORD: Discord,
|
2025-11-04 14:15:31 +08:00
|
|
|
FileSource.CONFLUENCE: Confluence,
|
2025-11-03 19:59:18 +08:00
|
|
|
FileSource.GMAIL: Gmail,
|
2025-11-10 19:15:02 +08:00
|
|
|
FileSource.GOOGLE_DRIVE: GoogleDrive,
|
2025-11-03 19:59:18 +08:00
|
|
|
FileSource.JIRA: Jira,
|
|
|
|
|
FileSource.SHAREPOINT: SharePoint,
|
feat(connector): implement OneDrive data source connector (issue #15330) (#15331)
### What problem does this PR solve?
Closes #15330.
RAGFlow had no connector for OneDrive / OneDrive for Business. Users who
store working documents in OneDrive could not index them into a
knowledge base without manually downloading and re-uploading files.
This PR adds a net-new OneDrive data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Enumerates every drive visible to the service principal and pages
through `/drives/{id}/root/delta`, persisting `@odata.deltaLink` values
per drive so subsequent syncs only fetch changed items.
- Optionally narrows ingestion to a sub-folder (`folder_path`) without
needing a separate code path.
- Surfaces typed errors on the validation probe (`GET /drives?$top=1`):
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with a `Files.Read.All` hint), 5xx →
`UnexpectedValidationError`.
- Filters folders, soft-deleted items, and unsupported extensions (`.pdf
.docx .doc .xlsx .xls .pptx .ppt .txt .md .csv`).
#### Files
| File | Change |
|------|--------|
| `common/data_source/onedrive_connector.py` | **New** —
`OneDriveConnector` + `OneDriveCheckpoint`. |
| `common/data_source/config.py` | `DocumentSource.ONEDRIVE =
"onedrive"`. |
| `common/constants.py` | `FileSource.ONEDRIVE = "onedrive"`. |
| `common/data_source/__init__.py` | Export `OneDriveConnector`. |
| `rag/svr/sync_data_source.py` | `OneDrive(SyncBase)` with `batch_size`
normalisation; registered in `func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.ONEDRIVE`, visibility map (`syncDeletedFiles: true`),
info entry, form fields (tenant_id, client_id, client_secret,
folder_path, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`onedriveDescription` + 4 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_onedrive_connector_unit.py` | **New**
— 13 unit tests (`p1`/`p2`) covering auth, validation, checkpoint
helpers, and document filtering. |
#### Required Azure AD permission
`Files.Read.All` (Application, admin-granted).
#### Out of scope
- Interactive end-user OAuth (delegated permissions) — the connector
uses app-only credentials, consistent with the SharePoint / Teams
precedent.
- Binary download of file contents — the sync layer emits `Document`s
carrying `webUrl` + metadata; bytes are hydrated downstream by the parse
pipeline.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 05:26:06 -06:00
|
|
|
FileSource.ONEDRIVE: OneDrive,
|
feat(connector): implement Outlook data source connector (issue #15332) (#15333)
### What problem does this PR solve?
Closes #15332.
RAGFlow can index Gmail and generic IMAP mailboxes but had no native
connector for Outlook / Microsoft 365 mail. Organisations on Microsoft
365 had no way to bring mailbox content into a knowledge base through
Microsoft Graph.
This PR adds a net-new Outlook data source that:
- Authenticates against Microsoft Graph with the same MSAL
client-credentials flow already used by the SharePoint and Teams
connectors (no new auth primitives).
- Pages over `/users/{id}/mailFolders/{folder}/messages/delta` per
mailbox and persists `@odata.deltaLink` values in
`OutlookCheckpoint.delta_links`, so incremental syncs only fetch changed
messages.
- Supports two scoping modes:
- **Tenant-wide** (default): enumerates every user in the tenant via
`/users` and syncs each mailbox. Requires `User.Read.All`.
- **Targeted**: when `user_ids` is provided (comma-separated UPNs or
object IDs), only those mailboxes are synced. `User.Read.All` is not
needed in this mode.
- Lets the caller pick the mail folder (`inbox`, `sentitems`, `archive`,
...). Defaults to `inbox`.
- Maps each message to a `Document` shaped after the Gmail connector:
one `TextSection` carrying `From/To/Cc/Subject` headers + body, with
HTML bodies stripped to text inline (no extra dependency).
- Surfaces typed errors on the validation probe:
401 → `ConnectorMissingCredentialError`, 403 →
`InsufficientPermissionsError` (with `Mail.Read` / `User.Read.All`
hint), 404 on a configured mailbox → `ConnectorValidationError`, 5xx →
`UnexpectedValidationError`.
- Skips messages flagged `@removed` by the delta semantics and messages
whose `receivedDateTime` is older than `poll_range_start`.
#### Files
| File | Change |
|------|--------|
| `common/data_source/outlook_connector.py` | **New** —
`OutlookConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`) + `OutlookCheckpoint` + tiny `_strip_html`
helper. |
| `common/data_source/config.py` | `DocumentSource.OUTLOOK = "outlook"`.
|
| `common/constants.py` | `FileSource.OUTLOOK = "outlook"`. |
| `common/data_source/__init__.py` | Export `OutlookConnector`. |
| `rag/svr/sync_data_source.py` | `Outlook(SyncBase)` with `batch_size`
normalisation, CSV/list parsing of `user_ids`; registered in
`func_factory`. |
| `web/src/pages/user-setting/data-source/constant/index.tsx` |
`DataSourceKey.OUTLOOK`, visibility map (`syncDeletedFiles: true`), info
entry, form fields (tenant_id, client_id, client_secret, folder,
user_ids, batch_size), default values. |
| `web/src/locales/en.ts`, `web/src/locales/zh.ts` |
`outlookDescription` + 5 tooltip keys (EN + ZH). |
| `test/unit_test/data_source/test_outlook_connector_unit.py` | **New**
— 19 unit tests (`p1`/`p2`/`p3`) covering auth, validation (tenant-wide
vs specific user vs error paths), checkpoint helpers, user enumeration
pagination, message filtering, HTML body stripping. |
#### Required Azure AD permissions
- `Mail.Read` (Application, admin-granted) — always.
- `User.Read.All` (Application, admin-granted) — only when `user_ids` is
left blank so the connector can enumerate mailboxes.
#### Out of scope
- **Attachment indexing.** The current connector emits message body +
headers; binary attachments are flagged via `metadata.has_attachments`
but not pulled. Adding attachment hydration is straightforward but
scoped out per the issue's "decide whether attachments are indexed in
the first version" note.
- **Delegated (per-user) OAuth.** The connector uses app-only
credentials, consistent with the SharePoint / Teams precedent in this
codebase.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-05-29 07:52:29 -06:00
|
|
|
FileSource.OUTLOOK: Outlook,
|
feat(connectors): add Azure Blob Storage data source connector (#15466)
### What problem does this PR solve?
Closes #15465.
RAGFlow supports S3, Google Cloud Storage, R2, and OCI as data sources
but not Azure Blob Storage, leaving Azure users without a way to index
container objects into a knowledge base. This adds a first-class Azure
Blob Storage data-source connector — distinct from RAGFlow's existing
Azure storage *backends* (`rag/utils/azure_sas_conn.py`,
`rag/utils/azure_spn_conn.py`) which store RAGFlow's own files.
**Highlights**
- `common/data_source/azure_blob_connector.py`: new `AzureBlobConnector`
(`CheckpointedConnectorWithPermSync` + `SlimConnectorWithPermSync`).
- Uses the existing `azure-storage-blob` dependency (already in
`pyproject.toml`).
- Three auth modes, tried in order of precedence:
1. **Account key** — `account_name` + `account_key` + `container_name`.
2. **Connection string** — `connection_string` + `container_name`.
3. **SAS token** — `container_url` + `sas_token` (same shape as
`RAGFlowAzureSasBlob`).
- ETag fingerprint stored per blob in `AzureBlobCheckpoint.etags` —
unchanged blobs (same ETag as last run) are skipped without a download.
Only new/modified blobs are fetched.
- Optional `prefix` scopes indexing to a virtual folder.
- `validate_connector_settings()` probes `get_container_properties()`
and maps `AuthenticationFailed / 403 / ContainerNotFound` to typed
connector exceptions.
- Slim-doc IDs are blob names so prune reconciles correctly.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `azure_blob` in `FileSource`
/ `DocumentSource` and export `AzureBlobConnector`.
- `rag/svr/sync_data_source.py`: new `AzureBlob(SyncBase)` class routed
through `load_from_checkpoint` (ETag fingerprint owns change-detection)
and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.AZURE_BLOB`, auth-mode selector (account key / connection
string / SAS token), all credential fields, prefix + batch-size,
`syncDeletedFiles` capability, default form values, tile entry with
icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips for all
9 new keys.
- `web/src/assets/svg/data-source/azure-blob.svg`: Azure-branded
stacked-cylinders icon.
**Verification**
- `npm run build` (vite + esbuild) passes (37 s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 07:06:01 -06:00
|
|
|
FileSource.AZURE_BLOB: AzureBlob,
|
feat(connectors): add Salesforce CRM data source connector (#15462)
### What problem does this PR solve?
Closes #15461.
RAGFlow had no way to ingest Salesforce CRM data, so support / sales
teams couldn't ground responses on live Accounts, Contacts,
Opportunities, Cases, or Knowledge articles. This adds a first-class
Salesforce data source connector that authenticates against a Connected
App via OAuth 2.0 client-credentials, queries selected SObjects via
SOQL, and turns each record into an indexable document with incremental
sync.
**Highlights**
- `common/data_source/salesforce_connector.py`: new
`SalesforceConnector` (`CheckpointedConnectorWithPermSync` +
`SlimConnectorWithPermSync`).
- OAuth 2.0 client-credentials flow; canonical `instance_url` from the
token response so multi-pod orgs route correctly.
- Per-object `SystemModstamp` cursor stored in
`SalesforceCheckpoint.cursors` — a failure mid-object doesn't rewind
sibling objects, and re-syncs only fetch changed rows.
- Deterministic record-to-text formatter (sorted keys) so SOQL field
reordering on the server doesn't mark every row "changed" on each poll.
- `_get_json` raises on non-2xx so 429 / 5xx never silently advance the
checkpoint past missing data.
- `Knowledge__kav` is in the default object set but is skipped silently
when the org doesn't have Salesforce Knowledge enabled (404 on
describe).
- Slim-doc IDs are scoped as `<Object>/<Id>` so prune deletes can't
collide across object types.
- `common/constants.py`, `common/data_source/config.py`,
`common/data_source/__init__.py`: register `salesforce` in `FileSource`
/ `DocumentSource` and export `SalesforceConnector`.
- `rag/svr/sync_data_source.py`: new `Salesforce(SyncBase)` class routed
through `load_from_checkpoint` (poll_source would re-walk every object
each run) and added to `func_factory`.
- Frontend:
- `web/src/pages/user-setting/data-source/constant/index.tsx`: new
`DataSourceKey.SALESFORCE`, form fields (instance URL, client ID/secret,
objects, api_version, batch size), `syncDeletedFiles` capability,
default form values, and tile entry with the new icon.
- `web/src/locales/{en,zh}.ts`: description + per-field tooltips.
- `web/src/assets/svg/data-source/salesforce.svg`: 48x48 brand-style
icon to match the other Microsoft / cloud tiles.
**Verification**
- `npm run build` (vite + esbuild) passes (1m 26s).
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
2026-06-04 23:24:36 -06:00
|
|
|
FileSource.SALESFORCE: Salesforce,
|
2025-11-03 19:59:18 +08:00
|
|
|
FileSource.SLACK: Slack,
|
2025-11-17 09:38:04 +08:00
|
|
|
FileSource.TEAMS: Teams,
|
2025-11-25 09:40:03 +08:00
|
|
|
FileSource.MOODLE: Moodle,
|
|
|
|
|
FileSource.DROPBOX: Dropbox,
|
2025-11-26 07:14:42 +01:00
|
|
|
FileSource.WEBDAV: WebDAV,
|
2025-12-25 17:50:41 +08:00
|
|
|
FileSource.BOX: BOX,
|
|
|
|
|
FileSource.AIRTABLE: Airtable,
|
2025-12-29 17:05:20 +08:00
|
|
|
FileSource.ASANA: Asana,
|
2025-12-30 17:09:13 +08:00
|
|
|
FileSource.IMAP: IMAP,
|
2025-12-31 14:40:49 +08:00
|
|
|
FileSource.ZENDESK: Zendesk,
|
2025-12-30 15:09:52 +08:00
|
|
|
FileSource.GITHUB: Github,
|
|
|
|
|
FileSource.GITLAB: Gitlab,
|
2025-12-31 17:18:30 +08:00
|
|
|
FileSource.BITBUCKET: Bitbucket,
|
2026-02-03 23:14:32 -03:00
|
|
|
FileSource.SEAFILE: SeaFile,
|
|
|
|
|
FileSource.MYSQL: MySQL,
|
|
|
|
|
FileSource.POSTGRESQL: PostgreSQL,
|
Feature big query connector (#15871)
### What problem does this PR solve?
This PR adds Google BigQuery as a first-class data source connector in
RAGFlow.
It enables users to ingest and sync BigQuery data using the same
row-to-document model used by relational database connectors: selected
content columns become document text, metadata columns become document
metadata, an optional ID column provides stable document IDs, and an
optional timestamp column enables cursor-based incremental sync.
The connector supports service-account JSON credentials, table mode,
custom query mode, GoogleSQL queries, cursor-based incremental sync,
deleted-row pruning support, configurable query limits such as
`maximum_bytes_billed`, dry-run validation, batch loading, stable
document IDs, and BigQuery-aware value serialization.
2026-06-29 17:08:40 +03:00
|
|
|
FileSource.BIGQUERY: BigQuery,
|
2026-03-06 21:13:23 +08:00
|
|
|
FileSource.DINGTALK_AI_TABLE: DingTalkAITable,
|
Feature/generic api connector (#13545)
# feat: Add Generic REST API Connector
## What problem does this PR solve?
RAGFlow supports many specific data source connectors (MySQL, Slack,
Google Drive, etc.), but there was no way to connect an arbitrary REST
API as a data source. Users with custom or third-party APIs had to write
a new connector class for each one.
This PR adds a **generic, configuration-driven REST API connector** that
lets users connect any REST API as a data source entirely through the UI
— no code changes needed per API.
---
## Features
### Core Connector (`common/data_source/rest_api_connector.py`)
- Implements `LoadConnector` and `PollConnector` interfaces for full and
incremental sync
- **Configurable authentication:** None, API Key (custom header), Bearer
Token, Basic Auth
- **Pluggable pagination:** Page-based, Offset-based, Cursor-based, or
None
- Smart page-size inference from user's query parameters to avoid
duplicate/conflicting params
- Configurable request delay between pages to prevent API rate limiting
- Auto-detection of the items array in JSON responses (`items`,
`results`, `data`, `records`, or first list found)
- **Advanced field mapping** with dot-notation (`country.name`), array
wildcards (`newsType[*].name`), type hints, and default values
- Optional content template rendering (`"Title: {title}\nBody: {body}"`)
- HTML stripping for content fields
- Stable document IDs via `hash128` from a configurable ID field or
auto-generated from item content
- Pydantic configuration schema with automatic coercion of UI string
inputs to dicts/lists
### Backend Registration (`rag/svr/sync_data_source.py`,
`common/constants.py`, `common/data_source/config.py`)
- `REST_API` sync class wired into RAGFlow's `func_factory`
- Full sync (`load_from_state`) and incremental polling (`poll_source`)
support
- Credentials and config passed from task to connector following
existing patterns (MySQL, SeaFile, etc.)
### Test Connection Endpoint (`api/apps/connector_app.py`)
- `POST /v1/connector/<id>/test` validates config schema,
authentication, and API connectivity without triggering a sync
- Clear error messages for auth failures vs. config issues
### Frontend UI (`web/src/pages/user-setting/data-source/constant/`)
- **Postman-style configuration:** Base URL, Query Parameters (key=value
per line), Auth, Content Fields, Metadata Fields, Pagination Type
- Auth-type-aware form: fields for API key header/value, Bearer token,
or Basic username/password appear only when relevant
- **Advanced Settings** toggle for: Custom Headers, Max Pages, Request
Delay, Poll Timestamp Field, Request Body (POST)
- Connector icon (SVG) and i18n strings (English)
- **"Test Connection"** button to validate before syncing
---
## Controls & Safety
- Configurable max pages safety cap (default: 1000, adjustable in UI)
- Configurable request delay between pages (default: 0.5s, adjustable in
UI)
- Auth errors (401/403) fail immediately without retries; transient
errors retry with exponential backoff
- Diagnostic logging: auth setup confirmation, request details on
failure, content field extraction status
---
## Type of change
- [x] New Feature (non-breaking change which adds functionality)
##Visual Screenshots of Features
<img width="482" height="510" alt="Screenshot 2026-03-11 at 5 19 52 PM"
src="https://github.com/user-attachments/assets/dcb7ab4a-1622-44f3-bb02-d6f0527314c4"
/>
(Connector can be configured within the external data sources tab)
Configuration Parameters:
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 46 PM"
src="https://github.com/user-attachments/assets/5e154e71-4ab5-4872-bfb2-04f02b73c18a"
/>
<img width="661" height="682" alt="Screenshot 2026-03-11 at 5 20 54 PM"
src="https://github.com/user-attachments/assets/00cb14b7-0bcf-4b94-9d71-34e93369ecb2"
/>
Connection can be tested before attaching to dataset:
<img width="981" height="681" alt="Screenshot 2026-03-11 at 5 21 40 PM"
src="https://github.com/user-attachments/assets/aaa6eeeb-89a7-4349-bc34-2423bf8be9ee"
/>
Ingestion tested with API connector (works perfectly fine):
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 22 30 PM"
src="https://github.com/user-attachments/assets/afcd0d58-cadd-4152-badc-d2f14d96fbec"
/>
Search & Retrieval works as well with metadata flow:
<img width="1062" height="705" alt="Screenshot 2026-03-11 at 5 23 05 PM"
src="https://github.com/user-attachments/assets/d41ee935-dcf7-4456-b317-22a76ca032c0"
/>
---------
Co-authored-by: Ahmad Intisar <ahmadintisar@Ahmads-MacBook-M4-Pro.local>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-13 17:35:01 +05:00
|
|
|
FileSource.REST_API: REST_API,
|
2025-11-03 19:59:18 +08:00
|
|
|
}
|
|
|
|
|
|
2025-11-17 09:38:04 +08:00
|
|
|
|
2025-11-03 19:59:18 +08:00
|
|
|
async def dispatch_tasks():
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Polls the database for pending synchronization tasks and dispatches them concurrently."""
|
2025-12-09 19:23:14 +08:00
|
|
|
while True:
|
|
|
|
|
try:
|
2026-05-19 10:07:11 +08:00
|
|
|
SyncLogsService.list_due_sync_tasks()
|
|
|
|
|
SyncLogsService.list_due_prune_tasks()
|
2025-12-09 19:23:14 +08:00
|
|
|
break
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.warning(f"DB is not ready yet: {e}")
|
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
|
|
2026-05-19 10:07:11 +08:00
|
|
|
due_sync_tasks = SyncLogsService.list_due_sync_tasks()
|
|
|
|
|
due_prune_tasks = SyncLogsService.list_due_prune_tasks()
|
2025-12-09 19:23:14 +08:00
|
|
|
tasks = []
|
2026-05-19 10:07:11 +08:00
|
|
|
for task in [*due_sync_tasks, *due_prune_tasks]:
|
2025-12-09 19:23:14 +08:00
|
|
|
if task["poll_range_start"]:
|
|
|
|
|
task["poll_range_start"] = task["poll_range_start"].astimezone(timezone.utc)
|
|
|
|
|
if task["poll_range_end"]:
|
|
|
|
|
task["poll_range_end"] = task["poll_range_end"].astimezone(timezone.utc)
|
|
|
|
|
func = func_factory[task["source"]](task["config"])
|
|
|
|
|
tasks.append(asyncio.create_task(func(task)))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=False)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error in dispatch_tasks: {e}")
|
|
|
|
|
for t in tasks:
|
|
|
|
|
t.cancel()
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
raise
|
|
|
|
|
await asyncio.sleep(1)
|
2025-11-03 19:59:18 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
stop_event = threading.Event()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def signal_handler(sig, frame):
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Handles system interruption signals to ensure a graceful worker shutdown."""
|
2025-11-03 19:59:18 +08:00
|
|
|
logging.info("Received interrupt signal, shutting down...")
|
|
|
|
|
stop_event.set()
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CONSUMER_NO = "0" if len(sys.argv) < 2 else sys.argv[1]
|
|
|
|
|
CONSUMER_NAME = "data_sync_" + CONSUMER_NO
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main():
|
2026-04-29 07:34:36 +05:30
|
|
|
"""Entry point for the RAGFlow data synchronization worker process."""
|
2025-11-03 19:59:18 +08:00
|
|
|
logging.info(r"""
|
2025-11-04 17:29:11 +08:00
|
|
|
_____ _ _____
|
|
|
|
|
| __ \ | | / ____|
|
|
|
|
|
| | | | __ _| |_ __ _ | (___ _ _ _ __ ___
|
2025-11-03 19:59:18 +08:00
|
|
|
| | | |/ _` | __/ _` | \___ \| | | | '_ \ / __|
|
2025-11-04 17:29:11 +08:00
|
|
|
| |__| | (_| | || (_| | ____) | |_| | | | | (__
|
2025-11-03 19:59:18 +08:00
|
|
|
|_____/ \__,_|\__\__,_| |_____/ \__, |_| |_|\___|
|
2025-11-04 17:29:11 +08:00
|
|
|
__/ |
|
|
|
|
|
|___/
|
2025-11-03 19:59:18 +08:00
|
|
|
""")
|
2026-03-25 18:17:52 +08:00
|
|
|
logging.info(f"RAGFlow data sync version: {get_ragflow_version()}")
|
2025-11-03 19:59:18 +08:00
|
|
|
show_configs()
|
|
|
|
|
settings.init_settings()
|
|
|
|
|
if sys.platform != "win32":
|
|
|
|
|
signal.signal(signal.SIGUSR1, start_tracemalloc_and_snapshot)
|
|
|
|
|
signal.signal(signal.SIGUSR2, stop_tracemalloc)
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
2026-04-09 16:40:14 +08:00
|
|
|
logging.info(f"RAGFlow data sync is ready after {time.perf_counter() - start_ts}s initialization.")
|
2025-11-03 19:59:18 +08:00
|
|
|
while not stop_event.is_set():
|
|
|
|
|
await dispatch_tasks()
|
|
|
|
|
logging.error("BUG!!! You should not reach here!!!")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
faulthandler.enable()
|
|
|
|
|
init_root_logger(CONSUMER_NAME)
|
2025-12-11 03:09:16 +01:00
|
|
|
asyncio.run(main())
|