mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
Feat(browser control):Add new agent component 'browser' to control browser by AI (#14888)
### What problem does this PR solve? This PR adds a new `Browser` operator to Agent workflows, enabling prompt-driven browser automation in RAGFlow.Technically based ‘Browser-Use’ It includes: - Backend browser component execution with tenant LLM integration - Upload source support (file IDs, URLs, variables, CSV/JSON array) - Downloaded file persistence to RAGFlow storage - Frontend node/operator integration, form config, icon, and i18n updates - Unit tests for upload/download and ID parsing logic - Dependency and Docker updates for browser-use runtime support ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
716
agent/component/browser.py
Normal file
716
agent/component/browser.py
Normal file
@@ -0,0 +1,716 @@
|
||||
#
|
||||
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
from abc import ABC
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.parse import unquote, urlparse
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from agent.component.base import ComponentBase
|
||||
from agent.component.llm import LLMParam
|
||||
from api.db import FileType
|
||||
from api.db.joint_services.tenant_model_service import get_model_config_by_type_and_name
|
||||
from api.db.services import duplicate_name
|
||||
from api.db.services.file_service import FileService
|
||||
from api.db.services.tenant_llm_service import TenantLLMService
|
||||
from api.utils.file_utils import filename_type
|
||||
from common import settings
|
||||
from common.connection_utils import timeout
|
||||
from common.misc_utils import get_uuid
|
||||
from rag.llm import FACTORY_DEFAULT_BASE_URL
|
||||
|
||||
|
||||
class BrowserParam(LLMParam):
|
||||
"""
|
||||
Parameters for Browser node.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.prompts = "{sys.query}"
|
||||
self.max_steps = 30
|
||||
self.headless = True
|
||||
self.enable_default_extensions = False
|
||||
self.chromium_sandbox = False
|
||||
# Reuse browser profile across runs of the same agent node by default.
|
||||
self.persist_session = True
|
||||
self.upload_sources = []
|
||||
self.outputs = {
|
||||
"content": {"type": "string", "value": ""},
|
||||
"downloaded_files": {"type": "Array<Object>", "value": []},
|
||||
}
|
||||
|
||||
def check(self):
|
||||
self.check_empty(self.llm_id, "[Browser] LLM")
|
||||
self.check_positive_integer(self.max_steps, "[Browser] Max steps")
|
||||
self.check_boolean(self.headless, "[Browser] Headless")
|
||||
self.check_boolean(self.enable_default_extensions, "[Browser] Enable default extensions")
|
||||
self.check_boolean(self.chromium_sandbox, "[Browser] Chromium sandbox")
|
||||
self.check_boolean(self.persist_session, "[Browser] Persist session")
|
||||
self.check_empty(self.prompts, "[Browser] Prompts")
|
||||
return True
|
||||
|
||||
def get_input_form(self) -> dict[str, dict]:
|
||||
return {
|
||||
"prompts": {"type": "text", "name": "Prompts"},
|
||||
"upload_sources": {"type": "line", "name": "Upload sources"},
|
||||
}
|
||||
|
||||
|
||||
class Browser(ComponentBase, ABC):
|
||||
component_name = "Browser"
|
||||
|
||||
def get_input_elements(self) -> dict[str, dict]:
|
||||
text_parts = [
|
||||
str(self._param.prompts or ""),
|
||||
json.dumps(self._param.upload_sources, ensure_ascii=False),
|
||||
]
|
||||
return self.get_input_elements_from_text("\n".join(text_parts))
|
||||
|
||||
def _resolve_param_value(self, value: Any) -> Any:
|
||||
if isinstance(value, str):
|
||||
direct_ref = value.strip()
|
||||
if direct_ref.startswith("{") and direct_ref.endswith("}") and self._canvas.is_reff(direct_ref):
|
||||
return self._canvas.get_variable_value(direct_ref)
|
||||
return value
|
||||
return value
|
||||
|
||||
def _extract_ids(self, value: Any) -> list[str]:
|
||||
ids: list[str] = []
|
||||
value = self._resolve_param_value(value)
|
||||
|
||||
def collect(item: Any):
|
||||
if item is None:
|
||||
return
|
||||
if isinstance(item, str):
|
||||
token = item.strip()
|
||||
if not token:
|
||||
return
|
||||
if token.startswith("{") and token.endswith("}") and self._canvas.is_reff(token):
|
||||
collect(self._canvas.get_variable_value(token))
|
||||
return
|
||||
if token.startswith("[") and token.endswith("]"):
|
||||
try:
|
||||
parsed = json.loads(token)
|
||||
collect(parsed)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
if self._is_http_url(token):
|
||||
ids.append(token)
|
||||
return
|
||||
if "," in token:
|
||||
for part in token.split(","):
|
||||
collect(part)
|
||||
return
|
||||
ids.append(token)
|
||||
return
|
||||
if isinstance(item, dict):
|
||||
for k in ("file_id", "id", "url", "value"):
|
||||
if k in item:
|
||||
collect(item[k])
|
||||
return
|
||||
for v in item.values():
|
||||
collect(v)
|
||||
return
|
||||
if isinstance(item, (list, tuple, set)):
|
||||
for v in item:
|
||||
collect(v)
|
||||
return
|
||||
token = str(item).strip()
|
||||
if token:
|
||||
ids.append(token)
|
||||
|
||||
collect(value)
|
||||
deduped: list[str] = []
|
||||
visited = set()
|
||||
for item in ids:
|
||||
if item in visited:
|
||||
continue
|
||||
visited.add(item)
|
||||
deduped.append(item)
|
||||
return deduped
|
||||
|
||||
@staticmethod
|
||||
def _is_http_url(value: str) -> bool:
|
||||
token = str(value or "").strip()
|
||||
if not token:
|
||||
return False
|
||||
parsed = urlparse(token)
|
||||
return parsed.scheme in {"http", "https"} and bool(parsed.netloc)
|
||||
|
||||
@staticmethod
|
||||
def _extract_url_filename(url: str, headers: Any) -> str:
|
||||
content_disposition = str(getattr(headers, "get", lambda *_args, **_kwargs: "")("Content-Disposition", "") or "")
|
||||
if content_disposition:
|
||||
# Prefer RFC 5987 encoded filename*=UTF-8''... when present.
|
||||
m = re.search(r"filename\*\s*=\s*(?:UTF-8''|utf-8'')?([^;]+)", content_disposition)
|
||||
if m:
|
||||
name = unquote(m.group(1).strip().strip('"'))
|
||||
if name:
|
||||
return os.path.basename(name)
|
||||
m = re.search(r'filename\s*=\s*"([^"]+)"', content_disposition)
|
||||
if m:
|
||||
name = m.group(1).strip()
|
||||
if name:
|
||||
return os.path.basename(name)
|
||||
m = re.search(r"filename\s*=\s*([^;]+)", content_disposition)
|
||||
if m:
|
||||
name = m.group(1).strip().strip('"')
|
||||
if name:
|
||||
return os.path.basename(name)
|
||||
|
||||
parsed = urlparse(url)
|
||||
raw_name = os.path.basename(parsed.path or "")
|
||||
name = unquote(raw_name).strip()
|
||||
if name:
|
||||
return name
|
||||
return f"url_file_{get_uuid()[:8]}.bin"
|
||||
|
||||
@staticmethod
|
||||
def _resolve_upload_url_max_bytes() -> int:
|
||||
raw = str(os.getenv("RAGFLOW_BROWSER_UPLOAD_URL_MAX_BYTES", "") or "").strip()
|
||||
default_max_bytes = 100 * 1024 * 1024
|
||||
if not raw:
|
||||
return default_max_bytes
|
||||
try:
|
||||
parsed = int(raw)
|
||||
return parsed if parsed > 0 else default_max_bytes
|
||||
except (TypeError, ValueError):
|
||||
return default_max_bytes
|
||||
|
||||
@staticmethod
|
||||
def _restore_env_var(key: str, value: str | None):
|
||||
if value is None:
|
||||
os.environ.pop(key, None)
|
||||
return
|
||||
os.environ[key] = value
|
||||
|
||||
def _prepare_upload_url_file(self, url: str, upload_dir: str) -> dict[str, Any] | None:
|
||||
max_bytes = self._resolve_upload_url_max_bytes()
|
||||
local_path = ""
|
||||
local_name = ""
|
||||
total_size = 0
|
||||
try:
|
||||
req = Request(url, headers={"User-Agent": "RAGFlow-Browser-Node/1.0"})
|
||||
with urlopen(req, timeout=30) as response:
|
||||
local_name = self._extract_url_filename(url, response.headers)
|
||||
|
||||
local_path = os.path.join(upload_dir, local_name)
|
||||
index = 1
|
||||
while os.path.exists(local_path):
|
||||
stem, ext = os.path.splitext(local_name)
|
||||
local_path = os.path.join(upload_dir, f"{stem}_{index}{ext}")
|
||||
index += 1
|
||||
|
||||
with open(local_path, "wb") as f:
|
||||
while True:
|
||||
chunk = response.read(1024 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
total_size += len(chunk)
|
||||
if total_size > max_bytes:
|
||||
raise ValueError(f"upload url file exceeds max size limit: {max_bytes}")
|
||||
f.write(chunk)
|
||||
except (HTTPError, URLError, OSError, TimeoutError, ValueError) as e:
|
||||
if local_path and os.path.exists(local_path):
|
||||
try:
|
||||
os.remove(local_path)
|
||||
except OSError:
|
||||
pass
|
||||
logging.warning("Browser failed to fetch upload url. url=%s, error=%s", url, e)
|
||||
return None
|
||||
|
||||
if total_size <= 0:
|
||||
if local_path and os.path.exists(local_path):
|
||||
try:
|
||||
os.remove(local_path)
|
||||
except OSError:
|
||||
pass
|
||||
logging.warning("Browser upload url returned empty content: %s", url)
|
||||
return None
|
||||
|
||||
return {
|
||||
"file_id": "",
|
||||
"name": local_name,
|
||||
"size": total_size,
|
||||
"local_path": local_path,
|
||||
"source_url": url,
|
||||
}
|
||||
|
||||
def _resolve_text(self, raw_text: Any) -> str:
|
||||
text = str(self._resolve_param_value(raw_text) or "")
|
||||
vars_map = self.get_input_elements_from_text(text)
|
||||
kv = {}
|
||||
for key, meta in vars_map.items():
|
||||
val = meta.get("value", "")
|
||||
if isinstance(val, str):
|
||||
kv[key] = val
|
||||
else:
|
||||
kv[key] = json.dumps(val, ensure_ascii=False)
|
||||
return self.string_format(text, kv)
|
||||
|
||||
@staticmethod
|
||||
def _as_model_config_dict(cfg_obj: Any) -> dict[str, Any]:
|
||||
if cfg_obj is None:
|
||||
return {}
|
||||
if isinstance(cfg_obj, dict):
|
||||
return cfg_obj
|
||||
if hasattr(cfg_obj, "to_dict") and callable(cfg_obj.to_dict):
|
||||
try:
|
||||
result = cfg_obj.to_dict()
|
||||
return result if isinstance(result, dict) else {}
|
||||
except (AttributeError, TypeError, ValueError):
|
||||
return {}
|
||||
result = {}
|
||||
for key in ("model", "model_name", "llm_name", "llm_factory", "api_key", "base_url", "api_base", "temperature"):
|
||||
val = getattr(cfg_obj, key, None)
|
||||
if val not in (None, ""):
|
||||
result[key] = val
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _error_chain(exc: Exception) -> str:
|
||||
parts = []
|
||||
cur = exc
|
||||
depth = 0
|
||||
while cur is not None and depth < 6:
|
||||
parts.append(f"{type(cur).__name__}: {cur}")
|
||||
cur = cur.__cause__ or cur.__context__
|
||||
depth += 1
|
||||
return " <- ".join(parts)
|
||||
|
||||
@staticmethod
|
||||
def _resolve_browser_executable() -> str:
|
||||
explicit_candidates = [
|
||||
os.getenv("BROWSER_USE_EXECUTABLE_PATH", "").strip(),
|
||||
os.getenv("BROWSER_USE_BROWSER_BINARY_PATH", "").strip(),
|
||||
os.getenv("BROWSER_USE_CHROME_BINARY_PATH", "").strip(),
|
||||
]
|
||||
for explicit in explicit_candidates:
|
||||
if explicit and os.path.isfile(explicit) and os.access(explicit, os.X_OK):
|
||||
return explicit
|
||||
candidates = [
|
||||
"/opt/chrome/chrome",
|
||||
"/usr/local/bin/chrome",
|
||||
"/usr/local/bin/google-chrome",
|
||||
"/usr/bin/google-chrome",
|
||||
"/usr/bin/google-chrome-stable",
|
||||
"/usr/bin/chromium",
|
||||
"/usr/bin/chromium-browser",
|
||||
]
|
||||
for path in candidates:
|
||||
if os.path.isfile(path) and os.access(path, os.X_OK):
|
||||
return path
|
||||
for cmd in ("chrome", "google-chrome", "google-chrome-stable", "chromium", "chromium-browser"):
|
||||
path = shutil.which(cmd)
|
||||
if path and os.path.isfile(path) and os.access(path, os.X_OK):
|
||||
return path
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def _normalize_model_name(model: Any) -> str:
|
||||
name = str(model or "").strip()
|
||||
if not name:
|
||||
return ""
|
||||
if name.startswith("bu-") or name.startswith("browser-use/"):
|
||||
return name
|
||||
if "@" in name:
|
||||
# RAGFlow model aliases may include provider suffix, e.g. qwen3.5-flash@Tongyi-Qianwen.
|
||||
# browser-use OpenAI-compatible adapters need the pure model name.
|
||||
name = name.split("@", 1)[0].strip()
|
||||
return name
|
||||
|
||||
@staticmethod
|
||||
def _safe_path_segment(value: Any) -> str:
|
||||
token = str(value or "").strip()
|
||||
if not token:
|
||||
return "unknown"
|
||||
token = re.sub(r"[^A-Za-z0-9._-]+", "_", token)
|
||||
return token.strip("._-") or "unknown"
|
||||
|
||||
def _resolve_persistent_profile_dir(self) -> str:
|
||||
root = os.path.join(tempfile.gettempdir(), "ragflow_browser_use_profiles")
|
||||
tenant = self._safe_path_segment(self._canvas.get_tenant_id())
|
||||
raw_canvas_id = getattr(self._canvas, "_id", "")
|
||||
if not raw_canvas_id:
|
||||
graph_text = json.dumps(
|
||||
self._canvas.dsl.get("graph", {}),
|
||||
sort_keys=True,
|
||||
ensure_ascii=False,
|
||||
)
|
||||
raw_canvas_id = (
|
||||
f"dsl_{hashlib.sha1(graph_text.encode('utf-8')).hexdigest()[:12]}"
|
||||
)
|
||||
canvas_id = self._safe_path_segment(raw_canvas_id)
|
||||
node_id = self._safe_path_segment(self._id)
|
||||
return os.path.join(root, tenant, canvas_id, node_id)
|
||||
|
||||
def _should_persist_session(self) -> bool:
|
||||
return bool(self._param.persist_session)
|
||||
|
||||
def _infer_provider_name(self, cfg: dict[str, Any]) -> str:
|
||||
provider = str(cfg.get("llm_factory") or "").strip()
|
||||
if provider:
|
||||
return provider
|
||||
llm_id = str(self._param.llm_id or "")
|
||||
if "@" in llm_id:
|
||||
return llm_id.split("@", 1)[1].strip()
|
||||
return ""
|
||||
|
||||
def _resolve_openai_compatible_base_url(self, cfg: dict[str, Any]) -> str:
|
||||
explicit = str(cfg.get("base_url") or cfg.get("api_base") or "").strip()
|
||||
if explicit:
|
||||
return explicit
|
||||
|
||||
provider = self._infer_provider_name(cfg)
|
||||
fallback = str(FACTORY_DEFAULT_BASE_URL.get(provider, "")).strip()
|
||||
return fallback if fallback else ""
|
||||
|
||||
def _build_browser_llm(self):
|
||||
from browser_use.llm import ChatBrowserUse, ChatOpenAI
|
||||
|
||||
chat_model_config = get_model_config_by_type_and_name(
|
||||
self._canvas.get_tenant_id(),
|
||||
TenantLLMService.llm_id2llm_type(self._param.llm_id),
|
||||
self._param.llm_id,
|
||||
)
|
||||
cfg = self._as_model_config_dict(chat_model_config)
|
||||
model_name = self._normalize_model_name(cfg.get("model_name") or cfg.get("model") or self._param.llm_id)
|
||||
if not model_name:
|
||||
raise ValueError(f"Invalid model config for Browser llm_id={self._param.llm_id}")
|
||||
base_url = self._resolve_openai_compatible_base_url(cfg)
|
||||
|
||||
# ChatBrowserUse only supports bu-* models. For tenant models, use OpenAI-compatible adapter.
|
||||
if model_name.startswith("bu-") or model_name.startswith("browser-use/"):
|
||||
llm_kwargs = {
|
||||
"model": model_name,
|
||||
"api_key": cfg.get("api_key"),
|
||||
"base_url": base_url,
|
||||
"temperature": self._param.temperature,
|
||||
"max_retries": self._param.max_retries,
|
||||
}
|
||||
llm_kwargs = {k: v for k, v in llm_kwargs.items() if v not in (None, "")}
|
||||
return ChatBrowserUse(**llm_kwargs)
|
||||
|
||||
llm_kwargs = {
|
||||
"model": model_name,
|
||||
"api_key": cfg.get("api_key"),
|
||||
"base_url": base_url,
|
||||
"temperature": self._param.temperature,
|
||||
"max_retries": self._param.max_retries,
|
||||
}
|
||||
llm_kwargs = {k: v for k, v in llm_kwargs.items() if v not in (None, "")}
|
||||
return ChatOpenAI(**llm_kwargs)
|
||||
|
||||
async def _run_browser_use_async(
|
||||
self,
|
||||
task_text: str,
|
||||
download_dir: str,
|
||||
available_file_paths: list[str] | None = None,
|
||||
profile_dir: str | None = None,
|
||||
):
|
||||
from browser_use import Agent as BrowserUseAgent, Browser as BrowserUseBrowser
|
||||
|
||||
llm = self._build_browser_llm()
|
||||
# NOTE:
|
||||
# _invoke() uses asyncio.run(), which creates a fresh event loop per task run.
|
||||
# Reusing a Browser object created by a previous loop can deadlock/timestamp out
|
||||
# in browser-use watchdog handlers on subsequent runs.
|
||||
# We keep persistent user_data_dir for session continuity, but we do not keep
|
||||
# browser instances alive across runs.
|
||||
available_file_paths = available_file_paths or []
|
||||
agent_kwargs: dict[str, Any] = {
|
||||
"task": task_text,
|
||||
"llm": llm,
|
||||
"available_file_paths": available_file_paths,
|
||||
}
|
||||
browser_obj = None
|
||||
previous_disable_extensions = os.environ.get("BROWSER_USE_DISABLE_EXTENSIONS")
|
||||
previous_browser_binary_path = os.environ.get("BROWSER_USE_BROWSER_BINARY_PATH")
|
||||
|
||||
try:
|
||||
enable_default_extensions = bool(self._param.enable_default_extensions)
|
||||
if not enable_default_extensions:
|
||||
os.environ["BROWSER_USE_DISABLE_EXTENSIONS"] = "1"
|
||||
else:
|
||||
os.environ.pop("BROWSER_USE_DISABLE_EXTENSIONS", None)
|
||||
|
||||
executable_path = self._resolve_browser_executable()
|
||||
browser_kwargs = {
|
||||
"headless": self._param.headless,
|
||||
"downloads_path": download_dir,
|
||||
# Docker often runs as root without user namespaces; disable sandbox by default.
|
||||
"chromium_sandbox": bool(self._param.chromium_sandbox),
|
||||
# Disable runtime extension download by default for intranet/offline environments.
|
||||
# Enable only when explicitly required and extensions are pre-cached.
|
||||
"enable_default_extensions": enable_default_extensions,
|
||||
}
|
||||
if executable_path:
|
||||
browser_kwargs["executable_path"] = executable_path
|
||||
# Keep browser-use watchdog fallback in sync with our resolved path.
|
||||
os.environ["BROWSER_USE_BROWSER_BINARY_PATH"] = executable_path
|
||||
else:
|
||||
logging.warning(
|
||||
"Browser no local browser executable found. "
|
||||
"Set BROWSER_USE_EXECUTABLE_PATH or preinstall chromium in image to avoid runtime playwright install."
|
||||
)
|
||||
if profile_dir:
|
||||
browser_kwargs["user_data_dir"] = profile_dir
|
||||
# browser-use expects profile_directory to be a profile name
|
||||
# such as "Default" / "Profile 1", not an absolute path.
|
||||
browser_kwargs["profile_directory"] = "Default"
|
||||
|
||||
browser_obj = BrowserUseBrowser(**browser_kwargs)
|
||||
agent_kwargs["browser"] = browser_obj
|
||||
except (OSError, RuntimeError, TypeError, ValueError) as e:
|
||||
logging.warning("Browser browser context customization skipped: %s", e)
|
||||
|
||||
agent = BrowserUseAgent(**agent_kwargs)
|
||||
|
||||
history = None
|
||||
run_fn = getattr(agent, "run", None)
|
||||
if run_fn is None:
|
||||
raise RuntimeError("browser-use Agent does not provide run().")
|
||||
|
||||
run_kwargs = {"max_steps": self._param.max_steps}
|
||||
try:
|
||||
if inspect.iscoroutinefunction(run_fn):
|
||||
history = await run_fn(**run_kwargs)
|
||||
else:
|
||||
history = await asyncio.to_thread(run_fn, **run_kwargs)
|
||||
except Exception as e:
|
||||
logging.error("Browser agent.run failed. error_chain=%s", self._error_chain(e))
|
||||
logging.exception("Browser agent.run traceback")
|
||||
raise
|
||||
finally:
|
||||
if browser_obj:
|
||||
close_fn = getattr(browser_obj, "close", None)
|
||||
if close_fn:
|
||||
try:
|
||||
if inspect.iscoroutinefunction(close_fn):
|
||||
await close_fn()
|
||||
else:
|
||||
await asyncio.to_thread(close_fn)
|
||||
except Exception as close_err:
|
||||
logging.warning("Browser failed to close browser object cleanly: %s", close_err)
|
||||
self._restore_env_var("BROWSER_USE_DISABLE_EXTENSIONS", previous_disable_extensions)
|
||||
self._restore_env_var("BROWSER_USE_BROWSER_BINARY_PATH", previous_browser_binary_path)
|
||||
|
||||
return history
|
||||
|
||||
def _prepare_upload_files(self, upload_dir: str) -> list[dict[str, Any]]:
|
||||
upload_refs = self._extract_ids(self._param.upload_sources)
|
||||
prepared = []
|
||||
for file_ref in upload_refs:
|
||||
if self._is_http_url(file_ref):
|
||||
prepared_url_file = self._prepare_upload_url_file(file_ref, upload_dir)
|
||||
if prepared_url_file:
|
||||
prepared.append(prepared_url_file)
|
||||
continue
|
||||
|
||||
file_id = file_ref
|
||||
exists, file = FileService.get_by_id(file_id)
|
||||
if not exists:
|
||||
logging.warning("Browser upload file_id not found: %s", file_id)
|
||||
continue
|
||||
try:
|
||||
blob = settings.STORAGE_IMPL.get(file.parent_id, file.location)
|
||||
if not blob:
|
||||
logging.warning("Browser upload blob not found: %s", file_id)
|
||||
continue
|
||||
local_name = os.path.basename(file.location) if file.location else (file.name or f"{file_id}.bin")
|
||||
local_path = os.path.join(upload_dir, local_name)
|
||||
index = 1
|
||||
while os.path.exists(local_path):
|
||||
stem, ext = os.path.splitext(local_name)
|
||||
local_path = os.path.join(upload_dir, f"{stem}_{index}{ext}")
|
||||
index += 1
|
||||
with open(local_path, "wb") as f:
|
||||
f.write(blob)
|
||||
except OSError as e:
|
||||
logging.warning("Browser failed to prepare upload file. file_id=%s, error=%s", file_id, e)
|
||||
continue
|
||||
except Exception as e:
|
||||
logging.warning("Browser failed to fetch upload blob. file_id=%s, error=%s", file_id, e)
|
||||
continue
|
||||
prepared.append(
|
||||
{
|
||||
"file_id": file.id,
|
||||
"name": file.name,
|
||||
"size": file.size,
|
||||
"local_path": local_path,
|
||||
}
|
||||
)
|
||||
return prepared
|
||||
|
||||
def _save_downloads(self, download_dir: str, parent_id: str) -> list[dict[str, Any]]:
|
||||
downloaded_files: list[dict[str, Any]] = []
|
||||
exists, folder = FileService.get_by_id(parent_id)
|
||||
if not exists or folder.type != FileType.FOLDER.value:
|
||||
raise ValueError(f"RAGFlow target folder does not exist or is not a folder: {parent_id}")
|
||||
tenant_id = self._canvas.get_tenant_id()
|
||||
storage_put = settings.STORAGE_IMPL.put
|
||||
storage_rm = getattr(settings.STORAGE_IMPL, "rm", None)
|
||||
insert_file = FileService.insert
|
||||
|
||||
for path in Path(download_dir).rglob("*"):
|
||||
if not path.is_file():
|
||||
continue
|
||||
try:
|
||||
if path.stat().st_size <= 0:
|
||||
continue
|
||||
blob = path.read_bytes()
|
||||
except OSError as e:
|
||||
logging.warning("Browser failed to read downloaded file. path=%s, error=%s", path, e)
|
||||
continue
|
||||
if not blob:
|
||||
continue
|
||||
display_name = ""
|
||||
blob_stored = False
|
||||
try:
|
||||
display_name = duplicate_name(FileService.query, name=path.name, parent_id=parent_id)
|
||||
storage_put(parent_id, display_name, blob)
|
||||
blob_stored = True
|
||||
file_data = {
|
||||
"id": get_uuid(),
|
||||
"parent_id": parent_id,
|
||||
"tenant_id": tenant_id,
|
||||
"created_by": tenant_id,
|
||||
"type": filename_type(display_name),
|
||||
"name": display_name,
|
||||
"location": display_name,
|
||||
"size": len(blob),
|
||||
}
|
||||
inserted = insert_file(file_data)
|
||||
downloaded_files.append(
|
||||
{
|
||||
"file_id": inserted.id,
|
||||
"name": inserted.name,
|
||||
"size": inserted.size,
|
||||
"parent_id": inserted.parent_id,
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
if blob_stored and callable(storage_rm):
|
||||
try:
|
||||
storage_rm(parent_id, display_name)
|
||||
except Exception as rollback_err:
|
||||
logging.warning(
|
||||
"Browser rollback stored download failed. path=%s, parent_id=%s, display_name=%s, error=%s",
|
||||
path,
|
||||
parent_id,
|
||||
display_name,
|
||||
rollback_err,
|
||||
)
|
||||
logging.error(
|
||||
"Browser failed to save download. path=%s, tenant_id=%s, parent_id=%s, display_name=%s, error=%s",
|
||||
path,
|
||||
tenant_id,
|
||||
parent_id,
|
||||
display_name,
|
||||
e,
|
||||
)
|
||||
continue
|
||||
return downloaded_files
|
||||
|
||||
@staticmethod
|
||||
def _extract_history_text(history: Any) -> str:
|
||||
if history is None:
|
||||
return ""
|
||||
|
||||
def pick_final_result(value: Any) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, str):
|
||||
return value.strip()
|
||||
if isinstance(value, (int, float, bool)):
|
||||
return str(value)
|
||||
return ""
|
||||
|
||||
# Only trust browser-use's explicit final_result API/property.
|
||||
final_result_fn = getattr(history, "final_result", None)
|
||||
if callable(final_result_fn):
|
||||
try:
|
||||
final_result_value = final_result_fn()
|
||||
return pick_final_result(final_result_value)
|
||||
except Exception:
|
||||
return ""
|
||||
return pick_final_result(final_result_fn)
|
||||
|
||||
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 20 * 60)))
|
||||
def _invoke(self, **kwargs):
|
||||
profile_dir = None
|
||||
persist_session = self._should_persist_session()
|
||||
try:
|
||||
user_prompt = self._resolve_text(kwargs.get("prompts", self._param.prompts))
|
||||
with tempfile.TemporaryDirectory(prefix="browser_use_upload_") as upload_dir, tempfile.TemporaryDirectory(
|
||||
prefix="browser_use_download_"
|
||||
) as download_dir:
|
||||
uploaded_files = self._prepare_upload_files(upload_dir)
|
||||
|
||||
upload_lines = [
|
||||
f"- file_id={item['file_id']}, name={item['name']}, local_path={item['local_path']}"
|
||||
for item in uploaded_files
|
||||
]
|
||||
task_text = user_prompt
|
||||
if upload_lines:
|
||||
task_text += (
|
||||
"\n\nYou can upload files from these local paths when operating web pages:\n"
|
||||
+ "\n".join(upload_lines)
|
||||
)
|
||||
|
||||
upload_local_paths = [item.get("local_path", "") for item in uploaded_files if item.get("local_path")]
|
||||
if persist_session:
|
||||
profile_dir = self._resolve_persistent_profile_dir()
|
||||
os.makedirs(profile_dir, exist_ok=True)
|
||||
else:
|
||||
try:
|
||||
profile_dir = tempfile.mkdtemp(prefix="browser_use_profile_")
|
||||
except OSError:
|
||||
profile_dir = None
|
||||
history = asyncio.run(
|
||||
self._run_browser_use_async(
|
||||
task_text, download_dir, upload_local_paths, profile_dir
|
||||
)
|
||||
)
|
||||
target_dir_id = FileService.get_root_folder(self._canvas.get_tenant_id())["id"]
|
||||
downloaded_files = self._save_downloads(download_dir, target_dir_id)
|
||||
|
||||
self.set_output("content", self._extract_history_text(history))
|
||||
self.set_output("downloaded_files", downloaded_files)
|
||||
return self.output()
|
||||
except Exception as e:
|
||||
logging.exception("Browser invoke failed")
|
||||
self.set_output("_ERROR", str(e))
|
||||
return self.output()
|
||||
finally:
|
||||
if profile_dir and not persist_session:
|
||||
shutil.rmtree(profile_dir, ignore_errors=True)
|
||||
|
||||
def thoughts(self) -> str:
|
||||
return "Planning and executing browser actions..."
|
||||
Reference in New Issue
Block a user