Fix: validate URL scheme and resolved IP before crawling to prevent SSRF (#14090)

### What problem does this PR solve?

The POST /upload_info?url=<url> endpoint accepted a user-supplied URL
and passed it directly to AsyncWebCrawler without any validation. There
were no restrictions on URL scheme, destination hostname, or resolved IP
address. This allowed any authenticated user to instruct the server to
make outbound HTTP requests to internal infrastructure — including RFC
1918 private networks, loopback addresses, and cloud metadata services
such as http://169.254.169.254 — effectively using the server as a proxy
for internal network reconnaissance or credential theft.

This PR adds an SSRF guard (_validate_url_for_crawl) that runs before
any crawl is initiated. It enforces an allowlist of safe schemes
(http/https), resolves the hostname at validation time, and rejects any
URL whose resolved IP falls within a private or reserved network range.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Xing Hong
2026-04-25 15:30:15 +09:00
committed by GitHub
parent 78188ce9e9
commit fb95136f39
10 changed files with 485 additions and 109 deletions

View File

@@ -15,11 +15,8 @@
#
import base64
import ipaddress
import json
import re
import socket
from urllib.parse import urlparse
import aiosmtplib
from email.mime.text import MIMEText
from email.header import Header
@@ -37,10 +34,10 @@ from webdriver_manager.chrome import ChromeDriverManager
OTP_LENGTH = 4
OTP_TTL_SECONDS = 5 * 60 # valid for 5 minutes
ATTEMPT_LIMIT = 5 # maximum attempts
ATTEMPT_LOCK_SECONDS = 30 * 60 # lock for 30 minutes
RESEND_COOLDOWN_SECONDS = 60 # cooldown for 1 minute
OTP_TTL_SECONDS = 5 * 60 # valid for 5 minutes
ATTEMPT_LIMIT = 5 # maximum attempts
ATTEMPT_LOCK_SECONDS = 30 * 60 # lock for 30 minutes
RESEND_COOLDOWN_SECONDS = 60 # cooldown for 1 minute
CONTENT_TYPE_MAP = {
@@ -188,29 +185,16 @@ def __get_pdf_from_html(path: str, timeout: int, install_driver: bool, print_opt
return base64.b64decode(result["data"])
def is_private_ip(ip: str) -> bool:
try:
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private
except ValueError:
return False
def is_valid_url(url: str) -> bool:
if not re.match(r"(https?)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", url):
return False
parsed_url = urlparse(url)
hostname = parsed_url.hostname
from common.ssrf_guard import assert_url_is_safe
if not hostname:
return False
try:
ip = socket.gethostbyname(hostname)
if is_private_ip(ip):
return False
except socket.gaierror:
assert_url_is_safe(url)
return True
except ValueError:
return False
return True
def safe_json_parse(data: str | dict) -> dict: