From 4d7c15be2bfdf2824301dbbc5fb886ca838e1482 Mon Sep 17 00:00:00 2001 From: zlei9 Date: Sun, 29 Mar 2026 13:03:48 +0800 Subject: [PATCH] Initial commit with translated description --- SKILL.md | 414 +++++++++++++++++++++++++++++++++++++ _meta.json | 6 + scripts/cf_bypass.py | 282 +++++++++++++++++++++++++ scripts/login_session.py | 268 ++++++++++++++++++++++++ scripts/proxy_rotate.py | 279 +++++++++++++++++++++++++ scripts/session_manager.py | 279 +++++++++++++++++++++++++ scripts/smart_login.py | 231 +++++++++++++++++++++ scripts/solve_captcha.py | 356 +++++++++++++++++++++++++++++++ scripts/stealth_session.py | 256 +++++++++++++++++++++++ scripts/task_runner.py | 177 ++++++++++++++++ 10 files changed, 2548 insertions(+) create mode 100644 SKILL.md create mode 100644 _meta.json create mode 100644 scripts/cf_bypass.py create mode 100644 scripts/login_session.py create mode 100644 scripts/proxy_rotate.py create mode 100644 scripts/session_manager.py create mode 100644 scripts/smart_login.py create mode 100644 scripts/solve_captcha.py create mode 100644 scripts/stealth_session.py create mode 100644 scripts/task_runner.py diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..d4bca70 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,414 @@ +--- +name: stealth-browser +description: "具有反检测、Cloudflare绕过、CAPTCHA解决的终极隐身浏览器自动化。" +--- + +# Stealth Browser Automation + +Silent, undetectable web automation combining multiple anti-detection layers. + +## Quick Login Workflow (IMPORTANT) + +When user asks to login to any website: + +1. **Open in headed mode** (visible browser for manual login): +```bash +python scripts/stealth_session.py -u "https://target.com/login" -s sitename --headed +``` + +2. **User logs in manually** in the visible browser + +3. **Save session** after login confirmed: +```bash +python scripts/stealth_session.py -u "https://target.com" -s sitename --headed --save +``` + +4. **Future use** - load saved session (headless): +```bash +python scripts/stealth_session.py -u "https://target.com" -s sitename --load +``` + +Sessions stored in: `~/.clawdbot/browser-sessions/.json` + +## 执行策略 (IMPORTANT) + +### 1. 先静默后显示 +- 优先使用 headless 模式静默尝试 +- 如果失败或需要验证码,再切换到 headed 显示模式 +- 避免打扰用户操作 + +### 2. 断点续传 +长任务使用 `task_runner.py` 管理状态: +```python +from task_runner import TaskRunner +task = TaskRunner('my_task') +task.set_total(100) +for i in items: + if task.is_completed(i): + continue # 跳过已完成 + # 处理... + task.mark_completed(i) +task.finish() +``` + +### 3. 超时处理 +- 默认单页超时: 30秒 +- 长任务每50项保存一次进度 +- 失败自动重试3次 + +### 4. 记录尝试 +所有登录尝试记录在: `~/.clawdbot/browser-sessions/attempts.json` + +## Architecture + +``` +┌─────────────────────────────────────────────────────┐ +│ Stealth Browser │ +├─────────────────────────────────────────────────────┤ +│ Layer 1: Anti-Detection Engine │ +│ - puppeteer-extra-plugin-stealth │ +│ - Browser fingerprint spoofing │ +│ - WebGL/Canvas/Audio fingerprint masking │ +├─────────────────────────────────────────────────────┤ +│ Layer 2: Challenge Bypass │ +│ - Cloudflare Turnstile/JS Challenge │ +│ - hCaptcha / reCAPTCHA integration │ +│ - 2Captcha / Anti-Captcha API │ +├─────────────────────────────────────────────────────┤ +│ Layer 3: Session Persistence │ +│ - Cookie storage (JSON/SQLite) │ +│ - localStorage sync │ +│ - Multi-profile management │ +├─────────────────────────────────────────────────────┤ +│ Layer 4: Proxy & Identity │ +│ - Rotating residential proxies │ +│ - User-Agent rotation │ +│ - Timezone/Locale spoofing │ +└─────────────────────────────────────────────────────┘ +``` + +## Setup + +### Install Core Dependencies + +```bash +npm install -g puppeteer-extra puppeteer-extra-plugin-stealth +npm install -g playwright +pip install undetected-chromedriver DrissionPage +``` + +### Optional: CAPTCHA Solvers + +Store API keys in `~/.clawdbot/secrets/captcha.json`: +```json +{ + "2captcha": "YOUR_2CAPTCHA_KEY", + "anticaptcha": "YOUR_ANTICAPTCHA_KEY", + "capsolver": "YOUR_CAPSOLVER_KEY" +} +``` + +### Optional: Proxy Configuration + +Store in `~/.clawdbot/secrets/proxies.json`: +```json +{ + "rotating": "http://user:pass@proxy.provider.com:port", + "residential": ["socks5://ip1:port", "socks5://ip2:port"], + "datacenter": "http://dc-proxy:port" +} +``` + +## Quick Start + +### 1. Stealth Session (Python - Recommended) + +```python +# scripts/stealth_session.py - use for maximum compatibility +import undetected_chromedriver as uc +from DrissionPage import ChromiumPage + +# Option A: undetected-chromedriver (Selenium-based) +driver = uc.Chrome(headless=True, use_subprocess=True) +driver.get("https://nowsecure.nl") # Test anti-detection + +# Option B: DrissionPage (faster, native Python) +page = ChromiumPage() +page.get("https://cloudflare-protected-site.com") +``` + +### 2. Stealth Session (Node.js) + +```javascript +// scripts/stealth.mjs +import puppeteer from 'puppeteer-extra'; +import StealthPlugin from 'puppeteer-extra-plugin-stealth'; + +puppeteer.use(StealthPlugin()); + +const browser = await puppeteer.launch({ + headless: 'new', + args: [ + '--disable-blink-features=AutomationControlled', + '--disable-dev-shm-usage', + '--no-sandbox' + ] +}); + +const page = await browser.newPage(); +await page.goto('https://bot.sannysoft.com'); // Verify stealth +``` + +## Core Operations + +### Open Stealth Page + +```bash +# Using agent-browser with stealth profile +agent-browser --profile ~/.stealth-profile open https://target.com + +# Or via script +python scripts/stealth_open.py --url "https://target.com" --headless +``` + +### Bypass Cloudflare + +```python +# Automatic CF bypass with DrissionPage +from DrissionPage import ChromiumPage + +page = ChromiumPage() +page.get("https://cloudflare-site.com") +# DrissionPage waits for CF challenge automatically + +# Manual wait if needed +page.wait.ele_displayed("main-content", timeout=30) +``` + +For stubborn Cloudflare sites, use FlareSolverr: + +```bash +# Start FlareSolverr container +docker run -d --name flaresolverr -p 8191:8191 ghcr.io/flaresolverr/flaresolverr + +# Request clearance +curl -X POST http://localhost:8191/v1 \ + -H "Content-Type: application/json" \ + -d '{"cmd":"request.get","url":"https://cf-protected.com","maxTimeout":60000}' +``` + +### Solve CAPTCHAs + +```python +# scripts/solve_captcha.py +import requests +import json +import time + +def solve_recaptcha(site_key, page_url, api_key): + """Solve reCAPTCHA v2/v3 via 2Captcha""" + # Submit task + resp = requests.post("http://2captcha.com/in.php", data={ + "key": api_key, + "method": "userrecaptcha", + "googlekey": site_key, + "pageurl": page_url, + "json": 1 + }).json() + + task_id = resp["request"] + + # Poll for result + for _ in range(60): + time.sleep(3) + result = requests.get(f"http://2captcha.com/res.php?key={api_key}&action=get&id={task_id}&json=1").json() + if result["status"] == 1: + return result["request"] # Token + return None + +def solve_hcaptcha(site_key, page_url, api_key): + """Solve hCaptcha via Anti-Captcha""" + resp = requests.post("https://api.anti-captcha.com/createTask", json={ + "clientKey": api_key, + "task": { + "type": "HCaptchaTaskProxyless", + "websiteURL": page_url, + "websiteKey": site_key + } + }).json() + + task_id = resp["taskId"] + + for _ in range(60): + time.sleep(3) + result = requests.post("https://api.anti-captcha.com/getTaskResult", json={ + "clientKey": api_key, + "taskId": task_id + }).json() + if result["status"] == "ready": + return result["solution"]["gRecaptchaResponse"] + return None +``` + +### Persistent Sessions + +```python +# scripts/session_manager.py +import json +import os +from pathlib import Path + +SESSIONS_DIR = Path.home() / ".clawdbot" / "browser-sessions" +SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + +def save_cookies(driver, session_name): + """Save cookies to JSON""" + cookies = driver.get_cookies() + path = SESSIONS_DIR / f"{session_name}_cookies.json" + path.write_text(json.dumps(cookies, indent=2)) + return path + +def load_cookies(driver, session_name): + """Load cookies from saved session""" + path = SESSIONS_DIR / f"{session_name}_cookies.json" + if path.exists(): + cookies = json.loads(path.read_text()) + for cookie in cookies: + driver.add_cookie(cookie) + return True + return False + +def save_local_storage(page, session_name): + """Save localStorage""" + ls = page.evaluate("() => JSON.stringify(localStorage)") + path = SESSIONS_DIR / f"{session_name}_localStorage.json" + path.write_text(ls) + return path + +def load_local_storage(page, session_name): + """Restore localStorage""" + path = SESSIONS_DIR / f"{session_name}_localStorage.json" + if path.exists(): + data = path.read_text() + page.evaluate(f"(data) => {{ Object.entries(JSON.parse(data)).forEach(([k,v]) => localStorage.setItem(k,v)) }}", data) + return True + return False +``` + +### Silent Automation Workflow + +```python +# Complete silent automation example +from DrissionPage import ChromiumPage, ChromiumOptions + +# Configure for stealth +options = ChromiumOptions() +options.headless() +options.set_argument('--disable-blink-features=AutomationControlled') +options.set_argument('--disable-dev-shm-usage') +options.set_user_agent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36') + +page = ChromiumPage(options) + +# Navigate with CF bypass +page.get("https://target-site.com") + +# Wait for any challenges +page.wait.doc_loaded() + +# Interact silently +page.ele("@id=username").input("user@email.com") +page.ele("@id=password").input("password123") +page.ele("@type=submit").click() + +# Save session for reuse +page.cookies.save("~/.clawdbot/browser-sessions/target-site.json") +``` + +## Proxy Rotation + +```python +# scripts/proxy_rotate.py +import random +import json +from pathlib import Path + +def get_proxy(): + """Get random proxy from pool""" + config = json.loads((Path.home() / ".clawdbot/secrets/proxies.json").read_text()) + proxies = config.get("residential", []) + return random.choice(proxies) if proxies else config.get("rotating") + +# Use with DrissionPage +options = ChromiumOptions() +options.set_proxy(get_proxy()) +page = ChromiumPage(options) +``` + +## User Input Required + +To complete this skill, provide: + +1. **CAPTCHA API Keys** (optional but recommended): + - 2Captcha key: https://2captcha.com + - Anti-Captcha key: https://anti-captcha.com + - CapSolver key: https://capsolver.com + +2. **Proxy Configuration** (optional): + - Residential proxy provider credentials + - Or list of SOCKS5/HTTP proxies + +3. **Target Sites** (for pre-configured sessions): + - Which sites need login persistence? + - What credentials should be stored? + +## Files Structure + +``` +stealth-browser/ +├── SKILL.md +├── scripts/ +│ ├── stealth_session.py # Main stealth browser wrapper +│ ├── solve_captcha.py # CAPTCHA solving utilities +│ ├── session_manager.py # Cookie/localStorage persistence +│ ├── proxy_rotate.py # Proxy rotation +│ └── cf_bypass.py # Cloudflare-specific bypass +└── references/ + ├── fingerprints.md # Browser fingerprint details + └── detection-tests.md # Sites to test anti-detection +``` + +## Testing Anti-Detection + +```bash +# Run these to verify stealth is working: +python scripts/stealth_open.py --url "https://bot.sannysoft.com" +python scripts/stealth_open.py --url "https://nowsecure.nl" +python scripts/stealth_open.py --url "https://arh.antoinevastel.com/bots/areyouheadless" +python scripts/stealth_open.py --url "https://pixelscan.net" +``` + +## Integration with agent-browser + +For simple tasks, use agent-browser with a persistent profile: + +```bash +# Create stealth profile once +agent-browser --profile ~/.stealth-profile --headed open https://login-site.com +# Login manually, then close + +# Reuse authenticated session (headless) +agent-browser --profile ~/.stealth-profile snapshot +agent-browser --profile ~/.stealth-profile click @e5 +``` + +For Cloudflare or CAPTCHA-heavy sites, use Python scripts instead. + +## Best Practices + +1. **Always use headless: 'new'** not `headless: true` (less detectable) +2. **Rotate User-Agents** matching browser version +3. **Add random delays** between actions (100-500ms) +4. **Use residential proxies** for sensitive targets +5. **Save sessions** after successful login +6. **Test on bot.sannysoft.com** before production use diff --git a/_meta.json b/_meta.json new file mode 100644 index 0000000..37c6c81 --- /dev/null +++ b/_meta.json @@ -0,0 +1,6 @@ +{ + "ownerId": "kn7a2y4prpvw3x9mrrjr4ty62s80a28z", + "slug": "stealth-browser", + "version": "1.0.0", + "publishedAt": 1769953990064 +} \ No newline at end of file diff --git a/scripts/cf_bypass.py b/scripts/cf_bypass.py new file mode 100644 index 0000000..2825385 --- /dev/null +++ b/scripts/cf_bypass.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +""" +Cloudflare Bypass Utilities +Methods: DrissionPage (native), FlareSolverr (Docker), cloudscraper +""" + +import json +import time +import requests +from pathlib import Path + + +def bypass_cloudflare_drission(url: str, headless: bool = True, timeout: int = 30): + """ + Bypass Cloudflare using DrissionPage (most reliable for JS challenges) + + Returns: + dict: {cookies: dict, user_agent: str, content: str, url: str} + """ + from DrissionPage import ChromiumPage, ChromiumOptions + + options = ChromiumOptions() + if headless: + options.headless() + + options.set_argument('--disable-blink-features=AutomationControlled') + options.set_user_agent( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/120.0.0.0 Safari/537.36' + ) + + page = ChromiumPage(options) + + try: + page.get(url) + + # Wait for CF challenge to complete (look for challenge elements to disappear) + start = time.time() + while time.time() - start < timeout: + # Check if still on challenge page + if "challenge" in page.url.lower() or "cdn-cgi" in page.url.lower(): + time.sleep(1) + continue + + # Check for common CF challenge indicators + html = page.html.lower() + if "checking your browser" in html or "please wait" in html: + time.sleep(1) + continue + + # Challenge passed + break + + return { + "cookies": page.cookies.as_dict(), + "user_agent": page.user_agent, + "content": page.html, + "url": page.url, + "success": True + } + + finally: + page.quit() + + +def bypass_cloudflare_flaresolverr(url: str, flaresolverr_url: str = "http://localhost:8191/v1", timeout: int = 60): + """ + Bypass Cloudflare using FlareSolverr (Docker container) + + Start FlareSolverr first: + docker run -d --name flaresolverr -p 8191:8191 ghcr.io/flaresolverr/flaresolverr + + Returns: + dict: {cookies: list, user_agent: str, content: str, url: str} + """ + payload = { + "cmd": "request.get", + "url": url, + "maxTimeout": timeout * 1000 + } + + try: + resp = requests.post(flaresolverr_url, json=payload, timeout=timeout + 10) + data = resp.json() + + if data.get("status") == "ok": + solution = data.get("solution", {}) + return { + "cookies": solution.get("cookies", []), + "user_agent": solution.get("userAgent"), + "content": solution.get("response"), + "url": solution.get("url"), + "success": True + } + else: + return { + "success": False, + "error": data.get("message", "Unknown error") + } + + except requests.exceptions.ConnectionError: + return { + "success": False, + "error": "FlareSolverr not running. Start with: docker run -d --name flaresolverr -p 8191:8191 ghcr.io/flaresolverr/flaresolverr" + } + + +def bypass_cloudflare_cloudscraper(url: str, **kwargs): + """ + Bypass Cloudflare using cloudscraper (Python library) + Works for simpler challenges, may fail on advanced protection + + pip install cloudscraper + + Returns: + dict: {cookies: dict, content: str, url: str} + """ + import cloudscraper + + scraper = cloudscraper.create_scraper( + browser={ + 'browser': 'chrome', + 'platform': 'windows', + 'mobile': False + } + ) + + try: + resp = scraper.get(url, **kwargs) + return { + "cookies": dict(resp.cookies), + "content": resp.text, + "url": resp.url, + "status_code": resp.status_code, + "success": resp.status_code == 200 + } + except Exception as e: + return { + "success": False, + "error": str(e) + } + + +def get_cf_clearance(url: str, method: str = "auto"): + """ + Get Cloudflare clearance cookies + + Args: + url: Target URL + method: 'drission', 'flaresolverr', 'cloudscraper', or 'auto' + + Returns: + dict with cookies and user_agent for use in subsequent requests + """ + methods = { + "drission": bypass_cloudflare_drission, + "flaresolverr": bypass_cloudflare_flaresolverr, + "cloudscraper": bypass_cloudflare_cloudscraper + } + + if method != "auto": + return methods[method](url) + + # Auto: try methods in order of reliability + for name, func in [("drission", bypass_cloudflare_drission), + ("cloudscraper", bypass_cloudflare_cloudscraper), + ("flaresolverr", bypass_cloudflare_flaresolverr)]: + try: + result = func(url) + if result.get("success"): + result["method"] = name + return result + except Exception as e: + continue + + return {"success": False, "error": "All methods failed"} + + +def apply_cf_cookies_to_session(session: requests.Session, cf_result: dict): + """ + Apply Cloudflare bypass cookies to a requests Session + + Args: + session: requests.Session object + cf_result: Result from bypass functions + """ + if not cf_result.get("success"): + raise ValueError("Cannot apply failed CF result") + + cookies = cf_result.get("cookies", {}) + user_agent = cf_result.get("user_agent") + + # Handle both dict and list cookie formats + if isinstance(cookies, list): + for cookie in cookies: + session.cookies.set(cookie["name"], cookie["value"], domain=cookie.get("domain")) + else: + for name, value in cookies.items(): + session.cookies.set(name, value) + + if user_agent: + session.headers["User-Agent"] = user_agent + + +def test_cf_protection(url: str) -> dict: + """ + Test if a URL has Cloudflare protection + + Returns: + dict: {protected: bool, type: str, headers: dict} + """ + try: + resp = requests.get(url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + }, timeout=10, allow_redirects=True) + + cf_headers = {k: v for k, v in resp.headers.items() if k.lower().startswith("cf-")} + + is_protected = False + protection_type = None + + if resp.status_code == 403: + is_protected = True + protection_type = "blocked" + elif resp.status_code == 503: + is_protected = True + protection_type = "challenge" + elif "cf-ray" in resp.headers: + if "challenge" in resp.text.lower() or "__cf" in resp.text: + is_protected = True + protection_type = "js_challenge" + else: + protection_type = "cdn_only" + + return { + "protected": is_protected, + "type": protection_type, + "status_code": resp.status_code, + "cf_headers": cf_headers + } + + except Exception as e: + return { + "protected": None, + "error": str(e) + } + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Cloudflare Bypass') + parser.add_argument('url', help='Target URL') + parser.add_argument('--method', '-m', choices=['auto', 'drission', 'flaresolverr', 'cloudscraper'], + default='auto', help='Bypass method') + parser.add_argument('--test', '-t', action='store_true', help='Test if URL has CF protection') + parser.add_argument('--save-cookies', '-s', help='Save cookies to file') + + args = parser.parse_args() + + if args.test: + result = test_cf_protection(args.url) + print(json.dumps(result, indent=2)) + else: + print(f"Bypassing Cloudflare for: {args.url}") + result = get_cf_clearance(args.url, args.method) + + if result.get("success"): + print(f"✓ Success using method: {result.get('method', args.method)}") + print(f" Cookies: {len(result.get('cookies', {}))} items") + print(f" User-Agent: {result.get('user_agent', 'N/A')[:50]}...") + + if args.save_cookies: + Path(args.save_cookies).write_text(json.dumps({ + "cookies": result.get("cookies"), + "user_agent": result.get("user_agent") + }, indent=2)) + print(f" Saved to: {args.save_cookies}") + else: + print(f"✗ Failed: {result.get('error')}") + exit(1) diff --git a/scripts/login_session.py b/scripts/login_session.py new file mode 100644 index 0000000..b3ccb45 --- /dev/null +++ b/scripts/login_session.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +""" +Interactive Login Session Manager +Opens browser for manual login, then saves session for future headless use +""" + +import argparse +import json +import sys +import time +from pathlib import Path + +SESSIONS_DIR = Path.home() / ".clawdbot" / "browser-sessions" + + +def login_and_save(url: str, session_name: str, wait_for_url: str = None, timeout: int = 300): + """ + Open browser for manual login, wait for success, then save session + + Args: + url: Login page URL + session_name: Name for saved session + wait_for_url: URL pattern to wait for (indicates successful login) + timeout: Max seconds to wait for login + """ + from DrissionPage import ChromiumPage, ChromiumOptions + + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + + # Visible browser for manual login + options = ChromiumOptions() + options.set_argument('--disable-blink-features=AutomationControlled') + options.set_argument('--start-maximized') + options.set_user_agent( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/120.0.0.0 Safari/537.36' + ) + + page = ChromiumPage(options) + + try: + print(f"打开登录页面: {url}") + print(f"请在浏览器中手动登录...") + print(f"登录成功后会自动保存会话 (超时: {timeout}秒)") + print("-" * 50) + + page.get(url) + + start_time = time.time() + initial_url = page.url + logged_in = False + + while time.time() - start_time < timeout: + current_url = page.url + + # Check if URL changed (likely logged in) + if wait_for_url: + if wait_for_url in current_url: + logged_in = True + break + else: + # Heuristics: URL changed away from login page + if current_url != initial_url and 'login' not in current_url.lower(): + # Wait a bit more to ensure cookies are set + time.sleep(2) + logged_in = True + break + + time.sleep(1) + + # Show progress + elapsed = int(time.time() - start_time) + if elapsed % 10 == 0: + print(f"等待登录中... ({elapsed}秒)") + + if not logged_in: + # Ask user to confirm + print("\n未检测到自动跳转。是否已登录成功?") + confirm = input("输入 y 保存会话,n 取消: ").strip().lower() + if confirm != 'y': + print("已取消") + return None + + # Save session + session_data = { + "name": session_name, + "url": page.url, + "title": page.title, + "cookies": page.cookies.as_dict(), + "localStorage": {}, + "timestamp": time.time() + } + + try: + ls = page.run_js("return JSON.stringify(localStorage);") + session_data["localStorage"] = json.loads(ls) if ls else {} + except: + pass + + session_path = SESSIONS_DIR / f"{session_name}.json" + session_path.write_text(json.dumps(session_data, indent=2, ensure_ascii=False)) + + print("-" * 50) + print(f"✓ 会话已保存: {session_path}") + print(f" 当前页面: {page.title}") + print(f" Cookies: {len(session_data['cookies'])} 个") + print(f" localStorage: {len(session_data['localStorage'])} 项") + + return session_path + + finally: + page.quit() + + +def use_saved_session(url: str, session_name: str, headless: bool = True, action: str = None): + """ + Use a previously saved session + + Args: + url: URL to navigate to + session_name: Name of saved session + headless: Run in headless mode + action: Optional action to perform (screenshot, html, etc) + """ + from DrissionPage import ChromiumPage, ChromiumOptions + + session_path = SESSIONS_DIR / f"{session_name}.json" + if not session_path.exists(): + print(f"会话不存在: {session_name}") + print(f"请先运行: python login_session.py login -u -s {session_name}") + return None + + session_data = json.loads(session_path.read_text()) + + options = ChromiumOptions() + if headless: + options.headless() + options.set_argument('--disable-blink-features=AutomationControlled') + options.set_user_agent( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/120.0.0.0 Safari/537.36' + ) + + page = ChromiumPage(options) + + try: + # Navigate first to set domain + page.get(url) + + # Apply cookies + for name, value in session_data.get("cookies", {}).items(): + try: + page.cookies.set({name: value}) + except: + pass + + # Apply localStorage + for k, v in session_data.get("localStorage", {}).items(): + try: + v_escaped = json.dumps(v) if not isinstance(v, str) else f'"{v}"' + page.run_js(f"localStorage.setItem('{k}', {v_escaped});") + except: + pass + + # Refresh to apply + page.refresh() + page.wait.doc_loaded() + + print(f"✓ 已加载会话: {session_name}") + print(f" 当前页面: {page.title}") + print(f" URL: {page.url}") + + if action == "screenshot": + path = f"{session_name}_screenshot.png" + page.get_screenshot(path) + print(f" 截图: {path}") + elif action == "html": + print(page.html[:2000]) + + return page + + except Exception as e: + print(f"错误: {e}") + page.quit() + return None + + +def list_sessions(): + """List all saved sessions""" + if not SESSIONS_DIR.exists(): + print("暂无保存的会话") + return + + sessions = list(SESSIONS_DIR.glob("*.json")) + if not sessions: + print("暂无保存的会话") + return + + print(f"已保存的会话 ({len(sessions)} 个):") + print("-" * 60) + for path in sessions: + try: + data = json.loads(path.read_text()) + name = path.stem + url = data.get("url", "N/A") + cookies = len(data.get("cookies", {})) + print(f" {name:<20} | {cookies:>3} cookies | {url[:40]}") + except: + print(f" {path.stem:<20} | (无法读取)") + + +def delete_session(session_name: str): + """Delete a saved session""" + session_path = SESSIONS_DIR / f"{session_name}.json" + if session_path.exists(): + session_path.unlink() + print(f"已删除: {session_name}") + else: + print(f"会话不存在: {session_name}") + + +def main(): + parser = argparse.ArgumentParser(description='登录会话管理器') + subparsers = parser.add_subparsers(dest='command') + + # Login command + login_parser = subparsers.add_parser('login', help='打开浏览器登录并保存会话') + login_parser.add_argument('-u', '--url', required=True, help='登录页面URL') + login_parser.add_argument('-s', '--session', required=True, help='会话名称') + login_parser.add_argument('--wait-url', help='等待的目标URL (可选)') + login_parser.add_argument('--timeout', type=int, default=300, help='超时秒数') + + # Use command + use_parser = subparsers.add_parser('use', help='使用已保存的会话') + use_parser.add_argument('-u', '--url', required=True, help='目标URL') + use_parser.add_argument('-s', '--session', required=True, help='会话名称') + use_parser.add_argument('--headed', action='store_true', help='显示浏览器') + use_parser.add_argument('--screenshot', action='store_true', help='截图') + + # List command + list_parser = subparsers.add_parser('list', help='列出所有会话') + + # Delete command + del_parser = subparsers.add_parser('delete', help='删除会话') + del_parser.add_argument('session', help='会话名称') + + args = parser.parse_args() + + if args.command == 'login': + login_and_save(args.url, args.session, args.wait_url, args.timeout) + elif args.command == 'use': + action = 'screenshot' if args.screenshot else None + page = use_saved_session(args.url, args.session, not args.headed, action) + if page and args.headed: + input("按回车关闭浏览器...") + page.quit() + elif args.command == 'list': + list_sessions() + elif args.command == 'delete': + delete_session(args.session) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/scripts/proxy_rotate.py b/scripts/proxy_rotate.py new file mode 100644 index 0000000..09f8aa4 --- /dev/null +++ b/scripts/proxy_rotate.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +""" +Proxy Rotation Manager +Supports residential, datacenter, and SOCKS proxies +""" + +import json +import random +import time +import requests +from pathlib import Path +from typing import Optional, List, Dict +from dataclasses import dataclass +from collections import defaultdict + +SECRETS_DIR = Path.home() / ".clawdbot" / "secrets" + + +@dataclass +class ProxyInfo: + url: str + type: str # residential, datacenter, socks5 + country: Optional[str] = None + last_used: float = 0 + fail_count: int = 0 + success_count: int = 0 + + +class ProxyPool: + """Manage and rotate through proxy pool""" + + def __init__(self, config_path: Optional[Path] = None): + self.config_path = config_path or (SECRETS_DIR / "proxies.json") + self.proxies: List[ProxyInfo] = [] + self.stats: Dict[str, Dict] = defaultdict(lambda: {"success": 0, "fail": 0}) + self._load_config() + + def _load_config(self): + """Load proxies from config file""" + if not self.config_path.exists(): + return + + config = json.loads(self.config_path.read_text()) + + # Load residential proxies + for proxy in config.get("residential", []): + if isinstance(proxy, str): + self.proxies.append(ProxyInfo(url=proxy, type="residential")) + else: + self.proxies.append(ProxyInfo( + url=proxy.get("url"), + type="residential", + country=proxy.get("country") + )) + + # Load datacenter proxies + for proxy in config.get("datacenter", []): + if isinstance(proxy, str): + self.proxies.append(ProxyInfo(url=proxy, type="datacenter")) + else: + self.proxies.append(ProxyInfo( + url=proxy.get("url"), + type="datacenter", + country=proxy.get("country") + )) + + # Load rotating proxy (single endpoint) + rotating = config.get("rotating") + if rotating: + self.proxies.append(ProxyInfo(url=rotating, type="rotating")) + + def get_proxy(self, + proxy_type: Optional[str] = None, + country: Optional[str] = None, + exclude_failed: bool = True) -> Optional[str]: + """ + Get a proxy from the pool + + Args: + proxy_type: Filter by type (residential, datacenter, rotating) + country: Filter by country code + exclude_failed: Skip proxies with high fail rate + + Returns: + Proxy URL or None + """ + candidates = self.proxies.copy() + + if proxy_type: + candidates = [p for p in candidates if p.type == proxy_type] + + if country: + candidates = [p for p in candidates if p.country == country] + + if exclude_failed: + # Exclude proxies with >50% fail rate and at least 3 attempts + candidates = [p for p in candidates + if p.success_count + p.fail_count < 3 or + p.fail_count / (p.success_count + p.fail_count) < 0.5] + + if not candidates: + return None + + # Prefer least recently used + candidates.sort(key=lambda p: p.last_used) + chosen = candidates[0] + chosen.last_used = time.time() + + return chosen.url + + def mark_success(self, proxy_url: str): + """Mark proxy as successful""" + for p in self.proxies: + if p.url == proxy_url: + p.success_count += 1 + break + self.stats[proxy_url]["success"] += 1 + + def mark_failed(self, proxy_url: str): + """Mark proxy as failed""" + for p in self.proxies: + if p.url == proxy_url: + p.fail_count += 1 + break + self.stats[proxy_url]["fail"] += 1 + + def get_stats(self) -> Dict: + """Get proxy usage statistics""" + return { + "total": len(self.proxies), + "by_type": { + "residential": len([p for p in self.proxies if p.type == "residential"]), + "datacenter": len([p for p in self.proxies if p.type == "datacenter"]), + "rotating": len([p for p in self.proxies if p.type == "rotating"]) + }, + "usage": dict(self.stats) + } + + +def test_proxy(proxy_url: str, test_url: str = "https://httpbin.org/ip", timeout: int = 10) -> Dict: + """ + Test if a proxy is working + + Returns: + dict: {success: bool, ip: str, latency_ms: int, error: str} + """ + proxies = { + "http": proxy_url, + "https": proxy_url + } + + start = time.time() + try: + resp = requests.get(test_url, proxies=proxies, timeout=timeout) + latency = int((time.time() - start) * 1000) + + if resp.status_code == 200: + data = resp.json() + return { + "success": True, + "ip": data.get("origin"), + "latency_ms": latency + } + else: + return { + "success": False, + "error": f"HTTP {resp.status_code}", + "latency_ms": latency + } + + except Exception as e: + return { + "success": False, + "error": str(e) + } + + +def get_my_ip() -> str: + """Get current public IP without proxy""" + try: + return requests.get("https://httpbin.org/ip", timeout=5).json()["origin"] + except: + return "unknown" + + +def create_proxy_config_template(): + """Create template proxies.json""" + template = { + "rotating": "http://user:pass@rotating-proxy.provider.com:port", + "residential": [ + "socks5://user:pass@residential1.provider.com:port", + "socks5://user:pass@residential2.provider.com:port" + ], + "datacenter": [ + "http://user:pass@dc1.provider.com:port", + "http://user:pass@dc2.provider.com:port" + ], + "_comment": "Replace with your actual proxy credentials. Types: http, https, socks5" + } + + SECRETS_DIR.mkdir(parents=True, exist_ok=True) + config_path = SECRETS_DIR / "proxies.json" + + if not config_path.exists(): + config_path.write_text(json.dumps(template, indent=2)) + print(f"Created template: {config_path}") + return config_path + else: + print(f"Config already exists: {config_path}") + return None + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Proxy Manager') + subparsers = parser.add_subparsers(dest='command') + + # Get proxy + get_parser = subparsers.add_parser('get', help='Get a proxy') + get_parser.add_argument('--type', '-t', choices=['residential', 'datacenter', 'rotating'], + help='Proxy type') + get_parser.add_argument('--country', '-c', help='Country code') + + # Test proxy + test_parser = subparsers.add_parser('test', help='Test a proxy') + test_parser.add_argument('proxy', help='Proxy URL') + + # Test all + test_all_parser = subparsers.add_parser('test-all', help='Test all proxies') + + # Stats + stats_parser = subparsers.add_parser('stats', help='Show statistics') + + # Init config + init_parser = subparsers.add_parser('init', help='Create config template') + + # My IP + myip_parser = subparsers.add_parser('myip', help='Show current IP') + + args = parser.parse_args() + + if args.command == 'get': + pool = ProxyPool() + proxy = pool.get_proxy(proxy_type=args.type, country=args.country) + if proxy: + print(proxy) + else: + print("No proxy available") + exit(1) + + elif args.command == 'test': + result = test_proxy(args.proxy) + print(json.dumps(result, indent=2)) + if not result["success"]: + exit(1) + + elif args.command == 'test-all': + pool = ProxyPool() + print(f"Testing {len(pool.proxies)} proxies...") + for p in pool.proxies: + result = test_proxy(p.url) + status = "✓" if result.get("success") else "✗" + ip = result.get("ip", result.get("error", "N/A")) + latency = result.get("latency_ms", "N/A") + print(f"{status} [{p.type}] {p.url[:40]}... -> {ip} ({latency}ms)") + + elif args.command == 'stats': + pool = ProxyPool() + print(json.dumps(pool.get_stats(), indent=2)) + + elif args.command == 'init': + create_proxy_config_template() + + elif args.command == 'myip': + print(get_my_ip()) + + else: + parser.print_help() diff --git a/scripts/session_manager.py b/scripts/session_manager.py new file mode 100644 index 0000000..1170275 --- /dev/null +++ b/scripts/session_manager.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +""" +Browser Session Manager +Handles cookie persistence, localStorage sync, and multi-profile management +""" + +import json +import time +import sqlite3 +from pathlib import Path +from datetime import datetime +from typing import Optional, Dict, List, Any + +SESSIONS_DIR = Path.home() / ".clawdbot" / "browser-sessions" +PROFILES_DIR = Path.home() / ".clawdbot" / "browser-profiles" + + +def init_dirs(): + """Initialize storage directories""" + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + PROFILES_DIR.mkdir(parents=True, exist_ok=True) + + +class SessionManager: + """Manage browser sessions with cookie and localStorage persistence""" + + def __init__(self, session_name: str): + init_dirs() + self.session_name = session_name + self.session_file = SESSIONS_DIR / f"{session_name}.json" + self.data = self._load() + + def _load(self) -> dict: + """Load session data from file""" + if self.session_file.exists(): + return json.loads(self.session_file.read_text()) + return { + "name": self.session_name, + "created": datetime.now().isoformat(), + "updated": None, + "cookies": {}, + "localStorage": {}, + "metadata": {} + } + + def save(self): + """Save session data to file""" + self.data["updated"] = datetime.now().isoformat() + self.session_file.write_text(json.dumps(self.data, indent=2)) + + def set_cookies(self, cookies: Dict[str, Any], domain: str = None): + """Store cookies, optionally grouped by domain""" + if domain: + if "cookies_by_domain" not in self.data: + self.data["cookies_by_domain"] = {} + self.data["cookies_by_domain"][domain] = cookies + else: + self.data["cookies"] = cookies + self.save() + + def get_cookies(self, domain: str = None) -> Dict[str, Any]: + """Get cookies, optionally for specific domain""" + if domain and "cookies_by_domain" in self.data: + return self.data["cookies_by_domain"].get(domain, {}) + return self.data.get("cookies", {}) + + def set_local_storage(self, ls_data: dict, origin: str = None): + """Store localStorage data""" + if origin: + if "localStorage_by_origin" not in self.data: + self.data["localStorage_by_origin"] = {} + self.data["localStorage_by_origin"][origin] = ls_data + else: + self.data["localStorage"] = ls_data + self.save() + + def get_local_storage(self, origin: str = None) -> dict: + """Get localStorage data""" + if origin and "localStorage_by_origin" in self.data: + return self.data["localStorage_by_origin"].get(origin, {}) + return self.data.get("localStorage", {}) + + def set_metadata(self, key: str, value: Any): + """Store arbitrary metadata""" + self.data["metadata"][key] = value + self.save() + + def get_metadata(self, key: str, default: Any = None) -> Any: + """Get metadata value""" + return self.data["metadata"].get(key, default) + + def export_for_browser(self, browser_type: str = "drission") -> dict: + """Export session in format suitable for browser injection""" + return { + "cookies": self.data.get("cookies", {}), + "localStorage": self.data.get("localStorage", {}), + "format": browser_type + } + + def import_from_browser(self, page, browser_type: str = "drission"): + """Import cookies and localStorage from active browser page""" + if browser_type == "drission": + self.data["cookies"] = page.cookies.as_dict() + try: + ls = page.run_js("return JSON.stringify(localStorage);") + self.data["localStorage"] = json.loads(ls) if ls else {} + except: + pass + self.data["metadata"]["url"] = page.url + self.data["metadata"]["title"] = page.title + else: # selenium/undetected + # Convert cookie list to dict + cookies = {} + for c in page.get_cookies(): + cookies[c["name"]] = { + "value": c["value"], + "domain": c.get("domain"), + "path": c.get("path"), + "secure": c.get("secure"), + "httpOnly": c.get("httpOnly") + } + self.data["cookies"] = cookies + try: + ls = page.execute_script("return JSON.stringify(localStorage);") + self.data["localStorage"] = json.loads(ls) if ls else {} + except: + pass + self.data["metadata"]["url"] = page.current_url + self.data["metadata"]["title"] = page.title + + self.save() + + def apply_to_browser(self, page, browser_type: str = "drission"): + """Apply saved session to browser page""" + if browser_type == "drission": + # Set cookies + for name, cookie_data in self.data.get("cookies", {}).items(): + if isinstance(cookie_data, str): + page.cookies.set({name: cookie_data}) + else: + page.cookies.set({name: cookie_data.get("value", "")}) + + # Set localStorage + ls = self.data.get("localStorage", {}) + if ls: + for k, v in ls.items(): + v_escaped = json.dumps(v) if not isinstance(v, str) else f'"{v}"' + page.run_js(f"localStorage.setItem('{k}', {v_escaped});") + else: # selenium + for name, cookie_data in self.data.get("cookies", {}).items(): + try: + if isinstance(cookie_data, str): + page.add_cookie({"name": name, "value": cookie_data}) + else: + page.add_cookie({ + "name": name, + "value": cookie_data.get("value", ""), + "domain": cookie_data.get("domain"), + "path": cookie_data.get("path", "/"), + "secure": cookie_data.get("secure", False) + }) + except: + pass + + ls = self.data.get("localStorage", {}) + if ls: + for k, v in ls.items(): + v_escaped = json.dumps(v) if not isinstance(v, str) else f'"{v}"' + page.execute_script(f"localStorage.setItem('{k}', {v_escaped});") + + +def list_sessions() -> List[dict]: + """List all saved sessions""" + init_dirs() + sessions = [] + for f in SESSIONS_DIR.glob("*.json"): + try: + data = json.loads(f.read_text()) + sessions.append({ + "name": f.stem, + "created": data.get("created"), + "updated": data.get("updated"), + "url": data.get("metadata", {}).get("url"), + "cookies_count": len(data.get("cookies", {})) + }) + except: + pass + return sessions + + +def delete_session(session_name: str) -> bool: + """Delete a saved session""" + session_file = SESSIONS_DIR / f"{session_name}.json" + if session_file.exists(): + session_file.unlink() + return True + return False + + +def create_profile(profile_name: str) -> Path: + """Create a new browser profile directory""" + init_dirs() + profile_path = PROFILES_DIR / profile_name + profile_path.mkdir(exist_ok=True) + return profile_path + + +def get_profile_path(profile_name: str) -> Optional[Path]: + """Get path to existing profile or None""" + profile_path = PROFILES_DIR / profile_name + return profile_path if profile_path.exists() else None + + +def list_profiles() -> List[str]: + """List all browser profiles""" + init_dirs() + return [d.name for d in PROFILES_DIR.iterdir() if d.is_dir()] + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Session Manager') + subparsers = parser.add_subparsers(dest='command') + + # List sessions + list_parser = subparsers.add_parser('list', help='List sessions') + + # Show session + show_parser = subparsers.add_parser('show', help='Show session details') + show_parser.add_argument('name', help='Session name') + + # Delete session + del_parser = subparsers.add_parser('delete', help='Delete session') + del_parser.add_argument('name', help='Session name') + + # List profiles + profiles_parser = subparsers.add_parser('profiles', help='List profiles') + + # Create profile + create_parser = subparsers.add_parser('create-profile', help='Create profile') + create_parser.add_argument('name', help='Profile name') + + args = parser.parse_args() + + if args.command == 'list': + sessions = list_sessions() + if sessions: + print(f"{'Name':<20} {'Updated':<25} {'URL':<40} {'Cookies'}") + print("-" * 100) + for s in sessions: + print(f"{s['name']:<20} {s.get('updated', 'N/A')[:25]:<25} {(s.get('url') or 'N/A')[:40]:<40} {s['cookies_count']}") + else: + print("No sessions found") + + elif args.command == 'show': + sm = SessionManager(args.name) + print(json.dumps(sm.data, indent=2)) + + elif args.command == 'delete': + if delete_session(args.name): + print(f"Deleted: {args.name}") + else: + print(f"Session not found: {args.name}") + + elif args.command == 'profiles': + profiles = list_profiles() + if profiles: + for p in profiles: + print(p) + else: + print("No profiles found") + + elif args.command == 'create-profile': + path = create_profile(args.name) + print(f"Created: {path}") + + else: + parser.print_help() diff --git a/scripts/smart_login.py b/scripts/smart_login.py new file mode 100644 index 0000000..a62676a --- /dev/null +++ b/scripts/smart_login.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- +""" +智能登录脚本 - 先静默尝试,失败再显示窗口 +支持断点续传和错误恢复 +""" +from DrissionPage import ChromiumPage, ChromiumOptions +import time +import json +import sys +from pathlib import Path +from datetime import datetime + +SESSIONS_DIR = Path.home() / '.clawdbot' / 'browser-sessions' +ATTEMPTS_LOG = Path.home() / '.clawdbot' / 'browser-sessions' / 'attempts.json' + +# 设置输出编码 +if sys.platform == 'win32': + sys.stdout.reconfigure(encoding='utf-8', errors='replace') + sys.stderr.reconfigure(encoding='utf-8', errors='replace') + +def log_attempt(url, success, method, notes=''): + """记录尝试过的网站""" + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + attempts = [] + if ATTEMPTS_LOG.exists(): + attempts = json.loads(ATTEMPTS_LOG.read_text()) + + attempts.append({ + 'url': url, + 'success': success, + 'method': method, # headless / headed + 'notes': notes, + 'timestamp': time.time() + }) + ATTEMPTS_LOG.write_text(json.dumps(attempts, indent=2, ensure_ascii=False)) + +def get_browser(headless=True): + """获取浏览器实例""" + options = ChromiumOptions() + if headless: + options.headless() + options.set_argument('--disable-blink-features=AutomationControlled') + options.set_argument('--no-sandbox') + options.set_argument('--disable-dev-shm-usage') + options.set_user_agent( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/120.0.0.0 Safari/537.36' + ) + return ChromiumPage(options) + +def save_session(page, name): + """保存session""" + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + session_data = { + 'name': name, + 'url': page.url, + 'title': page.title, + 'cookies': dict(page.cookies()), # 用dict()转换 + 'timestamp': time.time() + } + path = SESSIONS_DIR / f'{name}.json' + path.write_text(json.dumps(session_data, indent=2, ensure_ascii=False)) + print(f'Session saved: {path}') + return path + +def load_session(page, name): + """加载session""" + path = SESSIONS_DIR / f'{name}.json' + if not path.exists(): + return False + data = json.loads(path.read_text()) + for k, v in data.get('cookies', {}).items(): + try: + page.set.cookies({k: v}) + except: + pass + return True + +def smart_login(url, session_name, account=None, password=None): + """ + 智能登录 - 先静默尝试,失败再显示窗口 + """ + print(f'=== 登录 {url} ===') + + # 第一步:静默尝试加载已有session + print('1. 尝试静默加载已有session...') + page = get_browser(headless=True) + try: + page.get(url) + time.sleep(2) + + if load_session(page, session_name): + page.refresh() + time.sleep(3) + # 检查是否登录成功(简单检查:不在登录页) + if 'login' not in page.url.lower() and 'passport' not in page.url.lower(): + print('已有session有效,登录成功!') + log_attempt(url, True, 'headless', 'session_reuse') + return page + + print('无有效session') + except Exception as e: + print(f'静默尝试失败: {e}') + finally: + page.quit() + + # 第二步:如果有账号密码,尝试静默登录 + if account and password: + print('2. 尝试静默自动登录...') + page = get_browser(headless=True) + try: + page.get(url) + time.sleep(3) + + # 找输入框 + inputs = page.eles('tag:input') + text_input = None + pwd_input = None + for inp in inputs: + t = inp.attr('type') or '' + if t == 'text' or t == 'tel' or t == 'email': + if not text_input: + text_input = inp + if t == 'password': + pwd_input = inp + + if text_input and pwd_input: + text_input.clear() + text_input.input(account) + time.sleep(0.3) + pwd_input.clear() + pwd_input.input(password) + time.sleep(0.3) + + # 找登录按钮并点击 + btn = page.ele('tag:button') or page.ele('.btn') or page.ele('[type=submit]') + if btn: + try: + page.run_js('arguments[0].click()', btn) + except: + btn.click() + + time.sleep(5) + + # 检查是否成功 + if 'login' not in page.url.lower() and 'passport' not in page.url.lower(): + print('静默登录成功!') + save_session(page, session_name) + log_attempt(url, True, 'headless', 'auto_login') + return page + else: + print('可能需要验证码,切换到显示模式') + except Exception as e: + print(f'静默登录失败: {e}') + finally: + page.quit() + + # 第三步:显示窗口让用户手动操作 + print('3. 打开浏览器窗口,请手动登录...') + page = get_browser(headless=False) + try: + page.get(url) + + # 如果有账号密码,先填入 + if account and password: + time.sleep(3) + inputs = page.eles('tag:input') + for inp in inputs: + t = inp.attr('type') or '' + if t in ('text', 'tel', 'email'): + inp.clear() + inp.input(account) + elif t == 'password': + inp.clear() + inp.input(password) + + # 点击登录按钮 + time.sleep(0.5) + btns = page.eles('tag:button') + for btn in btns: + txt = btn.text.lower() if btn.text else '' + if '登录' in txt or 'login' in txt or '登入' in txt: + try: + page.run_js('arguments[0].click()', btn) + print('已自动点击登录按钮') + except: + pass + break + + # 等待登录完成 + print('等待登录完成...(检测到跳转后自动保存)') + original_url = page.url + for i in range(120): # 最多等2分钟 + time.sleep(1) + current = page.url + if 'login' not in current.lower() and 'passport' not in current.lower(): + if current != original_url: + print('检测到登录成功!') + time.sleep(2) # 等cookie写入 + break + if i % 15 == 0 and i > 0: + print(f'等待中... {i}秒') + + save_session(page, session_name) + log_attempt(url, True, 'headed', 'manual_login') + print('登录完成,session已保存') + return page + + except Exception as e: + print(f'错误: {e}') + log_attempt(url, False, 'headed', str(e)) + page.quit() + return None + +if __name__ == '__main__': + import sys + if len(sys.argv) < 3: + print('Usage: python smart_login.py [account] [password]') + sys.exit(1) + + url = sys.argv[1] + name = sys.argv[2] + account = sys.argv[3] if len(sys.argv) > 3 else None + password = sys.argv[4] if len(sys.argv) > 4 else None + + page = smart_login(url, name, account, password) + if page: + print(f'当前页面: {page.title}') + print(f'URL: {page.url}') + page.quit() diff --git a/scripts/solve_captcha.py b/scripts/solve_captcha.py new file mode 100644 index 0000000..4056bd0 --- /dev/null +++ b/scripts/solve_captcha.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python3 +""" +CAPTCHA Solving Utilities +Supports 2Captcha, Anti-Captcha, and CapSolver +""" + +import json +import time +import requests +from pathlib import Path + +SECRETS_DIR = Path.home() / ".clawdbot" / "secrets" + + +def load_api_keys(): + """Load CAPTCHA API keys from secrets""" + key_file = SECRETS_DIR / "captcha.json" + if key_file.exists(): + return json.loads(key_file.read_text()) + return {} + + +def solve_recaptcha_v2(site_key: str, page_url: str, invisible: bool = False, provider: str = None) -> str: + """ + Solve reCAPTCHA v2 + + Args: + site_key: The site key (data-sitekey attribute) + page_url: The page URL where CAPTCHA is displayed + invisible: Whether it's invisible reCAPTCHA + provider: Force specific provider (2captcha, anticaptcha, capsolver) + + Returns: + CAPTCHA token or None if failed + """ + keys = load_api_keys() + + # Try providers in order + providers = [provider] if provider else ['capsolver', '2captcha', 'anticaptcha'] + + for p in providers: + if p == '2captcha' and keys.get('2captcha'): + return _solve_2captcha_recaptcha(keys['2captcha'], site_key, page_url, invisible) + elif p == 'anticaptcha' and keys.get('anticaptcha'): + return _solve_anticaptcha_recaptcha(keys['anticaptcha'], site_key, page_url, invisible) + elif p == 'capsolver' and keys.get('capsolver'): + return _solve_capsolver_recaptcha(keys['capsolver'], site_key, page_url, invisible) + + raise ValueError("No CAPTCHA API keys configured. Add keys to ~/.clawdbot/secrets/captcha.json") + + +def solve_recaptcha_v3(site_key: str, page_url: str, action: str = "verify", min_score: float = 0.7, provider: str = None) -> str: + """ + Solve reCAPTCHA v3 + + Args: + site_key: The site key + page_url: The page URL + action: The action value (usually found in grecaptcha.execute call) + min_score: Minimum required score (0.1-0.9) + provider: Force specific provider + + Returns: + CAPTCHA token or None if failed + """ + keys = load_api_keys() + + if keys.get('2captcha'): + api_key = keys['2captcha'] + resp = requests.post("http://2captcha.com/in.php", data={ + "key": api_key, + "method": "userrecaptcha", + "googlekey": site_key, + "pageurl": page_url, + "version": "v3", + "action": action, + "min_score": min_score, + "json": 1 + }).json() + + if resp.get("status") != 1: + raise ValueError(f"2Captcha error: {resp.get('request')}") + + task_id = resp["request"] + return _poll_2captcha(api_key, task_id) + + raise ValueError("No reCAPTCHA v3 provider available") + + +def solve_hcaptcha(site_key: str, page_url: str, provider: str = None) -> str: + """ + Solve hCaptcha + + Args: + site_key: The site key (data-sitekey attribute) + page_url: The page URL + provider: Force specific provider + + Returns: + CAPTCHA token or None if failed + """ + keys = load_api_keys() + + if keys.get('anticaptcha'): + api_key = keys['anticaptcha'] + resp = requests.post("https://api.anti-captcha.com/createTask", json={ + "clientKey": api_key, + "task": { + "type": "HCaptchaTaskProxyless", + "websiteURL": page_url, + "websiteKey": site_key + } + }).json() + + if resp.get("errorId"): + raise ValueError(f"Anti-Captcha error: {resp.get('errorDescription')}") + + task_id = resp["taskId"] + return _poll_anticaptcha(api_key, task_id) + + if keys.get('2captcha'): + api_key = keys['2captcha'] + resp = requests.post("http://2captcha.com/in.php", data={ + "key": api_key, + "method": "hcaptcha", + "sitekey": site_key, + "pageurl": page_url, + "json": 1 + }).json() + + if resp.get("status") != 1: + raise ValueError(f"2Captcha error: {resp.get('request')}") + + task_id = resp["request"] + return _poll_2captcha(api_key, task_id) + + raise ValueError("No hCaptcha provider available") + + +def solve_turnstile(site_key: str, page_url: str, provider: str = None) -> str: + """ + Solve Cloudflare Turnstile + + Args: + site_key: The Turnstile site key + page_url: The page URL + provider: Force specific provider + + Returns: + CAPTCHA token or None if failed + """ + keys = load_api_keys() + + if keys.get('capsolver'): + api_key = keys['capsolver'] + resp = requests.post("https://api.capsolver.com/createTask", json={ + "clientKey": api_key, + "task": { + "type": "AntiTurnstileTaskProxyLess", + "websiteURL": page_url, + "websiteKey": site_key + } + }).json() + + if resp.get("errorId"): + raise ValueError(f"CapSolver error: {resp.get('errorDescription')}") + + task_id = resp["taskId"] + return _poll_capsolver(api_key, task_id) + + if keys.get('2captcha'): + api_key = keys['2captcha'] + resp = requests.post("http://2captcha.com/in.php", data={ + "key": api_key, + "method": "turnstile", + "sitekey": site_key, + "pageurl": page_url, + "json": 1 + }).json() + + if resp.get("status") != 1: + raise ValueError(f"2Captcha error: {resp.get('request')}") + + task_id = resp["request"] + return _poll_2captcha(api_key, task_id) + + raise ValueError("No Turnstile provider available") + + +def inject_captcha_token(page, token: str, captcha_type: str = "recaptcha"): + """ + Inject solved CAPTCHA token into page + + Args: + page: DrissionPage or Selenium driver + token: The solved CAPTCHA token + captcha_type: recaptcha, hcaptcha, or turnstile + """ + if captcha_type == "recaptcha": + js = f""" + document.getElementById('g-recaptcha-response').innerHTML = '{token}'; + if (typeof ___grecaptcha_cfg !== 'undefined') {{ + Object.entries(___grecaptcha_cfg.clients).forEach(([k,v]) => {{ + if (v.callback) v.callback('{token}'); + }}); + }} + """ + elif captcha_type == "hcaptcha": + js = f""" + document.querySelector('[name="h-captcha-response"]').value = '{token}'; + document.querySelector('[name="g-recaptcha-response"]').value = '{token}'; + """ + elif captcha_type == "turnstile": + js = f""" + document.querySelector('[name="cf-turnstile-response"]').value = '{token}'; + """ + else: + raise ValueError(f"Unknown captcha type: {captcha_type}") + + # Execute based on driver type + if hasattr(page, 'run_js'): + page.run_js(js) + else: + page.execute_script(js) + + +# Private helper functions + +def _solve_2captcha_recaptcha(api_key, site_key, page_url, invisible=False): + resp = requests.post("http://2captcha.com/in.php", data={ + "key": api_key, + "method": "userrecaptcha", + "googlekey": site_key, + "pageurl": page_url, + "invisible": 1 if invisible else 0, + "json": 1 + }).json() + + if resp.get("status") != 1: + raise ValueError(f"2Captcha error: {resp.get('request')}") + + return _poll_2captcha(api_key, resp["request"]) + + +def _solve_anticaptcha_recaptcha(api_key, site_key, page_url, invisible=False): + task_type = "RecaptchaV2TaskProxyless" + if invisible: + task_type = "RecaptchaV2EnterpriseTaskProxyless" + + resp = requests.post("https://api.anti-captcha.com/createTask", json={ + "clientKey": api_key, + "task": { + "type": task_type, + "websiteURL": page_url, + "websiteKey": site_key, + "isInvisible": invisible + } + }).json() + + if resp.get("errorId"): + raise ValueError(f"Anti-Captcha error: {resp.get('errorDescription')}") + + return _poll_anticaptcha(api_key, resp["taskId"]) + + +def _solve_capsolver_recaptcha(api_key, site_key, page_url, invisible=False): + resp = requests.post("https://api.capsolver.com/createTask", json={ + "clientKey": api_key, + "task": { + "type": "ReCaptchaV2TaskProxyLess", + "websiteURL": page_url, + "websiteKey": site_key, + "isInvisible": invisible + } + }).json() + + if resp.get("errorId"): + raise ValueError(f"CapSolver error: {resp.get('errorDescription')}") + + return _poll_capsolver(api_key, resp["taskId"]) + + +def _poll_2captcha(api_key, task_id, max_attempts=60): + for _ in range(max_attempts): + time.sleep(3) + result = requests.get( + f"http://2captcha.com/res.php?key={api_key}&action=get&id={task_id}&json=1" + ).json() + + if result.get("status") == 1: + return result["request"] + elif result.get("request") != "CAPCHA_NOT_READY": + raise ValueError(f"2Captcha error: {result.get('request')}") + + raise TimeoutError("CAPTCHA solving timed out") + + +def _poll_anticaptcha(api_key, task_id, max_attempts=60): + for _ in range(max_attempts): + time.sleep(3) + result = requests.post("https://api.anti-captcha.com/getTaskResult", json={ + "clientKey": api_key, + "taskId": task_id + }).json() + + if result.get("status") == "ready": + return result["solution"]["gRecaptchaResponse"] + elif result.get("errorId"): + raise ValueError(f"Anti-Captcha error: {result.get('errorDescription')}") + + raise TimeoutError("CAPTCHA solving timed out") + + +def _poll_capsolver(api_key, task_id, max_attempts=60): + for _ in range(max_attempts): + time.sleep(3) + result = requests.post("https://api.capsolver.com/getTaskResult", json={ + "clientKey": api_key, + "taskId": task_id + }).json() + + if result.get("status") == "ready": + return result["solution"].get("gRecaptchaResponse") or result["solution"].get("token") + elif result.get("errorId"): + raise ValueError(f"CapSolver error: {result.get('errorDescription')}") + + raise TimeoutError("CAPTCHA solving timed out") + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Solve CAPTCHAs') + parser.add_argument('--type', '-t', choices=['recaptcha2', 'recaptcha3', 'hcaptcha', 'turnstile'], + required=True, help='CAPTCHA type') + parser.add_argument('--sitekey', '-k', required=True, help='Site key') + parser.add_argument('--url', '-u', required=True, help='Page URL') + parser.add_argument('--action', '-a', default='verify', help='Action (reCAPTCHA v3)') + parser.add_argument('--provider', '-p', help='Force specific provider') + + args = parser.parse_args() + + try: + if args.type == 'recaptcha2': + token = solve_recaptcha_v2(args.sitekey, args.url, provider=args.provider) + elif args.type == 'recaptcha3': + token = solve_recaptcha_v3(args.sitekey, args.url, args.action, provider=args.provider) + elif args.type == 'hcaptcha': + token = solve_hcaptcha(args.sitekey, args.url, provider=args.provider) + elif args.type == 'turnstile': + token = solve_turnstile(args.sitekey, args.url, provider=args.provider) + + print(f"Token: {token}") + except Exception as e: + print(f"Error: {e}") + exit(1) diff --git a/scripts/stealth_session.py b/scripts/stealth_session.py new file mode 100644 index 0000000..c39f50f --- /dev/null +++ b/scripts/stealth_session.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +Stealth Browser Session Manager +Supports undetected-chromedriver and DrissionPage backends +""" + +import argparse +import json +import sys +import time +from pathlib import Path + +SESSIONS_DIR = Path.home() / ".clawdbot" / "browser-sessions" +SECRETS_DIR = Path.home() / ".clawdbot" / "secrets" + + +def get_drissionpage(headless=True, proxy=None, user_agent=None): + """Initialize DrissionPage with stealth options""" + from DrissionPage import ChromiumPage, ChromiumOptions + + options = ChromiumOptions() + + if headless: + options.headless() + + # Anti-detection flags + options.set_argument('--disable-blink-features=AutomationControlled') + options.set_argument('--disable-dev-shm-usage') + options.set_argument('--no-sandbox') + options.set_argument('--disable-infobars') + options.set_argument('--disable-extensions') + options.set_argument('--disable-gpu') + options.set_argument('--lang=en-US') + + if user_agent: + options.set_user_agent(user_agent) + else: + options.set_user_agent( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/120.0.0.0 Safari/537.36' + ) + + if proxy: + options.set_proxy(proxy) + + return ChromiumPage(options) + + +def get_undetected_chrome(headless=True, proxy=None, user_agent=None): + """Initialize undetected-chromedriver""" + import undetected_chromedriver as uc + + options = uc.ChromeOptions() + + if headless: + options.add_argument('--headless=new') + + options.add_argument('--disable-blink-features=AutomationControlled') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + + if user_agent: + options.add_argument(f'--user-agent={user_agent}') + + if proxy: + options.add_argument(f'--proxy-server={proxy}') + + driver = uc.Chrome(options=options, use_subprocess=True) + return driver + + +def save_session(driver_or_page, session_name, backend='drission'): + """Save cookies and localStorage for session persistence""" + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + session_path = SESSIONS_DIR / f"{session_name}.json" + + session_data = { + "cookies": [], + "localStorage": {}, + "backend": backend, + "timestamp": time.time() + } + + if backend == 'drission': + session_data["cookies"] = driver_or_page.cookies.as_dict() + try: + ls = driver_or_page.run_js("return JSON.stringify(localStorage);") + session_data["localStorage"] = json.loads(ls) if ls else {} + except: + pass + else: # selenium/undetected + session_data["cookies"] = driver_or_page.get_cookies() + try: + ls = driver_or_page.execute_script("return JSON.stringify(localStorage);") + session_data["localStorage"] = json.loads(ls) if ls else {} + except: + pass + + session_path.write_text(json.dumps(session_data, indent=2)) + print(f"Session saved: {session_path}") + return session_path + + +def load_session(driver_or_page, session_name, backend='drission'): + """Load cookies and localStorage from saved session""" + session_path = SESSIONS_DIR / f"{session_name}.json" + + if not session_path.exists(): + print(f"No session found: {session_name}") + return False + + session_data = json.loads(session_path.read_text()) + + if backend == 'drission': + for name, value in session_data.get("cookies", {}).items(): + driver_or_page.cookies.set({name: value}) + + ls_data = session_data.get("localStorage", {}) + if ls_data: + for k, v in ls_data.items(): + driver_or_page.run_js(f"localStorage.setItem('{k}', '{v}');") + else: # selenium/undetected + for cookie in session_data.get("cookies", []): + try: + driver_or_page.add_cookie(cookie) + except: + pass + + ls_data = session_data.get("localStorage", {}) + if ls_data: + for k, v in ls_data.items(): + driver_or_page.execute_script(f"localStorage.setItem('{k}', '{v}');") + + print(f"Session loaded: {session_name}") + return True + + +def get_proxy(): + """Get proxy from configuration""" + proxy_file = SECRETS_DIR / "proxies.json" + if proxy_file.exists(): + import random + config = json.loads(proxy_file.read_text()) + proxies = config.get("residential", []) + if proxies: + return random.choice(proxies) + return config.get("rotating") + return None + + +def main(): + parser = argparse.ArgumentParser(description='Stealth Browser Session') + parser.add_argument('--url', '-u', help='URL to open') + parser.add_argument('--session', '-s', help='Session name for persistence') + parser.add_argument('--backend', '-b', choices=['drission', 'undetected'], + default='drission', help='Browser backend') + parser.add_argument('--headless', action='store_true', default=True, + help='Run headless (default: True)') + parser.add_argument('--headed', action='store_true', + help='Show browser window') + parser.add_argument('--proxy', '-p', help='Proxy URL') + parser.add_argument('--rotate-proxy', action='store_true', + help='Use rotating proxy from config') + parser.add_argument('--save', action='store_true', + help='Save session after operation') + parser.add_argument('--load', action='store_true', + help='Load existing session') + parser.add_argument('--screenshot', help='Take screenshot to path') + parser.add_argument('--wait', type=int, default=5, + help='Wait seconds after page load') + parser.add_argument('--test-stealth', action='store_true', + help='Test anti-detection on bot.sannysoft.com') + + args = parser.parse_args() + + headless = not args.headed + proxy = args.proxy + + if args.rotate_proxy: + proxy = get_proxy() + if proxy: + print(f"Using proxy: {proxy[:30]}...") + + # Initialize browser + if args.backend == 'drission': + browser = get_drissionpage(headless=headless, proxy=proxy) + else: + browser = get_undetected_chrome(headless=headless, proxy=proxy) + + try: + # Load session if requested + if args.load and args.session: + # Navigate to domain first + if args.url: + if args.backend == 'drission': + browser.get(args.url) + else: + browser.get(args.url) + load_session(browser, args.session, args.backend) + # Refresh to apply cookies + if args.backend == 'drission': + browser.refresh() + else: + browser.refresh() + + # Navigate + url = args.url + if args.test_stealth: + url = "https://bot.sannysoft.com" + + if url: + print(f"Opening: {url}") + if args.backend == 'drission': + browser.get(url) + browser.wait.doc_loaded() + else: + browser.get(url) + + time.sleep(args.wait) + + # Screenshot + if args.screenshot: + if args.backend == 'drission': + browser.get_screenshot(args.screenshot) + else: + browser.save_screenshot(args.screenshot) + print(f"Screenshot saved: {args.screenshot}") + + # Save session if requested + if args.save and args.session: + save_session(browser, args.session, args.backend) + + # Print current URL and title + if args.backend == 'drission': + print(f"Title: {browser.title}") + print(f"URL: {browser.url}") + else: + print(f"Title: {browser.title}") + print(f"URL: {browser.current_url}") + + return 0 + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + return 1 + + finally: + if args.backend == 'drission': + browser.quit() + else: + browser.quit() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/task_runner.py b/scripts/task_runner.py new file mode 100644 index 0000000..e3031c1 --- /dev/null +++ b/scripts/task_runner.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +""" +稳定任务执行器 - 支持断点续传、超时重试、进度保存 +""" +import json +import time +import subprocess +import sys +from pathlib import Path +from datetime import datetime + +TASK_DIR = Path.home() / '.clawdbot' / 'tasks' +TASK_DIR.mkdir(parents=True, exist_ok=True) + +class TaskRunner: + def __init__(self, task_name): + self.task_name = task_name + self.state_file = TASK_DIR / f'{task_name}_state.json' + self.log_file = TASK_DIR / f'{task_name}.log' + self.state = self._load_state() + + def _load_state(self): + if self.state_file.exists(): + return json.loads(self.state_file.read_text(encoding='utf-8')) + return { + 'task_name': self.task_name, + 'status': 'pending', + 'progress': 0, + 'total': 0, + 'completed_items': [], + 'failed_items': [], + 'last_update': None, + 'result': None + } + + def save_state(self): + self.state['last_update'] = datetime.now().isoformat() + self.state_file.write_text(json.dumps(self.state, indent=2, ensure_ascii=False), encoding='utf-8') + + def log(self, msg): + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + line = f'[{timestamp}] {msg}\n' + with open(self.log_file, 'a', encoding='utf-8') as f: + f.write(line) + print(msg) + + def set_total(self, total): + self.state['total'] = total + self.save_state() + + def mark_completed(self, item_id, result=None): + self.state['completed_items'].append(item_id) + self.state['progress'] = len(self.state['completed_items']) + if result: + if 'results' not in self.state: + self.state['results'] = {} + self.state['results'][str(item_id)] = result + self.save_state() + + def mark_failed(self, item_id, error): + self.state['failed_items'].append({'id': item_id, 'error': str(error)}) + self.save_state() + + def is_completed(self, item_id): + return item_id in self.state['completed_items'] + + def get_progress(self): + return self.state['progress'], self.state['total'] + + def finish(self, result=None): + self.state['status'] = 'completed' + self.state['result'] = result + self.save_state() + self.log(f'Task completed: {self.task_name}') + + def fail(self, error): + self.state['status'] = 'failed' + self.state['error'] = str(error) + self.save_state() + self.log(f'Task failed: {error}') + + +def run_with_timeout(cmd, timeout=60, task_name=None): + """运行命令,支持超时和重试""" + max_retries = 3 + for attempt in range(max_retries): + try: + result = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + timeout=timeout, + encoding='utf-8', + errors='replace' + ) + return { + 'success': result.returncode == 0, + 'stdout': result.stdout, + 'stderr': result.stderr, + 'returncode': result.returncode + } + except subprocess.TimeoutExpired: + if attempt < max_retries - 1: + print(f'超时,重试 {attempt + 2}/{max_retries}...') + time.sleep(2) + else: + return {'success': False, 'error': 'timeout', 'stdout': '', 'stderr': ''} + except Exception as e: + return {'success': False, 'error': str(e), 'stdout': '', 'stderr': ''} + + +def list_tasks(): + """列出所有任务状态""" + tasks = [] + for f in TASK_DIR.glob('*_state.json'): + try: + state = json.loads(f.read_text(encoding='utf-8')) + tasks.append({ + 'name': state.get('task_name'), + 'status': state.get('status'), + 'progress': f"{state.get('progress', 0)}/{state.get('total', 0)}", + 'last_update': state.get('last_update') + }) + except: + pass + return tasks + + +def get_task_state(task_name): + """获取任务状态""" + state_file = TASK_DIR / f'{task_name}_state.json' + if state_file.exists(): + return json.loads(state_file.read_text(encoding='utf-8')) + return None + + +def clear_task(task_name): + """清除任务状态""" + state_file = TASK_DIR / f'{task_name}_state.json' + log_file = TASK_DIR / f'{task_name}.log' + if state_file.exists(): + state_file.unlink() + if log_file.exists(): + log_file.unlink() + print(f'已清除任务: {task_name}') + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print('Usage:') + print(' python task_runner.py list - 列出所有任务') + print(' python task_runner.py status - 查看任务状态') + print(' python task_runner.py clear - 清除任务') + sys.exit(0) + + cmd = sys.argv[1] + + if cmd == 'list': + tasks = list_tasks() + if tasks: + print(f'{"任务名":<20} {"状态":<12} {"进度":<15} {"最后更新"}') + print('-' * 70) + for t in tasks: + print(f"{t['name']:<20} {t['status']:<12} {t['progress']:<15} {t['last_update'] or 'N/A'}") + else: + print('没有任务记录') + + elif cmd == 'status' and len(sys.argv) > 2: + state = get_task_state(sys.argv[2]) + if state: + print(json.dumps(state, indent=2, ensure_ascii=False)) + else: + print('任务不存在') + + elif cmd == 'clear' and len(sys.argv) > 2: + clear_task(sys.argv[2])