From 1d7e45115b7b810125dfd2b4428ba34c3d5b1b40 Mon Sep 17 00:00:00 2001 From: web-dev0521 Date: Thu, 4 Jun 2026 23:24:36 -0600 Subject: [PATCH] feat(connectors): add Salesforce CRM data source connector (#15462) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Closes #15461. RAGFlow had no way to ingest Salesforce CRM data, so support / sales teams couldn't ground responses on live Accounts, Contacts, Opportunities, Cases, or Knowledge articles. This adds a first-class Salesforce data source connector that authenticates against a Connected App via OAuth 2.0 client-credentials, queries selected SObjects via SOQL, and turns each record into an indexable document with incremental sync. **Highlights** - `common/data_source/salesforce_connector.py`: new `SalesforceConnector` (`CheckpointedConnectorWithPermSync` + `SlimConnectorWithPermSync`). - OAuth 2.0 client-credentials flow; canonical `instance_url` from the token response so multi-pod orgs route correctly. - Per-object `SystemModstamp` cursor stored in `SalesforceCheckpoint.cursors` — a failure mid-object doesn't rewind sibling objects, and re-syncs only fetch changed rows. - Deterministic record-to-text formatter (sorted keys) so SOQL field reordering on the server doesn't mark every row "changed" on each poll. - `_get_json` raises on non-2xx so 429 / 5xx never silently advance the checkpoint past missing data. - `Knowledge__kav` is in the default object set but is skipped silently when the org doesn't have Salesforce Knowledge enabled (404 on describe). - Slim-doc IDs are scoped as `/` so prune deletes can't collide across object types. - `common/constants.py`, `common/data_source/config.py`, `common/data_source/__init__.py`: register `salesforce` in `FileSource` / `DocumentSource` and export `SalesforceConnector`. - `rag/svr/sync_data_source.py`: new `Salesforce(SyncBase)` class routed through `load_from_checkpoint` (poll_source would re-walk every object each run) and added to `func_factory`. - Frontend: - `web/src/pages/user-setting/data-source/constant/index.tsx`: new `DataSourceKey.SALESFORCE`, form fields (instance URL, client ID/secret, objects, api_version, batch size), `syncDeletedFiles` capability, default form values, and tile entry with the new icon. - `web/src/locales/{en,zh}.ts`: description + per-field tooltips. - `web/src/assets/svg/data-source/salesforce.svg`: 48x48 brand-style icon to match the other Microsoft / cloud tiles. **Verification** - `npm run build` (vite + esbuild) passes (1m 26s). ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/constants.py | 1 + common/data_source/__init__.py | 2 + common/data_source/config.py | 1 + common/data_source/salesforce_connector.py | 573 ++++++++++++++++++ rag/svr/sync_data_source.py | 63 ++ web/src/assets/svg/data-source/salesforce.svg | 4 + web/src/locales/en.ts | 12 + web/src/locales/zh.ts | 12 + .../data-source/constant/index.tsx | 82 +++ 9 files changed, 750 insertions(+) create mode 100644 common/data_source/salesforce_connector.py create mode 100644 web/src/assets/svg/data-source/salesforce.svg diff --git a/common/constants.py b/common/constants.py index d0f32c4039..00cd2be0e6 100644 --- a/common/constants.py +++ b/common/constants.py @@ -156,6 +156,7 @@ class FileSource(StrEnum): DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" OUTLOOK = "outlook" + SALESFORCE = "salesforce" AZURE_BLOB = "azure_blob" diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index c6ae1caf01..c0ebc8c809 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -36,6 +36,7 @@ from .jira.connector import JiraConnector from .sharepoint_connector import SharePointConnector from .onedrive_connector import OneDriveConnector from .outlook_connector import OutlookConnector +from .salesforce_connector import SalesforceConnector from .azure_blob_connector import AzureBlobConnector from .teams_connector import TeamsConnector from .moodle_connector import MoodleConnector @@ -72,6 +73,7 @@ __all__ = [ "SharePointConnector", "OneDriveConnector", "OutlookConnector", + "SalesforceConnector", "AzureBlobConnector", "TeamsConnector", "MoodleConnector", diff --git a/common/data_source/config.py b/common/data_source/config.py index a790df1a67..f73e3d2b98 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -71,6 +71,7 @@ class DocumentSource(str, Enum): DINGTALK_AI_TABLE = "dingtalk_ai_table" ONEDRIVE = "onedrive" OUTLOOK = "outlook" + SALESFORCE = "salesforce" AZURE_BLOB = "azure_blob" diff --git a/common/data_source/salesforce_connector.py b/common/data_source/salesforce_connector.py new file mode 100644 index 0000000000..10479ccf34 --- /dev/null +++ b/common/data_source/salesforce_connector.py @@ -0,0 +1,573 @@ +"""Salesforce data source connector. + +Talks to a Salesforce org over the REST + SOQL APIs, turns each selected +object's records into a Document, and uses ``SystemModstamp`` as the +incremental cursor so re-syncs only fetch what changed. + +Auth is OAuth 2.0 client-credentials (Salesforce "Connected App"); the +caller supplies the org's ``instance_url`` so we never have to guess the +pod hostname. A small allow-list of objects ships out of the box +(Account, Contact, Opportunity, Case, Knowledge__kav) so the connector +boots without per-org configuration, while the ``objects`` field lets +operators add or replace entries. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Any, Generator +from urllib.parse import urljoin + +import requests + +from common.data_source.config import INDEX_BATCH_SIZE +from common.data_source.exceptions import ( + ConnectorMissingCredentialError, + ConnectorValidationError, + InsufficientPermissionsError, + UnexpectedValidationError, +) +from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSync, + SecondsSinceUnixEpoch, + SlimConnectorWithPermSync, +) +from common.data_source.models import ConnectorCheckpoint, SlimDocument + +logger = logging.getLogger(__name__) + +_DEFAULT_API_VERSION = "v59.0" + +# CRM objects we index by default. Operators can override via the +# ``objects`` config list; Knowledge__kav is included because the issue +# (#15461) calls out Knowledge articles explicitly, but it silently +# downgrades to a skip when the org doesn't have Salesforce Knowledge +# enabled (the SObject describe returns 404). +_DEFAULT_OBJECTS = ["Account", "Contact", "Opportunity", "Case", "Knowledge__kav"] + +# Optional default object: Knowledge articles only exist when the org has +# Salesforce Knowledge enabled. It is the one object we skip silently when +# absent — every other configured object is required, so its absence/failure +# is treated as an error rather than quietly dropped. +_OPTIONAL_OBJECTS = frozenset({"Knowledge__kav"}) + + +class SalesforceObjectUnavailable(UnexpectedValidationError): + """An SObject is genuinely absent or not queryable for this org/user. + + Raised for HTTP 404 (describe of a non-existent object) and HTTP 400 + ``INVALID_TYPE`` (SOQL against a non-existent object). It subclasses + ``UnexpectedValidationError`` so existing broad handlers still catch it, + while letting prune distinguish "object genuinely missing" (safe to + skip — it has no records to orphan) from a transient failure (5xx/429, + permission, partial page) that must abort rather than delete live docs. + """ + + +def _is_object_unavailable(resp: requests.Response) -> bool: + """True when *resp* indicates the SObject simply does not exist for this + org/user, as opposed to a transient/permission error. + + Salesforce returns 404 for describe of an unknown object and 400 with + ``errorCode == "INVALID_TYPE"`` for SOQL against one. 403 (no access) and + 5xx/429 (transient) are deliberately NOT treated as "unavailable" — those + must propagate so callers don't act on an incomplete picture. + """ + if resp.status_code == 404: + return True + if resp.status_code == 400: + try: + payload = resp.json() + except ValueError: + return "INVALID_TYPE" in (resp.text or "") + entries = payload if isinstance(payload, list) else [payload] + for entry in entries: + if isinstance(entry, dict) and entry.get("errorCode") == "INVALID_TYPE": + return True + return False + + +class SalesforceCheckpoint(ConnectorCheckpoint): + """Per-object SystemModstamp cursor. + + Stored as ISO-8601 strings (Salesforce's native format) keyed by + SObject name so each object advances independently — a sync that + fails halfway through ``Case`` does not rewind ``Account``. + """ + + cursors: dict[str, str] | None = None + + +class SalesforceConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync): + """Salesforce CRM connector. + + Requires a Connected App with: + - ``Client Credentials Flow`` enabled + - OAuth scopes: ``api``, ``refresh_token`` (refresh_token not used + but Salesforce requires the scope set to issue an access token) + + The execution user must have read access to every object listed in + ``objects`` — missing permissions surface as ``403`` during + validation rather than silent empty pages. + """ + + def __init__( + self, + batch_size: int = INDEX_BATCH_SIZE, + objects: list[str] | None = None, + api_version: str = _DEFAULT_API_VERSION, + ) -> None: + self.batch_size = batch_size + self.api_version = api_version + self.objects = [obj.strip() for obj in (objects or _DEFAULT_OBJECTS) if obj and obj.strip()] + self._instance_url: str | None = None + self._access_token: str | None = None + + # ------------------------------------------------------------------ + # Auth + # ------------------------------------------------------------------ + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + instance_url = (credentials.get("instance_url") or "").rstrip("/") + client_id = credentials.get("client_id") + client_secret = credentials.get("client_secret") + + if not all([instance_url, client_id, client_secret]): + raise ConnectorMissingCredentialError( + "Salesforce credentials are incomplete (instance_url, client_id, client_secret required)" + ) + + token_url = urljoin(instance_url + "/", "services/oauth2/token") + try: + resp = requests.post( + token_url, + data={ + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + }, + timeout=60, + ) + except requests.RequestException as exc: + raise ConnectorMissingCredentialError( + f"Salesforce token request failed: {exc}" + ) + + if not resp.ok: + # Salesforce returns {"error": "...", "error_description": "..."} + try: + body = resp.json() + detail = body.get("error_description") or body.get("error") or resp.text + except ValueError: + detail = resp.text[:200] + raise ConnectorMissingCredentialError( + f"Failed to acquire Salesforce access token (HTTP {resp.status_code}): {detail}" + ) + + data = resp.json() + token = data.get("access_token") + # Salesforce echoes back the *canonical* instance for the org — + # prefer it so multi-pod orgs (NA1 → NA45 migrations) hit the + # correct host even when the configured URL went stale. + canonical = (data.get("instance_url") or "").rstrip("/") + if not token: + raise ConnectorMissingCredentialError( + "Salesforce token response did not contain access_token" + ) + + self._access_token = token + self._instance_url = canonical or instance_url + return None + + # ------------------------------------------------------------------ + # Validation + # ------------------------------------------------------------------ + + def validate_connector_settings(self) -> None: + if not self._access_token or not self._instance_url: + raise ConnectorMissingCredentialError("Salesforce") + + # Cheap reachability + permission probe: the /sobjects endpoint + # lists every object the user can describe; a 401/403 here means + # the connected app or the user lacks API access altogether. + resp = self._get(f"{self._base()}/sobjects") + if resp.status_code == 401: + raise ConnectorMissingCredentialError( + "Salesforce access token is invalid or expired." + ) + if resp.status_code == 403: + raise InsufficientPermissionsError( + "The Salesforce execution user lacks API access; enable the 'API Enabled' profile permission." + ) + if not resp.ok: + raise UnexpectedValidationError( + f"Salesforce validation failed (HTTP {resp.status_code}): {resp.text[:200]}" + ) + + try: + payload = resp.json() + except ValueError as exc: + raise ConnectorValidationError( + f"Salesforce /sobjects response is not JSON: {exc}" + ) + if "sobjects" not in payload: + raise ConnectorValidationError( + "Unexpected response format from Salesforce /sobjects." + ) + + # Fail fast on typos / inaccessible objects instead of silently + # missing their data during sync. The global describe lists every + # object the user can see plus its queryable flag, so we can vet the + # configured objects without an extra call per object. + queryable = { + so["name"]: bool(so.get("queryable", False)) + for so in payload.get("sobjects", []) + if isinstance(so, dict) and so.get("name") + } + unknown: list[str] = [] + not_queryable: list[str] = [] + for obj in self.objects: + if obj not in queryable: + # Knowledge__kav is an optional default — absent unless the + # org has Salesforce Knowledge. Don't fail validation for it. + if obj in _OPTIONAL_OBJECTS: + logger.warning( + "Salesforce: optional object %s not present in this org; it will be skipped.", + obj, + ) + continue + unknown.append(obj) + elif not queryable[obj]: + not_queryable.append(obj) + + if unknown or not_queryable: + problems = [] + if unknown: + problems.append(f"unknown object(s): {', '.join(sorted(unknown))}") + if not_queryable: + problems.append(f"non-queryable object(s): {', '.join(sorted(not_queryable))}") + raise ConnectorValidationError( + "Salesforce 'objects' configuration is invalid — " + + "; ".join(problems) + + ". Check for typos and that the execution user has read access to each object." + ) + + # ------------------------------------------------------------------ + # Checkpoint helpers + # ------------------------------------------------------------------ + + def build_dummy_checkpoint(self) -> SalesforceCheckpoint: + return SalesforceCheckpoint(has_more=True, cursors={}) + + def validate_checkpoint_json(self, checkpoint_json: str) -> SalesforceCheckpoint: + try: + return SalesforceCheckpoint.model_validate_json(checkpoint_json) + except Exception: + return self.build_dummy_checkpoint() + + # ------------------------------------------------------------------ + # Core data loading + # ------------------------------------------------------------------ + + def poll_source( + self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch + ) -> Any: + return self._iter_documents(since_epoch=start, until_epoch=end if end else None) + + def load_from_checkpoint( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + if not isinstance(checkpoint, SalesforceCheckpoint): + checkpoint = self.build_dummy_checkpoint() + since = start if start else None + until = end if end else None + return self._iter_documents(checkpoint=checkpoint, since_epoch=since, until_epoch=until) + + def load_from_checkpoint_with_perm_sync( + self, + start: SecondsSinceUnixEpoch, + end: SecondsSinceUnixEpoch, + checkpoint: ConnectorCheckpoint, + ) -> Any: + return self.load_from_checkpoint(start, end, checkpoint) + + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> Generator[list[SlimDocument], None, None]: + """Yield batches of slim documents for prune / permission sync. + + Salesforce records use 15/18-character object IDs; we surface + the SObject-prefixed form (``Account/0015g00000...``) so the + prune collector can disambiguate IDs that collide across object + types and so deletes are scoped to the connector instance. + """ + if not self._access_token: + raise ConnectorMissingCredentialError("Salesforce") + + batch: list[SlimDocument] = [] + for obj in self.objects: + try: + for record in self._query_records(obj, fields=["Id"], since_epoch=None): + rec_id = record.get("Id") + if not rec_id: + continue + doc_id = f"{obj}/{rec_id}" + if callback: + callback(doc_id, obj) + batch.append(SlimDocument(id=doc_id)) + if len(batch) >= self.batch_size: + yield batch + batch = [] + except SalesforceObjectUnavailable: + # Object genuinely absent (e.g. Knowledge__kav without + # Salesforce Knowledge). It has no records, so omitting it + # cannot orphan documents — safe to skip. + logger.warning("Salesforce prune skipping %s (object unavailable)", obj) + continue + # Any OTHER failure (transient 5xx/429, permission, a partial + # page mid-enumeration) propagates: prune must NOT run on an + # incomplete snapshot, or the collector would treat the missing + # IDs as stale and delete documents that still exist. + if batch: + yield batch + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _base(self) -> str: + return f"{self._instance_url}/services/data/{self.api_version}" + + def _get(self, url: str) -> requests.Response: + return requests.get( + url, + headers={ + "Authorization": f"Bearer {self._access_token}", + "Accept": "application/json", + }, + timeout=60, + ) + + def _get_json(self, url: str, *, context: str) -> dict: + """GET *url* and decode JSON. Raise on non-2xx so a 429 / 5xx + never silently advances the checkpoint past missing records.""" + resp = self._get(url) + if not resp.ok: + body_snippet = resp.text[:200] if resp.text else "" + logger.error( + "Salesforce request failed (%s): HTTP %s url=%s body=%s", + context, + resp.status_code, + url, + body_snippet, + ) + if _is_object_unavailable(resp): + raise SalesforceObjectUnavailable( + f"Salesforce object unavailable ({context}): HTTP {resp.status_code} {body_snippet}" + ) + raise UnexpectedValidationError( + f"Salesforce request failed ({context}): HTTP {resp.status_code} {body_snippet}" + ) + try: + return resp.json() + except ValueError as exc: + raise UnexpectedValidationError( + f"Salesforce response is not JSON ({context}): {exc}" + ) + + def _describe_fields(self, obj: str) -> list[str]: + """Return field API names for *obj*. Filters out compound types + (address, location) that SOQL can only project via sub-fields.""" + data = self._get_json( + f"{self._base()}/sobjects/{obj}/describe", + context=f"describe {obj}", + ) + fields = [] + for field in data.get("fields", []): + if field.get("type") in {"address", "location"}: + continue + name = field.get("name") + if name: + fields.append(name) + if "Id" not in fields: + fields.insert(0, "Id") + return fields + + def _query_records( + self, + obj: str, + fields: list[str], + since_epoch: float | None, + until_epoch: float | None = None, + ) -> Generator[dict, None, None]: + """Yield raw record dicts for *obj*, page by page, oldest first. + + Orders by ``SystemModstamp`` so the per-object cursor advances + monotonically even when paging is interrupted: the next run + resumes from the latest persisted timestamp and Salesforce + returns records strictly newer than that. + """ + field_list = ",".join(fields) + filters = [] + if since_epoch: + since_iso = datetime.fromtimestamp(since_epoch, tz=timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + filters.append(f"SystemModstamp > {since_iso}") + if until_epoch: + until_iso = datetime.fromtimestamp(until_epoch, tz=timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + filters.append(f"SystemModstamp <= {until_iso}") + where = f" WHERE {' AND '.join(filters)}" if filters else "" + soql = f"SELECT {field_list} FROM {obj}{where} ORDER BY SystemModstamp ASC" + + url: str | None = f"{self._base()}/query?q={requests.utils.quote(soql)}" + while url: + page = self._get_json(url, context=f"query {obj}") + for record in page.get("records", []): + yield record + next_url = page.get("nextRecordsUrl") + url = f"{self._instance_url}{next_url}" if next_url else None + + @staticmethod + def _record_to_text(obj: str, record: dict) -> str: + """Flatten a SOQL record into a deterministic plain-text body. + + Keeping the formatter deterministic (sorted field order, stable + ``key: value`` lines) matters for content hashing — without it, + Salesforce field-reordering on the server would tag every + record as "changed" on every poll and re-embed the entire + org each sync. + """ + lines = [f"Salesforce {obj}"] + for key in sorted(record.keys()): + if key in ("attributes",): + continue + value = record[key] + if value is None or value == "": + continue + if isinstance(value, (dict, list)): + # Nested relationships (e.g. Account on Contact) — keep + # the type name + id rather than recursing arbitrarily. + lines.append(f"{key}: {value}") + else: + lines.append(f"{key}: {value}") + return "\n".join(lines) + + def _iter_documents( + self, + checkpoint: SalesforceCheckpoint | None = None, + since_epoch: float | None = None, + until_epoch: float | None = None, + ): + from common.data_source.models import Document + + cursors: dict[str, str] = {} + if checkpoint and checkpoint.cursors: + cursors = dict(checkpoint.cursors) + + batch: list[Document] = [] + + for obj in self.objects: + try: + fields = self._describe_fields(obj) + except SalesforceObjectUnavailable: + # Object genuinely absent (e.g. Knowledge__kav without + # Salesforce Knowledge): skip it. Transient describe + # failures are NOT swallowed — they raise below so the run + # doesn't silently miss an object's data. + logger.warning( + "Salesforce skipping %s (object not present in this org)", + obj, + ) + continue + + # Per-object cursor takes precedence over the caller's + # window. The cursor was persisted from the *last successful + # record* so it cannot rewind past records we already + # ingested even if the caller passes an older since_epoch. + obj_since = since_epoch + cursor_iso = cursors.get(obj) + if cursor_iso: + try: + cur_dt = datetime.fromisoformat(cursor_iso.replace("Z", "+00:00")) + cur_ts = cur_dt.timestamp() + obj_since = max(obj_since or 0, cur_ts) + except ValueError: + pass + + latest_iso: str | None = cursor_iso + try: + for record in self._query_records(obj, fields, obj_since, until_epoch): + rec_id = record.get("Id") + if not rec_id: + continue + + modified_str: str = record.get("SystemModstamp", "") + modified_dt: datetime | None = None + if modified_str: + try: + modified_dt = datetime.fromisoformat( + modified_str.replace("Z", "+00:00") + ) + except ValueError: + modified_dt = None + + doc_updated_at = modified_dt or datetime.now(timezone.utc) + + # Display name: prefer ``Name``; fall back to + # ``Subject`` (Case) or ``Title`` (Knowledge); last + # resort is ``/`` so the doc list is + # never blank-titled. + name = ( + record.get("Name") + or record.get("Subject") + or record.get("Title") + or f"{obj}/{rec_id}" + ) + + body = self._record_to_text(obj, record) + blob = body.encode("utf-8") + + doc = Document( + id=f"{obj}/{rec_id}", + source="salesforce", + semantic_identifier=str(name), + extension=".txt", + blob=blob, + doc_updated_at=doc_updated_at, + size_bytes=len(blob), + metadata={ + "object": obj, + "record_id": rec_id, + "web_url": f"{self._instance_url}/{rec_id}", + }, + ) + batch.append(doc) + if modified_str: + latest_iso = modified_str + if len(batch) >= self.batch_size: + yield batch + batch = [] + except UnexpectedValidationError as exc: + # Do not continue: advancing to the next object would let + # the task finish as DONE and move the global poll window + # past the failed object's missing records permanently. + logger.warning("Salesforce %s query failed: %s", obj, exc) + raise + + if latest_iso: + cursors[obj] = latest_iso + + if batch: + yield batch + + if checkpoint is not None: + checkpoint.cursors = cursors + checkpoint.has_more = False diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 7278736710..d251f7c27c 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -64,6 +64,7 @@ from common.data_source import ( OneDriveConnector, OutlookConnector, AzureBlobConnector, + SalesforceConnector, TeamsConnector, SlackConnector, SharePointConnector, @@ -1129,6 +1130,67 @@ class Outlook(SyncBase): return wrapper() +class Salesforce(SyncBase): + SOURCE_NAME: str = FileSource.SALESFORCE + + async def _generate(self, task: dict): + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + + raw_objects = self.conf.get("objects") + if isinstance(raw_objects, str): + objects = [o.strip() for o in raw_objects.split(",") if o.strip()] + elif isinstance(raw_objects, list): + objects = [str(o).strip() for o in raw_objects if str(o).strip()] + else: + objects = None + + self.connector = SalesforceConnector( + batch_size=batch_size, + objects=objects, + api_version=self.conf.get("api_version") or "v59.0", + ) + self.connector.load_credentials(self.conf["credentials"]) + # Fail fast on invalid/inaccessible objects (typos, missing object + # permissions) before iterating, so a bad `objects` config surfaces + # as a clear error instead of silently skipping data at sync time. + # This guards configs that reach runtime without going through the + # UI (direct API callers, scripts, previously-persisted configs). + self.connector.validate_connector_settings() + + # Always route through load_from_checkpoint so the per-object + # SystemModstamp cursor owns incrementality; poll_source would + # re-query every object from the caller's window each run and + # ignore the persisted per-object cursors entirely. + if task["reindex"] == "1" or not task["poll_range_start"]: + start_ts = 0.0 + else: + start_ts = task["poll_range_start"].timestamp() + end_ts = datetime.now(timezone.utc).timestamp() + checkpoint = self.connector.build_dummy_checkpoint() + document_batch_generator = self.connector.load_from_checkpoint( + start_ts, end_ts, checkpoint + ) + + instance_url = (self.conf.get("credentials") or {}).get("instance_url", "") + self.log_connection( + "Salesforce", + f"{instance_url} objects({','.join(self.connector.objects)})", + task, + ) + + def wrapper(): + for document_batch in document_batch_generator: + yield document_batch + + return wrapper() + + class AzureBlob(SyncBase): SOURCE_NAME: str = FileSource.AZURE_BLOB @@ -2044,6 +2106,7 @@ func_factory = { FileSource.ONEDRIVE: OneDrive, FileSource.OUTLOOK: Outlook, FileSource.AZURE_BLOB: AzureBlob, + FileSource.SALESFORCE: Salesforce, FileSource.SLACK: Slack, FileSource.TEAMS: Teams, FileSource.MOODLE: Moodle, diff --git a/web/src/assets/svg/data-source/salesforce.svg b/web/src/assets/svg/data-source/salesforce.svg new file mode 100644 index 0000000000..bc7097dafb --- /dev/null +++ b/web/src/assets/svg/data-source/salesforce.svg @@ -0,0 +1,4 @@ + + + + diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index 75d1aaeb0d..0246dc54fb 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -1407,6 +1407,18 @@ Example: Virtual Hosted Style`, 'Mail folder to sync (e.g. inbox, sentitems, archive). Defaults to inbox.', outlookUserIdsTip: 'Comma-separated UPNs or object IDs of mailboxes to sync. Leave blank to sync every mailbox in the tenant (requires User.Read.All).', + salesforceDescription: + 'Connect a Salesforce org and index CRM records (Accounts, Contacts, Opportunities, Cases, Knowledge articles) via SOQL with incremental sync.', + salesforceInstanceUrlTip: + 'Salesforce org URL, e.g. https://your-domain.my.salesforce.com (no trailing slash).', + salesforceClientIdTip: + 'Consumer Key of a Connected App with Client Credentials Flow enabled and the api scope.', + salesforceClientSecretTip: + 'Consumer Secret of the Connected App used for client-credentials authentication.', + salesforceObjectsTip: + 'Comma-separated SObject API names to index. Defaults to Account, Contact, Opportunity, Case, Knowledge__kav.', + salesforceApiVersionTip: + 'Salesforce REST API version (e.g. v59.0). Use the version your org supports.', azure_blobDescription: 'Index blobs from an Azure Blob Storage container into a knowledge base. Supports account-key, connection-string, and SAS-token auth. Unchanged blobs are skipped via ETag fingerprinting.', azureBlobAuthModeTip: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index 21deb62450..feee40a5f4 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -1119,6 +1119,18 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系 '要同步的邮件文件夹(例如 inbox、sentitems、archive),默认为 inbox。', outlookUserIdsTip: '要同步的邮箱 UPN 或对象 ID 列表(逗号分隔)。留空则同步租户内的所有邮箱(需要 User.Read.All 权限)。', + salesforceDescription: + '连接 Salesforce 组织,通过 SOQL 增量同步并索引 CRM 记录(客户、联系人、商机、个案、知识文章)。', + salesforceInstanceUrlTip: + 'Salesforce 组织地址,例如 https://your-domain.my.salesforce.com(不要包含末尾斜杠)。', + salesforceClientIdTip: + '已启用 Client Credentials Flow 且包含 api 权限的 Connected App 的 Consumer Key。', + salesforceClientSecretTip: + '用于客户端凭证身份验证的 Connected App 的 Consumer Secret。', + salesforceObjectsTip: + '要索引的 SObject API 名称列表(逗号分隔)。默认值:Account, Contact, Opportunity, Case, Knowledge__kav。', + salesforceApiVersionTip: + 'Salesforce REST API 版本(例如 v59.0),请使用您组织所支持的版本。', azure_blobDescription: '将 Azure Blob 存储容器中的文件索引到知识库。支持账户密钥、连接字符串和 SAS 令牌三种认证方式,通过 ETag 指纹跳过未变更的文件。', azureBlobAuthModeTip: diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 82ef99ab0e..7e0fb9b4ad 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -45,6 +45,7 @@ export enum DataSourceKey { RSS = 'rss', ONEDRIVE = 'onedrive', OUTLOOK = 'outlook', + SALESFORCE = 'salesforce', AZURE_BLOB = 'azure_blob', TEAMS = 'teams', SLACK = 'slack', @@ -138,6 +139,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.OUTLOOK]: { syncDeletedFiles: true, }, + [DataSourceKey.SALESFORCE]: { + syncDeletedFiles: true, + }, [DataSourceKey.AZURE_BLOB]: { syncDeletedFiles: true, }, @@ -339,6 +343,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.OUTLOOK}Description`), icon: , }, + [DataSourceKey.SALESFORCE]: { + name: 'Salesforce', + description: t(`setting.${DataSourceKey.SALESFORCE}Description`), + icon: , + }, [DataSourceKey.AZURE_BLOB]: { name: 'Azure Blob Storage', description: t(`setting.${DataSourceKey.AZURE_BLOB}Description`), @@ -524,6 +533,65 @@ export const DataSourceFormFields = { }, }, ], + [DataSourceKey.SALESFORCE]: [ + { + label: 'Instance URL', + name: 'config.credentials.instance_url', + type: FormFieldType.Text, + required: true, + placeholder: 'https://your-domain.my.salesforce.com', + tooltip: t('setting.salesforceInstanceUrlTip'), + validation: { + pattern: /^https:\/\/[a-zA-Z0-9.-]+\.salesforce\.com$/, + message: + 'Must be a valid Salesforce domain (https://...salesforce.com)', + }, + }, + { + label: 'Client ID', + name: 'config.credentials.client_id', + type: FormFieldType.Text, + required: true, + tooltip: t('setting.salesforceClientIdTip'), + }, + { + label: 'Client Secret', + name: 'config.credentials.client_secret', + type: FormFieldType.Password, + required: true, + tooltip: t('setting.salesforceClientSecretTip'), + }, + { + label: 'Objects', + name: 'config.objects', + type: FormFieldType.Text, + required: false, + placeholder: 'Account,Contact,Opportunity,Case,Knowledge__kav', + tooltip: t('setting.salesforceObjectsTip'), + }, + { + label: 'API Version', + name: 'config.api_version', + type: FormFieldType.Text, + required: false, + placeholder: 'v59.0', + tooltip: t('setting.salesforceApiVersionTip'), + validation: { + pattern: /^v\d+\.\d+$/, + message: 'API version must match format like v59.0', + }, + }, + { + label: 'Batch Size', + name: 'config.batch_size', + type: FormFieldType.Number, + required: false, + validation: { + min: 1, + message: 'Batch Size must be at least 1', + }, + }, + ], [DataSourceKey.AZURE_BLOB]: [ { label: 'Auth Mode', @@ -2109,6 +2177,20 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.SALESFORCE]: { + name: '', + source: DataSourceKey.SALESFORCE, + config: { + objects: '', + api_version: 'v59.0', + batch_size: 2, + credentials: { + instance_url: '', + client_id: '', + client_secret: '', + }, + }, + }, [DataSourceKey.AZURE_BLOB]: { name: '', source: DataSourceKey.AZURE_BLOB,