mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
feat(connectors): add Salesforce CRM data source connector (#15462)
### What problem does this PR solve? Closes #15461. RAGFlow had no way to ingest Salesforce CRM data, so support / sales teams couldn't ground responses on live Accounts, Contacts, Opportunities, Cases, or Knowledge articles. This adds a first-class Salesforce data source connector that authenticates against a Connected App via OAuth 2.0 client-credentials, queries selected SObjects via SOQL, and turns each record into an indexable document with incremental sync. **Highlights** - `common/data_source/salesforce_connector.py`: new `SalesforceConnector` (`CheckpointedConnectorWithPermSync` + `SlimConnectorWithPermSync`). - OAuth 2.0 client-credentials flow; canonical `instance_url` from the token response so multi-pod orgs route correctly. - Per-object `SystemModstamp` cursor stored in `SalesforceCheckpoint.cursors` — a failure mid-object doesn't rewind sibling objects, and re-syncs only fetch changed rows. - Deterministic record-to-text formatter (sorted keys) so SOQL field reordering on the server doesn't mark every row "changed" on each poll. - `_get_json` raises on non-2xx so 429 / 5xx never silently advance the checkpoint past missing data. - `Knowledge__kav` is in the default object set but is skipped silently when the org doesn't have Salesforce Knowledge enabled (404 on describe). - Slim-doc IDs are scoped as `<Object>/<Id>` so prune deletes can't collide across object types. - `common/constants.py`, `common/data_source/config.py`, `common/data_source/__init__.py`: register `salesforce` in `FileSource` / `DocumentSource` and export `SalesforceConnector`. - `rag/svr/sync_data_source.py`: new `Salesforce(SyncBase)` class routed through `load_from_checkpoint` (poll_source would re-walk every object each run) and added to `func_factory`. - Frontend: - `web/src/pages/user-setting/data-source/constant/index.tsx`: new `DataSourceKey.SALESFORCE`, form fields (instance URL, client ID/secret, objects, api_version, batch size), `syncDeletedFiles` capability, default form values, and tile entry with the new icon. - `web/src/locales/{en,zh}.ts`: description + per-field tooltips. - `web/src/assets/svg/data-source/salesforce.svg`: 48x48 brand-style icon to match the other Microsoft / cloud tiles. **Verification** - `npm run build` (vite + esbuild) passes (1m 26s). ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -156,6 +156,7 @@ class FileSource(StrEnum):
|
||||
DINGTALK_AI_TABLE = "dingtalk_ai_table"
|
||||
ONEDRIVE = "onedrive"
|
||||
OUTLOOK = "outlook"
|
||||
SALESFORCE = "salesforce"
|
||||
AZURE_BLOB = "azure_blob"
|
||||
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ from .jira.connector import JiraConnector
|
||||
from .sharepoint_connector import SharePointConnector
|
||||
from .onedrive_connector import OneDriveConnector
|
||||
from .outlook_connector import OutlookConnector
|
||||
from .salesforce_connector import SalesforceConnector
|
||||
from .azure_blob_connector import AzureBlobConnector
|
||||
from .teams_connector import TeamsConnector
|
||||
from .moodle_connector import MoodleConnector
|
||||
@@ -72,6 +73,7 @@ __all__ = [
|
||||
"SharePointConnector",
|
||||
"OneDriveConnector",
|
||||
"OutlookConnector",
|
||||
"SalesforceConnector",
|
||||
"AzureBlobConnector",
|
||||
"TeamsConnector",
|
||||
"MoodleConnector",
|
||||
|
||||
@@ -71,6 +71,7 @@ class DocumentSource(str, Enum):
|
||||
DINGTALK_AI_TABLE = "dingtalk_ai_table"
|
||||
ONEDRIVE = "onedrive"
|
||||
OUTLOOK = "outlook"
|
||||
SALESFORCE = "salesforce"
|
||||
AZURE_BLOB = "azure_blob"
|
||||
|
||||
|
||||
|
||||
573
common/data_source/salesforce_connector.py
Normal file
573
common/data_source/salesforce_connector.py
Normal file
@@ -0,0 +1,573 @@
|
||||
"""Salesforce data source connector.
|
||||
|
||||
Talks to a Salesforce org over the REST + SOQL APIs, turns each selected
|
||||
object's records into a Document, and uses ``SystemModstamp`` as the
|
||||
incremental cursor so re-syncs only fetch what changed.
|
||||
|
||||
Auth is OAuth 2.0 client-credentials (Salesforce "Connected App"); the
|
||||
caller supplies the org's ``instance_url`` so we never have to guess the
|
||||
pod hostname. A small allow-list of objects ships out of the box
|
||||
(Account, Contact, Opportunity, Case, Knowledge__kav) so the connector
|
||||
boots without per-org configuration, while the ``objects`` field lets
|
||||
operators add or replace entries.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Generator
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
|
||||
from common.data_source.config import INDEX_BATCH_SIZE
|
||||
from common.data_source.exceptions import (
|
||||
ConnectorMissingCredentialError,
|
||||
ConnectorValidationError,
|
||||
InsufficientPermissionsError,
|
||||
UnexpectedValidationError,
|
||||
)
|
||||
from common.data_source.interfaces import (
|
||||
CheckpointedConnectorWithPermSync,
|
||||
SecondsSinceUnixEpoch,
|
||||
SlimConnectorWithPermSync,
|
||||
)
|
||||
from common.data_source.models import ConnectorCheckpoint, SlimDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_API_VERSION = "v59.0"
|
||||
|
||||
# CRM objects we index by default. Operators can override via the
|
||||
# ``objects`` config list; Knowledge__kav is included because the issue
|
||||
# (#15461) calls out Knowledge articles explicitly, but it silently
|
||||
# downgrades to a skip when the org doesn't have Salesforce Knowledge
|
||||
# enabled (the SObject describe returns 404).
|
||||
_DEFAULT_OBJECTS = ["Account", "Contact", "Opportunity", "Case", "Knowledge__kav"]
|
||||
|
||||
# Optional default object: Knowledge articles only exist when the org has
|
||||
# Salesforce Knowledge enabled. It is the one object we skip silently when
|
||||
# absent — every other configured object is required, so its absence/failure
|
||||
# is treated as an error rather than quietly dropped.
|
||||
_OPTIONAL_OBJECTS = frozenset({"Knowledge__kav"})
|
||||
|
||||
|
||||
class SalesforceObjectUnavailable(UnexpectedValidationError):
|
||||
"""An SObject is genuinely absent or not queryable for this org/user.
|
||||
|
||||
Raised for HTTP 404 (describe of a non-existent object) and HTTP 400
|
||||
``INVALID_TYPE`` (SOQL against a non-existent object). It subclasses
|
||||
``UnexpectedValidationError`` so existing broad handlers still catch it,
|
||||
while letting prune distinguish "object genuinely missing" (safe to
|
||||
skip — it has no records to orphan) from a transient failure (5xx/429,
|
||||
permission, partial page) that must abort rather than delete live docs.
|
||||
"""
|
||||
|
||||
|
||||
def _is_object_unavailable(resp: requests.Response) -> bool:
|
||||
"""True when *resp* indicates the SObject simply does not exist for this
|
||||
org/user, as opposed to a transient/permission error.
|
||||
|
||||
Salesforce returns 404 for describe of an unknown object and 400 with
|
||||
``errorCode == "INVALID_TYPE"`` for SOQL against one. 403 (no access) and
|
||||
5xx/429 (transient) are deliberately NOT treated as "unavailable" — those
|
||||
must propagate so callers don't act on an incomplete picture.
|
||||
"""
|
||||
if resp.status_code == 404:
|
||||
return True
|
||||
if resp.status_code == 400:
|
||||
try:
|
||||
payload = resp.json()
|
||||
except ValueError:
|
||||
return "INVALID_TYPE" in (resp.text or "")
|
||||
entries = payload if isinstance(payload, list) else [payload]
|
||||
for entry in entries:
|
||||
if isinstance(entry, dict) and entry.get("errorCode") == "INVALID_TYPE":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class SalesforceCheckpoint(ConnectorCheckpoint):
|
||||
"""Per-object SystemModstamp cursor.
|
||||
|
||||
Stored as ISO-8601 strings (Salesforce's native format) keyed by
|
||||
SObject name so each object advances independently — a sync that
|
||||
fails halfway through ``Case`` does not rewind ``Account``.
|
||||
"""
|
||||
|
||||
cursors: dict[str, str] | None = None
|
||||
|
||||
|
||||
class SalesforceConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync):
|
||||
"""Salesforce CRM connector.
|
||||
|
||||
Requires a Connected App with:
|
||||
- ``Client Credentials Flow`` enabled
|
||||
- OAuth scopes: ``api``, ``refresh_token`` (refresh_token not used
|
||||
but Salesforce requires the scope set to issue an access token)
|
||||
|
||||
The execution user must have read access to every object listed in
|
||||
``objects`` — missing permissions surface as ``403`` during
|
||||
validation rather than silent empty pages.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
objects: list[str] | None = None,
|
||||
api_version: str = _DEFAULT_API_VERSION,
|
||||
) -> None:
|
||||
self.batch_size = batch_size
|
||||
self.api_version = api_version
|
||||
self.objects = [obj.strip() for obj in (objects or _DEFAULT_OBJECTS) if obj and obj.strip()]
|
||||
self._instance_url: str | None = None
|
||||
self._access_token: str | None = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Auth
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
instance_url = (credentials.get("instance_url") or "").rstrip("/")
|
||||
client_id = credentials.get("client_id")
|
||||
client_secret = credentials.get("client_secret")
|
||||
|
||||
if not all([instance_url, client_id, client_secret]):
|
||||
raise ConnectorMissingCredentialError(
|
||||
"Salesforce credentials are incomplete (instance_url, client_id, client_secret required)"
|
||||
)
|
||||
|
||||
token_url = urljoin(instance_url + "/", "services/oauth2/token")
|
||||
try:
|
||||
resp = requests.post(
|
||||
token_url,
|
||||
data={
|
||||
"grant_type": "client_credentials",
|
||||
"client_id": client_id,
|
||||
"client_secret": client_secret,
|
||||
},
|
||||
timeout=60,
|
||||
)
|
||||
except requests.RequestException as exc:
|
||||
raise ConnectorMissingCredentialError(
|
||||
f"Salesforce token request failed: {exc}"
|
||||
)
|
||||
|
||||
if not resp.ok:
|
||||
# Salesforce returns {"error": "...", "error_description": "..."}
|
||||
try:
|
||||
body = resp.json()
|
||||
detail = body.get("error_description") or body.get("error") or resp.text
|
||||
except ValueError:
|
||||
detail = resp.text[:200]
|
||||
raise ConnectorMissingCredentialError(
|
||||
f"Failed to acquire Salesforce access token (HTTP {resp.status_code}): {detail}"
|
||||
)
|
||||
|
||||
data = resp.json()
|
||||
token = data.get("access_token")
|
||||
# Salesforce echoes back the *canonical* instance for the org —
|
||||
# prefer it so multi-pod orgs (NA1 → NA45 migrations) hit the
|
||||
# correct host even when the configured URL went stale.
|
||||
canonical = (data.get("instance_url") or "").rstrip("/")
|
||||
if not token:
|
||||
raise ConnectorMissingCredentialError(
|
||||
"Salesforce token response did not contain access_token"
|
||||
)
|
||||
|
||||
self._access_token = token
|
||||
self._instance_url = canonical or instance_url
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Validation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
if not self._access_token or not self._instance_url:
|
||||
raise ConnectorMissingCredentialError("Salesforce")
|
||||
|
||||
# Cheap reachability + permission probe: the /sobjects endpoint
|
||||
# lists every object the user can describe; a 401/403 here means
|
||||
# the connected app or the user lacks API access altogether.
|
||||
resp = self._get(f"{self._base()}/sobjects")
|
||||
if resp.status_code == 401:
|
||||
raise ConnectorMissingCredentialError(
|
||||
"Salesforce access token is invalid or expired."
|
||||
)
|
||||
if resp.status_code == 403:
|
||||
raise InsufficientPermissionsError(
|
||||
"The Salesforce execution user lacks API access; enable the 'API Enabled' profile permission."
|
||||
)
|
||||
if not resp.ok:
|
||||
raise UnexpectedValidationError(
|
||||
f"Salesforce validation failed (HTTP {resp.status_code}): {resp.text[:200]}"
|
||||
)
|
||||
|
||||
try:
|
||||
payload = resp.json()
|
||||
except ValueError as exc:
|
||||
raise ConnectorValidationError(
|
||||
f"Salesforce /sobjects response is not JSON: {exc}"
|
||||
)
|
||||
if "sobjects" not in payload:
|
||||
raise ConnectorValidationError(
|
||||
"Unexpected response format from Salesforce /sobjects."
|
||||
)
|
||||
|
||||
# Fail fast on typos / inaccessible objects instead of silently
|
||||
# missing their data during sync. The global describe lists every
|
||||
# object the user can see plus its queryable flag, so we can vet the
|
||||
# configured objects without an extra call per object.
|
||||
queryable = {
|
||||
so["name"]: bool(so.get("queryable", False))
|
||||
for so in payload.get("sobjects", [])
|
||||
if isinstance(so, dict) and so.get("name")
|
||||
}
|
||||
unknown: list[str] = []
|
||||
not_queryable: list[str] = []
|
||||
for obj in self.objects:
|
||||
if obj not in queryable:
|
||||
# Knowledge__kav is an optional default — absent unless the
|
||||
# org has Salesforce Knowledge. Don't fail validation for it.
|
||||
if obj in _OPTIONAL_OBJECTS:
|
||||
logger.warning(
|
||||
"Salesforce: optional object %s not present in this org; it will be skipped.",
|
||||
obj,
|
||||
)
|
||||
continue
|
||||
unknown.append(obj)
|
||||
elif not queryable[obj]:
|
||||
not_queryable.append(obj)
|
||||
|
||||
if unknown or not_queryable:
|
||||
problems = []
|
||||
if unknown:
|
||||
problems.append(f"unknown object(s): {', '.join(sorted(unknown))}")
|
||||
if not_queryable:
|
||||
problems.append(f"non-queryable object(s): {', '.join(sorted(not_queryable))}")
|
||||
raise ConnectorValidationError(
|
||||
"Salesforce 'objects' configuration is invalid — "
|
||||
+ "; ".join(problems)
|
||||
+ ". Check for typos and that the execution user has read access to each object."
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Checkpoint helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def build_dummy_checkpoint(self) -> SalesforceCheckpoint:
|
||||
return SalesforceCheckpoint(has_more=True, cursors={})
|
||||
|
||||
def validate_checkpoint_json(self, checkpoint_json: str) -> SalesforceCheckpoint:
|
||||
try:
|
||||
return SalesforceCheckpoint.model_validate_json(checkpoint_json)
|
||||
except Exception:
|
||||
return self.build_dummy_checkpoint()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Core data loading
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def poll_source(
|
||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||
) -> Any:
|
||||
return self._iter_documents(since_epoch=start, until_epoch=end if end else None)
|
||||
|
||||
def load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ConnectorCheckpoint,
|
||||
) -> Any:
|
||||
if not isinstance(checkpoint, SalesforceCheckpoint):
|
||||
checkpoint = self.build_dummy_checkpoint()
|
||||
since = start if start else None
|
||||
until = end if end else None
|
||||
return self._iter_documents(checkpoint=checkpoint, since_epoch=since, until_epoch=until)
|
||||
|
||||
def load_from_checkpoint_with_perm_sync(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
checkpoint: ConnectorCheckpoint,
|
||||
) -> Any:
|
||||
return self.load_from_checkpoint(start, end, checkpoint)
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> Generator[list[SlimDocument], None, None]:
|
||||
"""Yield batches of slim documents for prune / permission sync.
|
||||
|
||||
Salesforce records use 15/18-character object IDs; we surface
|
||||
the SObject-prefixed form (``Account/0015g00000...``) so the
|
||||
prune collector can disambiguate IDs that collide across object
|
||||
types and so deletes are scoped to the connector instance.
|
||||
"""
|
||||
if not self._access_token:
|
||||
raise ConnectorMissingCredentialError("Salesforce")
|
||||
|
||||
batch: list[SlimDocument] = []
|
||||
for obj in self.objects:
|
||||
try:
|
||||
for record in self._query_records(obj, fields=["Id"], since_epoch=None):
|
||||
rec_id = record.get("Id")
|
||||
if not rec_id:
|
||||
continue
|
||||
doc_id = f"{obj}/{rec_id}"
|
||||
if callback:
|
||||
callback(doc_id, obj)
|
||||
batch.append(SlimDocument(id=doc_id))
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
except SalesforceObjectUnavailable:
|
||||
# Object genuinely absent (e.g. Knowledge__kav without
|
||||
# Salesforce Knowledge). It has no records, so omitting it
|
||||
# cannot orphan documents — safe to skip.
|
||||
logger.warning("Salesforce prune skipping %s (object unavailable)", obj)
|
||||
continue
|
||||
# Any OTHER failure (transient 5xx/429, permission, a partial
|
||||
# page mid-enumeration) propagates: prune must NOT run on an
|
||||
# incomplete snapshot, or the collector would treat the missing
|
||||
# IDs as stale and delete documents that still exist.
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _base(self) -> str:
|
||||
return f"{self._instance_url}/services/data/{self.api_version}"
|
||||
|
||||
def _get(self, url: str) -> requests.Response:
|
||||
return requests.get(
|
||||
url,
|
||||
headers={
|
||||
"Authorization": f"Bearer {self._access_token}",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
def _get_json(self, url: str, *, context: str) -> dict:
|
||||
"""GET *url* and decode JSON. Raise on non-2xx so a 429 / 5xx
|
||||
never silently advances the checkpoint past missing records."""
|
||||
resp = self._get(url)
|
||||
if not resp.ok:
|
||||
body_snippet = resp.text[:200] if resp.text else ""
|
||||
logger.error(
|
||||
"Salesforce request failed (%s): HTTP %s url=%s body=%s",
|
||||
context,
|
||||
resp.status_code,
|
||||
url,
|
||||
body_snippet,
|
||||
)
|
||||
if _is_object_unavailable(resp):
|
||||
raise SalesforceObjectUnavailable(
|
||||
f"Salesforce object unavailable ({context}): HTTP {resp.status_code} {body_snippet}"
|
||||
)
|
||||
raise UnexpectedValidationError(
|
||||
f"Salesforce request failed ({context}): HTTP {resp.status_code} {body_snippet}"
|
||||
)
|
||||
try:
|
||||
return resp.json()
|
||||
except ValueError as exc:
|
||||
raise UnexpectedValidationError(
|
||||
f"Salesforce response is not JSON ({context}): {exc}"
|
||||
)
|
||||
|
||||
def _describe_fields(self, obj: str) -> list[str]:
|
||||
"""Return field API names for *obj*. Filters out compound types
|
||||
(address, location) that SOQL can only project via sub-fields."""
|
||||
data = self._get_json(
|
||||
f"{self._base()}/sobjects/{obj}/describe",
|
||||
context=f"describe {obj}",
|
||||
)
|
||||
fields = []
|
||||
for field in data.get("fields", []):
|
||||
if field.get("type") in {"address", "location"}:
|
||||
continue
|
||||
name = field.get("name")
|
||||
if name:
|
||||
fields.append(name)
|
||||
if "Id" not in fields:
|
||||
fields.insert(0, "Id")
|
||||
return fields
|
||||
|
||||
def _query_records(
|
||||
self,
|
||||
obj: str,
|
||||
fields: list[str],
|
||||
since_epoch: float | None,
|
||||
until_epoch: float | None = None,
|
||||
) -> Generator[dict, None, None]:
|
||||
"""Yield raw record dicts for *obj*, page by page, oldest first.
|
||||
|
||||
Orders by ``SystemModstamp`` so the per-object cursor advances
|
||||
monotonically even when paging is interrupted: the next run
|
||||
resumes from the latest persisted timestamp and Salesforce
|
||||
returns records strictly newer than that.
|
||||
"""
|
||||
field_list = ",".join(fields)
|
||||
filters = []
|
||||
if since_epoch:
|
||||
since_iso = datetime.fromtimestamp(since_epoch, tz=timezone.utc).strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
filters.append(f"SystemModstamp > {since_iso}")
|
||||
if until_epoch:
|
||||
until_iso = datetime.fromtimestamp(until_epoch, tz=timezone.utc).strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
filters.append(f"SystemModstamp <= {until_iso}")
|
||||
where = f" WHERE {' AND '.join(filters)}" if filters else ""
|
||||
soql = f"SELECT {field_list} FROM {obj}{where} ORDER BY SystemModstamp ASC"
|
||||
|
||||
url: str | None = f"{self._base()}/query?q={requests.utils.quote(soql)}"
|
||||
while url:
|
||||
page = self._get_json(url, context=f"query {obj}")
|
||||
for record in page.get("records", []):
|
||||
yield record
|
||||
next_url = page.get("nextRecordsUrl")
|
||||
url = f"{self._instance_url}{next_url}" if next_url else None
|
||||
|
||||
@staticmethod
|
||||
def _record_to_text(obj: str, record: dict) -> str:
|
||||
"""Flatten a SOQL record into a deterministic plain-text body.
|
||||
|
||||
Keeping the formatter deterministic (sorted field order, stable
|
||||
``key: value`` lines) matters for content hashing — without it,
|
||||
Salesforce field-reordering on the server would tag every
|
||||
record as "changed" on every poll and re-embed the entire
|
||||
org each sync.
|
||||
"""
|
||||
lines = [f"Salesforce {obj}"]
|
||||
for key in sorted(record.keys()):
|
||||
if key in ("attributes",):
|
||||
continue
|
||||
value = record[key]
|
||||
if value is None or value == "":
|
||||
continue
|
||||
if isinstance(value, (dict, list)):
|
||||
# Nested relationships (e.g. Account on Contact) — keep
|
||||
# the type name + id rather than recursing arbitrarily.
|
||||
lines.append(f"{key}: {value}")
|
||||
else:
|
||||
lines.append(f"{key}: {value}")
|
||||
return "\n".join(lines)
|
||||
|
||||
def _iter_documents(
|
||||
self,
|
||||
checkpoint: SalesforceCheckpoint | None = None,
|
||||
since_epoch: float | None = None,
|
||||
until_epoch: float | None = None,
|
||||
):
|
||||
from common.data_source.models import Document
|
||||
|
||||
cursors: dict[str, str] = {}
|
||||
if checkpoint and checkpoint.cursors:
|
||||
cursors = dict(checkpoint.cursors)
|
||||
|
||||
batch: list[Document] = []
|
||||
|
||||
for obj in self.objects:
|
||||
try:
|
||||
fields = self._describe_fields(obj)
|
||||
except SalesforceObjectUnavailable:
|
||||
# Object genuinely absent (e.g. Knowledge__kav without
|
||||
# Salesforce Knowledge): skip it. Transient describe
|
||||
# failures are NOT swallowed — they raise below so the run
|
||||
# doesn't silently miss an object's data.
|
||||
logger.warning(
|
||||
"Salesforce skipping %s (object not present in this org)",
|
||||
obj,
|
||||
)
|
||||
continue
|
||||
|
||||
# Per-object cursor takes precedence over the caller's
|
||||
# window. The cursor was persisted from the *last successful
|
||||
# record* so it cannot rewind past records we already
|
||||
# ingested even if the caller passes an older since_epoch.
|
||||
obj_since = since_epoch
|
||||
cursor_iso = cursors.get(obj)
|
||||
if cursor_iso:
|
||||
try:
|
||||
cur_dt = datetime.fromisoformat(cursor_iso.replace("Z", "+00:00"))
|
||||
cur_ts = cur_dt.timestamp()
|
||||
obj_since = max(obj_since or 0, cur_ts)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
latest_iso: str | None = cursor_iso
|
||||
try:
|
||||
for record in self._query_records(obj, fields, obj_since, until_epoch):
|
||||
rec_id = record.get("Id")
|
||||
if not rec_id:
|
||||
continue
|
||||
|
||||
modified_str: str = record.get("SystemModstamp", "")
|
||||
modified_dt: datetime | None = None
|
||||
if modified_str:
|
||||
try:
|
||||
modified_dt = datetime.fromisoformat(
|
||||
modified_str.replace("Z", "+00:00")
|
||||
)
|
||||
except ValueError:
|
||||
modified_dt = None
|
||||
|
||||
doc_updated_at = modified_dt or datetime.now(timezone.utc)
|
||||
|
||||
# Display name: prefer ``Name``; fall back to
|
||||
# ``Subject`` (Case) or ``Title`` (Knowledge); last
|
||||
# resort is ``<Object>/<Id>`` so the doc list is
|
||||
# never blank-titled.
|
||||
name = (
|
||||
record.get("Name")
|
||||
or record.get("Subject")
|
||||
or record.get("Title")
|
||||
or f"{obj}/{rec_id}"
|
||||
)
|
||||
|
||||
body = self._record_to_text(obj, record)
|
||||
blob = body.encode("utf-8")
|
||||
|
||||
doc = Document(
|
||||
id=f"{obj}/{rec_id}",
|
||||
source="salesforce",
|
||||
semantic_identifier=str(name),
|
||||
extension=".txt",
|
||||
blob=blob,
|
||||
doc_updated_at=doc_updated_at,
|
||||
size_bytes=len(blob),
|
||||
metadata={
|
||||
"object": obj,
|
||||
"record_id": rec_id,
|
||||
"web_url": f"{self._instance_url}/{rec_id}",
|
||||
},
|
||||
)
|
||||
batch.append(doc)
|
||||
if modified_str:
|
||||
latest_iso = modified_str
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
except UnexpectedValidationError as exc:
|
||||
# Do not continue: advancing to the next object would let
|
||||
# the task finish as DONE and move the global poll window
|
||||
# past the failed object's missing records permanently.
|
||||
logger.warning("Salesforce %s query failed: %s", obj, exc)
|
||||
raise
|
||||
|
||||
if latest_iso:
|
||||
cursors[obj] = latest_iso
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
if checkpoint is not None:
|
||||
checkpoint.cursors = cursors
|
||||
checkpoint.has_more = False
|
||||
@@ -64,6 +64,7 @@ from common.data_source import (
|
||||
OneDriveConnector,
|
||||
OutlookConnector,
|
||||
AzureBlobConnector,
|
||||
SalesforceConnector,
|
||||
TeamsConnector,
|
||||
SlackConnector,
|
||||
SharePointConnector,
|
||||
@@ -1129,6 +1130,67 @@ class Outlook(SyncBase):
|
||||
return wrapper()
|
||||
|
||||
|
||||
class Salesforce(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.SALESFORCE
|
||||
|
||||
async def _generate(self, task: dict):
|
||||
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
||||
try:
|
||||
batch_size = int(raw_batch_size)
|
||||
except (TypeError, ValueError):
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
if batch_size <= 0:
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
|
||||
raw_objects = self.conf.get("objects")
|
||||
if isinstance(raw_objects, str):
|
||||
objects = [o.strip() for o in raw_objects.split(",") if o.strip()]
|
||||
elif isinstance(raw_objects, list):
|
||||
objects = [str(o).strip() for o in raw_objects if str(o).strip()]
|
||||
else:
|
||||
objects = None
|
||||
|
||||
self.connector = SalesforceConnector(
|
||||
batch_size=batch_size,
|
||||
objects=objects,
|
||||
api_version=self.conf.get("api_version") or "v59.0",
|
||||
)
|
||||
self.connector.load_credentials(self.conf["credentials"])
|
||||
# Fail fast on invalid/inaccessible objects (typos, missing object
|
||||
# permissions) before iterating, so a bad `objects` config surfaces
|
||||
# as a clear error instead of silently skipping data at sync time.
|
||||
# This guards configs that reach runtime without going through the
|
||||
# UI (direct API callers, scripts, previously-persisted configs).
|
||||
self.connector.validate_connector_settings()
|
||||
|
||||
# Always route through load_from_checkpoint so the per-object
|
||||
# SystemModstamp cursor owns incrementality; poll_source would
|
||||
# re-query every object from the caller's window each run and
|
||||
# ignore the persisted per-object cursors entirely.
|
||||
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||
start_ts = 0.0
|
||||
else:
|
||||
start_ts = task["poll_range_start"].timestamp()
|
||||
end_ts = datetime.now(timezone.utc).timestamp()
|
||||
checkpoint = self.connector.build_dummy_checkpoint()
|
||||
document_batch_generator = self.connector.load_from_checkpoint(
|
||||
start_ts, end_ts, checkpoint
|
||||
)
|
||||
|
||||
instance_url = (self.conf.get("credentials") or {}).get("instance_url", "")
|
||||
self.log_connection(
|
||||
"Salesforce",
|
||||
f"{instance_url} objects({','.join(self.connector.objects)})",
|
||||
task,
|
||||
)
|
||||
|
||||
def wrapper():
|
||||
for document_batch in document_batch_generator:
|
||||
yield document_batch
|
||||
|
||||
return wrapper()
|
||||
|
||||
|
||||
class AzureBlob(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.AZURE_BLOB
|
||||
|
||||
@@ -2044,6 +2106,7 @@ func_factory = {
|
||||
FileSource.ONEDRIVE: OneDrive,
|
||||
FileSource.OUTLOOK: Outlook,
|
||||
FileSource.AZURE_BLOB: AzureBlob,
|
||||
FileSource.SALESFORCE: Salesforce,
|
||||
FileSource.SLACK: Slack,
|
||||
FileSource.TEAMS: Teams,
|
||||
FileSource.MOODLE: Moodle,
|
||||
|
||||
4
web/src/assets/svg/data-source/salesforce.svg
Normal file
4
web/src/assets/svg/data-source/salesforce.svg
Normal file
@@ -0,0 +1,4 @@
|
||||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 48 48" width="48" height="48">
|
||||
<path d="M20.5 13.5a7 7 0 0 1 11.7 1.6 6 6 0 0 1 2.4-.5 6.5 6.5 0 0 1 5.7 9.6 6 6 0 0 1-1.7 10.7 6 6 0 0 1-8 3.6 6.5 6.5 0 0 1-11 .7 6 6 0 0 1-9-5.4 6 6 0 0 1 .6-2.6A6.5 6.5 0 0 1 14.6 19a7 7 0 0 1 5.9-5.5z" fill="#00A1E0"/>
|
||||
<path d="M19 22.5h2.5v8H19zM22.5 22.5h2.4l3 5.3v-5.3h2.4v8h-2.4l-3-5.3v5.3h-2.4zM31 22.5h5v2h-3v1.2h2.8v1.7H33V29h3v1.5h-5z" fill="#FFFFFF"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 462 B |
@@ -1407,6 +1407,18 @@ Example: Virtual Hosted Style`,
|
||||
'Mail folder to sync (e.g. inbox, sentitems, archive). Defaults to inbox.',
|
||||
outlookUserIdsTip:
|
||||
'Comma-separated UPNs or object IDs of mailboxes to sync. Leave blank to sync every mailbox in the tenant (requires User.Read.All).',
|
||||
salesforceDescription:
|
||||
'Connect a Salesforce org and index CRM records (Accounts, Contacts, Opportunities, Cases, Knowledge articles) via SOQL with incremental sync.',
|
||||
salesforceInstanceUrlTip:
|
||||
'Salesforce org URL, e.g. https://your-domain.my.salesforce.com (no trailing slash).',
|
||||
salesforceClientIdTip:
|
||||
'Consumer Key of a Connected App with Client Credentials Flow enabled and the api scope.',
|
||||
salesforceClientSecretTip:
|
||||
'Consumer Secret of the Connected App used for client-credentials authentication.',
|
||||
salesforceObjectsTip:
|
||||
'Comma-separated SObject API names to index. Defaults to Account, Contact, Opportunity, Case, Knowledge__kav.',
|
||||
salesforceApiVersionTip:
|
||||
'Salesforce REST API version (e.g. v59.0). Use the version your org supports.',
|
||||
azure_blobDescription:
|
||||
'Index blobs from an Azure Blob Storage container into a knowledge base. Supports account-key, connection-string, and SAS-token auth. Unchanged blobs are skipped via ETag fingerprinting.',
|
||||
azureBlobAuthModeTip:
|
||||
|
||||
@@ -1119,6 +1119,18 @@ NER:使用 spaCy NER 和基于规则的关键词提取来抽取实体和关系
|
||||
'要同步的邮件文件夹(例如 inbox、sentitems、archive),默认为 inbox。',
|
||||
outlookUserIdsTip:
|
||||
'要同步的邮箱 UPN 或对象 ID 列表(逗号分隔)。留空则同步租户内的所有邮箱(需要 User.Read.All 权限)。',
|
||||
salesforceDescription:
|
||||
'连接 Salesforce 组织,通过 SOQL 增量同步并索引 CRM 记录(客户、联系人、商机、个案、知识文章)。',
|
||||
salesforceInstanceUrlTip:
|
||||
'Salesforce 组织地址,例如 https://your-domain.my.salesforce.com(不要包含末尾斜杠)。',
|
||||
salesforceClientIdTip:
|
||||
'已启用 Client Credentials Flow 且包含 api 权限的 Connected App 的 Consumer Key。',
|
||||
salesforceClientSecretTip:
|
||||
'用于客户端凭证身份验证的 Connected App 的 Consumer Secret。',
|
||||
salesforceObjectsTip:
|
||||
'要索引的 SObject API 名称列表(逗号分隔)。默认值:Account, Contact, Opportunity, Case, Knowledge__kav。',
|
||||
salesforceApiVersionTip:
|
||||
'Salesforce REST API 版本(例如 v59.0),请使用您组织所支持的版本。',
|
||||
azure_blobDescription:
|
||||
'将 Azure Blob 存储容器中的文件索引到知识库。支持账户密钥、连接字符串和 SAS 令牌三种认证方式,通过 ETag 指纹跳过未变更的文件。',
|
||||
azureBlobAuthModeTip:
|
||||
|
||||
@@ -45,6 +45,7 @@ export enum DataSourceKey {
|
||||
RSS = 'rss',
|
||||
ONEDRIVE = 'onedrive',
|
||||
OUTLOOK = 'outlook',
|
||||
SALESFORCE = 'salesforce',
|
||||
AZURE_BLOB = 'azure_blob',
|
||||
TEAMS = 'teams',
|
||||
SLACK = 'slack',
|
||||
@@ -138,6 +139,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
|
||||
[DataSourceKey.OUTLOOK]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.SALESFORCE]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.AZURE_BLOB]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
@@ -339,6 +343,11 @@ export const generateDataSourceInfo = (t: TFunction) => {
|
||||
description: t(`setting.${DataSourceKey.OUTLOOK}Description`),
|
||||
icon: <Mail className="text-text-primary" size={22} />,
|
||||
},
|
||||
[DataSourceKey.SALESFORCE]: {
|
||||
name: 'Salesforce',
|
||||
description: t(`setting.${DataSourceKey.SALESFORCE}Description`),
|
||||
icon: <SvgIcon name={'data-source/salesforce'} width={38} />,
|
||||
},
|
||||
[DataSourceKey.AZURE_BLOB]: {
|
||||
name: 'Azure Blob Storage',
|
||||
description: t(`setting.${DataSourceKey.AZURE_BLOB}Description`),
|
||||
@@ -524,6 +533,65 @@ export const DataSourceFormFields = {
|
||||
},
|
||||
},
|
||||
],
|
||||
[DataSourceKey.SALESFORCE]: [
|
||||
{
|
||||
label: 'Instance URL',
|
||||
name: 'config.credentials.instance_url',
|
||||
type: FormFieldType.Text,
|
||||
required: true,
|
||||
placeholder: 'https://your-domain.my.salesforce.com',
|
||||
tooltip: t('setting.salesforceInstanceUrlTip'),
|
||||
validation: {
|
||||
pattern: /^https:\/\/[a-zA-Z0-9.-]+\.salesforce\.com$/,
|
||||
message:
|
||||
'Must be a valid Salesforce domain (https://...salesforce.com)',
|
||||
},
|
||||
},
|
||||
{
|
||||
label: 'Client ID',
|
||||
name: 'config.credentials.client_id',
|
||||
type: FormFieldType.Text,
|
||||
required: true,
|
||||
tooltip: t('setting.salesforceClientIdTip'),
|
||||
},
|
||||
{
|
||||
label: 'Client Secret',
|
||||
name: 'config.credentials.client_secret',
|
||||
type: FormFieldType.Password,
|
||||
required: true,
|
||||
tooltip: t('setting.salesforceClientSecretTip'),
|
||||
},
|
||||
{
|
||||
label: 'Objects',
|
||||
name: 'config.objects',
|
||||
type: FormFieldType.Text,
|
||||
required: false,
|
||||
placeholder: 'Account,Contact,Opportunity,Case,Knowledge__kav',
|
||||
tooltip: t('setting.salesforceObjectsTip'),
|
||||
},
|
||||
{
|
||||
label: 'API Version',
|
||||
name: 'config.api_version',
|
||||
type: FormFieldType.Text,
|
||||
required: false,
|
||||
placeholder: 'v59.0',
|
||||
tooltip: t('setting.salesforceApiVersionTip'),
|
||||
validation: {
|
||||
pattern: /^v\d+\.\d+$/,
|
||||
message: 'API version must match format like v59.0',
|
||||
},
|
||||
},
|
||||
{
|
||||
label: 'Batch Size',
|
||||
name: 'config.batch_size',
|
||||
type: FormFieldType.Number,
|
||||
required: false,
|
||||
validation: {
|
||||
min: 1,
|
||||
message: 'Batch Size must be at least 1',
|
||||
},
|
||||
},
|
||||
],
|
||||
[DataSourceKey.AZURE_BLOB]: [
|
||||
{
|
||||
label: 'Auth Mode',
|
||||
@@ -2109,6 +2177,20 @@ export const DataSourceFormDefaultValues = {
|
||||
},
|
||||
},
|
||||
},
|
||||
[DataSourceKey.SALESFORCE]: {
|
||||
name: '',
|
||||
source: DataSourceKey.SALESFORCE,
|
||||
config: {
|
||||
objects: '',
|
||||
api_version: 'v59.0',
|
||||
batch_size: 2,
|
||||
credentials: {
|
||||
instance_url: '',
|
||||
client_id: '',
|
||||
client_secret: '',
|
||||
},
|
||||
},
|
||||
},
|
||||
[DataSourceKey.AZURE_BLOB]: {
|
||||
name: '',
|
||||
source: DataSourceKey.AZURE_BLOB,
|
||||
|
||||
Reference in New Issue
Block a user