Files
ragflow/api/utils/web_utils.py
monsterDavid 7da4f200e5 fix(agent): enable MCP file preview via doc_id (#15399)
## Summary
MCP-wrapped agents could only force-download files looked up by
`doc_id`. This adds an explicit preview path and inline response headers
for previewable file types.

- **New** `GET /api/v1/agents/attachments/{attachment_id}/preview` —
inline preview for PDFs, images, and other safe types (pass `ext` and/or
`mime_type`)
- **Improved** `GET /api/v1/documents/{doc_id}/preview` — sets inline
disposition using the document filename
- **Improved** attachment download routing — resolves `mime_type` /
`ext` query params (no default `markdown`), supports
`disposition=inline`
- **DocGenerator output** — includes URL-encoded `preview_url` for MCP
clients
- **Legacy `/document/download/...` aliases** — still use download
semantics; MCP clients should call `/preview` explicitly

Fixes #15398

## Test plan
- [x] `pytest test/unit_test/api/utils/test_file_response_headers.py`
(6/6)

---------

Co-authored-by: MkDev11 <mkdev11@users.noreply.github.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Ling Qin <qinling0210@163.com>
2026-07-03 19:56:01 +08:00

207 lines
6.4 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import json
import re
import aiosmtplib
from email.mime.text import MIMEText
from email.header import Header
from common import settings
from quart import render_template_string
from api.utils.email_templates import EMAIL_TEMPLATES
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.expected_conditions import staleness_of
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
OTP_LENGTH = 4
OTP_TTL_SECONDS = 5 * 60 # valid for 5 minutes
ATTEMPT_LIMIT = 5 # maximum attempts
ATTEMPT_LOCK_SECONDS = 30 * 60 # lock for 30 minutes
RESEND_COOLDOWN_SECONDS = 60 # cooldown for 1 minute
from api.utils.file_response import ( # noqa: F401
CONTENT_TYPE_MAP,
FORCE_ATTACHMENT_CONTENT_TYPES,
FORCE_ATTACHMENT_EXTENSIONS,
agent_attachment_preview_path,
apply_download_file_response_headers,
apply_preview_file_response_headers,
resolve_attachment_content_type,
sanitize_content_disposition_filename,
should_force_attachment,
)
def apply_safe_file_response_headers(response, content_type: str | None, ext: str | None = None):
if content_type:
response.headers.set("Content-Type", content_type)
force_attachment = should_force_attachment(ext, content_type)
if force_attachment:
response.headers.set("X-Content-Type-Options", "nosniff")
response.headers.set("Content-Disposition", "attachment")
return response
def html2pdf(
source: str,
timeout: int = 2,
install_driver: bool = True,
print_options: dict = {},
):
result = __get_pdf_from_html(source, timeout, install_driver, print_options)
return result
def __send_devtools(driver, cmd, params={}):
resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
url = driver.command_executor._url + resource
body = json.dumps({"cmd": cmd, "params": params})
response = driver.command_executor._request("POST", url, body)
if not response:
raise Exception(response.get("value"))
return response.get("value")
def __get_pdf_from_html(path: str, timeout: int, install_driver: bool, print_options: dict):
webdriver_options = Options()
webdriver_prefs = {}
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.experimental_options["prefs"] = webdriver_prefs
webdriver_prefs["profile.default_content_settings"] = {"images": 2}
if install_driver:
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=webdriver_options)
else:
driver = webdriver.Chrome(options=webdriver_options)
driver.get(path)
try:
WebDriverWait(driver, timeout).until(staleness_of(driver.find_element(by=By.TAG_NAME, value="html")))
except TimeoutException:
pass
try:
calculated_print_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
}
calculated_print_options.update(print_options)
result = __send_devtools(driver, "Page.printToPDF", calculated_print_options)
return base64.b64decode(result["data"])
finally:
driver.quit()
def is_valid_url(url: str) -> bool:
if not re.match(r"(https?)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", url):
return False
from common.ssrf_guard import assert_url_is_safe
try:
assert_url_is_safe(url)
return True
except ValueError:
return False
def safe_json_parse(data: str | dict) -> dict:
if isinstance(data, dict):
return data
try:
return json.loads(data) if data else {}
except (json.JSONDecodeError, TypeError):
return {}
def get_float(req: dict, key: str, default: float | int = 10.0) -> float:
try:
parsed = float(req.get(key, default))
return parsed if parsed > 0 else default
except (TypeError, ValueError):
return default
async def send_email_html(to_email: str, subject: str, template_key: str, **context):
body = await render_template_string(EMAIL_TEMPLATES.get(template_key), **context)
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = Header(subject, "utf-8")
msg["From"] = f"{settings.MAIL_DEFAULT_SENDER[0]} <{settings.MAIL_DEFAULT_SENDER[1]}>"
msg["To"] = to_email
smtp = aiosmtplib.SMTP(
hostname=settings.MAIL_SERVER,
port=settings.MAIL_PORT,
use_tls=True,
timeout=10,
)
await smtp.connect()
await smtp.login(settings.MAIL_USERNAME, settings.MAIL_PASSWORD)
await smtp.send_message(msg)
await smtp.quit()
async def send_invite_email(to_email, invite_url, tenant_id, inviter):
# Reuse the generic HTML sender with 'invite' template
await send_email_html(
to_email=to_email,
subject="RAGFlow Invitation",
template_key="invite",
email=to_email,
invite_url=invite_url,
tenant_id=tenant_id,
inviter=inviter,
)
def otp_keys(email: str):
email = (email or "").strip().lower()
return (
f"otp:{email}",
f"otp_attempts:{email}",
f"otp_last_sent:{email}",
f"otp_lock:{email}",
)
def hash_code(code: str, salt: bytes) -> str:
import hashlib
import hmac
return hmac.new(salt, (code or "").encode("utf-8"), hashlib.sha256).hexdigest()
def captcha_key(email: str) -> str:
return f"captcha:{email}"