diff --git a/rag/llm/rerank_model.py b/rag/llm/rerank_model.py index a150b40e72..bcf8347e6f 100644 --- a/rag/llm/rerank_model.py +++ b/rag/llm/rerank_model.py @@ -17,8 +17,9 @@ import json import logging from abc import ABC from urllib.parse import urljoin +from typing import Tuple, List +from http import HTTPStatus -import httpx import numpy as np import requests from yarl import URL @@ -28,21 +29,15 @@ from common.token_utils import num_tokens_from_string, truncate, total_token_cou class Base(ABC): def __init__(self, key, model_name, **kwargs): - """ - Abstract base class constructor. - Parameters are not stored; initialization is left to subclasses. - """ pass - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: raise NotImplementedError("Please implement encode method!") @staticmethod def _normalize_rank(rank: np.ndarray) -> np.ndarray: - """ - Normalize rank values to the range 0 to 1. - Avoids division by zero if all ranks are identical. - """ + if rank.size == 0: + return rank min_rank = np.min(rank) max_rank = np.max(rank) @@ -58,17 +53,21 @@ class JinaRerank(Base): _FACTORY_NAME = "Jina" def __init__(self, key, model_name="jina-reranker-v2-base-multilingual", base_url="https://api.jina.ai/v1/rerank"): - self.base_url = "https://api.jina.ai/v1/rerank" + self.base_url = base_url or "https://api.jina.ai/v1/rerank" self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"} self.model_name = model_name - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts) if texts else 0, dtype=float), 0 texts = [truncate(t, 8196) for t in texts] data = {"model": self.model_name, "query": query, "documents": texts, "top_n": len(texts)} - res = requests.post(self.base_url, headers=self.headers, json=data, timeout=30).json() + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) @@ -89,18 +88,20 @@ class XInferenceRerank(Base): if key and key != "x": self.headers["Authorization"] = f"Bearer {key}" - def similarity(self, query: str, texts: list): - if len(texts) == 0: - return np.array([]), 0 + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts) if texts else 0, dtype=float), 0 pairs = [(query, truncate(t, 4096)) for t in texts] token_count = 0 for _, t in pairs: token_count += num_tokens_from_string(t) data = {"model": self.model_name, "query": query, "return_documents": "true", "return_len": "true", "documents": texts} - res = requests.post(self.base_url, headers=self.headers, json=data, timeout=30).json() + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) @@ -118,8 +119,9 @@ class LocalAIRerank(Base): self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"} self.model_name = model_name.split("___")[0] - def similarity(self, query: str, texts: list): - # noway to config Ragflow , use fix setting + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 texts = [truncate(t, 500) for t in texts] data = { "model": self.model_name, @@ -130,16 +132,17 @@ class LocalAIRerank(Base): token_count = 0 for t in texts: token_count += num_tokens_from_string(t) - res = requests.post(self.base_url, headers=self.headers, json=data, timeout=30).json() + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) rank = Base._normalize_rank(rank) - return rank, token_count @@ -164,7 +167,9 @@ class NvidiaRerank(Base): "Authorization": f"Bearer {key}", } - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 token_count = num_tokens_from_string(query) + sum([num_tokens_from_string(t) for t in texts]) data = { "model": self.model_name, @@ -173,10 +178,12 @@ class NvidiaRerank(Base): "truncate": "END", "top_n": len(texts), } - res = requests.post(self.base_url, headers=self.headers, json=data, timeout=30).json() + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["rankings"]: + for d in res.get("rankings", []): rank[d["index"]] = d["logit"] except Exception as _e: log_exception(_e, res) @@ -189,8 +196,8 @@ class LmStudioRerank(Base): def __init__(self, key, model_name, base_url, **kwargs): pass - def similarity(self, query: str, texts: list): - raise NotImplementedError("The LmStudioRerank has not been implement") + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + raise NotImplementedError("The LmStudioRerank has not been implemented") class OpenAI_APIRerank(Base): @@ -205,8 +212,9 @@ class OpenAI_APIRerank(Base): self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"} self.model_name = model_name.split("___")[0] - def similarity(self, query: str, texts: list): - # noway to config Ragflow , use fix setting + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 texts = [truncate(t, 500) for t in texts] data = { "model": self.model_name, @@ -217,16 +225,17 @@ class OpenAI_APIRerank(Base): token_count = 0 for t in texts: token_count += num_tokens_from_string(t) - res = requests.post(self.base_url, headers=self.headers, json=data, timeout=30).json() + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) rank = Base._normalize_rank(rank) - return rank, token_count @@ -236,14 +245,15 @@ class CoHereRerank(Base): def __init__(self, key, model_name, base_url=None): from cohere import Client - # Only pass base_url if it's a non-empty string, otherwise use default Cohere API endpoint - client_kwargs = {"api_key": key} + client_kwargs = {"api_key": key, "timeout": 30.0} if base_url and base_url.strip(): client_kwargs["base_url"] = base_url self.client = Client(**client_kwargs) self.model_name = model_name.split("___")[0] - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 token_count = num_tokens_from_string(query) + sum([num_tokens_from_string(t) for t in texts]) res = self.client.rerank( model=self.model_name, @@ -267,8 +277,8 @@ class TogetherAIRerank(Base): def __init__(self, key, model_name, base_url, **kwargs): pass - def similarity(self, query: str, texts: list): - raise NotImplementedError("The api has not been implement") + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + raise NotImplementedError("The api has not been implemented") class SILICONFLOWRerank(Base): @@ -288,7 +298,9 @@ class SILICONFLOWRerank(Base): "authorization": f"Bearer {key}", } - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 payload = { "model": self.model_name, "query": query, @@ -298,18 +310,16 @@ class SILICONFLOWRerank(Base): "max_chunks_per_doc": 1024, "overlap_tokens": 80, } - response_raw = requests.post(self.base_url, json=payload, headers=self.headers, timeout=30) - response = response_raw.json() + response = requests.post(self.base_url, json=payload, headers=self.headers, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in response["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, response) - return ( - rank, - total_token_count_from_response(response), - ) + return rank, total_token_count_from_response(res) class BaiduYiyanRerank(Base): @@ -321,10 +331,12 @@ class BaiduYiyanRerank(Base): key = json.loads(key) ak = key.get("yiyan_ak", "") sk = key.get("yiyan_sk", "") - self.client = Reranker(ak=ak, sk=sk) + self.client = Reranker(ak=ak, sk=sk, request_timeout=30) self.model_name = model_name - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 res = self.client.do( model=self.model_name, query=query, @@ -333,7 +345,7 @@ class BaiduYiyanRerank(Base): ).body rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) @@ -346,12 +358,12 @@ class VoyageRerank(Base): def __init__(self, key, model_name, base_url=None): import voyageai - self.client = voyageai.Client(api_key=key) + self.client = voyageai.Client(api_key=key, timeout=30.0) self.model_name = model_name - def similarity(self, query: str, texts: list): - if not texts: - return np.array([]), 0 + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts) if texts else 0, dtype=float), 0 rank = np.zeros(len(texts), dtype=float) res = self.client.rerank(query=query, documents=texts, model=self.model_name, top_k=len(texts)) @@ -368,28 +380,31 @@ class QWenRerank(Base): def __init__(self, key, model_name="gte-rerank", **kwargs): import dashscope - self.api_key = key self.model_name = dashscope.TextReRank.Models.gte_rerank if model_name is None else model_name + # Remove invalid global timeout, use official SDK per-request timeout parameter + self.request_timeout = 30.0 - def similarity(self, query: str, texts: list): - from http import HTTPStatus - + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 + import dashscope - # Build call parameters - call_kwargs = { - "api_key": self.api_key, - "model": self.model_name, - "query": query, - "documents": texts, - "top_n": len(texts) - } - # qwen3-rerank does not support return_documents parameter - if not self.model_name.startswith("qwen3-rerank"): - call_kwargs["return_documents"] = False - - resp = dashscope.TextReRank.call(**call_kwargs) + # Pass official request_timeout parameter to both API call branches + if self.model_name.startswith("qwen3-rerank"): + resp = dashscope.TextReRank.call( + api_key=self.api_key, model=self.model_name, + query=query, documents=texts, top_n=len(texts), + request_timeout=self.request_timeout + ) + else: + resp = dashscope.TextReRank.call( + api_key=self.api_key, model=self.model_name, + query=query, documents=texts, + top_n=len(texts), return_documents=False, + request_timeout=self.request_timeout + ) rank = np.zeros(len(texts), dtype=float) if resp.status_code == HTTPStatus.OK: @@ -411,18 +426,21 @@ class HuggingfaceRerank(Base): exc = None scores = [0 for _ in range(len(texts))] batch_size = 8 + # FIX: Robust URL construction to avoid duplicate "/rerank" path suffix + base_url = url.rstrip("/") + if not base_url.startswith(("http://", "https://")): + base_url = f"http://{base_url}" + # Only append "/rerank" when endpoint does not already end with it + endpoint = base_url if base_url.endswith("/rerank") else f"{base_url}/rerank" + for i in range(0, len(texts), batch_size): try: - endpoint = (url or "").rstrip("/") - - if not endpoint.endswith("/rerank"): - endpoint = f"{endpoint}/rerank" res = requests.post( - endpoint, - headers = {"Content-Type": "application/json"}, - json = {"query": query, "texts": texts[i: i + batch_size], "raw_scores": False, "truncate": True}, + endpoint, headers={"Content-Type": "application/json"}, + json={"query": query, "texts": texts[i:i+batch_size], "raw_scores": False, "truncate": True}, timeout=30 ) + res.raise_for_status() for o in res.json(): scores[o["index"] + i] = o["score"] except Exception as e: @@ -436,9 +454,9 @@ class HuggingfaceRerank(Base): self.model_name = model_name.split("___")[0] self.base_url = base_url - def similarity(self, query: str, texts: list) -> tuple[np.ndarray, int]: - if not texts: - return np.array([]), 0 + def similarity(self, query: str, texts: List) -> tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 token_count = 0 for t in texts: token_count += num_tokens_from_string(t) @@ -460,7 +478,10 @@ class GPUStackRerank(Base): "authorization": f"Bearer {key}", } - def similarity(self, query: str, texts: list): + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 + payload = { "model": self.model_name, "query": query, @@ -474,23 +495,17 @@ class GPUStackRerank(Base): response_json = response.json() rank = np.zeros(len(texts), dtype=float) - - token_count = 0 - for t in texts: - token_count += num_tokens_from_string(t) + token_count = sum(num_tokens_from_string(t) for t in texts) try: - for result in response_json["results"]: + for result in response_json.get("results", []): rank[result["index"]] = result["relevance_score"] except Exception as _e: log_exception(_e, response) - return ( - rank, - token_count, - ) + return (rank, token_count) - except httpx.HTTPStatusError as e: - raise ValueError(f"Error calling GPUStackRerank model {self.model_name}: {e.response.status_code} - {e.response.text}") + except requests.exceptions.RequestException as e: + raise ValueError(f"Error calling GPUStackRerank model {self.model_name}: {str(e)}") from e class NovitaRerank(JinaRerank): @@ -515,9 +530,25 @@ class Ai302Rerank(Base): _FACTORY_NAME = "302.AI" def __init__(self, key, model_name, base_url="https://api.302.ai/v1/rerank"): - if not base_url: - base_url = "https://api.302.ai/v1/rerank" - super().__init__(key, model_name, base_url) + self.base_url = base_url or "https://api.302.ai/v1/rerank" + self.headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"} + self.model_name = model_name + + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 + texts = [truncate(t, 500) for t in texts] + data = {"model": self.model_name, "query": query, "documents": texts, "top_n": len(texts)} + response = requests.post(self.base_url, headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() + rank = np.zeros(len(texts), dtype=float) + try: + for d in res.get("results", []): + rank[d["index"]] = d["relevance_score"] + except Exception as _e: + log_exception(_e, res) + return rank, total_token_count_from_response(res) class JiekouAIRerank(JinaRerank): @@ -540,12 +571,6 @@ class FuturMixRerank(OpenAI_APIRerank): class RAGconRerank(Base): - """ - RAGcon Rerank Provider - routes through LiteLLM proxy - - Assumes LiteLLM proxy supports /rerank endpoint. - Default Base URL: https://connect.ragcon.ai/v1 - """ _FACTORY_NAME = "RAGcon" def __init__(self, key, model_name, base_url=None, **kwargs): @@ -559,8 +584,10 @@ class RAGconRerank(Base): self.model_name = model_name - def similarity(self, query: str, texts: list): - # noway to config Ragflow , use fix setting + def similarity(self, query: str, texts: List) -> Tuple[np.ndarray, int]: + if not query or not texts: + return np.zeros(len(texts), dtype=float), 0 + texts = [truncate(t, 500) for t in texts] data = { "model": self.model_name, @@ -568,17 +595,16 @@ class RAGconRerank(Base): "documents": texts, "top_n": len(texts), } - token_count = 0 - for t in texts: - token_count += num_tokens_from_string(t) - res = requests.post(self._base_url + "/rerank", headers=self.headers, json=data, timeout=30).json() + token_count = sum(num_tokens_from_string(t) for t in texts) + response = requests.post(self._base_url + "/rerank", headers=self.headers, json=data, timeout=30) + response.raise_for_status() + res = response.json() rank = np.zeros(len(texts), dtype=float) try: - for d in res["results"]: + for d in res.get("results", []): rank[d["index"]] = d["relevance_score"] except Exception as _e: log_exception(_e, res) rank = Base._normalize_rank(rank) - return rank, token_count