Feat: support local provider for code exec component & remove some outdated models (#14637)

### What problem does this PR solve?

Feat: support local provider for code exec component & remove some
outdated models

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Magicbook1108
2026-05-07 21:23:13 +08:00
committed by GitHub
parent 057806d7f1
commit c29335cbff
10 changed files with 659 additions and 143 deletions

View File

@@ -23,11 +23,12 @@ with the configured sandbox provider.
import json
import logging
import os
from typing import Dict, Any, Optional
from api.db.services.system_settings_service import SystemSettingsService
from agent.sandbox.providers import ProviderManager
from agent.sandbox.providers.base import ExecutionResult
from agent.sandbox.providers.base import ExecutionResult, SandboxProviderConfigError
logger = logging.getLogger(__name__)
@@ -59,8 +60,8 @@ def _load_provider_from_settings() -> None:
"""
Load sandbox provider from system settings and configure the provider manager.
This function reads the system settings to determine which provider is active
and initializes it with the appropriate configuration.
This function resolves the active provider type, then loads configuration
from system settings with environment overrides for that provider.
"""
global _provider_manager
@@ -68,41 +69,27 @@ def _load_provider_from_settings() -> None:
return
try:
# Get active provider type
provider_type_settings = SystemSettingsService.get_by_name("sandbox.provider_type")
if not provider_type_settings:
raise RuntimeError(
"Sandbox provider type not configured. Please set 'sandbox.provider_type' in system settings."
)
provider_type = provider_type_settings[0].value
# Get provider configuration
provider_config_settings = SystemSettingsService.get_by_name(f"sandbox.{provider_type}")
if not provider_config_settings:
logger.warning(f"No configuration found for provider: {provider_type}")
config = {}
else:
try:
config = json.loads(provider_config_settings[0].value)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse sandbox config for {provider_type}: {e}")
config = {}
provider_type, provider_type_from_env = _resolve_provider_type()
config = _load_provider_config(provider_type)
# Import and instantiate the provider
from agent.sandbox.providers import (
SelfManagedProvider,
AliyunCodeInterpreterProvider,
E2BProvider,
LocalProvider,
)
provider_classes = {
"self_managed": SelfManagedProvider,
"aliyun_codeinterpreter": AliyunCodeInterpreterProvider,
"e2b": E2BProvider,
"local": LocalProvider,
}
if provider_type not in provider_classes:
if provider_type_from_env:
raise SandboxProviderConfigError(f"Unknown sandbox provider type: {provider_type}")
logger.error(f"Unknown provider type: {provider_type}")
return
@@ -111,19 +98,97 @@ def _load_provider_from_settings() -> None:
# Initialize the provider
if not provider.initialize(config):
logger.error(f"Failed to initialize sandbox provider: {provider_type}. Config keys: {list(config.keys())}")
message = f"Failed to initialize sandbox provider: {provider_type}. Config keys: {list(config.keys())}"
if provider_type == "local" or provider_type_from_env:
raise SandboxProviderConfigError(message)
logger.error(message)
return
# Set the active provider
_provider_manager.set_provider(provider_type, provider)
logger.info(f"Sandbox provider '{provider_type}' initialized successfully")
except SandboxProviderConfigError:
raise
except Exception as e:
logger.error(f"Failed to load sandbox provider from settings: {e}")
import traceback
traceback.print_exc()
def _load_provider_config_from_settings(provider_type: str) -> Dict[str, Any]:
provider_config_settings = SystemSettingsService.get_by_name(f"sandbox.{provider_type}")
if not provider_config_settings:
logger.warning(f"No configuration found for provider: {provider_type}")
return {}
try:
return json.loads(provider_config_settings[0].value)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse sandbox config for {provider_type}: {e}")
return {}
def _resolve_provider_type() -> tuple[str, bool]:
provider_type = os.environ.get("SANDBOX_PROVIDER_TYPE", "").strip()
if provider_type:
return provider_type, True
provider_type_settings = SystemSettingsService.get_by_name("sandbox.provider_type")
if not provider_type_settings:
raise RuntimeError(
"Sandbox provider type not configured. Please set 'sandbox.provider_type' in system settings."
)
return provider_type_settings[0].value, False
def _load_provider_config(provider_type: str) -> Dict[str, Any]:
config = _load_provider_config_from_settings(provider_type)
env_config = _load_provider_config_from_env(provider_type)
if env_config:
config.update(env_config)
return config
def _load_provider_config_from_env(provider_type: str) -> Dict[str, Any]:
if provider_type == "local":
return _load_local_provider_config_from_env()
if provider_type == "self_managed":
return _load_self_managed_provider_config_from_env()
return {}
def _load_local_provider_config_from_env() -> Dict[str, Any]:
env_to_config = {
"SANDBOX_LOCAL_PYTHON_BIN": "python_bin",
"SANDBOX_LOCAL_NODE_BIN": "node_bin",
"SANDBOX_LOCAL_WORK_DIR": "work_dir",
"SANDBOX_LOCAL_TIMEOUT": "timeout",
"SANDBOX_LOCAL_MAX_MEMORY_MB": "max_memory_mb",
"SANDBOX_LOCAL_MAX_OUTPUT_BYTES": "max_output_bytes",
"SANDBOX_LOCAL_MAX_ARTIFACTS": "max_artifacts",
"SANDBOX_LOCAL_MAX_ARTIFACT_BYTES": "max_artifact_bytes",
}
config = {}
for env_name, config_name in env_to_config.items():
if env_name in os.environ:
config[config_name] = os.environ[env_name]
return config
def _load_self_managed_provider_config_from_env() -> Dict[str, Any]:
host = os.environ.get("SANDBOX_HOST", "").strip()
port = os.environ.get("SANDBOX_EXECUTOR_MANAGER_PORT", "").strip()
pool_size = os.environ.get("SANDBOX_EXECUTOR_MANAGER_POOL_SIZE", "").strip()
config = {}
if host:
config["endpoint"] = f"http://{host}:{port or '9385'}"
if pool_size:
config["pool_size"] = pool_size
return config
def reload_provider() -> None:
"""
Reload the sandbox provider from system settings.

View File

@@ -24,20 +24,24 @@ This package contains:
- aliyun_codeinterpreter.py: Aliyun Code Interpreter provider implementation
Official Documentation: https://help.aliyun.com/zh/functioncompute/fc/sandbox-sandbox-code-interepreter
- e2b.py: E2B provider implementation
- local.py: Local process provider implementation
"""
from .base import SandboxProvider, SandboxInstance, ExecutionResult
from .base import SandboxProvider, SandboxInstance, ExecutionResult, SandboxProviderConfigError
from .manager import ProviderManager
from .self_managed import SelfManagedProvider
from .aliyun_codeinterpreter import AliyunCodeInterpreterProvider
from .e2b import E2BProvider
from .local import LocalProvider
__all__ = [
"SandboxProvider",
"SandboxInstance",
"ExecutionResult",
"SandboxProviderConfigError",
"ProviderManager",
"SelfManagedProvider",
"AliyunCodeInterpreterProvider",
"E2BProvider",
"LocalProvider",
]

View File

@@ -30,7 +30,6 @@ https://api.aliyun.com/api/AgentRun/2025-09-10/CreateSandbox?lang=PYTHON
import logging
import os
import time
import base64
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
@@ -39,10 +38,10 @@ from agentrun.sandbox import TemplateType, CodeLanguage, Template, TemplateInput
from agentrun.utils.config import Config
from agentrun.utils.exception import ServerError
from agent.sandbox.result_protocol import build_javascript_wrapper, build_python_wrapper, extract_structured_result
from .base import SandboxProvider, SandboxInstance, ExecutionResult
logger = logging.getLogger(__name__)
RESULT_MARKER_PREFIX = "__RAGFLOW_RESULT__:"
class AliyunCodeInterpreterProvider(SandboxProvider):
@@ -234,9 +233,9 @@ class AliyunCodeInterpreterProvider(SandboxProvider):
# Matches self_managed provider behavior: call main(**arguments)
args_json = json.dumps(arguments or {})
wrapped_code = (
self._build_python_wrapper(code, args_json)
build_python_wrapper(code, args_json)
if normalized_lang == "python"
else self._build_javascript_wrapper(code, args_json)
else build_javascript_wrapper(code, args_json)
)
logger.debug(f"Aliyun Code Interpreter: Wrapped code (first 200 chars): {wrapped_code[:200]}")
@@ -284,7 +283,7 @@ class AliyunCodeInterpreterProvider(SandboxProvider):
stdout = "\n".join(stdout_parts)
stderr = "\n".join(stderr_parts)
stdout, structured_result = self._extract_structured_result(stdout)
stdout, structured_result = extract_structured_result(stdout)
logger.info(f"Aliyun Code Interpreter: stdout length={len(stdout)}, stderr length={len(stderr)}, exit_code={exit_code}")
if stdout:
@@ -364,71 +363,6 @@ class AliyunCodeInterpreterProvider(SandboxProvider):
# If we get any response (even an error), the service is reachable
return "connection" not in str(e).lower()
@staticmethod
def _build_python_wrapper(code: str, args_json: str) -> str:
marker = RESULT_MARKER_PREFIX
return f'''{code}
if __name__ == "__main__":
import base64
import json
result = main(**{args_json})
payload = json.dumps({{"present": True, "value": result, "type": "json"}}, ensure_ascii=False, separators=(",", ":"))
print("{marker}" + base64.b64encode(payload.encode("utf-8")).decode("ascii"))
'''
@staticmethod
def _build_javascript_wrapper(code: str, args_json: str) -> str:
marker = RESULT_MARKER_PREFIX
return f'''{code}
const __ragflowArgs = {args_json};
(async () => {{
try {{
const output = await Promise.resolve(main(__ragflowArgs));
if (typeof output === 'undefined') {{
throw new Error('main() must return a value. Use null for an empty result.');
}}
const payload = JSON.stringify({{ present: true, value: output, type: 'json' }});
if (typeof payload === 'undefined') {{
throw new Error('main() returned a non-JSON-serializable value.');
}}
console.log('{marker}' + Buffer.from(payload, 'utf8').toString('base64'));
}} catch (err) {{
console.error(err instanceof Error ? err.stack || err.message : String(err));
}}
}})();
'''
@staticmethod
def _extract_structured_result(stdout: str) -> tuple[str, Dict[str, Any]]:
if not stdout:
return "", {}
cleaned_lines: list[str] = []
structured_result: Dict[str, Any] = {}
for line in str(stdout).splitlines():
if line.startswith(RESULT_MARKER_PREFIX):
payload_b64 = line[len(RESULT_MARKER_PREFIX) :].strip()
if not payload_b64:
continue
try:
payload = base64.b64decode(payload_b64).decode("utf-8")
structured_result = json.loads(payload)
except Exception as exc:
logger.warning(f"Aliyun Code Interpreter: failed to decode structured result marker: {exc}")
cleaned_lines.append(line)
continue
cleaned_lines.append(line)
cleaned_stdout = "\n".join(cleaned_lines)
if stdout.endswith("\n") and cleaned_stdout and not cleaned_stdout.endswith("\n"):
cleaned_stdout += "\n"
return cleaned_stdout, structured_result
def get_supported_languages(self) -> List[str]:
"""
Get list of supported programming languages.

View File

@@ -26,6 +26,10 @@ from dataclasses import dataclass
from typing import Dict, Any, Optional, List
class SandboxProviderConfigError(Exception):
"""Raised when the selected provider is explicitly configured but unusable."""
@dataclass
class SandboxInstance:
"""Represents a sandbox execution instance"""
@@ -209,4 +213,4 @@ class SandboxProvider(ABC):
>>> return True, None
"""
# Default implementation: no custom validation
return True, None
return True, None

View File

@@ -0,0 +1,296 @@
#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import json
import mimetypes
import os
import shutil
import signal
import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.sandbox.result_protocol import build_javascript_wrapper, build_python_wrapper, extract_structured_result
from .base import ExecutionResult, SandboxInstance, SandboxProvider, SandboxProviderConfigError
ALLOWED_ARTIFACT_EXTENSIONS = {
".csv",
".html",
".jpeg",
".jpg",
".json",
".pdf",
".png",
".svg",
}
def _env_enabled(name: str) -> bool:
return os.environ.get(name, "").strip().lower() in {"1", "true", "yes", "on"}
class LocalProvider(SandboxProvider):
"""
Execute code as a local child process.
This provider is intentionally gated by SANDBOX_LOCAL_ENABLED because it is
not a sandbox boundary. Use a low-privilege runtime account.
"""
def __init__(self):
self.python_bin = "python3"
self.node_bin = "node"
self.work_dir = Path("/tmp/ragflow-codeexec")
self.timeout = 30
self.max_memory_mb = 512
self.max_output_bytes = 1024 * 1024
self.max_artifacts = 20
self.max_artifact_bytes = 10 * 1024 * 1024
self._initialized = False
self._instances: dict[str, Path] = {}
def initialize(self, config: Dict[str, Any]) -> bool:
if not _env_enabled("SANDBOX_LOCAL_ENABLED"):
raise SandboxProviderConfigError("Local code execution is disabled. Set SANDBOX_LOCAL_ENABLED=true to enable it.")
self.python_bin = str(self._resolve_config_value(config, "python_bin", "SANDBOX_LOCAL_PYTHON_BIN", "python3"))
self.node_bin = str(self._resolve_config_value(config, "node_bin", "SANDBOX_LOCAL_NODE_BIN", "node"))
self.work_dir = Path(self._resolve_config_value(config, "work_dir", "SANDBOX_LOCAL_WORK_DIR", "/tmp/ragflow-codeexec")).resolve()
self.timeout = int(self._resolve_config_value(config, "timeout", "SANDBOX_LOCAL_TIMEOUT", 30))
self.max_memory_mb = int(self._resolve_config_value(config, "max_memory_mb", "SANDBOX_LOCAL_MAX_MEMORY_MB", 512))
self.max_output_bytes = int(self._resolve_config_value(config, "max_output_bytes", "SANDBOX_LOCAL_MAX_OUTPUT_BYTES", 1024 * 1024))
self.max_artifacts = int(self._resolve_config_value(config, "max_artifacts", "SANDBOX_LOCAL_MAX_ARTIFACTS", 20))
self.max_artifact_bytes = int(self._resolve_config_value(config, "max_artifact_bytes", "SANDBOX_LOCAL_MAX_ARTIFACT_BYTES", 10 * 1024 * 1024))
self._validate_limits()
self.work_dir.mkdir(parents=True, exist_ok=True, mode=0o700)
self._initialized = True
return True
def create_instance(self, template: str = "python") -> SandboxInstance:
if not self._initialized:
raise RuntimeError("Provider not initialized. Call initialize() first.")
language = self._normalize_language(template)
instance_id = str(uuid.uuid4())
instance_dir = self.work_dir / instance_id
instance_dir.mkdir(mode=0o700)
(instance_dir / "artifacts").mkdir(mode=0o700)
self._instances[instance_id] = instance_dir
return SandboxInstance(
instance_id=instance_id,
provider="local",
status="running",
metadata={"language": language, "work_dir": str(instance_dir)},
)
def execute_code(
self,
instance_id: str,
code: str,
language: str,
timeout: int = 10,
arguments: Optional[Dict[str, Any]] = None,
) -> ExecutionResult:
if not self._initialized:
raise RuntimeError("Provider not initialized. Call initialize() first.")
normalized_lang = self._normalize_language(language)
instance_dir = self._instances[instance_id]
args_json = json.dumps(arguments or {}, ensure_ascii=False)
command, script_path = self._prepare_script(instance_dir, normalized_lang, code, args_json)
requested_timeout = self.timeout if timeout is None else int(timeout)
if requested_timeout <= 0:
raise RuntimeError(f"Execution timeout must be greater than 0 seconds, got {requested_timeout}.")
exec_timeout = min(requested_timeout, self.timeout)
start_time = time.time()
process = subprocess.Popen(
command,
cwd=instance_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
env=self._build_child_env(instance_dir),
preexec_fn=self._limit_child_process if os.name == "posix" else None,
start_new_session=os.name == "posix",
)
try:
stdout, stderr = process.communicate(timeout=exec_timeout)
except subprocess.TimeoutExpired:
if os.name == "posix":
os.killpg(process.pid, signal.SIGKILL)
else:
process.kill()
process.communicate()
raise TimeoutError(f"Execution timed out after {exec_timeout} seconds")
execution_time = time.time() - start_time
self._validate_output_size(stdout, stderr)
stdout, structured_result = extract_structured_result(stdout)
return ExecutionResult(
stdout=stdout,
stderr=stderr,
exit_code=process.returncode,
execution_time=execution_time,
metadata={
"instance_id": instance_id,
"language": normalized_lang,
"script_path": str(script_path),
"status": "ok" if process.returncode == 0 else "error",
"timeout": exec_timeout,
"artifacts": self._collect_artifacts(instance_dir / "artifacts"),
"result_present": structured_result.get("present", False),
"result_value": structured_result.get("value"),
"result_type": structured_result.get("type"),
},
)
def destroy_instance(self, instance_id: str) -> bool:
if not self._initialized:
raise RuntimeError("Provider not initialized. Call initialize() first.")
instance_dir = self._instances.pop(instance_id)
shutil.rmtree(instance_dir)
return True
def health_check(self) -> bool:
return self._initialized and self.work_dir.exists() and os.access(self.work_dir, os.W_OK)
def get_supported_languages(self) -> List[str]:
return ["python", "javascript", "nodejs"]
@staticmethod
def get_config_schema() -> Dict[str, Dict]:
return {
"python_bin": {"type": "string", "required": False, "default": "python3"},
"node_bin": {"type": "string", "required": False, "default": "node"},
"work_dir": {"type": "string", "required": False, "default": "/tmp/ragflow-codeexec"},
"timeout": {"type": "integer", "required": False, "default": 30},
"max_memory_mb": {"type": "integer", "required": False, "default": 512},
"max_output_bytes": {"type": "integer", "required": False, "default": 1048576},
"max_artifacts": {"type": "integer", "required": False, "default": 20},
"max_artifact_bytes": {"type": "integer", "required": False, "default": 10485760},
}
def _validate_limits(self) -> None:
if self.timeout <= 0:
raise SandboxProviderConfigError("SANDBOX_LOCAL_TIMEOUT must be greater than 0.")
if self.max_memory_mb <= 0:
raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_MEMORY_MB must be greater than 0.")
if self.max_output_bytes <= 0:
raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_OUTPUT_BYTES must be greater than 0.")
if self.max_artifacts < 0:
raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_ARTIFACTS must be greater than or equal to 0.")
if self.max_artifact_bytes <= 0:
raise SandboxProviderConfigError("SANDBOX_LOCAL_MAX_ARTIFACT_BYTES must be greater than 0.")
def _prepare_script(self, instance_dir: Path, language: str, code: str, args_json: str) -> tuple[list[str], Path]:
if language == "python":
script_path = instance_dir / "main.py"
script_path.write_text(build_python_wrapper(code, args_json), encoding="utf-8")
return [self.python_bin, str(script_path)], script_path
if language in {"javascript", "nodejs"}:
script_path = instance_dir / "main.js"
script_path.write_text(build_javascript_wrapper(code, args_json), encoding="utf-8")
return [self.node_bin, str(script_path)], script_path
raise RuntimeError(f"Unsupported language for local provider: {language}")
@staticmethod
def _resolve_config_value(config: Dict[str, Any], key: str, env_name: str, default: Any) -> Any:
value = config.get(key)
if value is not None:
return value
return os.environ.get(env_name, default)
def _build_child_env(self, instance_dir: Path) -> dict[str, str]:
return {
"HOME": str(instance_dir),
"MPLBACKEND": "Agg",
"PATH": os.environ.get("PATH", ""),
"PYTHONUNBUFFERED": "1",
"TMPDIR": str(instance_dir),
}
def _limit_child_process(self) -> None:
import resource
self._set_resource_limit(resource.RLIMIT_CPU, self.timeout + 1)
self._set_resource_limit(resource.RLIMIT_AS, self.max_memory_mb * 1024 * 1024)
self._set_resource_limit(resource.RLIMIT_FSIZE, self.max_artifact_bytes)
self._set_resource_limit(resource.RLIMIT_NOFILE, 64)
@staticmethod
def _set_resource_limit(kind: int, value: int) -> None:
import resource
_, hard = resource.getrlimit(kind)
limit = value if hard == resource.RLIM_INFINITY else min(value, hard)
resource.setrlimit(kind, (limit, limit))
def _validate_output_size(self, stdout: str, stderr: str) -> None:
output_size = len((stdout or "").encode("utf-8")) + len((stderr or "").encode("utf-8"))
if output_size > self.max_output_bytes:
raise RuntimeError(f"Local execution output exceeded {self.max_output_bytes} bytes.")
def _collect_artifacts(self, artifacts_dir: Path) -> list[dict[str, Any]]:
artifacts: list[dict[str, Any]] = []
for path in sorted(artifacts_dir.rglob("*")):
if path.is_symlink():
raise RuntimeError(f"Artifact symlinks are not allowed: {path.name}")
if path.is_dir():
continue
if not path.is_file():
raise RuntimeError(f"Unsupported artifact entry: {path.name}")
if len(artifacts) >= self.max_artifacts:
raise RuntimeError(f"Local execution produced more than {self.max_artifacts} artifacts.")
size = path.stat().st_size
if size > self.max_artifact_bytes:
raise RuntimeError(f"Artifact exceeds {self.max_artifact_bytes} bytes: {path.name}")
ext = path.suffix.lower()
if ext not in ALLOWED_ARTIFACT_EXTENSIONS:
raise RuntimeError(f"Unsupported artifact type: {path.name}")
artifacts.append(
{
"name": path.relative_to(artifacts_dir).as_posix(),
"content_b64": base64.b64encode(path.read_bytes()).decode("ascii"),
"mime_type": mimetypes.guess_type(path.name)[0] or "application/octet-stream",
"size": size,
}
)
return artifacts
@staticmethod
def _normalize_language(language: str) -> str:
lang_lower = (language or "python").lower()
if lang_lower in {"python", "python3"}:
return "python"
if lang_lower in {"javascript", "nodejs"}:
return "nodejs"
return lang_lower

View File

@@ -0,0 +1,85 @@
#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import json
from typing import Any
RESULT_MARKER_PREFIX = "__RAGFLOW_RESULT__:"
def build_python_wrapper(code: str, args_json: str) -> str:
return f'''{code}
if __name__ == "__main__":
import base64
import json
result = main(**{args_json})
payload = json.dumps({{"present": True, "value": result, "type": "json"}}, ensure_ascii=False, separators=(",", ":"))
print("{RESULT_MARKER_PREFIX}" + base64.b64encode(payload.encode("utf-8")).decode("ascii"))
'''
def build_javascript_wrapper(code: str, args_json: str) -> str:
return f'''{code}
const __ragflowArgs = {args_json};
(async () => {{
const __ragflowMain = typeof main !== 'undefined' ? main : module.exports && module.exports.main;
if (typeof __ragflowMain !== 'function') {{
throw new Error('main() must be defined or exported.');
}}
const output = await Promise.resolve(__ragflowMain(__ragflowArgs));
if (typeof output === 'undefined') {{
throw new Error('main() must return a value. Use null for an empty result.');
}}
const payload = JSON.stringify({{ present: true, value: output, type: 'json' }});
if (typeof payload === 'undefined') {{
throw new Error('main() returned a non-JSON-serializable value.');
}}
console.log('{RESULT_MARKER_PREFIX}' + Buffer.from(payload, 'utf8').toString('base64'));
}})();
'''
def extract_structured_result(stdout: str) -> tuple[str, dict[str, Any]]:
if not stdout:
return "", {}
cleaned_lines: list[str] = []
structured_result: dict[str, Any] = {}
for line in str(stdout).splitlines():
if line.startswith(RESULT_MARKER_PREFIX):
payload_b64 = line[len(RESULT_MARKER_PREFIX) :].strip()
if not payload_b64:
cleaned_lines.append(line)
continue
try:
payload = base64.b64decode(payload_b64, validate=True).decode("utf-8")
structured_result = json.loads(payload)
except Exception:
cleaned_lines.append(line)
continue
cleaned_lines.append(line)
cleaned_stdout = "\n".join(cleaned_lines)
if stdout.endswith("\n") and cleaned_stdout and not cleaned_stdout.endswith("\n"):
cleaned_stdout += "\n"
return cleaned_stdout, structured_result

View File

@@ -357,6 +357,7 @@ class CodeExec(ToolBase, ABC):
# Try using the new sandbox provider system first
try:
from agent.sandbox.client import execute_code as sandbox_execute_code
from agent.sandbox.providers.base import SandboxProviderConfigError
if self.check_if_canceled("CodeExec execution"):
return
@@ -376,8 +377,16 @@ class CodeExec(ToolBase, ABC):
execution_metadata=result.metadata,
)
except (ImportError, RuntimeError) as provider_error:
# Provider system not available or not configured, fall back to HTTP
except SandboxProviderConfigError as provider_error:
self.set_output("_ERROR", str(provider_error))
return self.output()
except ImportError as provider_error:
# Provider modules are unavailable, fall back to legacy HTTP sandbox.
logging.info(f"[CodeExec]: Provider system not available, using HTTP fallback: {provider_error}")
except RuntimeError as provider_error:
if not self._should_fallback_to_http(provider_error):
self.set_output("_ERROR", f"Provider system execution failed: {provider_error}")
return self.output()
logging.info(f"[CodeExec]: Provider system not available, using HTTP fallback: {provider_error}")
# Fallback to direct HTTP request
@@ -487,6 +496,15 @@ class CodeExec(ToolBase, ABC):
return metadata.get("result_value"), False
return self._deserialize_stdout(stdout), True
@staticmethod
def _should_fallback_to_http(provider_error: RuntimeError) -> bool:
message = str(provider_error).lower()
fallback_markers = (
"no sandbox provider configured",
"sandbox provider type not configured",
)
return any(marker in message for marker in fallback_markers)
@classmethod
def _ensure_bucket_lifecycle(cls):
if cls._lifecycle_configured:

