Files
robbyczgw-cla_web-search-plus/scripts/search.py

2941 lines
103 KiB
Python

#!/usr/bin/env python3
"""
Web Search Plus — Unified Multi-Provider Search with Intelligent Auto-Routing
Supports: Serper (Google), Tavily (Research), Querit (Multilingual AI Search),
Exa (Neural), Perplexity (Direct Answers)
Smart Routing uses multi-signal analysis:
- Query intent classification (shopping, research, discovery)
- Linguistic pattern detection (how much vs how does)
- Product/brand recognition
- URL detection
- Confidence scoring
Usage:
python3 search.py --query "..." # Auto-route based on query
python3 search.py --provider [serper|tavily|querit|exa] --query "..." [options]
Examples:
python3 search.py -q "iPhone 16 Pro price" # → Serper (shopping intent)
python3 search.py -q "how does quantum entanglement work" # → Tavily (research intent)
python3 search.py -q "startups similar to Notion" # → Exa (discovery intent)
"""
import argparse
from http.client import IncompleteRead
import hashlib
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urlparse
# =============================================================================
# Result Caching
# =============================================================================
CACHE_DIR = Path(os.environ.get("WSP_CACHE_DIR", os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), ".cache")))
PROVIDER_HEALTH_FILE = CACHE_DIR / "provider_health.json"
DEFAULT_CACHE_TTL = 3600 # 1 hour in seconds
def _build_cache_payload(query: str, provider: str, max_results: int, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Build normalized payload used for cache key hashing."""
payload = {
"query": query,
"provider": provider,
"max_results": max_results,
}
if params:
payload.update(params)
return payload
def _get_cache_key(query: str, provider: str, max_results: int, params: Optional[Dict[str, Any]] = None) -> str:
"""Generate a unique cache key from all relevant query parameters."""
payload = _build_cache_payload(query, provider, max_results, params)
key_string = json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
return hashlib.sha256(key_string.encode("utf-8")).hexdigest()[:32]
def _get_cache_path(cache_key: str) -> Path:
"""Get the file path for a cache entry."""
return CACHE_DIR / f"{cache_key}.json"
def _ensure_cache_dir() -> None:
"""Create cache directory if it doesn't exist."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def cache_get(query: str, provider: str, max_results: int, ttl: int = DEFAULT_CACHE_TTL, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
Retrieve cached search results if they exist and are not expired.
Args:
query: The search query
provider: The search provider
max_results: Maximum results requested
ttl: Time-to-live in seconds (default: 1 hour)
Returns:
Cached result dict or None if not found/expired
"""
cache_key = _get_cache_key(query, provider, max_results, params)
cache_path = _get_cache_path(cache_key)
if not cache_path.exists():
return None
try:
with open(cache_path, "r", encoding="utf-8") as f:
cached = json.load(f)
cached_time = cached.get("_cache_timestamp", 0)
if time.time() - cached_time > ttl:
# Cache expired, remove it
cache_path.unlink(missing_ok=True)
return None
return cached
except (json.JSONDecodeError, IOError, KeyError):
# Corrupted cache file, remove it
cache_path.unlink(missing_ok=True)
return None
def cache_put(query: str, provider: str, max_results: int, result: Dict[str, Any], params: Optional[Dict[str, Any]] = None) -> None:
"""
Store search results in cache.
Args:
query: The search query
provider: The search provider
max_results: Maximum results requested
result: The search result to cache
"""
_ensure_cache_dir()
cache_key = _get_cache_key(query, provider, max_results, params)
cache_path = _get_cache_path(cache_key)
# Add cache metadata
cached_result = result.copy()
cached_result["_cache_timestamp"] = time.time()
cached_result["_cache_key"] = cache_key
cached_result["_cache_query"] = query
cached_result["_cache_provider"] = provider
cached_result["_cache_max_results"] = max_results
cached_result["_cache_params"] = params or {}
try:
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(cached_result, f, ensure_ascii=False, indent=2)
except IOError as e:
# Non-fatal: log to stderr but don't fail
print(json.dumps({"cache_write_error": str(e)}), file=sys.stderr)
def cache_clear() -> Dict[str, Any]:
"""
Clear all cached results.
Returns:
Stats about what was cleared
"""
if not CACHE_DIR.exists():
return {"cleared": 0, "message": "Cache directory does not exist"}
count = 0
size_freed = 0
for cache_file in CACHE_DIR.glob("*.json"):
if cache_file.name == PROVIDER_HEALTH_FILE.name:
continue
try:
size_freed += cache_file.stat().st_size
cache_file.unlink()
count += 1
except IOError:
pass
return {
"cleared": count,
"size_freed_bytes": size_freed,
"size_freed_kb": round(size_freed / 1024, 2),
"message": f"Cleared {count} cached entries"
}
def cache_stats() -> Dict[str, Any]:
"""
Get statistics about the cache.
Returns:
Dict with cache statistics
"""
if not CACHE_DIR.exists():
return {
"total_entries": 0,
"total_size_bytes": 0,
"total_size_kb": 0,
"oldest": None,
"newest": None,
"cache_dir": str(CACHE_DIR),
"exists": False
}
entries = [p for p in CACHE_DIR.glob("*.json") if p.name != PROVIDER_HEALTH_FILE.name]
total_size = 0
oldest_time = None
newest_time = None
oldest_query = None
newest_query = None
provider_counts = {}
for cache_file in entries:
try:
stat = cache_file.stat()
total_size += stat.st_size
with open(cache_file, "r", encoding="utf-8") as f:
cached = json.load(f)
ts = cached.get("_cache_timestamp", 0)
query = cached.get("_cache_query", "unknown")
provider = cached.get("_cache_provider", "unknown")
provider_counts[provider] = provider_counts.get(provider, 0) + 1
if oldest_time is None or ts < oldest_time:
oldest_time = ts
oldest_query = query
if newest_time is None or ts > newest_time:
newest_time = ts
newest_query = query
except (json.JSONDecodeError, IOError):
pass
return {
"total_entries": len(entries),
"total_size_bytes": total_size,
"total_size_kb": round(total_size / 1024, 2),
"providers": provider_counts,
"oldest": {
"timestamp": oldest_time,
"age_seconds": int(time.time() - oldest_time) if oldest_time else None,
"query": oldest_query
} if oldest_time else None,
"newest": {
"timestamp": newest_time,
"age_seconds": int(time.time() - newest_time) if newest_time else None,
"query": newest_query
} if newest_time else None,
"cache_dir": str(CACHE_DIR),
"exists": True
}
# =============================================================================
# Auto-load .env from skill directory (if exists)
# =============================================================================
def _load_env_file():
"""Load .env file from skill root directory if it exists."""
env_path = Path(__file__).parent.parent / ".env"
if env_path.exists():
with open(env_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
# Handle export VAR=value or VAR=value
if line.startswith("export "):
line = line[7:]
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
_load_env_file()
# =============================================================================
# Configuration
# =============================================================================
DEFAULT_CONFIG = {
"defaults": {
"provider": "serper",
"max_results": 5
},
"auto_routing": {
"enabled": True,
"fallback_provider": "serper",
"provider_priority": ["tavily", "querit", "exa", "perplexity", "serper", "you", "searxng"],
"disabled_providers": [],
"confidence_threshold": 0.3, # Below this, note low confidence
},
"serper": {
"country": "us",
"language": "en",
"type": "search"
},
"tavily": {
"depth": "basic",
"topic": "general"
},
"querit": {
"base_url": "https://api.querit.ai",
"base_path": "/v1/search",
"timeout": 10
},
"exa": {
"type": "neural",
"depth": "normal",
"verbosity": "standard"
},
"perplexity": {
"api_url": "https://api.kilo.ai/api/gateway/chat/completions",
"model": "perplexity/sonar-pro"
},
"you": {
"country": "us",
"safesearch": "moderate"
},
"searxng": {
"instance_url": None, # Required - user must set their own instance
"safesearch": 0, # 0=off, 1=moderate, 2=strict
"engines": None, # Optional list of engines to use
"language": "en"
}
}
def load_config() -> Dict[str, Any]:
"""Load configuration from config.json if it exists, with defaults."""
config = DEFAULT_CONFIG.copy()
config_path = Path(__file__).parent.parent / "config.json"
if config_path.exists():
try:
with open(config_path) as f:
user_config = json.load(f)
for key, value in user_config.items():
if isinstance(value, dict) and key in config:
config[key] = {**config.get(key, {}), **value}
else:
config[key] = value
except (json.JSONDecodeError, IOError) as e:
print(json.dumps({
"warning": f"Could not load config.json: {e}",
"using": "default configuration"
}), file=sys.stderr)
return config
def get_api_key(provider: str, config: Dict[str, Any] = None) -> Optional[str]:
"""Get API key for provider from config.json or environment.
Priority: config.json > .env > environment variable
Note: SearXNG doesn't require an API key, but returns instance_url if configured.
"""
# Special case: SearXNG uses instance_url instead of API key
if provider == "searxng":
return get_searxng_instance_url(config)
# Check config.json first
if config:
provider_config = config.get(provider, {})
if isinstance(provider_config, dict):
key = provider_config.get("api_key") or provider_config.get("apiKey")
if key:
return key
# Then check environment
if provider == "perplexity":
return os.environ.get("PERPLEXITY_API_KEY") or os.environ.get("KILOCODE_API_KEY")
key_map = {
"serper": "SERPER_API_KEY",
"tavily": "TAVILY_API_KEY",
"querit": "QUERIT_API_KEY",
"exa": "EXA_API_KEY",
"you": "YOU_API_KEY",
}
return os.environ.get(key_map.get(provider, ""))
def _validate_searxng_url(url: str) -> str:
"""Validate and sanitize SearXNG instance URL to prevent SSRF.
Enforces http/https scheme and blocks requests to private/internal networks
including cloud metadata endpoints, loopback, link-local, and RFC1918 ranges.
"""
import ipaddress
import socket
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"SearXNG URL must use http or https scheme, got: {parsed.scheme}")
if not parsed.hostname:
raise ValueError("SearXNG URL must include a hostname")
hostname = parsed.hostname
# Block cloud metadata endpoints by hostname
BLOCKED_HOSTS = {
"169.254.169.254", # AWS/GCP/Azure metadata
"metadata.google.internal",
"metadata.internal",
}
if hostname in BLOCKED_HOSTS:
raise ValueError(f"SearXNG URL blocked: {hostname} is a cloud metadata endpoint")
# Resolve hostname and check for private/internal IPs
# Operators who intentionally self-host on private networks can opt out
allow_private = os.environ.get("SEARXNG_ALLOW_PRIVATE", "").strip() == "1"
if not allow_private:
try:
resolved_ips = socket.getaddrinfo(hostname, parsed.port or 80, proto=socket.IPPROTO_TCP)
for family, _type, _proto, _canonname, sockaddr in resolved_ips:
ip = ipaddress.ip_address(sockaddr[0])
if ip.is_loopback or ip.is_private or ip.is_link_local or ip.is_reserved:
raise ValueError(
f"SearXNG URL blocked: {hostname} resolves to private/internal IP {ip}. "
f"If this is intentional, set SEARXNG_ALLOW_PRIVATE=1 in your environment."
)
except socket.gaierror:
raise ValueError(f"SearXNG URL blocked: cannot resolve hostname {hostname}")
return url
def get_searxng_instance_url(config: Dict[str, Any] = None) -> Optional[str]:
"""Get SearXNG instance URL from config or environment.
SearXNG is self-hosted, so no API key needed - just the instance URL.
Priority: config.json > SEARXNG_INSTANCE_URL environment variable
Security: URL is validated to prevent SSRF via scheme enforcement.
Both config sources (config.json, env var) are operator-controlled,
not agent-controlled, so private IPs like localhost are permitted.
"""
# Check config.json first
if config:
searxng_config = config.get("searxng", {})
if isinstance(searxng_config, dict):
url = searxng_config.get("instance_url")
if url:
return _validate_searxng_url(url)
# Then check environment
env_url = os.environ.get("SEARXNG_INSTANCE_URL")
if env_url:
return _validate_searxng_url(env_url)
return None
# Backward compatibility alias
def get_env_key(provider: str) -> Optional[str]:
"""Get API key for provider from environment (legacy function)."""
return get_api_key(provider)
def validate_api_key(provider: str, config: Dict[str, Any] = None) -> str:
"""Validate and return API key (or instance URL for SearXNG), with helpful error messages."""
key = get_api_key(provider, config)
# Special handling for SearXNG - it needs instance URL, not API key
if provider == "searxng":
if not key:
error_msg = {
"error": "Missing SearXNG instance URL",
"env_var": "SEARXNG_INSTANCE_URL",
"how_to_fix": [
"1. Set up your own SearXNG instance: https://docs.searxng.org/admin/installation.html",
"2. Add to config.json: \"searxng\": {\"instance_url\": \"https://your-instance.example.com\"}",
"3. Or set environment variable: export SEARXNG_INSTANCE_URL=\"https://your-instance.example.com\"",
"Note: SearXNG requires a self-hosted instance with JSON format enabled.",
],
"provider": provider
}
raise ProviderConfigError(json.dumps(error_msg))
# Validate URL format
if not key.startswith(("http://", "https://")):
raise ProviderConfigError(json.dumps({
"error": "SearXNG instance URL must start with http:// or https://",
"provided": key,
"provider": provider
}))
return key
if not key:
env_var = {
"serper": "SERPER_API_KEY",
"tavily": "TAVILY_API_KEY",
"querit": "QUERIT_API_KEY",
"exa": "EXA_API_KEY",
"you": "YOU_API_KEY",
"perplexity": "KILOCODE_API_KEY"
}[provider]
urls = {
"serper": "https://serper.dev",
"tavily": "https://tavily.com",
"querit": "https://querit.ai",
"exa": "https://exa.ai",
"you": "https://api.you.com",
"perplexity": "https://api.kilo.ai"
}
error_msg = {
"error": f"Missing API key for {provider}",
"env_var": env_var,
"how_to_fix": [
f"1. Get your API key from {urls[provider]}",
f"2. Add to config.json: \"{provider}\": {{\"api_key\": \"your-key\"}}",
f"3. Or set environment variable: export {env_var}=\"your-key\"",
],
"provider": provider
}
raise ProviderConfigError(json.dumps(error_msg))
if len(key) < 10:
raise ProviderConfigError(json.dumps({
"error": f"API key for {provider} appears invalid (too short)",
"provider": provider
}))
return key
# =============================================================================
# Intelligent Auto-Routing Engine
# =============================================================================
class QueryAnalyzer:
"""
Intelligent query analysis for smart provider routing.
Uses multi-signal analysis:
- Intent classification (shopping, research, discovery, local, news)
- Linguistic patterns (question structure, phrase patterns)
- Entity detection (products, brands, URLs, dates)
- Complexity assessment
"""
# Intent signal patterns with weights
# Higher weight = stronger signal for that provider
SHOPPING_SIGNALS = {
# Price patterns (very strong)
r'\bhow much\b': 4.0,
r'\bprice of\b': 4.0,
r'\bcost of\b': 4.0,
r'\bprices?\b': 3.0,
r'\$\d+|\d+\s*dollars?': 3.0,
r'\d+|\d+\s*euros?': 3.0,
r'£\d+|\d+\s*pounds?': 3.0,
# German price patterns (sehr stark)
r'\bpreis(e)?\b': 3.5,
r'\bkosten\b': 3.0,
r'\bwieviel\b': 3.5,
r'\bwie viel\b': 3.5,
r'\bwas kostet\b': 4.0,
# Purchase intent (strong)
r'\bbuy\b': 3.5,
r'\bpurchase\b': 3.5,
r'\border\b(?!\s+by)': 3.0, # "order" but not "order by"
r'\bshopping\b': 3.5,
r'\bshop for\b': 3.5,
r'\bwhere to (buy|get|purchase)\b': 4.0,
# German purchase intent (stark)
r'\bkaufen\b': 3.5,
r'\bbestellen\b': 3.5,
r'\bwo kaufen\b': 4.0,
r'\bhändler\b': 3.0,
r'\bshop\b': 2.5,
# Deal/discount signals
r'\bdeal(s)?\b': 3.0,
r'\bdiscount(s)?\b': 3.0,
r'\bsale\b': 2.5,
r'\bcheap(er|est)?\b': 3.0,
r'\baffordable\b': 2.5,
r'\bbudget\b': 2.5,
r'\bbest price\b': 3.5,
r'\bcompare prices\b': 3.5,
r'\bcoupon\b': 3.0,
# German deal/discount signals
r'\bgünstig(er|ste)?\b': 3.0,
r'\bbillig(er|ste)?\b': 3.0,
r'\bangebot(e)?\b': 3.0,
r'\brabatt\b': 3.0,
r'\baktion\b': 2.5,
r'\bschnäppchen\b': 3.0,
# Product comparison
r'\bvs\.?\b': 2.0,
r'\bversus\b': 2.0,
r'\bor\b.*\bwhich\b': 2.0,
r'\bspecs?\b': 2.5,
r'\bspecifications?\b': 2.5,
r'\breview(s)?\b': 2.0,
r'\brating(s)?\b': 2.0,
r'\bunboxing\b': 2.5,
# German product comparison
r'\btest\b': 2.5,
r'\bbewertung(en)?\b': 2.5,
r'\btechnische daten\b': 3.0,
r'\bspezifikationen\b': 2.5,
}
RESEARCH_SIGNALS = {
# Explanation patterns (very strong)
r'\bhow does\b': 4.0,
r'\bhow do\b': 3.5,
r'\bwhy does\b': 4.0,
r'\bwhy do\b': 3.5,
r'\bwhy is\b': 3.5,
r'\bexplain\b': 4.0,
r'\bexplanation\b': 4.0,
r'\bwhat is\b': 3.0,
r'\bwhat are\b': 3.0,
r'\bdefine\b': 3.5,
r'\bdefinition of\b': 3.5,
r'\bmeaning of\b': 3.0,
# Analysis patterns (strong)
r'\banalyze\b': 3.5,
r'\banalysis\b': 3.5,
r'\bcompare\b(?!\s*prices?)': 3.0, # compare but not "compare prices"
r'\bcomparison\b': 3.0,
r'\bstatus of\b': 3.5,
r'\bstatus\b': 2.5,
r'\bwhat happened with\b': 4.0,
r'\bpros and cons\b': 4.0,
r'\badvantages?\b': 3.0,
r'\bdisadvantages?\b': 3.0,
r'\bbenefits?\b': 2.5,
r'\bdrawbacks?\b': 3.0,
r'\bdifference between\b': 3.5,
# Learning patterns
r'\bunderstand\b': 3.0,
r'\blearn(ing)?\b': 2.5,
r'\btutorial\b': 3.0,
r'\bguide\b': 2.5,
r'\bhow to\b': 2.0, # Lower weight - could be shopping too
r'\bstep by step\b': 3.0,
# Depth signals
r'\bin[- ]depth\b': 3.0,
r'\bdetailed\b': 2.5,
r'\bcomprehensive\b': 3.0,
r'\bthorough\b': 2.5,
r'\bdeep dive\b': 3.5,
r'\boverall\b': 2.0,
r'\bsummary\b': 2.0,
# Academic patterns
r'\bstudy\b': 2.5,
r'\bresearch shows\b': 3.5,
r'\baccording to\b': 2.5,
r'\bevidence\b': 3.0,
r'\bscientific\b': 3.0,
r'\bhistory of\b': 3.0,
r'\bbackground\b': 2.5,
r'\bcontext\b': 2.5,
r'\bimplications?\b': 3.0,
# German explanation patterns (sehr stark)
r'\bwie funktioniert\b': 4.0,
r'\bwarum\b': 3.5,
r'\berklär(en|ung)?\b': 4.0,
r'\bwas ist\b': 3.0,
r'\bwas sind\b': 3.0,
r'\bbedeutung\b': 3.0,
# German analysis patterns
r'\banalyse\b': 3.5,
r'\bvergleich(en)?\b': 3.0,
r'\bvor- und nachteile\b': 4.0,
r'\bvorteile\b': 3.0,
r'\bnachteile\b': 3.0,
r'\bunterschied(e)?\b': 3.5,
# German learning patterns
r'\bverstehen\b': 3.0,
r'\blernen\b': 2.5,
r'\banleitung\b': 3.0,
r'\bübersicht\b': 2.5,
r'\bhintergrund\b': 2.5,
r'\bzusammenfassung\b': 2.5,
}
DISCOVERY_SIGNALS = {
# Similarity patterns (very strong)
r'\bsimilar to\b': 5.0,
r'\blike\s+\w+\.com': 4.5, # "like notion.com"
r'\balternatives? to\b': 5.0,
r'\bcompetitors? (of|to)\b': 4.5,
r'\bcompeting with\b': 4.0,
r'\brivals? (of|to)\b': 4.0,
r'\binstead of\b': 3.0,
r'\breplacement for\b': 3.5,
# Company/startup patterns (strong)
r'\bcompanies (like|that|doing|building)\b': 4.5,
r'\bstartups? (like|that|doing|building)\b': 4.5,
r'\bwho else\b': 4.0,
r'\bother (companies|startups|tools|apps)\b': 3.5,
r'\bfind (companies|startups|tools|examples?)\b': 4.5,
r'\bevents? in\b': 4.0,
r'\bthings to do in\b': 4.5,
# Funding/business patterns
r'\bseries [a-d]\b': 4.0,
r'\byc\b|y combinator': 4.0,
r'\bfund(ed|ing|raise)\b': 3.5,
r'\bventure\b': 3.0,
r'\bvaluation\b': 3.0,
# Category patterns
r'\bresearch papers? (on|about)\b': 4.0,
r'\barxiv\b': 4.5,
r'\bgithub (projects?|repos?)\b': 4.5,
r'\bopen source\b.*\bprojects?\b': 4.0,
r'\btweets? (about|on)\b': 3.5,
r'\bblogs? (about|on|like)\b': 3.0,
# URL detection (very strong signal for Exa similar)
r'https?://[^\s]+': 5.0,
r'\b\w+\.(com|org|io|ai|co|dev)\b': 3.5,
}
LOCAL_NEWS_SIGNALS = {
# Local patterns → Serper
r'\bnear me\b': 4.0,
r'\bnearby\b': 3.5,
r'\blocal\b': 3.0,
r'\bin (my )?(city|area|town|neighborhood)\b': 3.5,
r'\brestaurants?\b': 2.5,
r'\bhotels?\b': 2.5,
r'\bcafes?\b': 2.5,
r'\bstores?\b': 2.0,
r'\bdirections? to\b': 3.5,
r'\bmap of\b': 3.0,
r'\bphone number\b': 3.0,
r'\baddress of\b': 3.0,
r'\bopen(ing)? hours\b': 3.0,
# Weather/time
r'\bweather\b': 4.0,
r'\bforecast\b': 3.5,
r'\btemperature\b': 3.0,
r'\btime in\b': 3.0,
# News/recency patterns → Serper (or Tavily for news depth)
r'\blatest\b': 2.5,
r'\brecent\b': 2.5,
r'\btoday\b': 2.5,
r'\bbreaking\b': 3.5,
r'\bnews\b': 2.5,
r'\bheadlines?\b': 3.0,
r'\b202[4-9]\b': 2.0, # Current year mentions
r'\blast (week|month|year)\b': 2.0,
# German local patterns
r'\bin der nähe\b': 4.0,
r'\bin meiner nähe\b': 4.0,
r'\böffnungszeiten\b': 3.0,
r'\badresse von\b': 3.0,
r'\bweg(beschreibung)? nach\b': 3.5,
# German news/recency patterns
r'\bheute\b': 2.5,
r'\bmorgen\b': 2.0,
r'\baktuell\b': 2.5,
r'\bnachrichten\b': 3.0,
}
# RAG/AI signals → You.com
# You.com excels at providing LLM-ready snippets and combined web+news
RAG_SIGNALS = {
# RAG/context patterns (strong signal for You.com)
r'\brag\b': 4.5,
r'\bcontext for\b': 4.0,
r'\bsummarize\b': 3.5,
r'\bbrief(ly)?\b': 3.0,
r'\bquick overview\b': 3.5,
r'\btl;?dr\b': 4.0,
r'\bkey (points|facts|info)\b': 3.5,
r'\bmain (points|takeaways)\b': 3.5,
# Combined web + news queries
r'\b(web|online)\s+and\s+news\b': 4.0,
r'\ball sources\b': 3.5,
r'\bcomprehensive (search|overview)\b': 3.5,
r'\blatest\s+(news|updates)\b': 3.0,
r'\bcurrent (events|situation|status)\b': 3.5,
# Real-time information needs
r'\bright now\b': 3.0,
r'\bas of today\b': 3.5,
r'\bup.to.date\b': 3.5,
r'\breal.time\b': 4.0,
r'\blive\b': 2.5,
# Information synthesis
r'\bwhat\'?s happening with\b': 3.5,
r'\bwhat\'?s the latest\b': 4.0,
r'\bupdates?\s+on\b': 3.5,
r'\bstatus of\b': 3.0,
r'\bsituation (in|with|around)\b': 3.5,
}
# Direct answer / synthesis signals → Perplexity via Kilo Gateway
DIRECT_ANSWER_SIGNALS = {
r'\bwhat is\b': 3.0,
r'\bwhat are\b': 2.5,
r'\bcurrent status\b': 4.0,
r'\bstatus of\b': 3.5,
r'\bstatus\b': 2.5,
r'\bwhat happened with\b': 4.0,
r"\bwhat'?s happening with\b": 4.0,
r'\bas of (today|now)\b': 4.0,
r'\bthis weekend\b': 3.5,
r'\bevents? in\b': 3.5,
r'\bthings to do in\b': 4.0,
r'\bnear me\b': 3.0,
r'\bcan you (tell me|summarize|explain)\b': 3.5,
# German
r'\bwann\b': 3.0,
r'\bwer\b': 3.0,
r'\bwo\b': 2.5,
r'\bwie viele\b': 3.0,
}
# Privacy/Multi-source signals → SearXNG (self-hosted meta-search)
# SearXNG is ideal for privacy-focused queries and aggregating multiple sources
PRIVACY_SIGNALS = {
# Privacy signals (very strong)
r'\bprivate(ly)?\b': 4.0,
r'\banonymous(ly)?\b': 4.0,
r'\bwithout tracking\b': 4.5,
r'\bno track(ing)?\b': 4.5,
r'\bprivacy\b': 3.5,
r'\bprivacy.?focused\b': 4.5,
r'\bprivacy.?first\b': 4.5,
r'\bduckduckgo alternative\b': 4.5,
r'\bprivate search\b': 5.0,
# German privacy signals
r'\bprivat\b': 4.0,
r'\banonym\b': 4.0,
r'\bohne tracking\b': 4.5,
r'\bdatenschutz\b': 4.0,
# Multi-source aggregation signals
r'\baggregate results?\b': 4.0,
r'\bmultiple sources?\b': 4.0,
r'\bdiverse (results|perspectives|sources)\b': 4.0,
r'\bfrom (all|multiple|different) (engines?|sources?)\b': 4.5,
r'\bmeta.?search\b': 5.0,
r'\ball engines?\b': 4.0,
# German multi-source signals
r'\bverschiedene quellen\b': 4.0,
r'\baus mehreren quellen\b': 4.0,
r'\balle suchmaschinen\b': 4.5,
# Budget/free signals (SearXNG is self-hosted = $0 API cost)
r'\bfree search\b': 3.5,
r'\bno api cost\b': 4.0,
r'\bself.?hosted search\b': 5.0,
r'\bzero cost\b': 3.5,
r'\bbudget\b(?!\s*(laptop|phone|option))\b': 2.5, # "budget" alone, not "budget laptop"
# German budget signals
r'\bkostenlos(e)?\s+suche\b': 3.5,
r'\bkeine api.?kosten\b': 4.0,
}
# Exa Deep Search signals → deep multi-source synthesis
EXA_DEEP_SIGNALS = {
r'\bsynthesi[sz]e\b': 5.0,
r'\bdeep research\b': 5.0,
r'\bcomprehensive (analysis|report|overview|survey)\b': 4.5,
r'\bacross (multiple|many|several) (sources|documents|papers)\b': 4.5,
r'\baggregat(e|ing) (information|data|results)\b': 4.0,
r'\bcross.?referenc': 4.5,
r'\bsec filings?\b': 4.5,
r'\bannual reports?\b': 4.0,
r'\bearnings (call|report|transcript)\b': 4.5,
r'\bfinancial analysis\b': 4.0,
r'\bliterature (review|survey)\b': 5.0,
r'\bacademic literature\b': 4.5,
r'\bstate of the (art|field|industry)\b': 4.0,
r'\bcompile (a |the )?(report|findings|results)\b': 4.5,
r'\bsummariz(e|ing) (research|papers|studies)\b': 4.0,
r'\bmultiple documents?\b': 4.0,
r'\bdossier\b': 4.5,
r'\bdue diligence\b': 4.5,
r'\bstructured (output|data|report)\b': 4.0,
r'\bmarket research\b': 4.0,
r'\bindustry (report|analysis|overview)\b': 4.0,
r'\bresearch (on|about|into)\b': 4.0,
r'\bwhitepaper\b': 4.5,
r'\btechnical report\b': 4.0,
r'\bsurvey of\b': 4.5,
r'\bmeta.?analysis\b': 5.0,
r'\bsystematic review\b': 5.0,
r'\bcase study\b': 3.5,
r'\bbenchmark(s|ing)?\b': 3.5,
# German
r'\btiefenrecherche\b': 5.0,
r'\bumfassende (analyse|übersicht|recherche)\b': 4.5,
r'\baus mehreren quellen zusammenfassen\b': 4.5,
r'\bmarktforschung\b': 4.0,
}
# Exa Deep Reasoning signals → complex cross-reference analysis
EXA_DEEP_REASONING_SIGNALS = {
r'\bdeep.?reasoning\b': 6.0,
r'\bcomplex (analysis|reasoning|research)\b': 4.5,
r'\bcontradictions?\b': 4.5,
r'\breconcil(e|ing)\b': 5.0,
r'\bcritical(ly)? analyz': 4.5,
r'\bweigh(ing)? (the )?evidence\b': 4.5,
r'\bcompeting (claims|theories|perspectives)\b': 4.5,
r'\bcomplex financial\b': 4.5,
r'\bregulatory (analysis|compliance|landscape)\b': 4.5,
r'\blegal analysis\b': 4.5,
r'\bcomprehensive (due diligence|investigation)\b': 5.0,
r'\bpatent (landscape|analysis|search)\b': 4.5,
r'\bmarket intelligence\b': 4.5,
r'\bcompetitive (intelligence|landscape)\b': 4.5,
r'\btrade.?offs?\b': 4.0,
r'\bpros and cons of\b': 4.0,
r'\bshould I (use|choose|pick)\b': 3.5,
r'\bwhich is better\b': 4.0,
# German
r'\bkomplexe analyse\b': 4.5,
r'\bwidersprüche\b': 4.5,
r'\bquellen abwägen\b': 4.5,
r'\brechtliche analyse\b': 4.5,
r'\bvergleich(e|en)?\b': 3.5,
}
# Brand/product patterns for shopping detection
BRAND_PATTERNS = [
# Tech brands
r'\b(apple|iphone|ipad|macbook|airpods?)\b',
r'\b(samsung|galaxy)\b',
r'\b(google|pixel)\b',
r'\b(microsoft|surface|xbox)\b',
r'\b(sony|playstation)\b',
r'\b(nvidia|geforce|rtx)\b',
r'\b(amd|ryzen|radeon)\b',
r'\b(intel|core i[3579])\b',
r'\b(dell|hp|lenovo|asus|acer)\b',
r'\b(lg|tcl|hisense)\b',
# Product categories
r'\b(laptop|phone|tablet|tv|monitor|headphones?|earbuds?)\b',
r'\b(camera|lens|drone)\b',
r'\b(watch|smartwatch|fitbit|garmin)\b',
r'\b(router|modem|wifi)\b',
r'\b(keyboard|mouse|gaming)\b',
]
def __init__(self, config: Dict[str, Any]):
self.config = config
self.auto_config = config.get("auto_routing", DEFAULT_CONFIG["auto_routing"])
def _calculate_signal_score(
self,
query: str,
signals: Dict[str, float]
) -> Tuple[float, List[Dict[str, Any]]]:
"""
Calculate score for a signal category.
Returns (total_score, list of matched signals with details).
"""
query_lower = query.lower()
matches = []
total_score = 0.0
for pattern, weight in signals.items():
regex = re.compile(pattern, re.IGNORECASE)
found = regex.findall(query_lower)
if found:
# Normalize found matches
match_text = found[0] if isinstance(found[0], str) else found[0][0] if found[0] else pattern
matches.append({
"pattern": pattern,
"matched": match_text,
"weight": weight
})
total_score += weight
return total_score, matches
def _detect_product_brand_combo(self, query: str) -> float:
"""
Detect product + brand combinations which strongly indicate shopping intent.
Returns a bonus score.
"""
query_lower = query.lower()
brand_found = False
product_found = False
for pattern in self.BRAND_PATTERNS:
if re.search(pattern, query_lower, re.IGNORECASE):
brand_found = True
break
# Check for product indicators
product_indicators = [
r'\b(buy|price|specs?|review|vs|compare)\b',
r'\b(pro|max|plus|mini|ultra|lite)\b', # Product tier names
r'\b\d+\s*(gb|tb|inch|mm|hz)\b', # Specifications
]
for pattern in product_indicators:
if re.search(pattern, query_lower, re.IGNORECASE):
product_found = True
break
if brand_found and product_found:
return 3.0 # Strong shopping signal
elif brand_found:
return 1.5 # Moderate shopping signal
return 0.0
def _detect_url(self, query: str) -> Optional[str]:
"""Detect URLs in query - strong signal for Exa similar search."""
url_pattern = r'https?://[^\s]+'
match = re.search(url_pattern, query)
if match:
return match.group()
# Also check for domain-like patterns
domain_pattern = r'\b(\w+\.(com|org|io|ai|co|dev|net|app))\b'
match = re.search(domain_pattern, query, re.IGNORECASE)
if match:
return match.group()
return None
def _assess_query_complexity(self, query: str) -> Dict[str, Any]:
"""
Assess query complexity - complex queries favor Tavily.
"""
words = query.split()
word_count = len(words)
# Count question words
question_words = len(re.findall(
r'\b(what|why|how|when|where|which|who|whose|whom)\b',
query, re.IGNORECASE
))
# Check for multiple clauses
clause_markers = len(re.findall(
r'\b(and|but|or|because|since|while|although|if|when)\b',
query, re.IGNORECASE
))
complexity_score = 0.0
if word_count > 10:
complexity_score += 1.5
if word_count > 20:
complexity_score += 1.0
if question_words > 1:
complexity_score += 1.0
if clause_markers > 0:
complexity_score += 0.5 * clause_markers
return {
"word_count": word_count,
"question_words": question_words,
"clause_markers": clause_markers,
"complexity_score": complexity_score,
"is_complex": complexity_score > 2.0
}
def _detect_recency_intent(self, query: str) -> Tuple[bool, float]:
"""
Detect if query wants recent/timely information.
Returns (is_recency_focused, score).
"""
recency_patterns = [
(r'\b(latest|newest|recent|current)\b', 2.5),
(r'\b(today|yesterday|this week|this month)\b', 3.0),
(r'\b(202[4-9]|2030)\b', 2.0),
(r'\b(breaking|live|just|now)\b', 3.0),
(r'\blast (hour|day|week|month)\b', 2.5),
]
total = 0.0
for pattern, weight in recency_patterns:
if re.search(pattern, query, re.IGNORECASE):
total += weight
return total > 2.0, total
def analyze(self, query: str) -> Dict[str, Any]:
"""
Perform comprehensive query analysis.
Returns detailed analysis with scores for each provider.
"""
# Calculate scores for each intent category
shopping_score, shopping_matches = self._calculate_signal_score(
query, self.SHOPPING_SIGNALS
)
research_score, research_matches = self._calculate_signal_score(
query, self.RESEARCH_SIGNALS
)
discovery_score, discovery_matches = self._calculate_signal_score(
query, self.DISCOVERY_SIGNALS
)
local_news_score, local_news_matches = self._calculate_signal_score(
query, self.LOCAL_NEWS_SIGNALS
)
rag_score, rag_matches = self._calculate_signal_score(
query, self.RAG_SIGNALS
)
privacy_score, privacy_matches = self._calculate_signal_score(
query, self.PRIVACY_SIGNALS
)
direct_answer_score, direct_answer_matches = self._calculate_signal_score(
query, self.DIRECT_ANSWER_SIGNALS
)
exa_deep_score, exa_deep_matches = self._calculate_signal_score(
query, self.EXA_DEEP_SIGNALS
)
exa_deep_reasoning_score, exa_deep_reasoning_matches = self._calculate_signal_score(
query, self.EXA_DEEP_REASONING_SIGNALS
)
# Apply product/brand bonus to shopping
brand_bonus = self._detect_product_brand_combo(query)
if brand_bonus > 0:
shopping_score += brand_bonus
shopping_matches.append({
"pattern": "product_brand_combo",
"matched": "brand + product detected",
"weight": brand_bonus
})
# Detect URL → strong Exa signal
detected_url = self._detect_url(query)
if detected_url:
discovery_score += 5.0
discovery_matches.append({
"pattern": "url_detected",
"matched": detected_url,
"weight": 5.0
})
# Assess complexity → favors Tavily
complexity = self._assess_query_complexity(query)
if complexity["is_complex"]:
research_score += complexity["complexity_score"]
research_matches.append({
"pattern": "query_complexity",
"matched": f"complex query ({complexity['word_count']} words)",
"weight": complexity["complexity_score"]
})
# Check recency intent
is_recency, recency_score = self._detect_recency_intent(query)
# Map intents to providers with final scores
provider_scores = {
"serper": shopping_score + local_news_score + (recency_score * 0.35),
"tavily": research_score + (complexity["complexity_score"] if not complexity["is_complex"] else 0) + (0.2 * recency_score),
"querit": (research_score * 0.65) + (rag_score * 0.35) + (recency_score * 0.45),
"exa": discovery_score + (1.0 if re.search(r"\b(similar|alternatives?|examples?)\b", query, re.IGNORECASE) else 0.0) + (exa_deep_score * 0.5) + (exa_deep_reasoning_score * 0.5),
"perplexity": direct_answer_score + (local_news_score * 0.4) + (recency_score * 0.55),
"you": rag_score + (recency_score * 0.25), # You.com good for real-time + RAG
"searxng": privacy_score, # SearXNG for privacy/multi-source queries
}
# Build match details per provider
provider_matches = {
"serper": shopping_matches + local_news_matches,
"tavily": research_matches,
"querit": research_matches,
"exa": discovery_matches + exa_deep_matches + exa_deep_reasoning_matches,
"perplexity": direct_answer_matches,
"you": rag_matches,
"searxng": privacy_matches,
}
return {
"query": query,
"provider_scores": provider_scores,
"provider_matches": provider_matches,
"detected_url": detected_url,
"complexity": complexity,
"recency_focused": is_recency,
"recency_score": recency_score,
"exa_deep_score": exa_deep_score,
"exa_deep_reasoning_score": exa_deep_reasoning_score,
}
def route(self, query: str) -> Dict[str, Any]:
"""
Route query to optimal provider with confidence scoring.
"""
analysis = self.analyze(query)
scores = analysis["provider_scores"]
# Filter to available providers
disabled = set(self.auto_config.get("disabled_providers", []))
available = {
p: s for p, s in scores.items()
if p not in disabled and get_api_key(p, self.config)
}
if not available:
# No providers available, use fallback
fallback = self.auto_config.get("fallback_provider", "serper")
return {
"provider": fallback,
"confidence": 0.0,
"confidence_level": "low",
"reason": "no_available_providers",
"scores": scores,
"top_signals": [],
"analysis": analysis,
}
# Find the winner
max_score = max(available.values())
total_score = sum(available.values()) or 1.0
# Handle ties using priority
priority = self.auto_config.get("provider_priority", ["tavily", "querit", "exa", "perplexity", "serper", "you", "searxng"])
winners = [p for p, s in available.items() if s == max_score]
if len(winners) > 1:
# Use priority to break tie
for p in priority:
if p in winners:
winner = p
break
else:
winner = winners[0]
else:
winner = winners[0]
# Calculate confidence
# High confidence = clear winner with good margin
if max_score == 0:
confidence = 0.0
reason = "no_signals_matched"
else:
# Confidence based on:
# 1. Absolute score (is it strong enough?)
# 2. Relative margin (is there a clear winner?)
second_best = sorted(available.values(), reverse=True)[1] if len(available) > 1 else 0
margin = (max_score - second_best) / max_score if max_score > 0 else 0
# Normalize score to 0-1 range (assuming max reasonable score ~15)
normalized_score = min(max_score / 15.0, 1.0)
# Confidence is combination of absolute strength and relative margin
confidence = round((normalized_score * 0.6 + margin * 0.4), 3)
if confidence >= 0.7:
reason = "high_confidence_match"
elif confidence >= 0.4:
reason = "moderate_confidence_match"
else:
reason = "low_confidence_match"
# Get top signals for the winning provider
matches = analysis["provider_matches"].get(winner, [])
top_signals = sorted(matches, key=lambda x: x["weight"], reverse=True)[:5]
# Special case: URL detected and Exa available → strong recommendation
if analysis["detected_url"] and "exa" in available:
if winner != "exa":
# Override if URL is present but didn't win
# (user might want similar search)
pass # Keep current winner but note it
# Determine Exa search depth when routed to Exa
exa_depth = "normal"
if winner == "exa":
deep_r_score = analysis.get("exa_deep_reasoning_score", 0)
deep_score = analysis.get("exa_deep_score", 0)
if deep_r_score >= 4.0:
exa_depth = "deep-reasoning"
elif deep_score >= 4.0:
exa_depth = "deep"
# Build detailed routing result
threshold = self.auto_config.get("confidence_threshold", 0.3)
return {
"provider": winner,
"confidence": confidence,
"confidence_level": "high" if confidence >= 0.7 else "medium" if confidence >= 0.4 else "low",
"reason": reason,
"exa_depth": exa_depth,
"scores": {p: round(s, 2) for p, s in available.items()},
"winning_score": round(max_score, 2),
"top_signals": [
{"matched": s["matched"], "weight": s["weight"]}
for s in top_signals
],
"below_threshold": confidence < threshold,
"analysis_summary": {
"query_length": len(query.split()),
"is_complex": analysis["complexity"]["is_complex"],
"has_url": analysis["detected_url"] is not None,
"recency_focused": analysis["recency_focused"],
}
}
def auto_route_provider(query: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Intelligently route query to the best provider.
Returns detailed routing decision with confidence.
"""
analyzer = QueryAnalyzer(config)
return analyzer.route(query)
def explain_routing(query: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Provide detailed explanation of routing decision for debugging.
"""
analyzer = QueryAnalyzer(config)
analysis = analyzer.analyze(query)
routing = analyzer.route(query)
return {
"query": query,
"routing_decision": {
"provider": routing["provider"],
"confidence": routing["confidence"],
"confidence_level": routing["confidence_level"],
"reason": routing["reason"],
"exa_depth": routing.get("exa_depth", "normal"),
},
"scores": routing["scores"],
"top_signals": routing["top_signals"],
"intent_breakdown": {
"shopping_signals": len(analysis["provider_matches"]["serper"]),
"research_signals": len(analysis["provider_matches"]["tavily"]),
"querit_signals": len(analysis["provider_matches"]["querit"]),
"discovery_signals": len(analysis["provider_matches"]["exa"]),
"rag_signals": len(analysis["provider_matches"]["you"]),
"exa_deep_score": round(analysis.get("exa_deep_score", 0), 2),
"exa_deep_reasoning_score": round(analysis.get("exa_deep_reasoning_score", 0), 2),
},
"query_analysis": {
"word_count": analysis["complexity"]["word_count"],
"is_complex": analysis["complexity"]["is_complex"],
"complexity_score": round(analysis["complexity"]["complexity_score"], 2),
"has_url": analysis["detected_url"],
"recency_focused": analysis["recency_focused"],
},
"all_matches": {
provider: [
{"matched": m["matched"], "weight": m["weight"]}
for m in matches
]
for provider, matches in analysis["provider_matches"].items()
if matches
},
"available_providers": [
p for p in ["serper", "tavily", "querit", "exa", "perplexity", "you", "searxng"]
if get_api_key(p, config) and p not in config.get("auto_routing", {}).get("disabled_providers", [])
]
}
class ProviderConfigError(Exception):
"""Raised when a provider is missing or has an invalid API key/config."""
pass
class ProviderRequestError(Exception):
"""Structured provider error with retry/cooldown metadata."""
def __init__(self, message: str, status_code: Optional[int] = None, transient: bool = False):
super().__init__(message)
self.status_code = status_code
self.transient = transient
TRANSIENT_HTTP_CODES = {429, 503}
COOLDOWN_STEPS_SECONDS = [60, 300, 1500, 3600] # 1m -> 5m -> 25m -> 1h cap
RETRY_BACKOFF_SECONDS = [1, 3, 9]
def _ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def _load_provider_health() -> Dict[str, Any]:
if not PROVIDER_HEALTH_FILE.exists():
return {}
try:
with open(PROVIDER_HEALTH_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except (json.JSONDecodeError, IOError):
return {}
def _save_provider_health(state: Dict[str, Any]) -> None:
_ensure_parent(PROVIDER_HEALTH_FILE)
with open(PROVIDER_HEALTH_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def provider_in_cooldown(provider: str) -> Tuple[bool, int]:
state = _load_provider_health()
pstate = state.get(provider, {})
cooldown_until = int(pstate.get("cooldown_until", 0) or 0)
remaining = cooldown_until - int(time.time())
return (remaining > 0, max(0, remaining))
def mark_provider_failure(provider: str, error_message: str) -> Dict[str, Any]:
state = _load_provider_health()
now = int(time.time())
pstate = state.get(provider, {})
fail_count = int(pstate.get("failure_count", 0)) + 1
cooldown_seconds = COOLDOWN_STEPS_SECONDS[min(fail_count - 1, len(COOLDOWN_STEPS_SECONDS) - 1)]
state[provider] = {
"failure_count": fail_count,
"cooldown_until": now + cooldown_seconds,
"cooldown_seconds": cooldown_seconds,
"last_error": error_message,
"last_failure_at": now,
}
_save_provider_health(state)
return state[provider]
def reset_provider_health(provider: str) -> None:
state = _load_provider_health()
if provider in state:
state.pop(provider, None)
_save_provider_health(state)
def _title_from_url(url: str) -> str:
"""Derive a readable title from a URL when none is provided."""
try:
parsed = urlparse(url)
domain = parsed.netloc.replace("www.", "")
# Use last meaningful path segment as context
segments = [s for s in parsed.path.strip("/").split("/") if s]
if segments:
last = segments[-1].replace("-", " ").replace("_", " ")
# Strip file extensions
last = re.sub(r'\.\w{2,4}$', '', last)
if last:
return f"{domain}{last[:80]}"
return domain
except Exception:
return url[:60]
def normalize_result_url(url: str) -> str:
if not url:
return ""
parsed = urlparse(url.strip())
netloc = (parsed.netloc or "").lower()
if netloc.startswith("www."):
netloc = netloc[4:]
path = parsed.path.rstrip("/")
return f"{netloc}{path}"
def deduplicate_results_across_providers(results_by_provider: List[Tuple[str, Dict[str, Any]]], max_results: int) -> Tuple[List[Dict[str, Any]], int]:
deduped = []
seen = set()
dedup_count = 0
for provider_name, data in results_by_provider:
for item in data.get("results", []):
norm = normalize_result_url(item.get("url", ""))
if norm and norm in seen:
dedup_count += 1
continue
if norm:
seen.add(norm)
item = item.copy()
item.setdefault("provider", provider_name)
deduped.append(item)
if len(deduped) >= max_results:
return deduped, dedup_count
return deduped, dedup_count
# =============================================================================
# HTTP Client
# =============================================================================
def make_request(url: str, headers: dict, body: dict, timeout: int = 30) -> dict:
"""Make HTTP POST request and return JSON response."""
# Ensure User-Agent is set (required by some APIs like Exa/Cloudflare)
if "User-Agent" not in headers:
headers["User-Agent"] = "ClawdBot-WebSearchPlus/2.1"
data = json.dumps(body).encode("utf-8")
req = Request(url, data=data, headers=headers, method="POST")
try:
with urlopen(req, timeout=timeout) as response:
return json.loads(response.read().decode("utf-8"))
except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else str(e)
try:
error_json = json.loads(error_body)
error_detail = error_json.get("error") or error_json.get("message") or error_body
except json.JSONDecodeError:
error_detail = error_body[:500]
error_messages = {
401: "Invalid or expired API key. Please check your credentials.",
403: "Access forbidden. Your API key may not have permission for this operation.",
429: "Rate limit exceeded. Please wait a moment and try again.",
500: "Server error. The search provider is experiencing issues.",
503: "Service unavailable. The search provider may be down."
}
friendly_msg = error_messages.get(e.code, f"API error: {error_detail}")
raise ProviderRequestError(f"{friendly_msg} (HTTP {e.code})", status_code=e.code, transient=e.code in TRANSIENT_HTTP_CODES)
except URLError as e:
reason = str(getattr(e, "reason", e))
is_timeout = "timed out" in reason.lower()
raise ProviderRequestError(f"Network error: {reason}. Check your internet connection.", transient=is_timeout)
except IncompleteRead as e:
partial_len = len(getattr(e, "partial", b"") or b"")
raise ProviderRequestError(
f"Connection interrupted while reading response ({partial_len} bytes received). Please retry.",
transient=True,
)
except TimeoutError:
raise ProviderRequestError(f"Request timed out after {timeout}s. Try again or reduce max_results.", transient=True)
# =============================================================================
# Serper (Google Search API)
# =============================================================================
def search_serper(
query: str,
api_key: str,
max_results: int = 5,
country: str = "us",
language: str = "en",
search_type: str = "search",
time_range: Optional[str] = None,
include_images: bool = False,
) -> dict:
"""Search using Serper (Google Search API)."""
endpoint = f"https://google.serper.dev/{search_type}"
body = {
"q": query,
"gl": country,
"hl": language,
"num": max_results,
"autocorrect": True,
}
if time_range and time_range != "none":
tbs_map = {
"hour": "qdr:h",
"day": "qdr:d",
"week": "qdr:w",
"month": "qdr:m",
"year": "qdr:y",
}
if time_range in tbs_map:
body["tbs"] = tbs_map[time_range]
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json",
}
data = make_request(endpoint, headers, body)
results = []
for i, item in enumerate(data.get("organic", [])[:max_results]):
results.append({
"title": item.get("title", ""),
"url": item.get("link", ""),
"snippet": item.get("snippet", ""),
"score": round(1.0 - i * 0.1, 2),
"date": item.get("date"),
})
answer = ""
if data.get("answerBox", {}).get("answer"):
answer = data["answerBox"]["answer"]
elif data.get("answerBox", {}).get("snippet"):
answer = data["answerBox"]["snippet"]
elif data.get("knowledgeGraph", {}).get("description"):
answer = data["knowledgeGraph"]["description"]
elif results:
answer = results[0]["snippet"]
images = []
if include_images:
try:
img_data = make_request(
"https://google.serper.dev/images",
headers,
{"q": query, "gl": country, "hl": language, "num": 5},
)
images = [img.get("imageUrl", "") for img in img_data.get("images", [])[:5] if img.get("imageUrl")]
except Exception:
pass
return {
"provider": "serper",
"query": query,
"results": results,
"images": images,
"answer": answer,
"knowledge_graph": data.get("knowledgeGraph"),
"related_searches": [r.get("query") for r in data.get("relatedSearches", [])]
}
# =============================================================================
# Tavily (Research Search)
# =============================================================================
def search_tavily(
query: str,
api_key: str,
max_results: int = 5,
depth: str = "basic",
topic: str = "general",
include_domains: Optional[List[str]] = None,
exclude_domains: Optional[List[str]] = None,
include_images: bool = False,
include_raw_content: bool = False,
) -> dict:
"""Search using Tavily (AI Research Search)."""
endpoint = "https://api.tavily.com/search"
body = {
"api_key": api_key,
"query": query,
"max_results": max_results,
"search_depth": depth,
"topic": topic,
"include_images": include_images,
"include_answer": True,
"include_raw_content": include_raw_content,
}
if include_domains:
body["include_domains"] = include_domains
if exclude_domains:
body["exclude_domains"] = exclude_domains
headers = {"Content-Type": "application/json"}
data = make_request(endpoint, headers, body)
results = []
for item in data.get("results", [])[:max_results]:
result = {
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("content", ""),
"score": round(item.get("score", 0.0), 3),
}
if include_raw_content and item.get("raw_content"):
result["raw_content"] = item["raw_content"]
results.append(result)
return {
"provider": "tavily",
"query": query,
"results": results,
"images": data.get("images", []),
"answer": data.get("answer", ""),
}
# =============================================================================
# Querit (Multi-lingual search API for AI, with rich metadata and real-time information)
# =============================================================================
def _map_querit_time_range(time_range: Optional[str]) -> Optional[str]:
"""Map generic time ranges to Querit's compact date filter format."""
if not time_range:
return None
return {
"day": "d1",
"week": "w1",
"month": "m1",
"year": "y1",
}.get(time_range, time_range)
def search_querit(
query: str,
api_key: str,
max_results: int = 5,
language: str = "en",
country: str = "us",
time_range: Optional[str] = None,
include_domains: Optional[List[str]] = None,
exclude_domains: Optional[List[str]] = None,
base_url: str = "https://api.querit.ai",
base_path: str = "/v1/search",
timeout: int = 30,
) -> dict:
"""Search using Querit.
Mirrors the Querit Python SDK payload shape:
- query
- count
- optional filters: languages, geo, sites, timeRange
"""
endpoint = base_url.rstrip("/") + base_path
filters: Dict[str, Any] = {}
if language:
filters["languages"] = {"include": [language.lower()]}
if country:
filters["geo"] = {"countries": {"include": [country.upper()]}}
if include_domains or exclude_domains:
sites: Dict[str, List[str]] = {}
if include_domains:
sites["include"] = include_domains
if exclude_domains:
sites["exclude"] = exclude_domains
filters["sites"] = sites
querit_time_range = _map_querit_time_range(time_range)
if querit_time_range:
filters["timeRange"] = {"date": querit_time_range}
body: Dict[str, Any] = {
"query": query,
"count": max_results,
}
if filters:
body["filters"] = filters
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = make_request(endpoint, headers, body, timeout=timeout)
error_code = data.get("error_code")
error_msg = data.get("error_msg")
if error_msg or (error_code not in (None, 0, 200)):
message = error_msg or f"Querit request failed with error_code={error_code}"
raise ProviderRequestError(message)
raw_results = ((data.get("results") or {}).get("result")) or []
results = []
for i, item in enumerate(raw_results[:max_results]):
snippet = item.get("snippet") or item.get("page_age") or ""
result = {
"title": item.get("title") or _title_from_url(item.get("url", "")),
"url": item.get("url", ""),
"snippet": snippet,
"score": round(1.0 - i * 0.05, 3),
}
if item.get("page_time") is not None:
result["page_time"] = item["page_time"]
if item.get("page_age"):
result["date"] = item["page_age"]
if item.get("language") is not None:
result["language"] = item["language"]
results.append(result)
answer = results[0]["snippet"] if results else ""
return {
"provider": "querit",
"query": query,
"results": results,
"images": [],
"answer": answer,
"metadata": {
"search_id": data.get("search_id"),
"time_range": querit_time_range,
}
}
# =============================================================================
# Exa (Neural/Semantic/Deep Search)
# =============================================================================
def search_exa(
query: str,
api_key: str,
max_results: int = 5,
search_type: str = "neural",
exa_depth: str = "normal",
category: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
similar_url: Optional[str] = None,
include_domains: Optional[List[str]] = None,
exclude_domains: Optional[List[str]] = None,
text_verbosity: str = "standard",
) -> dict:
"""Search using Exa (Neural/Semantic/Deep Search).
exa_depth controls synthesis level:
- "normal": standard search (neural/fast/auto/keyword/instant)
- "deep": multi-source synthesis with grounding (4-12s, $12/1k)
- "deep-reasoning": cross-reference reasoning with grounding (12-50s, $15/1k)
"""
is_deep = exa_depth in ("deep", "deep-reasoning")
if similar_url:
# findSimilar does not support deep search types
endpoint = "https://api.exa.ai/findSimilar"
body: Dict[str, Any] = {
"url": similar_url,
"numResults": max_results,
"contents": {
"text": {"maxCharacters": 2000, "verbosity": text_verbosity},
"highlights": {"numSentences": 3, "highlightsPerUrl": 2},
},
}
elif is_deep:
endpoint = "https://api.exa.ai/search"
body = {
"query": query,
"numResults": max_results,
"type": exa_depth,
"contents": {
"text": {"maxCharacters": 5000, "verbosity": "full"},
},
}
else:
endpoint = "https://api.exa.ai/search"
body = {
"query": query,
"numResults": max_results,
"type": search_type,
"contents": {
"text": {"maxCharacters": 2000, "verbosity": text_verbosity},
"highlights": {"numSentences": 3, "highlightsPerUrl": 2},
},
}
if category:
body["category"] = category
if start_date:
body["startPublishedDate"] = start_date
if end_date:
body["endPublishedDate"] = end_date
if include_domains:
body["includeDomains"] = include_domains
if exclude_domains:
body["excludeDomains"] = exclude_domains
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
}
timeout = 55 if is_deep else 30
data = make_request(endpoint, headers, body, timeout=timeout)
results = []
# Deep search: primary content in output field with grounding citations
if is_deep:
deep_output = data.get("output", {})
synthesized_text = ""
grounding_citations: List[Dict[str, Any]] = []
if isinstance(deep_output.get("content"), str):
synthesized_text = deep_output["content"]
elif isinstance(deep_output.get("content"), dict):
synthesized_text = json.dumps(deep_output["content"], ensure_ascii=False)
for field_citation in deep_output.get("grounding", []):
for cite in field_citation.get("citations", []):
grounding_citations.append({
"url": cite.get("url", ""),
"title": cite.get("title", ""),
"confidence": field_citation.get("confidence", ""),
"field": field_citation.get("field", ""),
})
# Primary synthesized result
if synthesized_text:
results.append({
"title": f"Exa {exa_depth.replace('-', ' ').title()} Synthesis",
"url": "",
"snippet": synthesized_text,
"full_synthesis": synthesized_text,
"score": 1.0,
"grounding": grounding_citations[:10],
"type": "synthesis",
})
# Supporting source documents
for item in data.get("results", [])[:max_results]:
text_content = item.get("text", "") or ""
highlights = item.get("highlights", [])
snippet = text_content[:800] if text_content else (highlights[0] if highlights else "")
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": snippet,
"score": round(item.get("score", 0.0), 3),
"published_date": item.get("publishedDate"),
"author": item.get("author"),
"type": "source",
})
answer = synthesized_text if synthesized_text else (results[1]["snippet"] if len(results) > 1 else "")
return {
"provider": "exa",
"query": query,
"exa_depth": exa_depth,
"results": results,
"images": [],
"answer": answer,
"grounding": grounding_citations,
"metadata": {
"synthesis_length": len(synthesized_text),
"source_count": len(data.get("results", [])),
},
}
# Standard search result parsing
for item in data.get("results", [])[:max_results]:
text_content = item.get("text", "") or ""
highlights = item.get("highlights", [])
if text_content:
snippet = text_content[:800]
elif highlights:
snippet = " ... ".join(highlights[:2])
else:
snippet = ""
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": snippet,
"score": round(item.get("score", 0.0), 3),
"published_date": item.get("publishedDate"),
"author": item.get("author"),
})
answer = results[0]["snippet"] if results else ""
return {
"provider": "exa",
"query": query if not similar_url else f"Similar to: {similar_url}",
"results": results,
"images": [],
"answer": answer,
}
# =============================================================================
# Perplexity via Kilo Gateway (Synthesized Direct Answers)
# =============================================================================
def search_perplexity(
query: str,
api_key: str,
max_results: int = 5,
model: str = "perplexity/sonar-pro",
api_url: str = "https://api.kilo.ai/api/gateway/chat/completions",
freshness: Optional[str] = None,
) -> dict:
"""Search/answer using Perplexity Sonar Pro via Kilo Gateway.
Args:
query: Search query
api_key: Kilo Gateway API key
max_results: Maximum results to return
model: Perplexity model to use
api_url: Kilo Gateway endpoint
freshness: Filter by recency — 'day', 'week', 'month', 'year' (maps to
Perplexity's search_recency_filter parameter)
"""
# Map generic freshness values to Perplexity's search_recency_filter
recency_map = {"day": "day", "pd": "day", "week": "week", "pw": "week", "month": "month", "pm": "month", "year": "year", "py": "year"}
recency_filter = recency_map.get(freshness or "", None)
body = {
"model": model,
"messages": [
{"role": "system", "content": "Answer with concise factual summary and include source URLs."},
{"role": "user", "content": query},
],
"temperature": 0.2,
}
if recency_filter:
body["search_recency_filter"] = recency_filter
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
data = make_request(api_url, headers, body)
choices = data.get("choices", [])
message = choices[0].get("message", {}) if choices else {}
answer = (message.get("content") or "").strip()
# Prefer the structured citations array from Perplexity API response
api_citations = data.get("citations", [])
# Fallback: extract URLs from answer text if API doesn't provide citations
if not api_citations:
api_citations = []
seen = set()
for u in re.findall(r"https?://[^\s)\]}>\"']+", answer):
if u not in seen:
seen.add(u)
api_citations.append(u)
results = []
# Primary result: the synthesized answer itself
if answer:
# Clean citation markers [1][2] for the snippet
clean_answer = re.sub(r'\[\d+\]', '', answer).strip()
results.append({
"title": f"Perplexity Answer: {query[:80]}",
"url": "https://www.perplexity.ai",
"snippet": clean_answer[:500],
"score": 1.0,
})
# Source results from citations
for i, citation in enumerate(api_citations[:max_results - 1]):
# citations can be plain URL strings or dicts with url/title
if isinstance(citation, str):
url = citation
title = _title_from_url(url)
else:
url = citation.get("url", "")
title = citation.get("title") or _title_from_url(url)
results.append({
"title": title,
"url": url,
"snippet": f"Source cited in Perplexity answer [citation {i+1}]",
"score": round(0.9 - i * 0.1, 3),
})
return {
"provider": "perplexity",
"query": query,
"results": results,
"images": [],
"answer": answer,
"metadata": {
"model": model,
"usage": data.get("usage", {}),
}
}
# =============================================================================
# You.com (LLM-Ready Web & News Search)
# =============================================================================
def search_you(
query: str,
api_key: str,
max_results: int = 5,
country: str = "US",
language: str = "en",
freshness: Optional[str] = None,
safesearch: str = "moderate",
include_news: bool = True,
livecrawl: Optional[str] = None,
) -> dict:
"""Search using You.com (LLM-Ready Web & News Search).
You.com excels at:
- RAG applications with pre-extracted snippets
- Combined web + news results in one call
- Real-time information with automatic news classification
- Clean, structured JSON optimized for AI consumption
Args:
query: Search query
api_key: You.com API key
max_results: Maximum results to return (default 5, max 100)
country: ISO 3166-2 country code (e.g., US, GB, DE)
language: BCP 47 language code (e.g., en, de, fr)
freshness: Filter by recency: day, week, month, year, or YYYY-MM-DDtoYYYY-MM-DD
safesearch: Content filter: off, moderate (default), strict
include_news: Include news results when relevant (default True)
livecrawl: Fetch full page content: "web", "news", or "all"
"""
endpoint = "https://ydc-index.io/v1/search"
# Build query parameters
params = {
"query": query,
"count": max_results,
"safesearch": safesearch,
}
if country:
params["country"] = country.upper()
if language:
params["language"] = language.upper()
if freshness:
params["freshness"] = freshness
if livecrawl:
params["livecrawl"] = livecrawl
params["livecrawl_formats"] = "markdown"
# Build URL with query params (URL-encode values)
query_string = "&".join(f"{k}={quote(str(v))}" for k, v in params.items())
url = f"{endpoint}?{query_string}"
headers = {
"X-API-KEY": api_key,
"Accept": "application/json",
"User-Agent": "ClawdBot-WebSearchPlus/2.4",
}
# Make GET request (You.com uses GET, not POST)
from urllib.request import Request, urlopen
req = Request(url, headers=headers, method="GET")
try:
with urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode("utf-8"))
except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else str(e)
try:
error_json = json.loads(error_body)
error_detail = error_json.get("error") or error_json.get("message") or error_body
except json.JSONDecodeError:
error_detail = error_body[:500]
error_messages = {
401: "Invalid or expired API key. Get one at https://api.you.com",
403: "Access forbidden. Check your API key permissions.",
429: "Rate limit exceeded. Please wait and try again.",
500: "You.com server error. Try again later.",
503: "You.com service unavailable."
}
friendly_msg = error_messages.get(e.code, f"API error: {error_detail}")
raise ProviderRequestError(f"{friendly_msg} (HTTP {e.code})", status_code=e.code, transient=e.code in TRANSIENT_HTTP_CODES)
except URLError as e:
reason = str(getattr(e, "reason", e))
is_timeout = "timed out" in reason.lower()
raise ProviderRequestError(f"Network error: {reason}. Check your internet connection.", transient=is_timeout)
except TimeoutError:
raise ProviderRequestError("You.com request timed out after 30s.", transient=True)
# Parse results
results_data = data.get("results", {})
web_results = results_data.get("web", [])
news_results = results_data.get("news", []) if include_news else []
metadata = data.get("metadata", {})
# Normalize web results
results = []
for i, item in enumerate(web_results[:max_results]):
snippets = item.get("snippets", [])
snippet = snippets[0] if snippets else item.get("description", "")
result = {
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": snippet,
"score": round(1.0 - i * 0.05, 3), # Assign descending score
"date": item.get("page_age"),
"source": "web",
}
# Include additional snippets if available (great for RAG)
if len(snippets) > 1:
result["additional_snippets"] = snippets[1:3]
# Include thumbnail and favicon for UI display
if item.get("thumbnail_url"):
result["thumbnail"] = item["thumbnail_url"]
if item.get("favicon_url"):
result["favicon"] = item["favicon_url"]
# Include live-crawled content if available
if item.get("contents"):
result["raw_content"] = item["contents"].get("markdown") or item["contents"].get("html", "")
results.append(result)
# Add news results (if any)
news = []
for item in news_results[:5]:
news.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("description", ""),
"date": item.get("page_age"),
"thumbnail": item.get("thumbnail_url"),
"source": "news",
})
# Build answer from best snippets
answer = ""
if results:
# Combine top snippets for LLM context
top_snippets = []
for r in results[:3]:
if r.get("snippet"):
top_snippets.append(r["snippet"])
answer = " ".join(top_snippets)[:1000]
return {
"provider": "you",
"query": query,
"results": results,
"news": news,
"images": [],
"answer": answer,
"metadata": {
"search_uuid": metadata.get("search_uuid"),
"latency": metadata.get("latency"),
}
}
# =============================================================================
# SearXNG (Privacy-First Meta-Search)
# =============================================================================
def search_searxng(
query: str,
instance_url: str,
max_results: int = 5,
categories: Optional[List[str]] = None,
engines: Optional[List[str]] = None,
language: str = "en",
time_range: Optional[str] = None,
safesearch: int = 0,
) -> dict:
"""Search using SearXNG (self-hosted privacy-first meta-search).
SearXNG excels at:
- Privacy-preserving search (no tracking, no profiling)
- Multi-source aggregation (70+ upstream engines)
- $0 API cost (self-hosted)
- Diverse perspectives from multiple search engines
Args:
query: Search query
instance_url: URL of your SearXNG instance (required)
max_results: Maximum results to return (default 5)
categories: Search categories (general, images, news, videos, etc.)
engines: Specific engines to use (google, bing, duckduckgo, etc.)
language: Language code (e.g., en, de, fr)
time_range: Filter by recency: day, week, month, year
safesearch: Content filter: 0=off, 1=moderate, 2=strict
Note:
Requires a self-hosted SearXNG instance with JSON format enabled.
See: https://docs.searxng.org/admin/installation.html
"""
# Build URL with query parameters
params = {
"q": query,
"format": "json",
"language": language,
"safesearch": str(safesearch),
}
if categories:
params["categories"] = ",".join(categories)
if engines:
params["engines"] = ",".join(engines)
if time_range:
params["time_range"] = time_range
# Build URL — instance_url comes from operator-controlled config/env only
# (validated by _validate_searxng_url), not from agent/LLM input
base_url = instance_url.rstrip("/")
query_string = "&".join(f"{k}={quote(str(v))}" for k, v in params.items())
url = f"{base_url}/search?{query_string}"
headers = {
"User-Agent": "ClawdBot-WebSearchPlus/2.5",
"Accept": "application/json",
}
# Make GET request
req = Request(url, headers=headers, method="GET")
try:
with urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode("utf-8"))
except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else str(e)
try:
error_json = json.loads(error_body)
error_detail = error_json.get("error") or error_json.get("message") or error_body
except json.JSONDecodeError:
error_detail = error_body[:500]
error_messages = {
403: "JSON API disabled on this SearXNG instance. Enable 'json' in search.formats in settings.yml",
404: "SearXNG instance not found. Check your instance URL.",
500: "SearXNG server error. Check instance health.",
503: "SearXNG service unavailable."
}
friendly_msg = error_messages.get(e.code, f"SearXNG error: {error_detail}")
raise ProviderRequestError(f"{friendly_msg} (HTTP {e.code})", status_code=e.code, transient=e.code in TRANSIENT_HTTP_CODES)
except URLError as e:
reason = str(getattr(e, "reason", e))
is_timeout = "timed out" in reason.lower()
raise ProviderRequestError(f"Cannot reach SearXNG instance at {instance_url}. Error: {reason}", transient=is_timeout)
except TimeoutError:
raise ProviderRequestError(f"SearXNG request timed out after 30s. Check instance health.", transient=True)
# Parse results
raw_results = data.get("results", [])
# Normalize results to unified format
results = []
engines_used = set()
for i, item in enumerate(raw_results[:max_results]):
engine = item.get("engine", "unknown")
engines_used.add(engine)
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("content", ""),
"score": round(item.get("score", 1.0 - i * 0.05), 3),
"engine": engine,
"category": item.get("category", "general"),
"date": item.get("publishedDate"),
})
# Build answer from answers, infoboxes, or first result
answer = ""
if data.get("answers"):
answer = data["answers"][0] if isinstance(data["answers"][0], str) else str(data["answers"][0])
elif data.get("infoboxes"):
infobox = data["infoboxes"][0]
answer = infobox.get("content", "") or infobox.get("infobox", "")
elif results:
answer = results[0]["snippet"]
return {
"provider": "searxng",
"query": query,
"results": results,
"images": [],
"answer": answer,
"suggestions": data.get("suggestions", []),
"corrections": data.get("corrections", []),
"metadata": {
"number_of_results": data.get("number_of_results"),
"engines_used": list(engines_used),
"instance_url": instance_url,
}
}
# =============================================================================
# CLI
# =============================================================================
def main():
config = load_config()
parser = argparse.ArgumentParser(
description="Web Search Plus — Intelligent multi-provider search with smart auto-routing",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Intelligent Auto-Routing:
The query is analyzed using multi-signal detection to find the optimal provider:
Shopping Intent → Serper (Google)
"how much", "price of", "buy", product+brand combos, deals, specs
Research Intent → Tavily
"how does", "explain", "what is", analysis, pros/cons, tutorials
Multilingual + Real-Time AI Search → Querit
multilingual search, metadata-rich results, current information for AI workflows
Discovery Intent → Exa (Neural)
"similar to", "companies like", "alternatives", URLs, startups, papers
Direct Answer Intent → Perplexity (via Kilo Gateway)
"what is", "current status", local events, synthesized up-to-date answers
Examples:
python3 search.py -q "iPhone 16 Pro Max price" # → Serper (shopping)
python3 search.py -q "how does HTTPS encryption work" # → Tavily (research)
python3 search.py -q "startups similar to Notion" # → Exa (discovery)
python3 search.py --explain-routing -q "your query" # Debug routing
Full docs: See README.md and SKILL.md
""",
)
# Common arguments
parser.add_argument(
"--provider", "-p",
choices=["serper", "tavily", "querit", "exa", "perplexity", "you", "searxng", "auto"],
help="Search provider (auto=intelligent routing)"
)
parser.add_argument(
"--query", "-q",
help="Search query"
)
parser.add_argument(
"--max-results", "-n",
type=int,
default=config.get("defaults", {}).get("max_results", 5),
help="Maximum results (default: 5)"
)
parser.add_argument(
"--images",
action="store_true",
help="Include images (Serper/Tavily)"
)
# Auto-routing options
parser.add_argument(
"--auto", "-a",
action="store_true",
help="Use intelligent auto-routing (default when no provider specified)"
)
parser.add_argument(
"--explain-routing",
action="store_true",
help="Show detailed routing analysis (debug mode)"
)
# Serper-specific
serper_config = config.get("serper", {})
parser.add_argument("--country", default=serper_config.get("country", "us"))
parser.add_argument("--language", default=serper_config.get("language", "en"))
parser.add_argument(
"--type",
dest="search_type",
default=serper_config.get("type", "search"),
choices=["search", "news", "images", "videos", "places", "shopping"]
)
parser.add_argument(
"--time-range",
choices=["hour", "day", "week", "month", "year"]
)
# Tavily-specific
tavily_config = config.get("tavily", {})
parser.add_argument(
"--depth",
default=tavily_config.get("depth", "basic"),
choices=["basic", "advanced"]
)
parser.add_argument(
"--topic",
default=tavily_config.get("topic", "general"),
choices=["general", "news"]
)
parser.add_argument("--raw-content", action="store_true")
# Querit-specific
querit_config = config.get("querit", {})
parser.add_argument(
"--querit-base-url",
default=querit_config.get("base_url", "https://api.querit.ai"),
help="Querit API base URL"
)
parser.add_argument(
"--querit-base-path",
default=querit_config.get("base_path", "/v1/search"),
help="Querit API path"
)
# Exa-specific
exa_config = config.get("exa", {})
parser.add_argument(
"--exa-type",
default=exa_config.get("type", "neural"),
choices=["neural", "fast", "auto", "keyword", "instant"],
help="Exa search type (for standard search, ignored when --exa-depth is set)"
)
parser.add_argument(
"--exa-depth",
default=exa_config.get("depth", "normal"),
choices=["normal", "deep", "deep-reasoning"],
help="Exa search depth: deep (synthesized, 4-12s), deep-reasoning (cross-reference, 12-50s)"
)
parser.add_argument(
"--exa-verbosity",
default=exa_config.get("verbosity", "standard"),
choices=["compact", "standard", "full"],
help="Exa text verbosity for content extraction"
)
parser.add_argument(
"--category",
choices=[
"company", "research paper", "news", "pdf", "github",
"tweet", "personal site", "linkedin profile"
]
)
parser.add_argument("--start-date")
parser.add_argument("--end-date")
parser.add_argument("--similar-url")
# You.com-specific
you_config = config.get("you", {})
parser.add_argument(
"--you-safesearch",
default=you_config.get("safesearch", "moderate"),
choices=["off", "moderate", "strict"],
help="You.com SafeSearch filter"
)
parser.add_argument(
"--freshness",
choices=["day", "week", "month", "year"],
help="Filter results by recency (You.com/Serper)"
)
parser.add_argument(
"--livecrawl",
choices=["web", "news", "all"],
help="You.com: fetch full page content"
)
parser.add_argument(
"--no-news",
action="store_true",
help="You.com: exclude news results (included by default)"
)
# SearXNG-specific
searxng_config = config.get("searxng", {})
parser.add_argument(
"--searxng-url",
default=searxng_config.get("instance_url"),
help="SearXNG instance URL (e.g., https://searx.example.com)"
)
parser.add_argument(
"--searxng-safesearch",
type=int,
default=searxng_config.get("safesearch", 0),
choices=[0, 1, 2],
help="SearXNG SafeSearch: 0=off, 1=moderate, 2=strict"
)
parser.add_argument(
"--engines",
nargs="+",
default=searxng_config.get("engines"),
help="SearXNG: specific engines to use (e.g., google bing duckduckgo)"
)
parser.add_argument(
"--categories",
nargs="+",
help="SearXNG: search categories (general, images, news, videos, etc.)"
)
# Domain filters
parser.add_argument("--include-domains", nargs="+")
parser.add_argument("--exclude-domains", nargs="+")
# Output
parser.add_argument("--compact", action="store_true")
# Caching options
parser.add_argument(
"--cache-ttl",
type=int,
default=DEFAULT_CACHE_TTL,
help=f"Cache TTL in seconds (default: {DEFAULT_CACHE_TTL} = 1 hour)"
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Bypass cache (always fetch fresh results)"
)
parser.add_argument(
"--clear-cache",
action="store_true",
help="Clear all cached results and exit"
)
parser.add_argument(
"--cache-stats",
action="store_true",
help="Show cache statistics and exit"
)
args = parser.parse_args()
# Handle cache management commands first (before query validation)
if args.clear_cache:
result = cache_clear()
indent = None if args.compact else 2
print(json.dumps(result, indent=indent, ensure_ascii=False))
return
if args.cache_stats:
result = cache_stats()
indent = None if args.compact else 2
print(json.dumps(result, indent=indent, ensure_ascii=False))
return
if not args.query and not args.similar_url:
parser.error("--query is required (unless using --similar-url with Exa)")
# Handle --explain-routing
if args.explain_routing:
if not args.query:
parser.error("--query is required for --explain-routing")
explanation = explain_routing(args.query, config)
indent = None if args.compact else 2
print(json.dumps(explanation, indent=indent, ensure_ascii=False))
return
# Determine provider
if args.provider == "auto" or (args.provider is None and not args.similar_url):
if args.query:
routing = auto_route_provider(args.query, config)
provider = routing["provider"]
routing_info = {
"auto_routed": True,
"provider": provider,
"confidence": routing["confidence"],
"confidence_level": routing["confidence_level"],
"reason": routing["reason"],
"top_signals": routing["top_signals"],
"scores": routing["scores"],
}
else:
provider = "exa"
routing_info = {
"auto_routed": True,
"provider": "exa",
"confidence": 1.0,
"confidence_level": "high",
"reason": "similar_url_specified",
}
else:
provider = args.provider or "serper"
routing_info = {"auto_routed": False, "provider": provider}
# Build provider fallback list
auto_config = config.get("auto_routing", {})
provider_priority = auto_config.get("provider_priority", ["tavily", "querit", "exa", "perplexity", "serper", "you", "searxng"])
disabled_providers = auto_config.get("disabled_providers", [])
# Start with the selected provider, then try others in priority order
# Only include providers that have a configured API key (except the primary,
# which gets a clear error if unconfigured and no fallback succeeds)
providers_to_try = [provider]
for p in provider_priority:
if p not in providers_to_try and p not in disabled_providers and get_api_key(p, config):
providers_to_try.append(p)
# Skip providers currently in cooldown
eligible_providers = []
cooldown_skips = []
for p in providers_to_try:
in_cd, remaining = provider_in_cooldown(p)
if in_cd:
cooldown_skips.append({"provider": p, "cooldown_remaining_seconds": remaining})
else:
eligible_providers.append(p)
if not eligible_providers:
eligible_providers = providers_to_try[:1]
# Helper function to execute search for a provider
def execute_search(prov: str) -> Dict[str, Any]:
key = validate_api_key(prov, config)
if prov == "serper":
return search_serper(
query=args.query,
api_key=key,
max_results=args.max_results,
country=args.country,
language=args.language,
search_type=args.search_type,
time_range=args.time_range,
include_images=args.images,
)
elif prov == "tavily":
return search_tavily(
query=args.query,
api_key=key,
max_results=args.max_results,
depth=args.depth,
topic=args.topic,
include_domains=args.include_domains,
exclude_domains=args.exclude_domains,
include_images=args.images,
include_raw_content=args.raw_content,
)
elif prov == "querit":
return search_querit(
query=args.query,
api_key=key,
max_results=args.max_results,
language=args.language,
country=args.country,
time_range=args.time_range or args.freshness,
include_domains=args.include_domains,
exclude_domains=args.exclude_domains,
base_url=args.querit_base_url,
base_path=args.querit_base_path,
timeout=int(querit_config.get("timeout", 30)),
)
elif prov == "exa":
# CLI --exa-depth overrides; fallback to auto-routing suggestion
exa_depth = args.exa_depth
if exa_depth == "normal" and routing_info.get("exa_depth") in ("deep", "deep-reasoning"):
exa_depth = routing_info["exa_depth"]
return search_exa(
query=args.query or "",
api_key=key,
max_results=args.max_results,
search_type=args.exa_type,
exa_depth=exa_depth,
category=args.category,
start_date=args.start_date,
end_date=args.end_date,
similar_url=args.similar_url,
include_domains=args.include_domains,
exclude_domains=args.exclude_domains,
text_verbosity=args.exa_verbosity,
)
elif prov == "perplexity":
perplexity_config = config.get("perplexity", {})
return search_perplexity(
query=args.query,
api_key=key,
max_results=args.max_results,
model=perplexity_config.get("model", "perplexity/sonar-pro"),
api_url=perplexity_config.get("api_url", "https://api.kilo.ai/api/gateway/chat/completions"),
freshness=getattr(args, "freshness", None),
)
elif prov == "you":
return search_you(
query=args.query,
api_key=key,
max_results=args.max_results,
country=args.country,
language=args.language,
freshness=args.freshness,
safesearch=args.you_safesearch,
include_news=not args.no_news,
livecrawl=args.livecrawl,
)
elif prov == "searxng":
# For SearXNG, 'key' is actually the instance URL
instance_url = args.searxng_url or key
if instance_url:
instance_url = _validate_searxng_url(instance_url)
return search_searxng(
query=args.query,
instance_url=instance_url,
max_results=args.max_results,
categories=args.categories,
engines=args.engines,
language=args.language,
time_range=args.time_range,
safesearch=args.searxng_safesearch,
)
else:
raise ValueError(f"Unknown provider: {prov}")
def execute_with_retry(prov: str) -> Dict[str, Any]:
last_error = None
for attempt in range(0, 3):
try:
return execute_search(prov)
except ProviderRequestError as e:
last_error = e
if e.status_code in {401, 403}:
break
if not e.transient:
break
if attempt < 2:
time.sleep(RETRY_BACKOFF_SECONDS[attempt])
continue
break
except Exception as e:
last_error = e
break
raise last_error if last_error else Exception("Unknown provider execution error")
cache_context = {
"locale": f"{args.country}:{args.language}",
"freshness": args.freshness,
"time_range": args.time_range,
"include_domains": sorted(args.include_domains) if args.include_domains else None,
"exclude_domains": sorted(args.exclude_domains) if args.exclude_domains else None,
"topic": args.topic,
"search_engines": sorted(args.engines) if args.engines else None,
"include_news": not args.no_news,
"search_type": args.search_type,
"exa_type": args.exa_type,
"exa_depth": args.exa_depth,
"exa_verbosity": args.exa_verbosity,
"category": args.category,
"similar_url": args.similar_url,
}
# Check cache first (unless --no-cache is set)
cached_result = None
cache_hit = False
if not args.no_cache and args.query:
cached_result = cache_get(
query=args.query,
provider=provider,
max_results=args.max_results,
ttl=args.cache_ttl,
params=cache_context,
)
if cached_result:
cache_hit = True
result = {k: v for k, v in cached_result.items() if not k.startswith("_cache_")}
result["cached"] = True
result["cache_age_seconds"] = int(time.time() - cached_result.get("_cache_timestamp", 0))
errors = []
successful_provider = None
successful_results: List[Tuple[str, Dict[str, Any]]] = []
result = None if not cache_hit else result
for idx, current_provider in enumerate(eligible_providers):
if cache_hit:
successful_provider = provider
break
try:
provider_result = execute_with_retry(current_provider)
reset_provider_health(current_provider)
successful_results.append((current_provider, provider_result))
successful_provider = current_provider
# If we have enough results, stop.
if len(provider_result.get("results", [])) >= args.max_results:
break
# Only continue collecting from lower-priority providers when fallback was needed.
if not errors:
break
except Exception as e:
error_msg = str(e)
cooldown_info = mark_provider_failure(current_provider, error_msg)
errors.append({
"provider": current_provider,
"error": error_msg,
"cooldown_seconds": cooldown_info.get("cooldown_seconds"),
})
if len(eligible_providers) > 1:
remaining = eligible_providers[idx + 1:]
if remaining:
print(json.dumps({
"fallback": True,
"failed_provider": current_provider,
"error": error_msg,
"trying_next": remaining[0],
}), file=sys.stderr)
continue
if successful_results:
if len(successful_results) == 1:
result = successful_results[0][1]
else:
primary = successful_results[0][1].copy()
deduped_results, dedup_count = deduplicate_results_across_providers(successful_results, args.max_results)
primary["results"] = deduped_results
primary["deduplicated"] = dedup_count > 0
primary.setdefault("metadata", {})
primary["metadata"]["dedup_count"] = dedup_count
primary["metadata"]["providers_merged"] = [p for p, _ in successful_results]
result = primary
if result is not None:
if successful_provider != provider:
routing_info["fallback_used"] = True
routing_info["original_provider"] = provider
routing_info["provider"] = successful_provider
routing_info["fallback_errors"] = errors
if cooldown_skips:
routing_info["cooldown_skips"] = cooldown_skips
result["routing"] = routing_info
if not cache_hit and not args.no_cache and args.query:
cache_put(
query=args.query,
provider=successful_provider or provider,
max_results=args.max_results,
result=result,
params=cache_context,
)
result["cached"] = bool(cache_hit)
if "deduplicated" not in result:
result["deduplicated"] = False
result.setdefault("metadata", {})
result["metadata"].setdefault("dedup_count", 0)
indent = None if args.compact else 2
print(json.dumps(result, indent=indent, ensure_ascii=False))
else:
error_result = {
"error": "All providers failed",
"provider": provider,
"query": args.query,
"routing": routing_info,
"provider_errors": errors,
"cooldown_skips": cooldown_skips,
}
print(json.dumps(error_result, indent=2), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()