commit 84d54a6ee93f6697c468cef5d6bb2d31a96216d1 Author: zlei9 Date: Sun Mar 29 10:22:50 2026 +0800 Initial commit with translated description diff --git a/README.md b/README.md new file mode 100644 index 0000000..770bc08 --- /dev/null +++ b/README.md @@ -0,0 +1,227 @@ +# ๐ŸŒ Web Pilot โ€” OpenClaw Skill + +[![Ko-fi](https://img.shields.io/badge/Ko--fi-Support%20this%20project-FF5E5B?logo=ko-fi&logoColor=white)](https://ko-fi.com/liranudi) + +A web search, page reading, and browser automation skill for [OpenClaw](https://github.com/openclaw/openclaw). No API keys required. + +## โ™ฟ Accessibility + +This skill enables AI agents to **read, navigate, and interact with the web on behalf of users** โ€” making it a powerful accessibility tool for people with visual impairments, motor disabilities, or cognitive challenges. + +- **Screen reading on steroids** โ€” extracts clean, structured text from any webpage, stripping away visual clutter, ads, and navigation noise +- **Voice-driven browsing** โ€” when paired with an AI assistant, users can browse the web entirely through natural language ("scroll down", "click Sign In", "read me the Overview section") +- **Targeted content extraction** โ€” grab specific sections, search for text, or screenshot regions without needing to visually scan a page +- **Form interaction** โ€” fill inputs and submit forms via commands, removing the need for precise mouse/keyboard control +- **Cookie banner removal** โ€” automatically dismisses consent popups that are notoriously difficult for screen readers + +## Features + +- **Web Search** โ€” Multi-engine (DuckDuckGo, Brave, Google) with pagination +- **Page Reader** โ€” Extract clean text from any URL with JS rendering +- **Persistent Browser** โ€” Visible or headless browser with 20+ actions +- **Cookie Auto-Dismiss** โ€” Automatically clears cookie consent banners +- **File Download** โ€” Download files with auto-detection, PDF text extraction +- **Output Formats** โ€” JSON, markdown, or plain text +- **Zero API Keys** โ€” Everything runs locally +- **Partial Screenshots** โ€” Capture viewport, full page, single elements, or ranges between two elements + +## Requirements + +- Python 3.8+ +- `pip install requests beautifulsoup4 playwright Pillow` +- `playwright install chromium` +- Optional: `pip install pdfplumber` for PDF text extraction + +## Installation + +### As an OpenClaw Skill + +```bash +cp -r web-pilot/ $(dirname $(which openclaw))/../lib/node_modules/openclaw/skills/web-pilot +``` + +### Standalone + +```bash +git clone https://github.com/LiranUdi/web-pilot.git +cd web-pilot +pip install requests beautifulsoup4 playwright Pillow +playwright install chromium +``` + +## Usage + +### 1. Search the Web + +```bash +python3 scripts/google_search.py "search term" --pages 3 --engine brave +``` + +| Flag | Description | Default | +|------|-------------|---------| +| `--pages N` | Result pages (~10 results each) | 1 | +| `--engine` | `duckduckgo`, `brave`, or `google` | duckduckgo | + +**Engine notes:** +- **duckduckgo** โ€” Most reliable, no CAPTCHA +- **brave** โ€” More results per page, broader sources +- **google** โ€” Often blocked by CAPTCHA; last resort + +### 2. Read a Page + +```bash +python3 scripts/read_page.py "https://example.com" --max-chars 10000 --format markdown +``` + +| Flag | Description | Default | +|------|-------------|---------| +| `--max-chars N` | Max characters to extract | 50000 | +| `--visible` | Show browser window | off | +| `--format` | `json`, `markdown`, or `text` | json | +| `--no-dismiss` | Skip cookie consent auto-dismiss | off | + +### 3. Persistent Browser Session + +The browser session is a long-running process that stays open between commands, enabling stateful multi-step browsing. + +```bash +# Open a page (flags: --headless, --proxy , --user-agent ) +python3 scripts/browser_session.py open "https://example.com" +python3 scripts/browser_session.py open "https://example.com" --headless --user-agent "MyBot/1.0" + +# Check current state +python3 scripts/browser_session.py status + +# Navigate (returns response status, final URL, load time) +python3 scripts/browser_session.py navigate "https://other-site.com" + +# Extract content in different formats +python3 scripts/browser_session.py extract --format markdown + +# Scroll +python3 scripts/browser_session.py scroll down +python3 scripts/browser_session.py scroll up +python3 scripts/browser_session.py scroll "#section-id" # scroll to element + +# Wait +python3 scripts/browser_session.py wait 2 # wait 2 seconds +python3 scripts/browser_session.py wait ".loading-done" # wait for element + +# Fill forms +python3 scripts/browser_session.py fill "input[name=q]" "search term" +python3 scripts/browser_session.py fill "input[name=q]" "search term" --submit + +# Navigation history +python3 scripts/browser_session.py back +python3 scripts/browser_session.py forward +python3 scripts/browser_session.py reload + +# Execute JavaScript +python3 scripts/browser_session.py eval "document.title" + +# Extract all links +python3 scripts/browser_session.py links + +# Screenshots +python3 scripts/browser_session.py screenshot /tmp/page.png # viewport +python3 scripts/browser_session.py screenshot /tmp/full.png --full # full page +python3 scripts/browser_session.py screenshot /tmp/el.png --element "h1" # single element +python3 scripts/browser_session.py screenshot /tmp/range.png --from "#Overview" --to "#end" # range + +# Export page as PDF (headless only) +python3 scripts/browser_session.py pdf /tmp/page.pdf + +# Click elements +python3 scripts/browser_session.py click "Sign In" +python3 scripts/browser_session.py click "#submit-btn" + +# Search for text in the page +python3 scripts/browser_session.py search "pricing" + +# Tab management +python3 scripts/browser_session.py tab new "https://docs.example.com" +python3 scripts/browser_session.py tab list +python3 scripts/browser_session.py tab switch 0 +python3 scripts/browser_session.py tab close 1 + +# Dismiss cookie banners +python3 scripts/browser_session.py dismiss-cookies + +# Close +python3 scripts/browser_session.py close +``` + +### 4. Download Files + +```bash +python3 scripts/download_file.py "https://example.com/report.pdf" --output ~/docs +``` + +| Flag | Description | Default | +|------|-------------|---------| +| `--output DIR` | Save directory | /tmp/downloads | +| `--filename` | Override filename | auto-detected | + +For PDFs, returns `extracted_text` if `pdfplumber` or `PyPDF2` is installed. + +## Architecture + +- **Search** โ€” HTTP requests to DuckDuckGo/Brave/Google HTML endpoints +- **Page reading** โ€” Playwright + Chromium with read-only DOM TreeWalker +- **Browser sessions** โ€” Unix socket server with 4-byte length-prefix framing; forked child keeps browser alive, commands return immediately +- **Screenshots** โ€” Range mode uses full-page capture + PIL crop for pixel-perfect section captures +- **Cookie dismiss** โ€” Tries common selectors and button text patterns (Accept All, Got It, etc.) +- **Downloads** โ€” Streams to disk with auto filename detection from headers/URL + +## Browser Session Reference + +| Action | Description | +|--------|-------------| +| `open ` | Launch browser (flags: `--headless`, `--proxy`, `--user-agent`) | +| `navigate ` | Go to URL (returns status code, final URL, load time) | +| `extract` | Extract page content (`--format json\|markdown\|text`) | +| `screenshot ` | Capture (`--full`, `--element `, `--from --to `) | +| `click ` | Click by CSS selector, text, or button/link role | +| `scroll ` | Scroll down/up or to a CSS selector | +| `wait ` | Wait seconds or for element to appear | +| `fill ` | Fill input field (optional `--submit`) | +| `back` / `forward` / `reload` | Navigation history | +| `eval ` | Execute JavaScript, return result | +| `links` | Extract all links (href + text) | +| `search ` | Find text in page content | +| `pdf ` | Export as PDF (headless only) | +| `status` | Current URL, title, tab count | +| `tab new\|list\|switch\|close` | Multi-tab management | +| `dismiss-cookies` | Clear cookie consent banners | +| `close` | Shut down browser | + +--- + +## For AI Agents (OpenClaw / LLM Integration) + +### Workflow Pattern + +1. **Search** โ†’ get URLs +2. **Read** or **Open** โ†’ extract content +3. **Scroll/Click/Navigate/Tab** โ†’ interact as needed +4. **Search** โ†’ find specific info in page +5. **Screenshot** โ†’ capture visual state (viewport, element, or range) +6. **Download** โ†’ grab linked files +7. **Close** โ†’ clean up + +### Important Notes + +- All output defaults to **JSON to stdout**; use `--format` for alternatives +- `browser_session.py` is **stateful** โ€” one session at a time, persists between commands +- `read_page.py` is **stateless** โ€” opens/closes browser each call +- Cookie consent is **auto-dismissed** on open/navigate +- Always **close** browser sessions when done +- `Pillow` is required for range screenshots (`--from`/`--to`) + +## Support + +If this project is useful to you, consider [buying me a coffee](https://ko-fi.com/liranudi) โ˜• + +## License + +MIT diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..4222caa --- /dev/null +++ b/SKILL.md @@ -0,0 +1,63 @@ +--- +name: web-pilot +description: "ๆ— ้œ€APIๅฏ†้’ฅๆœ็ดข็ฝ‘้กตๅ’Œ้˜…่ฏป้กต้ขๅ†…ๅฎนใ€‚" +--- + +# Web Pilot + +Four scripts, zero API keys. All output is JSON by default. + +**Dependencies:** `requests`, `beautifulsoup4`, `playwright` (with Chromium). +**Optional:** `pdfplumber` or `PyPDF2` for PDF text extraction. + +Install: `pip install requests beautifulsoup4 playwright && playwright install chromium` + +## 1. Search the Web + +```bash +python3 scripts/google_search.py "query" --pages N --engine ENGINE +``` + +- `--engine` โ€” `duckduckgo` (default), `brave`, or `google` +- Returns `[{title, url, snippet}, ...]` + +## 2. Read a Page (one-shot) + +```bash +python3 scripts/read_page.py "https://url" [--max-chars N] [--visible] [--format json|markdown|text] [--no-dismiss] +``` + +- `--format` โ€” `json` (default), `markdown`, or `text` +- Auto-dismisses cookie consent banners (skip with `--no-dismiss`) + +## 3. Persistent Browser Session + +```bash +python3 scripts/browser_session.py open "https://url" # Open + extract +python3 scripts/browser_session.py navigate "https://other" # Go to new URL +python3 scripts/browser_session.py extract [--format FMT] # Re-read page +python3 scripts/browser_session.py screenshot [path] [--full] # Save screenshot +python3 scripts/browser_session.py click "Submit" # Click by text/selector +python3 scripts/browser_session.py search "keyword" # Search text in page +python3 scripts/browser_session.py tab new "https://url" # Open new tab +python3 scripts/browser_session.py tab list # List all tabs +python3 scripts/browser_session.py tab switch 1 # Switch to tab index +python3 scripts/browser_session.py tab close [index] # Close tab +python3 scripts/browser_session.py dismiss-cookies # Manually dismiss cookies +python3 scripts/browser_session.py close # Close browser +``` + +- Cookie consent auto-dismissed on open/navigate +- Multiple tabs supported โ€” open, switch, close independently +- Search returns matching lines with line numbers +- Extract supports json/markdown/text output + +## 4. Download Files + +```bash +python3 scripts/download_file.py "https://example.com/doc.pdf" [--output DIR] [--filename NAME] +``` + +- Auto-detects filename from URL/headers +- PDFs: extracts text if pdfplumber/PyPDF2 installed +- Returns `{status, path, filename, size_bytes, content_type, extracted_text}` diff --git a/_meta.json b/_meta.json new file mode 100644 index 0000000..0e4a7cd --- /dev/null +++ b/_meta.json @@ -0,0 +1,6 @@ +{ + "ownerId": "kn72vgg7f9v52jr01p0yamfz1n81b8n5", + "slug": "web-pilot", + "version": "1.0.0", + "publishedAt": 1771349856982 +} \ No newline at end of file diff --git a/scripts/browser_session.py b/scripts/browser_session.py new file mode 100644 index 0000000..daffc97 --- /dev/null +++ b/scripts/browser_session.py @@ -0,0 +1,775 @@ +#!/usr/bin/env python3 +"""Persistent browser session that stays open until told to close. + +Usage: + python3 browser_session.py open Open URL in visible browser, extract content + python3 browser_session.py navigate Go to new URL, extract content + python3 browser_session.py extract [--format FMT] Re-extract content from current page + python3 browser_session.py screenshot [path] [--full] Save screenshot + python3 browser_session.py click Click an element + python3 browser_session.py search Search for text in page content + python3 browser_session.py tab new Open URL in new tab + python3 browser_session.py tab list List all open tabs + python3 browser_session.py tab switch Switch to tab by index + python3 browser_session.py tab close [index] Close tab (current if no index) + python3 browser_session.py close Close browser + +Formats for extract: json (default), markdown, text +""" + +import json +import os +import re +import signal +import socket +import struct +import sys +import time + +SOCKET_PATH = "/tmp/web-pilot-browser.sock" +PID_FILE = "/tmp/web-pilot-browser.pid" + +EXTRACT_JS = """() => { + const SKIP = new Set(['SCRIPT','STYLE','NOSCRIPT','IFRAME','SVG','NAV','FOOTER','HEADER','ASIDE']); + const title = document.title || ''; + const mainEl = document.querySelector('article') + || document.querySelector('main') + || document.querySelector('[role="main"]') + || document.querySelector('#content, .content, .post-content, .entry-content') + || document.body; + + const lines = []; + const walker = document.createTreeWalker(mainEl, NodeFilter.SHOW_ELEMENT, { + acceptNode(node) { + if (SKIP.has(node.tagName)) return NodeFilter.FILTER_REJECT; + const tag = node.tagName.toLowerCase(); + if (['h1','h2','h3','h4','h5','h6','p','li','td','th','pre','blockquote'].includes(tag)) + return NodeFilter.FILTER_ACCEPT; + return NodeFilter.FILTER_SKIP; + } + }); + let node; + while (node = walker.nextNode()) { + const text = node.innerText?.trim(); + if (!text) continue; + const tag = node.tagName.toLowerCase(); + if (tag.startsWith('h')) lines.push('\\n' + '#'.repeat(parseInt(tag[1])) + ' ' + text + '\\n'); + else if (tag === 'li') lines.push('- ' + text); + else if (tag === 'blockquote') lines.push('> ' + text); + else lines.push(text); + } + let content = lines.join('\\n').trim(); + if (content.length < 200) content = mainEl.innerText || ''; + return { title, content }; +}""" + +# Common cookie consent selectors and text patterns +COOKIE_DISMISS_JS = """() => { + const selectors = [ + 'button[id*="accept" i]', 'button[id*="consent" i]', 'button[id*="agree" i]', + 'button[class*="accept" i]', 'button[class*="consent" i]', 'button[class*="agree" i]', + 'a[id*="accept" i]', 'a[class*="accept" i]', + '[data-testid*="accept" i]', '[data-testid*="consent" i]', + '.cookie-banner button', '.cookie-notice button', '.cookie-popup button', + '#cookie-banner button', '#cookie-notice button', '#cookie-popup button', + '.cc-btn.cc-dismiss', '.cc-accept', '#onetrust-accept-btn-handler', + '.js-cookie-consent-agree', '[aria-label*="accept" i][aria-label*="cookie" i]', + '[aria-label*="Accept all" i]', '[aria-label*="Accept cookies" i]', + ]; + + // Try selectors first + for (const sel of selectors) { + try { + const el = document.querySelector(sel); + if (el && el.offsetParent !== null) { el.click(); return { dismissed: true, method: 'selector', selector: sel }; } + } catch(e) {} + } + + // Try matching button text + const patterns = [ + /^accept all$/i, /accept all cookies/i, /accept cookies/i, /accept & close/i, + /^agree$/i, /agree and continue/i, /agree & continue/i, + /consent and continue/i, /consent & continue/i, + /got it/i, /i understand/i, /i agree/i, + /allow all/i, /allow cookies/i, /allow all cookies/i, + /^ok$/i, /^okay$/i, /^continue$/i, /^dismiss$/i, + /accept and close/i, /accept and continue/i, + /nur notwendige/i, /alle akzeptieren/i, /akzeptieren/i, + /tout accepter/i, /accepter/i, /accepter et continuer/i, + ]; + for (const btn of document.querySelectorAll('button, a[role="button"], [role="button"]')) { + const text = btn.innerText?.trim(); + if (!text || text.length > 50) continue; + for (const pat of patterns) { + if (pat.test(text) && btn.offsetParent !== null) { + btn.click(); + return { dismissed: true, method: 'text', matched: text }; + } + } + } + + return { dismissed: false }; +}""" + + +def format_output(result: dict, fmt: str) -> str: + """Format extraction result based on requested format.""" + if fmt == "text": + # Strip markdown-ish formatting + content = result.get("content", "") + content = re.sub(r'^#+\s+', '', content, flags=re.MULTILINE) + content = re.sub(r'^- ', ' ', content, flags=re.MULTILINE) + content = re.sub(r'^> ', '', content, flags=re.MULTILINE) + return content.strip() + elif fmt == "markdown": + return f"# {result.get('title', '')}\n\n{result.get('content', '')}" + else: # json + return json.dumps(result, indent=2, ensure_ascii=False) + + +def dismiss_cookies(page): + """Try to dismiss cookie consent in main frame and all iframes.""" + result = page.evaluate(COOKIE_DISMISS_JS) + if result.get("dismissed"): + page.wait_for_timeout(500) + return result + # Check iframes (many EU sites put consent in an iframe) + for frame in page.frames: + if frame == page.main_frame: + continue + try: + result = frame.evaluate(COOKIE_DISMISS_JS) + if result.get("dismissed"): + page.wait_for_timeout(500) + return result + except Exception: + pass + return {"dismissed": False} + + +def run_server(url: str, headless: bool = False, proxy: str = None, user_agent: str = None): + from playwright.sync_api import sync_playwright + + if os.path.exists(SOCKET_PATH): + os.remove(SOCKET_PATH) + + pw = sync_playwright().start() + launch_opts = {"headless": headless} + if proxy: + launch_opts["proxy"] = {"server": proxy} + browser = pw.chromium.launch(**launch_opts) + ua = user_agent or "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ctx = browser.new_context( + user_agent=ua, + locale="en-US", + viewport={"width": 1280, "height": 900}, + ) + + # Track pages (tabs) + pages = [ctx.new_page()] + active_idx = 0 + + def active_page(): + return pages[active_idx] + + active_page().goto(url, timeout=30000, wait_until="domcontentloaded") + active_page().wait_for_timeout(1500) + + # Auto-dismiss cookie consent on first load (main frame + iframes) + dismiss_cookies(active_page()) + + result = active_page().evaluate(EXTRACT_JS) + with open("/tmp/web-pilot-initial.json", "w") as f: + json.dump(result, f, ensure_ascii=False) + + with open(PID_FILE, "w") as f: + f.write(str(os.getpid())) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(SOCKET_PATH) + sock.listen(1) + sock.settimeout(1.0) + + running = True + while running: + try: + conn, _ = sock.accept() + raw = _recv_msg(conn) + cmd = json.loads(raw.decode()) + action = cmd.get("action") + + if action == "close": + _send_msg(conn, json.dumps({"status": "closing"}).encode()) + conn.close() + running = False + + elif action == "navigate": + t0 = time.time() + response = None + try: + response = active_page().goto(cmd["url"], timeout=30000, wait_until="domcontentloaded") + except Exception as nav_err: + # Playwright throws on HTTP error codes (4xx/5xx) โ€” still extract what we can + pass + active_page().wait_for_timeout(1500) + load_time = round(time.time() - t0, 3) + dismiss_cookies(active_page()) + result = active_page().evaluate(EXTRACT_JS) + result["response_status"] = response.status if response else None + result["final_url"] = active_page().url + result["load_time_s"] = load_time + mc = cmd.get("max_chars") + if mc and len(result["content"]) > mc: + result["content"] = result["content"][:mc] + "\n\n[...truncated]" + _send_msg(conn, json.dumps(result, ensure_ascii=False).encode()) + conn.close() + + elif action == "extract": + result = active_page().evaluate(EXTRACT_JS) + mc = cmd.get("max_chars") + if mc and len(result["content"]) > mc: + result["content"] = result["content"][:mc] + "\n\n[...truncated]" + fmt = cmd.get("format", "json") + output = format_output(result, fmt) if fmt != "json" else json.dumps(result, ensure_ascii=False) + _send_msg(conn, output.encode()) + conn.close() + + elif action == "screenshot": + path = cmd.get("path", "/tmp/screenshot.png") + full_page = cmd.get("full_page", False) + element_sel = cmd.get("element") + from_sel = cmd.get("from_sel") + to_sel = cmd.get("to_sel") + + if element_sel: + # Screenshot a single element + el = active_page().query_selector(element_sel) + if el: + el.screenshot(path=path) + _send_msg(conn, json.dumps({ + "status": "saved", "path": path, "mode": "element", + "selector": element_sel, + "url": active_page().url, "title": active_page().title(), + "tab": active_idx, + }).encode()) + else: + _send_msg(conn, json.dumps({ + "error": f"Element not found: {element_sel}" + }).encode()) + conn.close() + elif from_sel and to_sel: + # Screenshot a range between two elements using full-page screenshot + crop + bounds = active_page().evaluate("""([fromSel, toSel]) => { + const elFrom = document.querySelector(fromSel); + const elTo = document.querySelector(toSel); + if (!elFrom || !elTo) return null; + const r1 = elFrom.getBoundingClientRect(); + const r2 = elTo.getBoundingClientRect(); + return { + y: r1.top + window.scrollY, + y2: r2.bottom + window.scrollY, + pageWidth: document.documentElement.scrollWidth + }; + }""", [from_sel, to_sel]) + if bounds: + import tempfile + # Take full-page screenshot to a temp file + tmp = tempfile.mktemp(suffix=".png") + active_page().screenshot(path=tmp, full_page=True) + # Crop using PIL + try: + from PIL import Image + im = Image.open(tmp) + # Playwright full_page screenshots use device pixel ratio + scale = im.width / bounds["pageWidth"] if bounds["pageWidth"] else 1 + top = int(bounds["y"] * scale) + bottom = int(bounds["y2"] * scale) + cropped = im.crop((0, top, im.width, bottom)) + cropped.save(path) + os.remove(tmp) + _send_msg(conn, json.dumps({ + "status": "saved", "path": path, "mode": "range", + "from": from_sel, "to": to_sel, + "url": active_page().url, "title": active_page().title(), + "tab": active_idx, + }).encode()) + except Exception as e: + try: os.remove(tmp) + except: pass + _send_msg(conn, json.dumps({"error": f"Crop failed: {str(e)}"}).encode()) + else: + _send_msg(conn, json.dumps({"error": f"One or both selectors not found: {from_sel}, {to_sel}"}).encode()) + conn.close() + else: + active_page().screenshot(path=path, full_page=full_page) + _send_msg(conn, json.dumps({ + "status": "saved", "path": path, "mode": "full_page" if full_page else "viewport", + "url": active_page().url, "title": active_page().title(), + "tab": active_idx, + }).encode()) + conn.close() + + elif action == "click": + target = cmd.get("target", "") + clicked = False + try: + el = active_page().query_selector(target) + if el: + el.click() + clicked = True + except Exception: + pass + if not clicked: + try: + active_page().get_by_text(target, exact=False).first.click() + clicked = True + except Exception: + pass + if not clicked: + try: + active_page().get_by_role("button", name=target).or_( + active_page().get_by_role("link", name=target) + ).first.click() + clicked = True + except Exception: + pass + active_page().wait_for_timeout(1000) + result = {"status": "clicked" if clicked else "not_found", "target": target, "url": active_page().url} + _send_msg(conn, json.dumps(result, ensure_ascii=False).encode()) + conn.close() + + elif action == "dismiss_cookies": + result = dismiss_cookies(active_page()) + _send_msg(conn, json.dumps(result, ensure_ascii=False).encode()) + conn.close() + + elif action == "search": + query = cmd.get("query", "").lower() + result = active_page().evaluate(EXTRACT_JS) + content = result.get("content", "") + lines = content.split("\n") + matches = [] + for i, line in enumerate(lines): + if query in line.lower(): + matches.append({"line": i + 1, "text": line.strip()}) + _send_msg(conn, json.dumps({ + "query": query, + "matches": len(matches), + "results": matches[:50], # cap at 50 + "url": active_page().url, + }, ensure_ascii=False).encode()) + conn.close() + + elif action == "tab_new": + new_page = ctx.new_page() + pages.append(new_page) + active_idx = len(pages) - 1 + new_page.goto(cmd["url"], timeout=30000, wait_until="domcontentloaded") + new_page.wait_for_timeout(1500) + dismiss_cookies(new_page) + result = new_page.evaluate(EXTRACT_JS) + result["tab"] = active_idx + result["total_tabs"] = len(pages) + _send_msg(conn, json.dumps(result, ensure_ascii=False).encode()) + conn.close() + + elif action == "tab_list": + tab_info = [] + for i, p in enumerate(pages): + try: + tab_info.append({ + "index": i, + "title": p.title(), + "url": p.url, + "active": i == active_idx, + }) + except Exception: + tab_info.append({"index": i, "title": "(closed)", "url": "", "active": i == active_idx}) + _send_msg(conn, json.dumps({"tabs": tab_info, "active": active_idx}, ensure_ascii=False).encode()) + conn.close() + + elif action == "tab_switch": + idx = cmd.get("index", 0) + if 0 <= idx < len(pages): + active_idx = idx + pages[active_idx].bring_to_front() + _send_msg(conn, json.dumps({ + "status": "switched", "tab": active_idx, + "title": pages[active_idx].title(), + "url": pages[active_idx].url, + }, ensure_ascii=False).encode()) + else: + _send_msg(conn, json.dumps({"error": f"Invalid tab index {idx}. Have {len(pages)} tabs."}).encode()) + conn.close() + + elif action == "tab_close": + idx = cmd.get("index", active_idx) + if len(pages) <= 1: + _send_msg(conn, json.dumps({"error": "Cannot close the last tab. Use 'close' to close the browser."}).encode()) + elif 0 <= idx < len(pages): + pages[idx].close() + pages.pop(idx) + if active_idx >= len(pages): + active_idx = len(pages) - 1 + elif active_idx > idx: + active_idx -= 1 + pages[active_idx].bring_to_front() + _send_msg(conn, json.dumps({ + "status": "tab_closed", "closed_index": idx, + "active": active_idx, "total_tabs": len(pages), + }, ensure_ascii=False).encode()) + else: + _send_msg(conn, json.dumps({"error": f"Invalid tab index {idx}"}).encode()) + conn.close() + + elif action == "scroll": + direction = cmd.get("direction", "down") + if direction == "down": + active_page().evaluate("window.scrollBy(0, window.innerHeight)") + elif direction == "up": + active_page().evaluate("window.scrollBy(0, -window.innerHeight)") + else: + # Treat as CSS selector + active_page().evaluate(f"document.querySelector({json.dumps(direction)})?.scrollIntoView({{behavior:'smooth',block:'center'}})") + active_page().wait_for_timeout(300) + _send_msg(conn, json.dumps({"status": "scrolled", "direction": direction, "url": active_page().url}).encode()) + conn.close() + + elif action == "wait": + target = cmd.get("target", "1") + try: + seconds = float(target) + active_page().wait_for_timeout(int(seconds * 1000)) + _send_msg(conn, json.dumps({"status": "waited", "seconds": seconds}).encode()) + except ValueError: + # CSS selector + try: + active_page().wait_for_selector(target, timeout=30000) + _send_msg(conn, json.dumps({"status": "found", "selector": target}).encode()) + except Exception as e: + _send_msg(conn, json.dumps({"status": "timeout", "selector": target, "error": str(e)}).encode()) + conn.close() + + elif action == "fill": + selector = cmd.get("selector", "") + value = cmd.get("value", "") + submit = cmd.get("submit", False) + try: + active_page().fill(selector, value) + if submit: + active_page().press(selector, "Enter") + active_page().wait_for_timeout(1000) + _send_msg(conn, json.dumps({"status": "filled", "selector": selector, "submitted": submit, "url": active_page().url}).encode()) + except Exception as e: + _send_msg(conn, json.dumps({"error": str(e)}).encode()) + conn.close() + + elif action in ("back", "forward", "reload"): + if action == "back": + active_page().go_back(timeout=30000, wait_until="domcontentloaded") + elif action == "forward": + active_page().go_forward(timeout=30000, wait_until="domcontentloaded") + else: + active_page().reload(timeout=30000, wait_until="domcontentloaded") + active_page().wait_for_timeout(500) + _send_msg(conn, json.dumps({"status": action, "url": active_page().url, "title": active_page().title()}).encode()) + conn.close() + + elif action == "eval": + js_code = cmd.get("code", "") + try: + result = active_page().evaluate(js_code) + _send_msg(conn, json.dumps({"status": "ok", "result": result}, ensure_ascii=False, default=str).encode()) + except Exception as e: + _send_msg(conn, json.dumps({"status": "error", "error": str(e)}).encode()) + conn.close() + + elif action == "links": + links_js = """() => { + return Array.from(document.querySelectorAll('a[href]')).map(a => ({ + href: a.href, text: (a.innerText || '').trim().substring(0, 200) + })).filter(l => l.href && !l.href.startsWith('javascript:')) + }""" + result = active_page().evaluate(links_js) + _send_msg(conn, json.dumps({"links": result, "count": len(result), "url": active_page().url}, ensure_ascii=False).encode()) + conn.close() + + elif action == "pdf": + path = cmd.get("path", "/tmp/page.pdf") + try: + active_page().pdf(path=path) + _send_msg(conn, json.dumps({"status": "saved", "path": path}).encode()) + except Exception as e: + _send_msg(conn, json.dumps({"error": str(e)}).encode()) + conn.close() + + elif action == "status": + _send_msg(conn, json.dumps({ + "url": active_page().url, + "title": active_page().title(), + "active_tab": active_idx, + "total_tabs": len(pages), + }).encode()) + conn.close() + + else: + _send_msg(conn, json.dumps({"error": f"unknown action: {action}"}).encode()) + conn.close() + + except socket.timeout: + continue + except Exception as e: + try: + _send_msg(conn, json.dumps({"error": str(e)}).encode()) + conn.close() + except Exception: + pass + + sock.close() + for f in [SOCKET_PATH, PID_FILE]: + if os.path.exists(f): + os.remove(f) + browser.close() + pw.stop() + + +def _recv_exact(sock, n): + """Read exactly n bytes from socket.""" + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("Socket closed while reading") + buf += chunk + return buf + + +def _send_msg(sock, data: bytes): + """Send a length-prefixed message.""" + sock.sendall(struct.pack('>I', len(data)) + data) + + +def _recv_msg(sock) -> bytes: + """Receive a length-prefixed message.""" + header = _recv_exact(sock, 4) + length = struct.unpack('>I', header)[0] + return _recv_exact(sock, length) + + +def send_command(cmd: dict) -> str: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.settimeout(60) + sock.connect(SOCKET_PATH) + _send_msg(sock, json.dumps(cmd).encode()) + result = _recv_msg(sock) + sock.close() + return result.decode("utf-8", errors="replace") + + +def main(): + if len(sys.argv) < 2: + print("Usage: browser_session.py [args]") + sys.exit(1) + + action = sys.argv[1] + + if action == "open": + headless = "--headless" in sys.argv + # Parse --proxy and --user-agent + proxy = None + user_agent = None + i = 2 + while i < len(sys.argv): + if sys.argv[i] == "--proxy" and i + 1 < len(sys.argv): + proxy = sys.argv[i + 1]; i += 2 + elif sys.argv[i] == "--user-agent" and i + 1 < len(sys.argv): + user_agent = sys.argv[i + 1]; i += 2 + else: + i += 1 + args = [a for a in sys.argv[2:] if not a.startswith("--") and a != proxy and a != user_agent] + if not args: + print("Usage: browser_session.py open [--headless] [--proxy ] [--user-agent ]") + sys.exit(1) + url = args[0] + + # Stale PID/socket cleanup + if os.path.exists(SOCKET_PATH): + stale = True + if os.path.exists(PID_FILE): + try: + old_pid = int(open(PID_FILE).read().strip()) + os.kill(old_pid, 0) # check if alive + stale = False + except (OSError, ValueError): + pass + if not stale: + print(json.dumps({"error": "Browser session already open. Use 'navigate', 'extract', or 'close'."})) + sys.exit(1) + # Clean up stale files + try: os.remove(SOCKET_PATH) + except OSError: pass + try: os.remove(PID_FILE) + except OSError: pass + + pid = os.fork() + if pid == 0: + os.setsid() + sys.stdout = open(os.devnull, "w") + sys.stderr = open(os.devnull, "w") + run_server(url, headless=headless, proxy=proxy, user_agent=user_agent) + sys.exit(0) + else: + for _ in range(30): + if os.path.exists("/tmp/web-pilot-initial.json"): + time.sleep(0.2) + with open("/tmp/web-pilot-initial.json") as f: + result = json.load(f) + os.remove("/tmp/web-pilot-initial.json") + result["status"] = "browser open" + result["note"] = "Commands: navigate, extract, screenshot, click, search, tab, close" + print(json.dumps(result, indent=2, ensure_ascii=False)) + sys.exit(0) + time.sleep(0.5) + print(json.dumps({"error": "Timeout waiting for browser to start"})) + sys.exit(1) + + elif action == "navigate": + if len(sys.argv) < 3: + print("Usage: browser_session.py navigate ") + sys.exit(1) + print(send_command({"action": "navigate", "url": sys.argv[2], "max_chars": 50000})) + + elif action == "extract": + fmt = "json" + if "--format" in sys.argv: + idx = sys.argv.index("--format") + if idx + 1 < len(sys.argv): + fmt = sys.argv[idx + 1] + print(send_command({"action": "extract", "max_chars": 50000, "format": fmt})) + + elif action == "screenshot": + path = "/tmp/screenshot.png" + full_page = "--full" in sys.argv + element_sel = None + from_sel = None + to_sel = None + # Parse flags + args = sys.argv[2:] + i = 0 + positional = [] + while i < len(args): + if args[i] == "--element" and i + 1 < len(args): + element_sel = args[i + 1]; i += 2 + elif args[i] == "--from" and i + 1 < len(args): + from_sel = args[i + 1]; i += 2 + elif args[i] == "--to" and i + 1 < len(args): + to_sel = args[i + 1]; i += 2 + elif args[i] == "--full": + i += 1 + elif not args[i].startswith("--"): + positional.append(args[i]); i += 1 + else: + i += 1 + if positional: + path = positional[0] + cmd = {"action": "screenshot", "path": path, "full_page": full_page} + if element_sel: + cmd["element"] = element_sel + if from_sel: + cmd["from_sel"] = from_sel + if to_sel: + cmd["to_sel"] = to_sel + print(send_command(cmd)) + + elif action == "click": + if len(sys.argv) < 3: + print("Usage: browser_session.py click ") + sys.exit(1) + target = " ".join(a for a in sys.argv[2:] if not a.startswith("--")) + print(send_command({"action": "click", "target": target})) + + elif action == "search": + if len(sys.argv) < 3: + print("Usage: browser_session.py search ") + sys.exit(1) + query = " ".join(sys.argv[2:]) + print(send_command({"action": "search", "query": query})) + + elif action == "tab": + if len(sys.argv) < 3: + print("Usage: browser_session.py tab [args]") + sys.exit(1) + sub = sys.argv[2] + if sub == "new": + if len(sys.argv) < 4: + print("Usage: browser_session.py tab new ") + sys.exit(1) + print(send_command({"action": "tab_new", "url": sys.argv[3]})) + elif sub == "list": + print(send_command({"action": "tab_list"})) + elif sub == "switch": + if len(sys.argv) < 4: + print("Usage: browser_session.py tab switch ") + sys.exit(1) + print(send_command({"action": "tab_switch", "index": int(sys.argv[3])})) + elif sub == "close": + idx = int(sys.argv[3]) if len(sys.argv) > 3 else -1 + cmd = {"action": "tab_close"} + if idx >= 0: + cmd["index"] = idx + print(send_command(cmd)) + else: + print(f"Unknown tab command: {sub}") + sys.exit(1) + + elif action == "dismiss-cookies": + print(send_command({"action": "dismiss_cookies"})) + + elif action == "scroll": + if len(sys.argv) < 3: + print("Usage: browser_session.py scroll down|up|") + sys.exit(1) + print(send_command({"action": "scroll", "direction": sys.argv[2]})) + + elif action == "wait": + if len(sys.argv) < 3: + print("Usage: browser_session.py wait ") + sys.exit(1) + print(send_command({"action": "wait", "target": sys.argv[2]})) + + elif action == "fill": + if len(sys.argv) < 4: + print("Usage: browser_session.py fill [--submit]") + sys.exit(1) + submit = "--submit" in sys.argv + print(send_command({"action": "fill", "selector": sys.argv[2], "value": sys.argv[3], "submit": submit})) + + elif action in ("back", "forward", "reload"): + print(send_command({"action": action})) + + elif action == "eval": + if len(sys.argv) < 3: + print("Usage: browser_session.py eval \"\"") + sys.exit(1) + print(send_command({"action": "eval", "code": " ".join(sys.argv[2:])})) + + elif action == "links": + print(send_command({"action": "links"})) + + elif action == "pdf": + path = sys.argv[2] if len(sys.argv) > 2 else "/tmp/page.pdf" + print(send_command({"action": "pdf", "path": path})) + + elif action == "status": + print(send_command({"action": "status"})) + + elif action == "close": + print(send_command({"action": "close"})) + + else: + print(f"Unknown action: {action}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/scripts/download_file.py b/scripts/download_file.py new file mode 100644 index 0000000..aa690d9 --- /dev/null +++ b/scripts/download_file.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +"""Download files from URLs. Handles PDFs, images, documents, and any binary content. + +Usage: + python3 download_file.py [--output DIR] [--filename NAME] + +Flags: + --output DIR Directory to save to (default: /tmp/downloads) + --filename NAME Override filename (auto-detected from URL/headers if omitted) + +Outputs JSON {status, path, filename, size_bytes, content_type}. +Detects file type from Content-Type header and URL. For PDFs, also extracts +text if possible (requires pdfplumber or falls back to basic extraction). +""" + +import argparse +import json +import os +import re +import sys +import urllib.parse + +import requests + + +def json_error(message: str) -> str: + """Return standardized JSON error format.""" + return json.dumps({"error": message}, indent=2, ensure_ascii=False) + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", +} + +# File types we handle specially +TEXT_EXTRACTABLE = { + "application/pdf": "pdf", +} + + +def guess_filename(url: str, resp: requests.Response) -> str: + """Determine filename from Content-Disposition, URL, or Content-Type.""" + # Check Content-Disposition header + cd = resp.headers.get("Content-Disposition", "") + if "filename=" in cd: + match = re.search(r'filename[*]?=["\']?([^"\';]+)', cd) + if match: + return match.group(1).strip() + + # Extract from URL path + parsed = urllib.parse.urlparse(url) + path_name = os.path.basename(parsed.path) + if path_name and "." in path_name: + return urllib.parse.unquote(path_name) + + # Fall back to content type + ct = resp.headers.get("Content-Type", "") + ext_map = { + "application/pdf": "download.pdf", + "image/png": "download.png", + "image/jpeg": "download.jpg", + "image/gif": "download.gif", + "image/webp": "download.webp", + "application/zip": "download.zip", + "text/html": "download.html", + "text/plain": "download.txt", + "application/json": "download.json", + } + for mime, name in ext_map.items(): + if mime in ct: + return name + + return "download.bin" + + +def extract_pdf_text(filepath: str) -> str: + """Try to extract text from a PDF. Returns empty string on failure.""" + # Try pdfplumber first + try: + import pdfplumber + text_parts = [] + with pdfplumber.open(filepath) as pdf: + for page in pdf.pages: + t = page.extract_text() + if t: + text_parts.append(t) + return "\n\n".join(text_parts) + except ImportError: + pass + + # Try PyPDF2 + try: + from PyPDF2 import PdfReader + reader = PdfReader(filepath) + text_parts = [] + for page in reader.pages: + t = page.extract_text() + if t: + text_parts.append(t) + return "\n\n".join(text_parts) + except ImportError: + pass + + return "" + + +def download(url: str, output_dir: str = "/tmp/downloads", filename: str = None, + proxy: str = None, user_agent: str = None) -> dict: + os.makedirs(output_dir, exist_ok=True) + + headers = HEADERS.copy() + if user_agent: + headers["User-Agent"] = user_agent + + proxies = {} + if proxy: + proxies = {"http": proxy, "https": proxy} + + try: + resp = requests.get(url, headers=headers, timeout=30, stream=True, + allow_redirects=True, proxies=proxies) + except requests.exceptions.SSLError: + # Retry without SSL verification if certs are broken + resp = requests.get(url, headers=headers, timeout=30, stream=True, + allow_redirects=True, proxies=proxies, verify=False) + resp.raise_for_status() + + if not filename: + filename = guess_filename(url, resp) + + filepath = os.path.join(output_dir, filename) + + # Avoid overwriting โ€” add suffix if exists + base, ext = os.path.splitext(filepath) + counter = 1 + while os.path.exists(filepath): + filepath = f"{base}_{counter}{ext}" + counter += 1 + + # Stream to disk + total = 0 + with open(filepath, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + f.write(chunk) + total += len(chunk) + + content_type = resp.headers.get("Content-Type", "unknown") + result = { + "status": "downloaded", + "path": filepath, + "filename": os.path.basename(filepath), + "size_bytes": total, + "content_type": content_type, + "url": url, + } + + # Add redirect URL if redirected + if resp.url != url: + result["redirect_url"] = resp.url + + # Extract text from PDFs + if "pdf" in content_type.lower() or filepath.lower().endswith(".pdf"): + text = extract_pdf_text(filepath) + if text: + result["extracted_text"] = text + result["extracted_chars"] = len(text) + else: + result["extracted_text"] = "" + result["note"] = "PDF text extraction failed. Install pdfplumber or PyPDF2 for text extraction." + + return result + + +def main(): + parser = argparse.ArgumentParser(description="Download files from URLs") + parser.add_argument("url", help="URL to download") + parser.add_argument("--output", default="/tmp/downloads", help="Output directory (default: /tmp/downloads)") + parser.add_argument("--filename", default=None, help="Override filename") + parser.add_argument("--proxy", help="Proxy URL (e.g., http://proxy:8080)") + parser.add_argument("--user-agent", help="Override User-Agent string") + args = parser.parse_args() + + try: + result = download(args.url, args.output, args.filename, args.proxy, args.user_agent) + print(json.dumps(result, indent=2, ensure_ascii=False)) + except Exception as e: + print(json_error(f"Download failed: {str(e)}")) + + +if __name__ == "__main__": + main() diff --git a/scripts/google_search.py b/scripts/google_search.py new file mode 100644 index 0000000..131dece --- /dev/null +++ b/scripts/google_search.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +"""Web search via multiple engines. No API key required. + +Usage: + python3 google_search.py "search term" [--pages N] [--engine ENGINE] + +Flags: + --pages N Number of result pages (default: 1, ~10 results each) + --engine ENGINE Search engine: duckduckgo (default), brave, google + Note: google often blocks with CAPTCHA + +Outputs JSON array of {title, url, snippet} per result. +""" + +import argparse +import json +import time +import urllib.parse + +import requests +from bs4 import BeautifulSoup + + +def json_error(message: str) -> str: + """Return standardized JSON error format.""" + return json.dumps({"error": message}, indent=2, ensure_ascii=False) + +HEADERS = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Accept-Language": "en-US,en;q=0.9", +} + + +def search_duckduckgo(query: str, pages: int = 1) -> list[dict]: + """DuckDuckGo HTML endpoint โ€” most reliable, no CAPTCHA.""" + results = [] + form_data = {"q": query} + + for page in range(pages): + resp = requests.post("https://html.duckduckgo.com/html/", data=form_data, headers=HEADERS, timeout=15) + resp.raise_for_status() + soup = BeautifulSoup(resp.text, "html.parser") + + for res in soup.select(".result"): + title_el = res.select_one(".result__title a, a.result__a") + snippet_el = res.select_one(".result__snippet") + if not title_el: + continue + href = title_el.get("href", "") + if "uddg=" in href: + href = urllib.parse.unquote( + urllib.parse.parse_qs(urllib.parse.urlparse(href).query).get("uddg", [href])[0] + ) + if href.startswith("http"): + results.append({ + "title": title_el.get_text(strip=True), + "url": href, + "snippet": snippet_el.get_text(strip=True) if snippet_el else "", + }) + + if page < pages - 1: + next_form = None + for btn in soup.find_all("input", {"value": "Next"}): + if btn.parent and btn.parent.name == "form": + next_form = btn.parent + break + if not next_form: + break + form_data = {} + for inp in next_form.find_all("input"): + name = inp.get("name") + if name: + form_data[name] = inp.get("value", "") + time.sleep(1) + + return results + + +def search_brave(query: str, pages: int = 1) -> list[dict]: + """Brave Search HTML โ€” good alternative, sometimes more results.""" + results = [] + + for page in range(pages): + offset = page * 10 + params = {"q": query, "offset": str(offset)} + resp = requests.get("https://search.brave.com/search", params=params, headers=HEADERS, timeout=15) + resp.raise_for_status() + soup = BeautifulSoup(resp.text, "html.parser") + + for item in soup.select('div[data-type="web"]'): + # Title: dedicated title span, or first link text + title_el = item.select_one(".title.search-snippet-title, .search-snippet-title") + link_el = item.select_one("a[href^='http']") + # Description/snippet + snippet_el = item.select_one(".generic-snippet .content, .generic-snippet, .snippet-description") + + if not link_el: + continue + href = link_el.get("href", "") + title = title_el.get_text(strip=True) if title_el else link_el.get_text(strip=True) + if href.startswith("http") and title: + results.append({ + "title": title, + "url": href, + "snippet": snippet_el.get_text(strip=True) if snippet_el else "", + }) + + if page < pages - 1: + time.sleep(1) + + return results + + +def search_google(query: str, pages: int = 1) -> list[dict]: + """Google HTML โ€” often blocked by CAPTCHA. Use as fallback.""" + results = [] + + for page in range(pages): + start = page * 10 + params = {"q": query, "start": str(start), "hl": "en"} + resp = requests.get("https://www.google.com/search", params=params, headers=HEADERS, timeout=15) + resp.raise_for_status() + + if "sorry" in resp.url or "unusual traffic" in resp.text.lower(): + if not results: + raise RuntimeError("Google blocked the request (CAPTCHA). Try --engine duckduckgo or brave.") + break + + soup = BeautifulSoup(resp.text, "html.parser") + for h3 in soup.find_all("h3"): + parent_a = h3.find_parent("a") + if parent_a and parent_a.get("href", "").startswith("http"): + # Find snippet near the h3 + container = h3.find_parent("div", class_="g") or h3.parent + snippet_el = container.select_one("div[data-sncf], div.VwiC3b, span.st") if container else None + results.append({ + "title": h3.get_text(strip=True), + "url": parent_a["href"], + "snippet": snippet_el.get_text(strip=True) if snippet_el else "", + }) + + if page < pages - 1: + time.sleep(1.5) + + return results + + +ENGINES = { + "duckduckgo": search_duckduckgo, + "ddg": search_duckduckgo, + "brave": search_brave, + "google": search_google, +} + + +def main(): + parser = argparse.ArgumentParser(description="Web search (multi-engine, no API key)") + parser.add_argument("query", help="Search query") + parser.add_argument("--pages", type=int, default=1, help="Number of result pages (default: 1)") + parser.add_argument("--engine", choices=["duckduckgo", "ddg", "brave", "google"], + default="duckduckgo", help="Search engine (default: duckduckgo)") + args = parser.parse_args() + + try: + search_fn = ENGINES[args.engine] + results = search_fn(args.query, args.pages) + + # Deduplicate + seen = set() + deduped = [] + for r in results: + if r["url"] not in seen: + seen.add(r["url"]) + deduped.append(r) + + print(json.dumps(deduped, indent=2, ensure_ascii=False)) + except Exception as e: + print(json_error(f"Search failed: {str(e)}")) + + +if __name__ == "__main__": + main() diff --git a/scripts/read_page.py b/scripts/read_page.py new file mode 100644 index 0000000..f8d11bb --- /dev/null +++ b/scripts/read_page.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +"""Extract readable content from a web page using Playwright + Chromium. + +Usage: + python3 read_page.py [--max-chars N] [--visible] [--format FMT] [--no-dismiss] + +Flags: + --max-chars N Max characters to output (default: 50000) + --visible Show browser window (non-headless) + --format FMT Output format: json (default), markdown, text + --no-dismiss Skip cookie consent auto-dismiss + +Outputs content in the requested format. +""" + +import argparse +import json +import re + +from playwright.sync_api import sync_playwright + +EXTRACT_JS = """() => { + const SKIP = new Set(['SCRIPT','STYLE','NOSCRIPT','IFRAME','SVG','NAV','FOOTER','HEADER','ASIDE']); + const title = document.title || ''; + const mainEl = document.querySelector('article') + || document.querySelector('main') + || document.querySelector('[role="main"]') + || document.querySelector('#content, .content, .post-content, .entry-content') + || document.body; + + const lines = []; + const walker = document.createTreeWalker(mainEl, NodeFilter.SHOW_ELEMENT, { + acceptNode(node) { + if (SKIP.has(node.tagName)) return NodeFilter.FILTER_REJECT; + const tag = node.tagName.toLowerCase(); + if (['h1','h2','h3','h4','h5','h6','p','li','td','th','pre','blockquote'].includes(tag)) + return NodeFilter.FILTER_ACCEPT; + return NodeFilter.FILTER_SKIP; + } + }); + let node; + while (node = walker.nextNode()) { + const text = node.innerText?.trim(); + if (!text) continue; + const tag = node.tagName.toLowerCase(); + if (tag.startsWith('h')) lines.push('\\n' + '#'.repeat(parseInt(tag[1])) + ' ' + text + '\\n'); + else if (tag === 'li') lines.push('- ' + text); + else if (tag === 'blockquote') lines.push('> ' + text); + else lines.push(text); + } + let content = lines.join('\\n').trim(); + if (content.length < 200) content = mainEl.innerText || ''; + return { title, content }; +}""" + +COOKIE_DISMISS_JS = """() => { + const selectors = [ + 'button[id*="accept" i]', 'button[id*="consent" i]', 'button[id*="agree" i]', + 'button[class*="accept" i]', 'button[class*="consent" i]', 'button[class*="agree" i]', + 'a[id*="accept" i]', 'a[class*="accept" i]', + '[data-testid*="accept" i]', '[data-testid*="consent" i]', + '.cookie-banner button', '.cookie-notice button', '.cookie-popup button', + '#cookie-banner button', '#cookie-notice button', '#cookie-popup button', + '.cc-btn.cc-dismiss', '.cc-accept', '#onetrust-accept-btn-handler', + '.js-cookie-consent-agree', '[aria-label*="accept" i][aria-label*="cookie" i]', + '[aria-label*="Accept all" i]', '[aria-label*="Accept cookies" i]', + ]; + for (const sel of selectors) { + try { + const el = document.querySelector(sel); + if (el && el.offsetParent !== null) { el.click(); return { dismissed: true }; } + } catch(e) {} + } + const patterns = [ + /^accept all$/i, /accept all cookies/i, /accept cookies/i, /accept & close/i, + /^agree$/i, /agree and continue/i, /agree & continue/i, + /consent and continue/i, /consent & continue/i, + /got it/i, /i understand/i, /i agree/i, + /allow all/i, /allow cookies/i, /allow all cookies/i, + /^ok$/i, /^okay$/i, /^continue$/i, /^dismiss$/i, + /accept and close/i, /accept and continue/i, + /nur notwendige/i, /alle akzeptieren/i, /akzeptieren/i, + /tout accepter/i, /accepter/i, /accepter et continuer/i, + ]; + for (const btn of document.querySelectorAll('button, a[role="button"], [role="button"]')) { + const text = btn.innerText?.trim(); + if (!text || text.length > 50) continue; + for (const pat of patterns) { + if (pat.test(text) && btn.offsetParent !== null) { btn.click(); return { dismissed: true }; } + } + } + return { dismissed: false }; +}""" + + +def format_output(result: dict, fmt: str) -> str: + if fmt == "text": + content = result.get("content", "") + content = re.sub(r'^#+\s+', '', content, flags=re.MULTILINE) + content = re.sub(r'^- ', ' ', content, flags=re.MULTILINE) + content = re.sub(r'^> ', '', content, flags=re.MULTILINE) + return content.strip() + elif fmt == "markdown": + return f"# {result.get('title', '')}\n\n{result.get('content', '')}" + else: + return json.dumps(result, indent=2, ensure_ascii=False) + + +def main(): + parser = argparse.ArgumentParser(description="Web page reader (Playwright + Chromium)") + parser.add_argument("url", help="URL to read") + parser.add_argument("--max-chars", type=int, default=50000, help="Max characters (default: 50000)") + parser.add_argument("--visible", action="store_true", help="Run in visible (non-headless) mode") + parser.add_argument("--format", choices=["json", "markdown", "text"], default="json", help="Output format") + parser.add_argument("--no-dismiss", action="store_true", help="Skip cookie consent auto-dismiss") + args = parser.parse_args() + + with sync_playwright() as p: + browser = p.chromium.launch(headless=not args.visible) + ctx = browser.new_context( + user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + locale="en-US", + viewport={"width": 1280, "height": 900}, + ) + page = ctx.new_page() + page.goto(args.url, timeout=30000, wait_until="domcontentloaded") + page.wait_for_timeout(1500) + + if not args.no_dismiss: + # Try main frame first, then iframes (EU sites often use iframe consent) + dismissed = page.evaluate(COOKIE_DISMISS_JS) + if not dismissed.get("dismissed"): + for frame in page.frames: + if frame == page.main_frame: + continue + try: + r = frame.evaluate(COOKIE_DISMISS_JS) + if r.get("dismissed"): + break + except Exception: + pass + page.wait_for_timeout(500) + + result = page.evaluate(EXTRACT_JS) + if len(result["content"]) > args.max_chars: + result["content"] = result["content"][:args.max_chars] + "\n\n[...truncated]" + + print(format_output(result, args.format)) + browser.close() + + +if __name__ == "__main__": + main()