Files
ragflow/agent/component/invoke.py
Magicbook1108 901023a80a Fix: literal eval http request input (#14145)
### What problem does this PR solve?

Fix: literal eval http request input

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

<img width="700" alt="img_v3_0210q_f4b49ff7-e670-4054-ab0e-9443a09215fg"
src="https://github.com/user-attachments/assets/089300be-06f9-4bb6-97af-61bf5f4a5e8c"
/>


<img width="700" alt="img_v3_0210q_398cd52a-2ad9-42be-8d5b-4e6e68a7d22g"
src="https://github.com/user-attachments/assets/239b43cd-a2a5-49d8-9200-991bb26336c8"
/>
2026-04-16 16:52:34 +08:00

227 lines
8.4 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import os
import re
import time
from abc import ABC
import requests
from agent.component.base import ComponentBase, ComponentParamBase
from common.connection_utils import timeout
from deepdoc.parser import HtmlParser
class InvokeParam(ComponentParamBase):
"""
Define the Crawler component parameters.
"""
def __init__(self):
super().__init__()
self.proxy = None
self.headers = ""
self.method = "get"
self.variables = []
self.url = ""
self.timeout = 60
self.clean_html = False
self.datatype = "json" # New parameter to determine data posting type
def check(self):
self.check_valid_value(self.method.lower(), "Type of content from the crawler", ["get", "post", "put"])
self.check_empty(self.url, "End point URL")
self.check_positive_integer(self.timeout, "Timeout time in second")
self.check_boolean(self.clean_html, "Clean HTML")
self.check_valid_value(self.datatype.lower(), "Data post type", ["json", "formdata"]) # Check for valid datapost value
class Invoke(ComponentBase, ABC):
component_name = "Invoke"
@staticmethod
def _coerce_json_arg_if_possible(key, value):
raw_value = value
if isinstance(value, str):
try:
value = json.loads(value)
logging.debug(
"Invoke JSON arg coercion succeeded. key=%s parsed_type=%s",
key,
type(value).__name__,
)
except json.JSONDecodeError as exc:
logging.info(
"Invoke JSON arg coercion skipped; value is not valid JSON. key=%s raw=%r error=%s",
key,
raw_value,
exc,
)
return raw_value
try:
json.dumps(value, allow_nan=False)
except (TypeError, ValueError) as exc:
logging.warning(
"Invoke JSON arg is not JSON-serializable. key=%s value_type=%s value=%r error=%s",
key,
type(value).__name__,
value,
exc,
)
raise ValueError(f"Invoke JSON argument '{key}' is not JSON-serializable.") from exc
return value
def get_input_form(self) -> dict[str, dict]:
res = {}
for item in self._param.variables or []:
if not isinstance(item, dict):
continue
ref = (item.get("ref") or "").strip()
if not ref or ref in res:
continue
elements = self.get_input_elements_from_text("{" + ref + "}")
element = elements.get(ref, {})
res[ref] = {
"type": "line",
"name": element.get("name") or item.get("key") or ref,
}
return res
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 3)))
def _invoke(self, **kwargs):
if self.check_if_canceled("Invoke processing"):
return
is_json_mode = self._param.datatype.lower() == "json"
args = {}
for para in self._param.variables:
key = para["key"]
if "value" in para and para.get("value") is not None:
value = para["value"]
elif para.get("ref") in kwargs:
value = kwargs[para["ref"]]
else:
value = self._canvas.get_variable_value(para["ref"])
coerced_value = self._coerce_json_arg_if_possible(key, value) if is_json_mode else value
args[key] = coerced_value
if para.get("ref"):
self.set_input_value(para["ref"], coerced_value)
url = self._param.url.strip()
def replace_variable(match):
var_name = match.group(1)
try:
value = self._canvas.get_variable_value(var_name)
return str(value or "")
except Exception:
return ""
variable_pattern = r"\{([a-zA-Z_][a-zA-Z0-9_.@-]*)\}"
# {base_url} or {component_id@variable_name}
url = re.sub(variable_pattern, replace_variable, url)
if url.find("http") != 0:
url = "http://" + url
method = self._param.method.lower()
headers = {}
if self._param.headers:
try:
parsed_headers = json.loads(self._param.headers)
except json.JSONDecodeError as e:
logging.warning(
"Invoke headers are not valid JSON, ignoring headers. raw=%r error=%s",
self._param.headers,
e,
)
parsed_headers = {}
if not isinstance(parsed_headers, dict):
logging.warning(
"Invoke headers JSON is of type %s, expected an object; ignoring headers.",
type(parsed_headers).__name__,
)
parsed_headers = {}
headers = parsed_headers
for key, value in list(headers.items()):
if isinstance(value, str):
headers[key] = re.sub(variable_pattern, replace_variable, value)
proxies = None
if re.sub(r"https?:?/?/?", "", self._param.proxy):
proxies = {"http": self._param.proxy, "https": self._param.proxy}
last_e = ""
for _ in range(self._param.max_retries + 1):
if self.check_if_canceled("Invoke processing"):
return
try:
if method == "get":
response = requests.get(url=url, params=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
if self._param.clean_html:
sections = HtmlParser()(None, response.content)
self.set_output("result", "\n".join(sections))
else:
self.set_output("result", response.text)
if method == "put":
if self._param.datatype.lower() == "json":
response = requests.put(url=url, json=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
else:
response = requests.put(url=url, data=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
if self._param.clean_html:
sections = HtmlParser()(None, response.content)
self.set_output("result", "\n".join(sections))
else:
self.set_output("result", response.text)
if method == "post":
if self._param.datatype.lower() == "json":
response = requests.post(url=url, json=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
else:
response = requests.post(url=url, data=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
if self._param.clean_html:
sections = HtmlParser()(None, response.content)
self.set_output("result", "\n".join(sections))
else:
self.set_output("result", response.text)
return self.output("result")
except Exception as e:
if self.check_if_canceled("Invoke processing"):
return
last_e = e
logging.exception(f"Http request error: {e}")
time.sleep(self._param.delay_after_error)
if last_e:
self.set_output("_ERROR", str(last_e))
return f"Http request error: {last_e}"
assert False, self.output()
def thoughts(self) -> str:
return "Waiting for the server respond..."