From c29335cbff6aabca0148172c67c730740ed6f766 Mon Sep 17 00:00:00 2001 From: Magicbook1108 Date: Thu, 7 May 2026 21:23:13 +0800 Subject: [PATCH] Feat: support local provider for code exec component & remove some outdated models (#14637) ### What problem does this PR solve? Feat: support local provider for code exec component & remove some outdated models ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- agent/sandbox/client.py | 113 +++++-- agent/sandbox/providers/__init__.py | 6 +- .../providers/aliyun_codeinterpreter.py | 74 +---- agent/sandbox/providers/base.py | 6 +- agent/sandbox/providers/local.py | 296 ++++++++++++++++++ agent/sandbox/result_protocol.py | 85 +++++ agent/tools/code_exec.py | 22 +- conf/llm_factories.json | 28 -- docker/.env | 74 ++++- .../agent/sandbox/test_local_provider.py | 98 ++++++ 10 files changed, 659 insertions(+), 143 deletions(-) create mode 100644 agent/sandbox/providers/local.py create mode 100644 agent/sandbox/result_protocol.py create mode 100644 test/unit_test/agent/sandbox/test_local_provider.py diff --git a/agent/sandbox/client.py b/agent/sandbox/client.py index 4d49ae734c..9ca51cc8e3 100644 --- a/agent/sandbox/client.py +++ b/agent/sandbox/client.py @@ -23,11 +23,12 @@ with the configured sandbox provider. import json import logging +import os from typing import Dict, Any, Optional from api.db.services.system_settings_service import SystemSettingsService from agent.sandbox.providers import ProviderManager -from agent.sandbox.providers.base import ExecutionResult +from agent.sandbox.providers.base import ExecutionResult, SandboxProviderConfigError logger = logging.getLogger(__name__) @@ -59,8 +60,8 @@ def _load_provider_from_settings() -> None: """ Load sandbox provider from system settings and configure the provider manager. - This function reads the system settings to determine which provider is active - and initializes it with the appropriate configuration. + This function resolves the active provider type, then loads configuration + from system settings with environment overrides for that provider. """ global _provider_manager @@ -68,41 +69,27 @@ def _load_provider_from_settings() -> None: return try: - # Get active provider type - provider_type_settings = SystemSettingsService.get_by_name("sandbox.provider_type") - if not provider_type_settings: - raise RuntimeError( - "Sandbox provider type not configured. Please set 'sandbox.provider_type' in system settings." - ) - provider_type = provider_type_settings[0].value - - # Get provider configuration - provider_config_settings = SystemSettingsService.get_by_name(f"sandbox.{provider_type}") - - if not provider_config_settings: - logger.warning(f"No configuration found for provider: {provider_type}") - config = {} - else: - try: - config = json.loads(provider_config_settings[0].value) - except json.JSONDecodeError as e: - logger.error(f"Failed to parse sandbox config for {provider_type}: {e}") - config = {} + provider_type, provider_type_from_env = _resolve_provider_type() + config = _load_provider_config(provider_type) # Import and instantiate the provider from agent.sandbox.providers import ( SelfManagedProvider, AliyunCodeInterpreterProvider, E2BProvider, + LocalProvider, ) provider_classes = { "self_managed": SelfManagedProvider, "aliyun_codeinterpreter": AliyunCodeInterpreterProvider, "e2b": E2BProvider, + "local": LocalProvider, } if provider_type not in provider_classes: + if provider_type_from_env: + raise SandboxProviderConfigError(f"Unknown sandbox provider type: {provider_type}") logger.error(f"Unknown provider type: {provider_type}") return @@ -111,19 +98,97 @@ def _load_provider_from_settings() -> None: # Initialize the provider if not provider.initialize(config): - logger.error(f"Failed to initialize sandbox provider: {provider_type}. Config keys: {list(config.keys())}") + message = f"Failed to initialize sandbox provider: {provider_type}. Config keys: {list(config.keys())}" + if provider_type == "local" or provider_type_from_env: + raise SandboxProviderConfigError(message) + logger.error(message) return # Set the active provider _provider_manager.set_provider(provider_type, provider) logger.info(f"Sandbox provider '{provider_type}' initialized successfully") + except SandboxProviderConfigError: + raise except Exception as e: logger.error(f"Failed to load sandbox provider from settings: {e}") import traceback traceback.print_exc() +def _load_provider_config_from_settings(provider_type: str) -> Dict[str, Any]: + provider_config_settings = SystemSettingsService.get_by_name(f"sandbox.{provider_type}") + if not provider_config_settings: + logger.warning(f"No configuration found for provider: {provider_type}") + return {} + + try: + return json.loads(provider_config_settings[0].value) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse sandbox config for {provider_type}: {e}") + return {} + + +def _resolve_provider_type() -> tuple[str, bool]: + provider_type = os.environ.get("SANDBOX_PROVIDER_TYPE", "").strip() + if provider_type: + return provider_type, True + + provider_type_settings = SystemSettingsService.get_by_name("sandbox.provider_type") + if not provider_type_settings: + raise RuntimeError( + "Sandbox provider type not configured. Please set 'sandbox.provider_type' in system settings." + ) + return provider_type_settings[0].value, False + + +def _load_provider_config(provider_type: str) -> Dict[str, Any]: + config = _load_provider_config_from_settings(provider_type) + env_config = _load_provider_config_from_env(provider_type) + if env_config: + config.update(env_config) + return config + + +def _load_provider_config_from_env(provider_type: str) -> Dict[str, Any]: + if provider_type == "local": + return _load_local_provider_config_from_env() + if provider_type == "self_managed": + return _load_self_managed_provider_config_from_env() + return {} + + +def _load_local_provider_config_from_env() -> Dict[str, Any]: + env_to_config = { + "SANDBOX_LOCAL_PYTHON_BIN": "python_bin", + "SANDBOX_LOCAL_NODE_BIN": "node_bin", + "SANDBOX_LOCAL_WORK_DIR": "work_dir", + "SANDBOX_LOCAL_TIMEOUT": "timeout", + "SANDBOX_LOCAL_MAX_MEMORY_MB": "max_memory_mb", + "SANDBOX_LOCAL_MAX_OUTPUT_BYTES": "max_output_bytes", + "SANDBOX_LOCAL_MAX_ARTIFACTS": "max_artifacts", + "SANDBOX_LOCAL_MAX_ARTIFACT_BYTES": "max_artifact_bytes", + } + config = {} + for env_name, config_name in env_to_config.items(): + if env_name in os.environ: + config[config_name] = os.environ[env_name] + return config + + +def _load_self_managed_provider_config_from_env() -> Dict[str, Any]: + host = os.environ.get("SANDBOX_HOST", "").strip() + port = os.environ.get("SANDBOX_EXECUTOR_MANAGER_PORT", "").strip() + pool_size = os.environ.get("SANDBOX_EXECUTOR_MANAGER_POOL_SIZE", "").strip() + + config = {} + if host: + config["endpoint"] = f"http://{host}:{port or '9385'}" + if pool_size: + config["pool_size"] = pool_size + return config + + def reload_provider() -> None: """ Reload the sandbox provider from system settings. diff --git a/agent/sandbox/providers/__init__.py b/agent/sandbox/providers/__init__.py index 7be1463b9c..e7cfc2ddc9 100644 --- a/agent/sandbox/providers/__init__.py +++ b/agent/sandbox/providers/__init__.py @@ -24,20 +24,24 @@ This package contains: - aliyun_codeinterpreter.py: Aliyun Code Interpreter provider implementation Official Documentation: https://help.aliyun.com/zh/functioncompute/fc/sandbox-sandbox-code-interepreter - e2b.py: E2B provider implementation +- local.py: Local process provider implementation """ -from .base import SandboxProvider, SandboxInstance, ExecutionResult +from .base import SandboxProvider, SandboxInstance, ExecutionResult, SandboxProviderConfigError from .manager import ProviderManager from .self_managed import SelfManagedProvider from .aliyun_codeinterpreter import AliyunCodeInterpreterProvider from .e2b import E2BProvider +from .local import LocalProvider __all__ = [ "SandboxProvider", "SandboxInstance", "ExecutionResult", + "SandboxProviderConfigError", "ProviderManager", "SelfManagedProvider", "AliyunCodeInterpreterProvider", "E2BProvider", + "LocalProvider", ] diff --git a/agent/sandbox/providers/aliyun_codeinterpreter.py b/agent/sandbox/providers/aliyun_codeinterpreter.py index 8ee99ed1ec..bbec2a2682 100644 --- a/agent/sandbox/providers/aliyun_codeinterpreter.py +++ b/agent/sandbox/providers/aliyun_codeinterpreter.py @@ -30,7 +30,6 @@ https://api.aliyun.com/api/AgentRun/2025-09-10/CreateSandbox?lang=PYTHON import logging import os import time -import base64 import json from typing import Dict, Any, List, Optional from datetime import datetime, timezone @@ -39,10 +38,10 @@ from agentrun.sandbox import TemplateType, CodeLanguage, Template, TemplateInput from agentrun.utils.config import Config from agentrun.utils.exception import ServerError +from agent.sandbox.result_protocol import build_javascript_wrapper, build_python_wrapper, extract_structured_result from .base import SandboxProvider, SandboxInstance, ExecutionResult logger = logging.getLogger(__name__) -RESULT_MARKER_PREFIX = "__RAGFLOW_RESULT__:" class AliyunCodeInterpreterProvider(SandboxProvider): @@ -234,9 +233,9 @@ class AliyunCodeInterpreterProvider(SandboxProvider): # Matches self_managed provider behavior: call main(**arguments) args_json = json.dumps(arguments or {}) wrapped_code = ( - self._build_python_wrapper(code, args_json) + build_python_wrapper(code, args_json) if normalized_lang == "python" - else self._build_javascript_wrapper(code, args_json) + else build_javascript_wrapper(code, args_json) ) logger.debug(f"Aliyun Code Interpreter: Wrapped code (first 200 chars): {wrapped_code[:200]}") @@ -284,7 +283,7 @@ class AliyunCodeInterpreterProvider(SandboxProvider): stdout = "\n".join(stdout_parts) stderr = "\n".join(stderr_parts) - stdout, structured_result = self._extract_structured_result(stdout) + stdout, structured_result = extract_structured_result(stdout) logger.info(f"Aliyun Code Interpreter: stdout length={len(stdout)}, stderr length={len(stderr)}, exit_code={exit_code}") if stdout: @@ -364,71 +363,6 @@ class AliyunCodeInterpreterProvider(SandboxProvider): # If we get any response (even an error), the service is reachable return "connection" not in str(e).lower() - @staticmethod - def _build_python_wrapper(code: str, args_json: str) -> str: - marker = RESULT_MARKER_PREFIX - return f'''{code} - -if __name__ == "__main__": - import base64 - import json - - result = main(**{args_json}) - payload = json.dumps({{"present": True, "value": result, "type": "json"}}, ensure_ascii=False, separators=(",", ":")) - print("{marker}" + base64.b64encode(payload.encode("utf-8")).decode("ascii")) -''' - - @staticmethod - def _build_javascript_wrapper(code: str, args_json: str) -> str: - marker = RESULT_MARKER_PREFIX - return f'''{code} - -const __ragflowArgs = {args_json}; - -(async () => {{ - try {{ - const output = await Promise.resolve(main(__ragflowArgs)); - if (typeof output === 'undefined') {{ - throw new Error('main() must return a value. Use null for an empty result.'); - }} - const payload = JSON.stringify({{ present: true, value: output, type: 'json' }}); - if (typeof payload === 'undefined') {{ - throw new Error('main() returned a non-JSON-serializable value.'); - }} - console.log('{marker}' + Buffer.from(payload, 'utf8').toString('base64')); - }} catch (err) {{ - console.error(err instanceof Error ? err.stack || err.message : String(err)); - }} -}})(); -''' - - @staticmethod - def _extract_structured_result(stdout: str) -> tuple[str, Dict[str, Any]]: - if not stdout: - return "", {} - - cleaned_lines: list[str] = [] - structured_result: Dict[str, Any] = {} - - for line in str(stdout).splitlines(): - if line.startswith(RESULT_MARKER_PREFIX): - payload_b64 = line[len(RESULT_MARKER_PREFIX) :].strip() - if not payload_b64: - continue - try: - payload = base64.b64decode(payload_b64).decode("utf-8") - structured_result = json.loads(payload) - except Exception as exc: - logger.warning(f"Aliyun Code Interpreter: failed to decode structured result marker: {exc}") - cleaned_lines.append(line) - continue - cleaned_lines.append(line) - - cleaned_stdout = "\n".join(cleaned_lines) - if stdout.endswith("\n") and cleaned_stdout and not cleaned_stdout.endswith("\n"): - cleaned_stdout += "\n" - return cleaned_stdout, structured_result - def get_supported_languages(self) -> List[str]: """ Get list of supported programming languages. diff --git a/agent/sandbox/providers/base.py b/agent/sandbox/providers/base.py index c21b583e02..8f9c04aaa4 100644 --- a/agent/sandbox/providers/base.py +++ b/agent/sandbox/providers/base.py @@ -26,6 +26,10 @@ from dataclasses import dataclass from typing import Dict, Any, Optional, List +class SandboxProviderConfigError(Exception): + """Raised when the selected provider is explicitly configured but unusable.""" + + @dataclass class SandboxInstance: """Represents a sandbox execution instance""" @@ -209,4 +213,4 @@ class SandboxProvider(ABC): >>> return True, None """ # Default implementation: no custom validation - return True, None \ No newline at end of file + return True, None diff --git a/agent/sandbox/providers/local.py b/agent/sandbox/providers/local.py new file mode 100644 index 0000000000..b8057fa5b4 --- /dev/null +++ b/agent/sandbox/providers/local.py @@ -0,0 +1,296 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import base64 +import json +import mimetypes +import os +import shutil +import signal +import subprocess +import time +import uuid +from pathlib import Path +from typing import Any, Dict, List, Optional + +from agent.sandbox.result_protocol import build_javascript_wrapper, build_python_wrapper, extract_structured_result +from .base import ExecutionResult, SandboxInstance, SandboxProvider, SandboxProviderConfigError + + +ALLOWED_ARTIFACT_EXTENSIONS = { + ".csv", + ".html", + ".jpeg", + ".jpg", + ".json", + ".pdf", + ".png", + ".svg", +} + + +def _env_enabled(name: str) -> bool: + return os.environ.get(name, "").strip().lower() in {"1", "true", "yes", "on"} + + +class LocalProvider(SandboxProvider): + """ + Execute code as a local child process. + + This provider is intentionally gated by SANDBOX_LOCAL_ENABLED because it is + not a sandbox boundary. Use a low-privilege runtime account. + """ + + def __init__(self): + self.python_bin = "python3" + self.node_bin = "node" + self.work_dir = Path("/tmp/ragflow-codeexec") + self.timeout = 30 + self.max_memory_mb = 512 + self.max_output_bytes = 1024 * 1024 + self.max_artifacts = 20 + self.max_artifact_bytes = 10 * 1024 * 1024 + self._initialized = False + self._instances: dict[str, Path] = {} + + def initialize(self, config: Dict[str, Any]) -> bool: + if not _env_enabled("SANDBOX_LOCAL_ENABLED"): + raise SandboxProviderConfigError("Local code execution is disabled. Set SANDBOX_LOCAL_ENABLED=true to enable it.") + + self.python_bin = str(self._resolve_config_value(config, "python_bin", "SANDBOX_LOCAL_PYTHON_BIN", "python3")) + self.node_bin = str(self._resolve_config_value(config, "node_bin", "SANDBOX_LOCAL_NODE_BIN", "node")) + self.work_dir = Path(self._resolve_config_value(config, "work_dir", "SANDBOX_LOCAL_WORK_DIR", "/tmp/ragflow-codeexec")).resolve() + self.timeout = int(self._resolve_config_value(config, "timeout", "SANDBOX_LOCAL_TIMEOUT", 30)) + self.max_memory_mb = int(self._resolve_config_value(config, "max_memory_mb", "SANDBOX_LOCAL_MAX_MEMORY_MB", 512)) + self.max_output_bytes = int(self._resolve_config_value(config, "max_output_bytes", "SANDBOX_LOCAL_MAX_OUTPUT_BYTES", 1024 * 1024)) + self.max_artifacts = int(self._resolve_config_value(config, "max_artifacts", "SANDBOX_LOCAL_MAX_ARTIFACTS", 20)) + self.max_artifact_bytes = int(self._resolve_config_value(config, "max_artifact_bytes", "SANDBOX_LOCAL_MAX_ARTIFACT_BYTES", 10 * 1024 * 1024)) + + self._validate_limits() + self.work_dir.mkdir(parents=True, exist_ok=True, mode=0o700) + self._initialized = True + return True + + def create_instance(self, template: str = "python") -> SandboxInstance: + if not self._initialized: + raise RuntimeError("Provider not initialized. Call initialize() first.") + + language = self._normalize_language(template) + instance_id = str(uuid.uuid4()) + instance_dir = self.work_dir / instance_id + instance_dir.mkdir(mode=0o700) + (instance_dir / "artifacts").mkdir(mode=0o700) + self._instances[instance_id] = instance_dir + + return SandboxInstance( + instance_id=instance_id, + provider="local", + status="running", + metadata={"language": language, "work_dir": str(instance_dir)}, + ) + + def execute_code( + self, + instance_id: str, + code: str, + language: str, + timeout: int = 10, + arguments: Optional[Dict[str, Any]] = None, + ) -> ExecutionResult: + if not self._initialized: + raise RuntimeError("Provider not initialized. Call initialize() first.") + + normalized_lang = self._normalize_language(language) + instance_dir = self._instances[instance_id] + args_json = json.dumps(arguments or {}, ensure_ascii=False) + command, script_path = self._prepare_script(instance_dir, normalized_lang, code, args_json) + requested_timeout = self.timeout if timeout is None else int(timeout) + if requested_timeout <= 0: + raise RuntimeError(f"Execution timeout must be greater than 0 seconds, got {requested_timeout}.") + exec_timeout = min(requested_timeout, self.timeout) + + start_time = time.time() + process = subprocess.Popen( + command, + cwd=instance_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + env=self._build_child_env(instance_dir), + preexec_fn=self._limit_child_process if os.name == "posix" else None, + start_new_session=os.name == "posix", + ) + + try: + stdout, stderr = process.communicate(timeout=exec_timeout) + except subprocess.TimeoutExpired: + if os.name == "posix": + os.killpg(process.pid, signal.SIGKILL) + else: + process.kill() + process.communicate() + raise TimeoutError(f"Execution timed out after {exec_timeout} seconds") + + execution_time = time.time() - start_time + self._validate_output_size(stdout, stderr) + stdout, structured_result = extract_structured_result(stdout) + + return ExecutionResult( + stdout=stdout, + stderr=stderr, + exit_code=process.returncode, + execution_time=execution_time, + metadata={ + "instance_id": instance_id, + "language": normalized_lang, + "script_path": str(script_path), + "status": "ok" if process.returncode == 0 else "error", + "timeout": exec_timeout, + "artifacts": self._collect_artifacts(instance_dir / "artifacts"), + "result_present": structured_result.get("present", False), + "result_value": structured_result.get("value"), + "result_type": structured_result.get("type"), + }, + ) + + def destroy_instance(self, instance_id: str) -> bool: + if not self._initialized: + raise RuntimeError("Provider not initialized. Call initialize() first.") + + instance_dir = self._instances.pop(instance_id) + shutil.rmtree(instance_dir) + return True + + def health_check(self) -> bool: + return self._initialized and self.work_dir.exists() and os.access(self.work_dir, os.W_OK) + + def get_supported_languages(self) -> List[str]: + return ["python", "javascript", "nodejs"] + + @staticmethod + def get_config_schema() -> Dict[str, Dict]: + return { + "python_bin": {"type": "string", "required": False, "default": "python3"}, + "node_bin": {"type": "string", "required": False, "default": "node"}, + "work_dir": {"type": "string", "required": False, "default": "/tmp/ragflow-codeexec"}, + "timeout": {"type": "integer", "required": False, "default": 30}, + "max_memory_mb": {"type": "integer", "required": False, "default": 512}, + "max_output_bytes": {"type": "integer", "required": False, "default": 1048576}, + "max_artifacts": {"type": "integer", "required": False, "default": 20}, + "max_artifact_bytes": {"type": "integer", "required": False, "default": 10485760}, + } + + def _validate_limits(self) -> None: + if self.timeout <= 0: + raise SandboxProviderConfigError("SANDBOX_LOCAL_TIMEOUT must be greater than 0.") + if self.max_memory_mb <= 0: + raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_MEMORY_MB must be greater than 0.") + if self.max_output_bytes <= 0: + raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_OUTPUT_BYTES must be greater than 0.") + if self.max_artifacts < 0: + raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_ARTIFACTS must be greater than or equal to 0.") + if self.max_artifact_bytes <= 0: + raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_ARTIFACT_BYTES must be greater than 0.") + + def _prepare_script(self, instance_dir: Path, language: str, code: str, args_json: str) -> tuple[list[str], Path]: + if language == "python": + script_path = instance_dir / "main.py" + script_path.write_text(build_python_wrapper(code, args_json), encoding="utf-8") + return [self.python_bin, str(script_path)], script_path + if language in {"javascript", "nodejs"}: + script_path = instance_dir / "main.js" + script_path.write_text(build_javascript_wrapper(code, args_json), encoding="utf-8") + return [self.node_bin, str(script_path)], script_path + raise RuntimeError(f"Unsupported language for local provider: {language}") + + @staticmethod + def _resolve_config_value(config: Dict[str, Any], key: str, env_name: str, default: Any) -> Any: + value = config.get(key) + if value is not None: + return value + return os.environ.get(env_name, default) + + def _build_child_env(self, instance_dir: Path) -> dict[str, str]: + return { + "HOME": str(instance_dir), + "MPLBACKEND": "Agg", + "PATH": os.environ.get("PATH", ""), + "PYTHONUNBUFFERED": "1", + "TMPDIR": str(instance_dir), + } + + def _limit_child_process(self) -> None: + import resource + + self._set_resource_limit(resource.RLIMIT_CPU, self.timeout + 1) + self._set_resource_limit(resource.RLIMIT_AS, self.max_memory_mb * 1024 * 1024) + self._set_resource_limit(resource.RLIMIT_FSIZE, self.max_artifact_bytes) + self._set_resource_limit(resource.RLIMIT_NOFILE, 64) + + @staticmethod + def _set_resource_limit(kind: int, value: int) -> None: + import resource + + _, hard = resource.getrlimit(kind) + limit = value if hard == resource.RLIM_INFINITY else min(value, hard) + resource.setrlimit(kind, (limit, limit)) + + def _validate_output_size(self, stdout: str, stderr: str) -> None: + output_size = len((stdout or "").encode("utf-8")) + len((stderr or "").encode("utf-8")) + if output_size > self.max_output_bytes: + raise RuntimeError(f"Local execution output exceeded {self.max_output_bytes} bytes.") + + def _collect_artifacts(self, artifacts_dir: Path) -> list[dict[str, Any]]: + artifacts: list[dict[str, Any]] = [] + for path in sorted(artifacts_dir.rglob("*")): + if path.is_symlink(): + raise RuntimeError(f"Artifact symlinks are not allowed: {path.name}") + if path.is_dir(): + continue + if not path.is_file(): + raise RuntimeError(f"Unsupported artifact entry: {path.name}") + + if len(artifacts) >= self.max_artifacts: + raise RuntimeError(f"Local execution produced more than {self.max_artifacts} artifacts.") + + size = path.stat().st_size + if size > self.max_artifact_bytes: + raise RuntimeError(f"Artifact exceeds {self.max_artifact_bytes} bytes: {path.name}") + + ext = path.suffix.lower() + if ext not in ALLOWED_ARTIFACT_EXTENSIONS: + raise RuntimeError(f"Unsupported artifact type: {path.name}") + + artifacts.append( + { + "name": path.relative_to(artifacts_dir).as_posix(), + "content_b64": base64.b64encode(path.read_bytes()).decode("ascii"), + "mime_type": mimetypes.guess_type(path.name)[0] or "application/octet-stream", + "size": size, + } + ) + return artifacts + + @staticmethod + def _normalize_language(language: str) -> str: + lang_lower = (language or "python").lower() + if lang_lower in {"python", "python3"}: + return "python" + if lang_lower in {"javascript", "nodejs"}: + return "nodejs" + return lang_lower diff --git a/agent/sandbox/result_protocol.py b/agent/sandbox/result_protocol.py new file mode 100644 index 0000000000..f71e5f4996 --- /dev/null +++ b/agent/sandbox/result_protocol.py @@ -0,0 +1,85 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import base64 +import json +from typing import Any + + +RESULT_MARKER_PREFIX = "__RAGFLOW_RESULT__:" + + +def build_python_wrapper(code: str, args_json: str) -> str: + return f'''{code} + +if __name__ == "__main__": + import base64 + import json + + result = main(**{args_json}) + payload = json.dumps({{"present": True, "value": result, "type": "json"}}, ensure_ascii=False, separators=(",", ":")) + print("{RESULT_MARKER_PREFIX}" + base64.b64encode(payload.encode("utf-8")).decode("ascii")) +''' + + +def build_javascript_wrapper(code: str, args_json: str) -> str: + return f'''{code} + +const __ragflowArgs = {args_json}; + +(async () => {{ + const __ragflowMain = typeof main !== 'undefined' ? main : module.exports && module.exports.main; + if (typeof __ragflowMain !== 'function') {{ + throw new Error('main() must be defined or exported.'); + }} + const output = await Promise.resolve(__ragflowMain(__ragflowArgs)); + if (typeof output === 'undefined') {{ + throw new Error('main() must return a value. Use null for an empty result.'); + }} + const payload = JSON.stringify({{ present: true, value: output, type: 'json' }}); + if (typeof payload === 'undefined') {{ + throw new Error('main() returned a non-JSON-serializable value.'); + }} + console.log('{RESULT_MARKER_PREFIX}' + Buffer.from(payload, 'utf8').toString('base64')); +}})(); +''' + + +def extract_structured_result(stdout: str) -> tuple[str, dict[str, Any]]: + if not stdout: + return "", {} + + cleaned_lines: list[str] = [] + structured_result: dict[str, Any] = {} + + for line in str(stdout).splitlines(): + if line.startswith(RESULT_MARKER_PREFIX): + payload_b64 = line[len(RESULT_MARKER_PREFIX) :].strip() + if not payload_b64: + cleaned_lines.append(line) + continue + try: + payload = base64.b64decode(payload_b64, validate=True).decode("utf-8") + structured_result = json.loads(payload) + except Exception: + cleaned_lines.append(line) + continue + cleaned_lines.append(line) + + cleaned_stdout = "\n".join(cleaned_lines) + if stdout.endswith("\n") and cleaned_stdout and not cleaned_stdout.endswith("\n"): + cleaned_stdout += "\n" + return cleaned_stdout, structured_result diff --git a/agent/tools/code_exec.py b/agent/tools/code_exec.py index 229967a572..ece67d97fc 100644 --- a/agent/tools/code_exec.py +++ b/agent/tools/code_exec.py @@ -357,6 +357,7 @@ class CodeExec(ToolBase, ABC): # Try using the new sandbox provider system first try: from agent.sandbox.client import execute_code as sandbox_execute_code + from agent.sandbox.providers.base import SandboxProviderConfigError if self.check_if_canceled("CodeExec execution"): return @@ -376,8 +377,16 @@ class CodeExec(ToolBase, ABC): execution_metadata=result.metadata, ) - except (ImportError, RuntimeError) as provider_error: - # Provider system not available or not configured, fall back to HTTP + except SandboxProviderConfigError as provider_error: + self.set_output("_ERROR", str(provider_error)) + return self.output() + except ImportError as provider_error: + # Provider modules are unavailable, fall back to legacy HTTP sandbox. + logging.info(f"[CodeExec]: Provider system not available, using HTTP fallback: {provider_error}") + except RuntimeError as provider_error: + if not self._should_fallback_to_http(provider_error): + self.set_output("_ERROR", f"Provider system execution failed: {provider_error}") + return self.output() logging.info(f"[CodeExec]: Provider system not available, using HTTP fallback: {provider_error}") # Fallback to direct HTTP request @@ -487,6 +496,15 @@ class CodeExec(ToolBase, ABC): return metadata.get("result_value"), False return self._deserialize_stdout(stdout), True + @staticmethod + def _should_fallback_to_http(provider_error: RuntimeError) -> bool: + message = str(provider_error).lower() + fallback_markers = ( + "no sandbox provider configured", + "sandbox provider type not configured", + ) + return any(marker in message for marker in fallback_markers) + @classmethod def _ensure_bucket_lifecycle(cls): if cls._lifecycle_configured: diff --git a/conf/llm_factories.json b/conf/llm_factories.json index ee74bb8a50..2fc12803d7 100644 --- a/conf/llm_factories.json +++ b/conf/llm_factories.json @@ -421,13 +421,6 @@ "model_type": "chat", "is_tools": false }, - { - "llm_name": "deepseek-r1-distill-qwen-7b", - "tags": "LLM,CHAT,32K", - "max_tokens": 32768, - "model_type": "chat", - "is_tools": false - }, { "llm_name": "deepseek-r1-distill-qwen-14b", "tags": "LLM,CHAT,32K", @@ -2948,20 +2941,6 @@ "model_type": "chat", "is_tools": true }, - { - "llm_name": "Pro/deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", - "tags": "LLM,CHAT,32k", - "max_tokens": 32000, - "model_type": "chat", - "is_tools": true - }, - { - "llm_name": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", - "tags": "LLM,CHAT,32k", - "max_tokens": 32000, - "model_type": "chat", - "is_tools": true - }, { "llm_name": "deepseek-ai/DeepSeek-V2.5", "tags": "LLM,CHAT,32k", @@ -4246,13 +4225,6 @@ "model_type": "chat", "is_tools": false }, - { - "llm_name": "DeepSeek-R1-Distill-Qwen-7B", - "tags": "LLM,CHAT", - "max_tokens": 65792, - "model_type": "chat", - "is_tools": false - }, { "llm_name": "DeepSeek-R1-Distill-Qwen-1.5B", "tags": "LLM,CHAT", diff --git a/docker/.env b/docker/.env index df2512f4c5..f2343dab41 100644 --- a/docker/.env +++ b/docker/.env @@ -239,32 +239,72 @@ EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-16} # - Disable registration: 0 REGISTER_ENABLED=1 -# Important: To enable sandbox, you need to uncomment following two lines: +# ----------------------------------------------------------------------------- +# Sandbox +# ----------------------------------------------------------------------------- +# Sandbox settings are grouped by provider type. +# 1. Set `SANDBOX_ENABLED=1` to enable sandbox support. +# 2. Set `SANDBOX_PROVIDER_TYPE` to choose the active provider. +# 3. Only edit the section that matches the selected provider type. +# 4. If you do not use `self_managed`, remove `,sandbox` from `COMPOSE_PROFILES`. +# +# Naming convention for future providers: +# - `SANDBOX__*` +# Examples: +# - `SANDBOX_SELF_MANAGED_*` +# - `SANDBOX_LOCAL_*` +# - `SANDBOX_E2B_*` +# - `SANDBOX_ALIYUN_CODEINTERPRETER_*` + +# Enable sandbox support. # SANDBOX_ENABLED=1 # COMPOSE_PROFILES=${COMPOSE_PROFILES},sandbox +# SANDBOX_PROVIDER_TYPE=${SANDBOX_PROVIDER_TYPE:-self_managed} -# Sandbox settings -# Double check if you add `sandbox-executor-manager` to your `/etc/hosts` +# Shared sandbox settings +# `SANDBOX_HOST` is kept as the common endpoint name for legacy HTTP fallback +# and for the self-managed provider. +# Double check that `sandbox-executor-manager` resolves correctly in your +# Docker network or `/etc/hosts`. +# SANDBOX_HOST=${SANDBOX_HOST:-sandbox-executor-manager} +# The MinIO bucket name for storing sandbox-generated artifacts. +# SANDBOX_ARTIFACT_BUCKET=sandbox-artifacts +# Number of days before sandbox artifacts are automatically deleted. +# SANDBOX_ARTIFACT_EXPIRE_DAYS=7 + +# Provider: self_managed +# Use this provider when sandbox executors run as Docker services managed by +# RAGFlow. This is the default provider used by the `sandbox` compose profile. # Pull the required base images before running: # docker pull infiniflow/sandbox-base-nodejs:latest # docker pull infiniflow/sandbox-base-python:latest -# Our default sandbox environments include: -# - Node.js base image: includes axios -# - Python base image: includes requests, numpy, and pandas -# Specify custom executor images below if you're using non-default environments. -# SANDBOX_HOST=${SANDBOX_HOST:-sandbox-executor-manager} -# SANDBOX_EXECUTOR_MANAGER_IMAGE=infiniflow/sandbox-executor-manager:latest -# SANDBOX_EXECUTOR_MANAGER_POOL_SIZE=3 -# SANDBOX_BASE_PYTHON_IMAGE=infiniflow/sandbox-base-python:latest -# SANDBOX_BASE_NODEJS_IMAGE=infiniflow/sandbox-base-nodejs:latest -# SANDBOX_EXECUTOR_MANAGER_PORT=9385 +# Default runtime images include: +# - Node.js base image: axios +# - Python base image: requests, numpy, pandas +# SANDBOX_EXECUTOR_MANAGER_IMAGE=${SANDBOX_EXECUTOR_MANAGER_IMAGE:-infiniflow/sandbox-executor-manager:latest} +# SANDBOX_EXECUTOR_MANAGER_POOL_SIZE=${SANDBOX_EXECUTOR_MANAGER_POOL_SIZE:-3} +# SANDBOX_BASE_PYTHON_IMAGE=${SANDBOX_BASE_PYTHON_IMAGE:-infiniflow/sandbox-base-python:latest} +# SANDBOX_BASE_NODEJS_IMAGE=${SANDBOX_BASE_NODEJS_IMAGE:-infiniflow/sandbox-base-nodejs:latest} +# SANDBOX_EXECUTOR_MANAGER_PORT=${SANDBOX_EXECUTOR_MANAGER_PORT:-9385} # SANDBOX_ENABLE_SECCOMP=false # SANDBOX_MAX_MEMORY=256m # b, k, m, g # SANDBOX_TIMEOUT=10s # s, m, 1m30s -# The MinIO bucket name for storing sandbox-generated artifacts (charts, files, etc.). -SANDBOX_ARTIFACT_BUCKET=sandbox-artifacts -# Number of days before sandbox artifacts are automatically deleted from storage. -SANDBOX_ARTIFACT_EXPIRE_DAYS=7 + +# Provider: local +# Use this provider only in trusted development environments. It executes code +# on the local machine instead of inside Docker-managed sandbox containers. +# When `SANDBOX_PROVIDER_TYPE=local`, you usually do not need the `sandbox` +# compose profile. +# Uncomment and adjust only if you use the local provider. +# SANDBOX_LOCAL_ENABLED=true +# SANDBOX_LOCAL_PYTHON_BIN=python3 +# SANDBOX_LOCAL_NODE_BIN=node +# SANDBOX_LOCAL_WORK_DIR=/tmp/ragflow-codeexec +# SANDBOX_LOCAL_TIMEOUT=30 +# SANDBOX_LOCAL_MAX_MEMORY_MB=1024 +# SANDBOX_LOCAL_MAX_OUTPUT_BYTES=1048576 +# SANDBOX_LOCAL_MAX_ARTIFACTS=20 +# SANDBOX_LOCAL_MAX_ARTIFACT_BYTES=10485760 # Enable DocLing USE_DOCLING=false diff --git a/test/unit_test/agent/sandbox/test_local_provider.py b/test/unit_test/agent/sandbox/test_local_provider.py new file mode 100644 index 0000000000..e3bcd14865 --- /dev/null +++ b/test/unit_test/agent/sandbox/test_local_provider.py @@ -0,0 +1,98 @@ +import base64 +import sys + +import pytest + +from agent.sandbox.providers.base import SandboxProviderConfigError +from agent.sandbox.providers.local import LocalProvider + + +def _make_provider(monkeypatch, tmp_path, **overrides): + monkeypatch.setenv("SANDBOX_LOCAL_ENABLED", "true") + config = { + "python_bin": sys.executable, + "work_dir": str(tmp_path), + "timeout": 5, + "max_memory_mb": 512, + "max_output_bytes": 1024 * 1024, + "max_artifacts": 20, + "max_artifact_bytes": 1024 * 1024, + } + config.update(overrides) + provider = LocalProvider() + provider.initialize(config) + return provider + + +def test_local_provider_requires_explicit_env_enable(monkeypatch, tmp_path): + monkeypatch.delenv("SANDBOX_LOCAL_ENABLED", raising=False) + provider = LocalProvider() + + with pytest.raises(SandboxProviderConfigError): + provider.initialize({"work_dir": str(tmp_path)}) + + +def test_local_provider_executes_python_main(monkeypatch, tmp_path): + provider = _make_provider(monkeypatch, tmp_path) + instance = provider.create_instance("python") + + try: + result = provider.execute_code( + instance.instance_id, + 'def main(name: str) -> dict:\n return {"message": "hello " + name}\n', + "python", + timeout=5, + arguments={"name": "ragflow"}, + ) + finally: + provider.destroy_instance(instance.instance_id) + + assert result.exit_code == 0 + assert result.stdout == "" + assert result.metadata["result_present"] is True + assert result.metadata["result_value"] == {"message": "hello ragflow"} + + +def test_local_provider_collects_artifacts(monkeypatch, tmp_path): + provider = _make_provider(monkeypatch, tmp_path) + instance = provider.create_instance("python") + + try: + result = provider.execute_code( + instance.instance_id, + ( + "from pathlib import Path\n" + "def main() -> dict:\n" + " Path('artifacts/chart.png').write_bytes(b'PNGDATA')\n" + " return {'ok': True}\n" + ), + "python", + timeout=5, + ) + finally: + provider.destroy_instance(instance.instance_id) + + assert result.metadata["artifacts"] == [ + { + "name": "chart.png", + "content_b64": base64.b64encode(b"PNGDATA").decode("ascii"), + "mime_type": "image/png", + "size": 7, + } + ] + + +def test_local_provider_times_out(monkeypatch, tmp_path): + provider = _make_provider(monkeypatch, tmp_path, timeout=1) + instance = provider.create_instance("python") + + try: + with pytest.raises(TimeoutError): + provider.execute_code( + instance.instance_id, + "import time\n\ndef main() -> dict:\n time.sleep(5)\n return {'ok': True}\n", + "python", + timeout=1, + ) + finally: + provider.destroy_instance(instance.instance_id)