View File

@@ -421,13 +421,6 @@
"model_type": "chat",
"is_tools": false
},
{
"llm_name": "deepseek-r1-distill-qwen-7b",
"tags": "LLM,CHAT,32K",
"max_tokens": 32768,
"model_type": "chat",
"is_tools": false
},
{
"llm_name": "deepseek-r1-distill-qwen-14b",
"tags": "LLM,CHAT,32K",
@@ -2948,20 +2941,6 @@
"model_type": "chat",
"is_tools": true
},
{
"llm_name": "Pro/deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
"tags": "LLM,CHAT,32k",
"max_tokens": 32000,
"model_type": "chat",
"is_tools": true
},
{
"llm_name": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
"tags": "LLM,CHAT,32k",
"max_tokens": 32000,
"model_type": "chat",
"is_tools": true
},
{
"llm_name": "deepseek-ai/DeepSeek-V2.5",
"tags": "LLM,CHAT,32k",
@@ -4246,13 +4225,6 @@
"model_type": "chat",
"is_tools": false
},
{
"llm_name": "DeepSeek-R1-Distill-Qwen-7B",
"tags": "LLM,CHAT",
"max_tokens": 65792,
"model_type": "chat",
"is_tools": false
},
{
"llm_name": "DeepSeek-R1-Distill-Qwen-1.5B",
"tags": "LLM,CHAT",

View File

@@ -239,32 +239,72 @@ EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-16}
# - Disable registration: 0
REGISTER_ENABLED=1
# Important: To enable sandbox, you need to uncomment following two lines:
# -----------------------------------------------------------------------------
# Sandbox
# -----------------------------------------------------------------------------
# Sandbox settings are grouped by provider type.
# 1. Set `SANDBOX_ENABLED=1` to enable sandbox support.
# 2. Set `SANDBOX_PROVIDER_TYPE` to choose the active provider.
# 3. Only edit the section that matches the selected provider type.
# 4. If you do not use `self_managed`, remove `,sandbox` from `COMPOSE_PROFILES`.
#
# Naming convention for future providers:
# - `SANDBOX_<PROVIDER>_*`
# Examples:
# - `SANDBOX_SELF_MANAGED_*`
# - `SANDBOX_LOCAL_*`
# - `SANDBOX_E2B_*`
# - `SANDBOX_ALIYUN_CODEINTERPRETER_*`
# Enable sandbox support.
# SANDBOX_ENABLED=1
# COMPOSE_PROFILES=${COMPOSE_PROFILES},sandbox
# SANDBOX_PROVIDER_TYPE=${SANDBOX_PROVIDER_TYPE:-self_managed}
# Sandbox settings
# Double check if you add `sandbox-executor-manager` to your `/etc/hosts`
# Shared sandbox settings
# `SANDBOX_HOST` is kept as the common endpoint name for legacy HTTP fallback
# and for the self-managed provider.
# Double check that `sandbox-executor-manager` resolves correctly in your
# Docker network or `/etc/hosts`.
# SANDBOX_HOST=${SANDBOX_HOST:-sandbox-executor-manager}
# The MinIO bucket name for storing sandbox-generated artifacts.
# SANDBOX_ARTIFACT_BUCKET=sandbox-artifacts
# Number of days before sandbox artifacts are automatically deleted.
# SANDBOX_ARTIFACT_EXPIRE_DAYS=7
# Provider: self_managed
# Use this provider when sandbox executors run as Docker services managed by
# RAGFlow. This is the default provider used by the `sandbox` compose profile.
# Pull the required base images before running:
# docker pull infiniflow/sandbox-base-nodejs:latest
# docker pull infiniflow/sandbox-base-python:latest
# Our default sandbox environments include:
# - Node.js base image: includes axios
# - Python base image: includes requests, numpy, and pandas
# Specify custom executor images below if you're using non-default environments.
# SANDBOX_HOST=${SANDBOX_HOST:-sandbox-executor-manager}
# SANDBOX_EXECUTOR_MANAGER_IMAGE=infiniflow/sandbox-executor-manager:latest
# SANDBOX_EXECUTOR_MANAGER_POOL_SIZE=3
# SANDBOX_BASE_PYTHON_IMAGE=infiniflow/sandbox-base-python:latest
# SANDBOX_BASE_NODEJS_IMAGE=infiniflow/sandbox-base-nodejs:latest
# SANDBOX_EXECUTOR_MANAGER_PORT=9385
# Default runtime images include:
# - Node.js base image: axios
# - Python base image: requests, numpy, pandas
# SANDBOX_EXECUTOR_MANAGER_IMAGE=${SANDBOX_EXECUTOR_MANAGER_IMAGE:-infiniflow/sandbox-executor-manager:latest}
# SANDBOX_EXECUTOR_MANAGER_POOL_SIZE=${SANDBOX_EXECUTOR_MANAGER_POOL_SIZE:-3}
# SANDBOX_BASE_PYTHON_IMAGE=${SANDBOX_BASE_PYTHON_IMAGE:-infiniflow/sandbox-base-python:latest}
# SANDBOX_BASE_NODEJS_IMAGE=${SANDBOX_BASE_NODEJS_IMAGE:-infiniflow/sandbox-base-nodejs:latest}
# SANDBOX_EXECUTOR_MANAGER_PORT=${SANDBOX_EXECUTOR_MANAGER_PORT:-9385}
# SANDBOX_ENABLE_SECCOMP=false
# SANDBOX_MAX_MEMORY=256m # b, k, m, g
# SANDBOX_TIMEOUT=10s # s, m, 1m30s
# The MinIO bucket name for storing sandbox-generated artifacts (charts, files, etc.).
SANDBOX_ARTIFACT_BUCKET=sandbox-artifacts
# Number of days before sandbox artifacts are automatically deleted from storage.
SANDBOX_ARTIFACT_EXPIRE_DAYS=7
# Provider: local
# Use this provider only in trusted development environments. It executes code
# on the local machine instead of inside Docker-managed sandbox containers.
# When `SANDBOX_PROVIDER_TYPE=local`, you usually do not need the `sandbox`
# compose profile.
# Uncomment and adjust only if you use the local provider.
# SANDBOX_LOCAL_ENABLED=true
# SANDBOX_LOCAL_PYTHON_BIN=python3
# SANDBOX_LOCAL_NODE_BIN=node
# SANDBOX_LOCAL_WORK_DIR=/tmp/ragflow-codeexec
# SANDBOX_LOCAL_TIMEOUT=30
# SANDBOX_LOCAL_MAX_MEMORY_MB=1024
# SANDBOX_LOCAL_MAX_OUTPUT_BYTES=1048576
# SANDBOX_LOCAL_MAX_ARTIFACTS=20
# SANDBOX_LOCAL_MAX_ARTIFACT_BYTES=10485760
# Enable DocLing
USE_DOCLING=false

