mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Fix: variable in doc generator (#14180)
### What problem does this PR solve? Fix: variable in doc generator ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -146,35 +146,32 @@ class DocGenerator(Message, ABC):
|
||||
os.remove(file_path)
|
||||
|
||||
def _resolve_content(self, kwargs: dict) -> str:
|
||||
content = self._param.content or ""
|
||||
content = self._param.content or kwargs.get("content", "") or ""
|
||||
logging.info("Starting document generation, content length: %s chars", len(content))
|
||||
|
||||
if content and self._canvas.is_reff(content.strip()):
|
||||
matches = re.findall(self.variable_ref_patt, content, flags=re.DOTALL)
|
||||
for match in matches:
|
||||
if content:
|
||||
def _replace_variable(match_obj: re.Match[str]) -> str:
|
||||
match = match_obj.group(1)
|
||||
try:
|
||||
var_value = self._canvas.get_variable_value(match)
|
||||
if var_value is None:
|
||||
continue
|
||||
return ""
|
||||
if isinstance(var_value, partial):
|
||||
resolved_content = ""
|
||||
for chunk in var_value():
|
||||
resolved_content += chunk
|
||||
content = content.replace("{" + match + "}", resolved_content)
|
||||
else:
|
||||
content = content.replace("{" + match + "}", str(var_value))
|
||||
return resolved_content
|
||||
return self._stringify_message_value(var_value, fallback_to_str=True)
|
||||
except Exception as e:
|
||||
logging.warning("Error resolving variable %s: %s", match, str(e))
|
||||
content = content.replace("{" + match + "}", f"[ERROR: {str(e)}]")
|
||||
return f"[ERROR: {str(e)}]"
|
||||
|
||||
if content:
|
||||
try:
|
||||
content, _ = self.get_kwargs(content, kwargs)
|
||||
except Exception as e:
|
||||
logging.warning("Error processing content with get_kwargs: %s", str(e))
|
||||
|
||||
if not content:
|
||||
content = kwargs.get("content", "")
|
||||
content = re.sub(
|
||||
self.variable_ref_patt,
|
||||
_replace_variable,
|
||||
content,
|
||||
flags=re.DOTALL,
|
||||
)
|
||||
|
||||
return content
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import os
|
||||
import re
|
||||
import time
|
||||
from abc import ABC
|
||||
from functools import partial
|
||||
|
||||
import requests
|
||||
|
||||
@@ -29,7 +30,7 @@ from deepdoc.parser import HtmlParser
|
||||
|
||||
class InvokeParam(ComponentParamBase):
|
||||
"""
|
||||
Define the Crawler component parameters.
|
||||
Define the Invoke component parameters.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
@@ -41,7 +42,7 @@ class InvokeParam(ComponentParamBase):
|
||||
self.url = ""
|
||||
self.timeout = 60
|
||||
self.clean_html = False
|
||||
self.datatype = "json" # New parameter to determine data posting type
|
||||
self.datatype = "json"
|
||||
|
||||
def check(self):
|
||||
self.check_valid_value(self.method.lower(), "Type of content from the crawler", ["get", "post", "put"])
|
||||
@@ -53,6 +54,7 @@ class InvokeParam(ComponentParamBase):
|
||||
|
||||
class Invoke(ComponentBase, ABC):
|
||||
component_name = "Invoke"
|
||||
header_variable_ref_patt = r"\{([a-zA-Z_][a-zA-Z0-9_.@-]*)\}"
|
||||
|
||||
@staticmethod
|
||||
def _coerce_json_arg_if_possible(key, value):
|
||||
@@ -105,122 +107,138 @@ class Invoke(ComponentBase, ABC):
|
||||
}
|
||||
return res
|
||||
|
||||
def _resolve_variable_value(self, variable_name: str, kwargs: dict | None = None):
|
||||
kwargs = kwargs or {}
|
||||
value = kwargs.get(variable_name, self._canvas.get_variable_value(variable_name))
|
||||
if isinstance(value, partial):
|
||||
value = "".join(value())
|
||||
self.set_input_value(variable_name, value)
|
||||
return "" if value is None else value
|
||||
|
||||
def _render_template(self, content: str, pattern: str, kwargs: dict | None = None, *, flags: int = 0) -> str:
|
||||
content = content or ""
|
||||
if not content:
|
||||
return content
|
||||
|
||||
def replace_variable(match_obj):
|
||||
return str(self._resolve_variable_value(match_obj.group(1), kwargs))
|
||||
|
||||
return re.sub(pattern, replace_variable, content, flags=flags)
|
||||
|
||||
def _resolve_template_text(self, content: str, kwargs: dict | None = None) -> str:
|
||||
return self._render_template(content, self.variable_ref_patt, kwargs, flags=re.DOTALL)
|
||||
|
||||
def _resolve_header_text(self, content: str, kwargs: dict | None = None) -> str:
|
||||
# Headers support plain {token} placeholders, so they cannot reuse the canvas variable regex.
|
||||
return self._render_template(content, self.header_variable_ref_patt, kwargs)
|
||||
|
||||
def _resolve_arg_value(self, para: dict, kwargs: dict) -> object:
|
||||
if para.get("value") is not None:
|
||||
value = para["value"]
|
||||
if isinstance(value, str):
|
||||
return self._resolve_template_text(value, kwargs)
|
||||
return value
|
||||
return self._resolve_variable_value(para["ref"], kwargs)
|
||||
|
||||
def _is_json_mode(self) -> bool:
|
||||
return self._param.datatype.lower() == "json"
|
||||
|
||||
def _build_request_args(self, kwargs: dict) -> dict:
|
||||
args = {}
|
||||
for para in self._param.variables:
|
||||
key = para["key"]
|
||||
value = self._resolve_arg_value(para, kwargs)
|
||||
if self._is_json_mode():
|
||||
# JSON mode accepts stringified JSON so complex payloads can be passed through variables.
|
||||
value = self._coerce_json_arg_if_possible(key, value)
|
||||
args[key] = value
|
||||
|
||||
if para.get("ref"):
|
||||
self.set_input_value(para["ref"], value)
|
||||
return args
|
||||
|
||||
def _build_url(self, kwargs: dict) -> str:
|
||||
url = self._resolve_template_text(self._param.url.strip(), kwargs)
|
||||
if not url.startswith(("http://", "https://")):
|
||||
url = "http://" + url
|
||||
return url
|
||||
|
||||
def _build_headers(self, kwargs: dict) -> dict:
|
||||
if not self._param.headers:
|
||||
return {}
|
||||
|
||||
headers = json.loads(self._param.headers)
|
||||
if not isinstance(headers, dict):
|
||||
raise ValueError("Invoke headers must be a JSON object.")
|
||||
|
||||
return {
|
||||
key: self._resolve_header_text(value, kwargs) if isinstance(value, str) else value
|
||||
for key, value in headers.items()
|
||||
}
|
||||
|
||||
def _build_proxies(self) -> dict | None:
|
||||
if not re.sub(r"https?:?/?/?", "", self._param.proxy):
|
||||
return None
|
||||
return {"http": self._param.proxy, "https": self._param.proxy}
|
||||
|
||||
def _send_request(self, url: str, args: dict, headers: dict, proxies: dict | None):
|
||||
method = self._param.method.lower()
|
||||
request = getattr(requests, method)
|
||||
request_kwargs = {
|
||||
"url": url,
|
||||
"headers": headers,
|
||||
"proxies": proxies,
|
||||
"timeout": self._param.timeout,
|
||||
}
|
||||
|
||||
# GET sends query params; POST/PUT send either JSON or form data based on datatype.
|
||||
if method == "get":
|
||||
request_kwargs["params"] = args
|
||||
return request(**request_kwargs)
|
||||
|
||||
body_key = "json" if self._is_json_mode() else "data"
|
||||
request_kwargs[body_key] = args
|
||||
return request(**request_kwargs)
|
||||
|
||||
def _format_response(self, response) -> str:
|
||||
if not self._param.clean_html:
|
||||
return response.text
|
||||
|
||||
# HtmlParser keeps the Invoke output text-focused when the endpoint returns HTML.
|
||||
sections = HtmlParser()(None, response.content)
|
||||
return "\n".join(sections)
|
||||
|
||||
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 3)))
|
||||
def _invoke(self, **kwargs):
|
||||
if self.check_if_canceled("Invoke processing"):
|
||||
return
|
||||
|
||||
is_json_mode = self._param.datatype.lower() == "json"
|
||||
args = {}
|
||||
for para in self._param.variables:
|
||||
key = para["key"]
|
||||
if "value" in para and para.get("value") is not None:
|
||||
value = para["value"]
|
||||
elif para.get("ref") in kwargs:
|
||||
value = kwargs[para["ref"]]
|
||||
else:
|
||||
value = self._canvas.get_variable_value(para["ref"])
|
||||
args = self._build_request_args(kwargs)
|
||||
url = self._build_url(kwargs)
|
||||
headers = self._build_headers(kwargs)
|
||||
proxies = self._build_proxies()
|
||||
|
||||
coerced_value = self._coerce_json_arg_if_possible(key, value) if is_json_mode else value
|
||||
args[key] = coerced_value
|
||||
|
||||
if para.get("ref"):
|
||||
self.set_input_value(para["ref"], coerced_value)
|
||||
|
||||
url = self._param.url.strip()
|
||||
|
||||
def replace_variable(match):
|
||||
var_name = match.group(1)
|
||||
try:
|
||||
value = self._canvas.get_variable_value(var_name)
|
||||
return str(value or "")
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
variable_pattern = r"\{([a-zA-Z_][a-zA-Z0-9_.@-]*)\}"
|
||||
|
||||
# {base_url} or {component_id@variable_name}
|
||||
url = re.sub(variable_pattern, replace_variable, url)
|
||||
|
||||
if url.find("http") != 0:
|
||||
url = "http://" + url
|
||||
|
||||
method = self._param.method.lower()
|
||||
headers = {}
|
||||
if self._param.headers:
|
||||
try:
|
||||
parsed_headers = json.loads(self._param.headers)
|
||||
except json.JSONDecodeError as e:
|
||||
logging.warning(
|
||||
"Invoke headers are not valid JSON, ignoring headers. raw=%r error=%s",
|
||||
self._param.headers,
|
||||
e,
|
||||
)
|
||||
parsed_headers = {}
|
||||
if not isinstance(parsed_headers, dict):
|
||||
logging.warning(
|
||||
"Invoke headers JSON is of type %s, expected an object; ignoring headers.",
|
||||
type(parsed_headers).__name__,
|
||||
)
|
||||
parsed_headers = {}
|
||||
headers = parsed_headers
|
||||
for key, value in list(headers.items()):
|
||||
if isinstance(value, str):
|
||||
headers[key] = re.sub(variable_pattern, replace_variable, value)
|
||||
proxies = None
|
||||
if re.sub(r"https?:?/?/?", "", self._param.proxy):
|
||||
proxies = {"http": self._param.proxy, "https": self._param.proxy}
|
||||
|
||||
last_e = ""
|
||||
last_error = None
|
||||
for _ in range(self._param.max_retries + 1):
|
||||
if self.check_if_canceled("Invoke processing"):
|
||||
return
|
||||
|
||||
try:
|
||||
if method == "get":
|
||||
response = requests.get(url=url, params=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
|
||||
if self._param.clean_html:
|
||||
sections = HtmlParser()(None, response.content)
|
||||
self.set_output("result", "\n".join(sections))
|
||||
else:
|
||||
self.set_output("result", response.text)
|
||||
|
||||
if method == "put":
|
||||
if self._param.datatype.lower() == "json":
|
||||
response = requests.put(url=url, json=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
|
||||
else:
|
||||
response = requests.put(url=url, data=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
|
||||
if self._param.clean_html:
|
||||
sections = HtmlParser()(None, response.content)
|
||||
self.set_output("result", "\n".join(sections))
|
||||
else:
|
||||
self.set_output("result", response.text)
|
||||
|
||||
if method == "post":
|
||||
if self._param.datatype.lower() == "json":
|
||||
response = requests.post(url=url, json=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
|
||||
else:
|
||||
response = requests.post(url=url, data=args, headers=headers, proxies=proxies, timeout=self._param.timeout)
|
||||
if self._param.clean_html:
|
||||
sections = HtmlParser()(None, response.content)
|
||||
self.set_output("result", "\n".join(sections))
|
||||
else:
|
||||
self.set_output("result", response.text)
|
||||
|
||||
return self.output("result")
|
||||
response = self._send_request(url, args, headers, proxies)
|
||||
result = self._format_response(response)
|
||||
self.set_output("result", result)
|
||||
return result
|
||||
except Exception as e:
|
||||
if self.check_if_canceled("Invoke processing"):
|
||||
return
|
||||
|
||||
last_e = e
|
||||
last_error = e
|
||||
logging.exception(f"Http request error: {e}")
|
||||
time.sleep(self._param.delay_after_error)
|
||||
|
||||
if last_e:
|
||||
self.set_output("_ERROR", str(last_e))
|
||||
return f"Http request error: {last_e}"
|
||||
|
||||
assert False, self.output()
|
||||
if last_error:
|
||||
self.set_output("_ERROR", str(last_error))
|
||||
return f"Http request error: {last_error}"
|
||||
|
||||
def thoughts(self) -> str:
|
||||
return "Waiting for the server respond..."
|
||||
|
||||
Reference in New Issue
Block a user