View File

@@ -0,0 +1,98 @@
import base64
import sys
import pytest
from agent.sandbox.providers.base import SandboxProviderConfigError
from agent.sandbox.providers.local import LocalProvider
def _make_provider(monkeypatch, tmp_path, **overrides):
monkeypatch.setenv("SANDBOX_LOCAL_ENABLED", "true")
config = {
"python_bin": sys.executable,
"work_dir": str(tmp_path),
"timeout": 5,
"max_memory_mb": 512,
"max_output_bytes": 1024 * 1024,
"max_artifacts": 20,
"max_artifact_bytes": 1024 * 1024,
}
config.update(overrides)
provider = LocalProvider()
provider.initialize(config)
return provider
def test_local_provider_requires_explicit_env_enable(monkeypatch, tmp_path):
monkeypatch.delenv("SANDBOX_LOCAL_ENABLED", raising=False)
provider = LocalProvider()
with pytest.raises(SandboxProviderConfigError):
provider.initialize({"work_dir": str(tmp_path)})
def test_local_provider_executes_python_main(monkeypatch, tmp_path):
provider = _make_provider(monkeypatch, tmp_path)
instance = provider.create_instance("python")
try:
result = provider.execute_code(
instance.instance_id,
'def main(name: str) -> dict:\n return {"message": "hello " + name}\n',
"python",
timeout=5,
arguments={"name": "ragflow"},
)
finally:
provider.destroy_instance(instance.instance_id)
assert result.exit_code == 0
assert result.stdout == ""
assert result.metadata["result_present"] is True
assert result.metadata["result_value"] == {"message": "hello ragflow"}
def test_local_provider_collects_artifacts(monkeypatch, tmp_path):
provider = _make_provider(monkeypatch, tmp_path)
instance = provider.create_instance("python")
try:
result = provider.execute_code(
instance.instance_id,
(
"from pathlib import Path\n"
"def main() -> dict:\n"
" Path('artifacts/chart.png').write_bytes(b'PNGDATA')\n"
" return {'ok': True}\n"
),
"python",
timeout=5,
)
finally:
provider.destroy_instance(instance.instance_id)
assert result.metadata["artifacts"] == [
{
"name": "chart.png",
"content_b64": base64.b64encode(b"PNGDATA").decode("ascii"),
"mime_type": "image/png",
"size": 7,
}
]
def test_local_provider_times_out(monkeypatch, tmp_path):
provider = _make_provider(monkeypatch, tmp_path, timeout=1)
instance = provider.create_instance("python")
try:
with pytest.raises(TimeoutError):
provider.execute_code(
instance.instance_id,
"import time\n\ndef main() -> dict:\n time.sleep(5)\n return {'ok': True}\n",
"python",
timeout=1,
)
finally:
provider.destroy_instance(instance.instance_id)