Initial commit with translated description

This commit is contained in:
2026-03-29 14:34:25 +08:00
commit 7d03078316
17 changed files with 7879 additions and 0 deletions

2532
scripts/analyze_stock.py Normal file

File diff suppressed because it is too large Load Diff

365
scripts/dividends.py Normal file
View File

@@ -0,0 +1,365 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "yfinance>=0.2.40",
# "pandas>=2.0.0",
# ]
# ///
"""
Dividend Analysis Module.
Analyzes dividend metrics for income investors:
- Dividend Yield
- Payout Ratio
- Dividend Growth Rate (5Y CAGR)
- Dividend Safety Score
- Ex-Dividend Date
Usage:
uv run dividends.py AAPL
uv run dividends.py JNJ PG KO --output json
"""
import argparse
import json
import sys
from dataclasses import dataclass, asdict
from datetime import datetime
import pandas as pd
import yfinance as yf
@dataclass
class DividendAnalysis:
ticker: str
company_name: str
# Basic metrics
dividend_yield: float | None # Annual yield %
annual_dividend: float | None # Annual dividend per share
current_price: float | None
# Payout analysis
payout_ratio: float | None # Dividend / EPS
payout_status: str # "safe", "moderate", "high", "unsustainable"
# Growth
dividend_growth_5y: float | None # 5-year CAGR %
consecutive_years: int | None # Years of consecutive increases
dividend_history: list[dict] | None # Last 5 years
# Timing
ex_dividend_date: str | None
payment_frequency: str | None # "quarterly", "monthly", "annual"
# Safety score (0-100)
safety_score: int
safety_factors: list[str]
# Verdict
income_rating: str # "excellent", "good", "moderate", "poor", "no_dividend"
summary: str
def analyze_dividends(ticker: str, verbose: bool = False) -> DividendAnalysis | None:
"""Analyze dividend metrics for a stock."""
try:
stock = yf.Ticker(ticker)
info = stock.info
company_name = info.get("longName") or info.get("shortName") or ticker
current_price = info.get("regularMarketPrice") or info.get("currentPrice")
# Basic dividend info
dividend_yield = info.get("dividendYield")
if dividend_yield:
dividend_yield = dividend_yield * 100 # Convert to percentage
annual_dividend = info.get("dividendRate")
# No dividend
if not annual_dividend or annual_dividend == 0:
return DividendAnalysis(
ticker=ticker,
company_name=company_name,
dividend_yield=None,
annual_dividend=None,
current_price=current_price,
payout_ratio=None,
payout_status="no_dividend",
dividend_growth_5y=None,
consecutive_years=None,
dividend_history=None,
ex_dividend_date=None,
payment_frequency=None,
safety_score=0,
safety_factors=["No dividend paid"],
income_rating="no_dividend",
summary=f"{ticker} does not pay a dividend.",
)
# Payout ratio
trailing_eps = info.get("trailingEps")
payout_ratio = None
payout_status = "unknown"
if trailing_eps and trailing_eps > 0 and annual_dividend:
payout_ratio = (annual_dividend / trailing_eps) * 100
if payout_ratio < 40:
payout_status = "safe"
elif payout_ratio < 60:
payout_status = "moderate"
elif payout_ratio < 80:
payout_status = "high"
else:
payout_status = "unsustainable"
# Dividend history (for growth calculation)
dividends = stock.dividends
dividend_history = None
dividend_growth_5y = None
consecutive_years = None
if dividends is not None and len(dividends) > 0:
# Group by year
dividends_df = dividends.reset_index()
dividends_df["Year"] = pd.to_datetime(dividends_df["Date"]).dt.year
yearly = dividends_df.groupby("Year")["Dividends"].sum().sort_index(ascending=False)
# Last 5 years history
dividend_history = []
for year in yearly.head(5).index:
dividend_history.append({
"year": int(year),
"total": round(float(yearly[year]), 4),
})
# Calculate 5-year CAGR
if len(yearly) >= 5:
current_div = yearly.iloc[0]
div_5y_ago = yearly.iloc[4]
if div_5y_ago > 0 and current_div > 0:
dividend_growth_5y = ((current_div / div_5y_ago) ** (1/5) - 1) * 100
# Count consecutive years of increases
consecutive_years = 0
prev_div = None
for div in yearly.values:
if prev_div is not None:
if div >= prev_div:
consecutive_years += 1
else:
break
prev_div = div
# Ex-dividend date
ex_dividend_date = info.get("exDividendDate")
if ex_dividend_date:
ex_dividend_date = datetime.fromtimestamp(ex_dividend_date).strftime("%Y-%m-%d")
# Payment frequency
payment_frequency = None
if dividends is not None and len(dividends) >= 4:
# Count dividends in last year
one_year_ago = pd.Timestamp.now() - pd.DateOffset(years=1)
recent_divs = dividends[dividends.index > one_year_ago]
count = len(recent_divs)
if count >= 10:
payment_frequency = "monthly"
elif count >= 3:
payment_frequency = "quarterly"
elif count >= 1:
payment_frequency = "annual"
# Safety score calculation (0-100)
safety_score = 50 # Base score
safety_factors = []
# Payout ratio factor (+/- 20)
if payout_ratio:
if payout_ratio < 40:
safety_score += 20
safety_factors.append(f"Low payout ratio ({payout_ratio:.0f}%)")
elif payout_ratio < 60:
safety_score += 10
safety_factors.append(f"Moderate payout ratio ({payout_ratio:.0f}%)")
elif payout_ratio < 80:
safety_score -= 10
safety_factors.append(f"High payout ratio ({payout_ratio:.0f}%)")
else:
safety_score -= 20
safety_factors.append(f"Unsustainable payout ratio ({payout_ratio:.0f}%)")
# Growth factor (+/- 15)
if dividend_growth_5y:
if dividend_growth_5y > 10:
safety_score += 15
safety_factors.append(f"Strong dividend growth ({dividend_growth_5y:.1f}% CAGR)")
elif dividend_growth_5y > 5:
safety_score += 10
safety_factors.append(f"Good dividend growth ({dividend_growth_5y:.1f}% CAGR)")
elif dividend_growth_5y > 0:
safety_score += 5
safety_factors.append(f"Positive dividend growth ({dividend_growth_5y:.1f}% CAGR)")
else:
safety_score -= 15
safety_factors.append(f"Dividend declining ({dividend_growth_5y:.1f}% CAGR)")
# Consecutive years factor (+/- 15)
if consecutive_years:
if consecutive_years >= 25:
safety_score += 15
safety_factors.append(f"Dividend Aristocrat ({consecutive_years}+ years)")
elif consecutive_years >= 10:
safety_score += 10
safety_factors.append(f"Long dividend history ({consecutive_years} years)")
elif consecutive_years >= 5:
safety_score += 5
safety_factors.append(f"Consistent dividend ({consecutive_years} years)")
# Yield factor (high yield can be risky)
if dividend_yield:
if dividend_yield > 8:
safety_score -= 10
safety_factors.append(f"Very high yield ({dividend_yield:.1f}%) - verify sustainability")
elif dividend_yield < 1:
safety_factors.append(f"Low yield ({dividend_yield:.2f}%)")
# Clamp score
safety_score = max(0, min(100, safety_score))
# Income rating
if safety_score >= 80:
income_rating = "excellent"
elif safety_score >= 60:
income_rating = "good"
elif safety_score >= 40:
income_rating = "moderate"
else:
income_rating = "poor"
# Summary
summary_parts = []
if dividend_yield:
summary_parts.append(f"{dividend_yield:.2f}% yield")
if payout_ratio:
summary_parts.append(f"{payout_ratio:.0f}% payout")
if dividend_growth_5y:
summary_parts.append(f"{dividend_growth_5y:+.1f}% 5Y growth")
if consecutive_years and consecutive_years >= 5:
summary_parts.append(f"{consecutive_years}Y streak")
summary = f"{ticker}: {', '.join(summary_parts)}. Rating: {income_rating.upper()}"
return DividendAnalysis(
ticker=ticker,
company_name=company_name,
dividend_yield=round(dividend_yield, 2) if dividend_yield else None,
annual_dividend=round(annual_dividend, 4) if annual_dividend else None,
current_price=current_price,
payout_ratio=round(payout_ratio, 1) if payout_ratio else None,
payout_status=payout_status,
dividend_growth_5y=round(dividend_growth_5y, 2) if dividend_growth_5y else None,
consecutive_years=consecutive_years,
dividend_history=dividend_history,
ex_dividend_date=ex_dividend_date,
payment_frequency=payment_frequency,
safety_score=safety_score,
safety_factors=safety_factors,
income_rating=income_rating,
summary=summary,
)
except Exception as e:
if verbose:
print(f"Error analyzing {ticker}: {e}", file=sys.stderr)
return None
def format_text(analysis: DividendAnalysis) -> str:
"""Format dividend analysis as text."""
lines = [
"=" * 60,
f"DIVIDEND ANALYSIS: {analysis.ticker} ({analysis.company_name})",
"=" * 60,
"",
]
if analysis.income_rating == "no_dividend":
lines.append("This stock does not pay a dividend.")
lines.append("=" * 60)
return "\n".join(lines)
# Yield & Price
lines.append(f"Current Price: ${analysis.current_price:.2f}")
lines.append(f"Annual Dividend: ${analysis.annual_dividend:.2f}")
lines.append(f"Dividend Yield: {analysis.dividend_yield:.2f}%")
lines.append(f"Payment Freq: {analysis.payment_frequency or 'Unknown'}")
if analysis.ex_dividend_date:
lines.append(f"Ex-Dividend: {analysis.ex_dividend_date}")
lines.append("")
# Payout & Safety
lines.append(f"Payout Ratio: {analysis.payout_ratio:.1f}% ({analysis.payout_status})")
lines.append(f"5Y Div Growth: {analysis.dividend_growth_5y:+.1f}%" if analysis.dividend_growth_5y else "5Y Div Growth: N/A")
if analysis.consecutive_years:
lines.append(f"Consecutive Yrs: {analysis.consecutive_years}")
lines.append("")
lines.append(f"SAFETY SCORE: {analysis.safety_score}/100")
lines.append(f"INCOME RATING: {analysis.income_rating.upper()}")
lines.append("")
lines.append("Safety Factors:")
for factor in analysis.safety_factors:
lines.append(f"{factor}")
# History
if analysis.dividend_history:
lines.append("")
lines.append("Dividend History:")
for h in analysis.dividend_history[:5]:
lines.append(f" {h['year']}: ${h['total']:.2f}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Dividend Analysis")
parser.add_argument("tickers", nargs="+", help="Stock ticker(s)")
parser.add_argument("--output", choices=["text", "json"], default="text")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
results = []
for ticker in args.tickers:
analysis = analyze_dividends(ticker.upper(), verbose=args.verbose)
if analysis:
results.append(analysis)
else:
print(f"Error: Could not analyze {ticker}", file=sys.stderr)
if args.output == "json":
if len(results) == 1:
print(json.dumps(asdict(results[0]), indent=2))
else:
print(json.dumps([asdict(r) for r in results], indent=2))
else:
for i, analysis in enumerate(results):
if i > 0:
print("\n")
print(format_text(analysis))
if __name__ == "__main__":
main()

582
scripts/hot_scanner.py Normal file
View File

@@ -0,0 +1,582 @@
#!/usr/bin/env python3
"""
🔥 HOT SCANNER v2 - Find viral stocks & crypto trends
Now with Twitter/X, Reddit, and improved Yahoo Finance
"""
import json
import urllib.request
import urllib.error
import xml.etree.ElementTree as ET
import gzip
import io
import subprocess
import os
from datetime import datetime, timezone
from pathlib import Path
import re
import ssl
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
# Load .env file if exists
ENV_FILE = Path(__file__).parent.parent / ".env"
if ENV_FILE.exists():
with open(ENV_FILE) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
os.environ[key] = value
# Cache directory
CACHE_DIR = Path(__file__).parent.parent / "cache"
CACHE_DIR.mkdir(exist_ok=True)
# SSL context
SSL_CONTEXT = ssl.create_default_context()
class HotScanner:
def __init__(self, include_social=True):
self.include_social = include_social
self.results = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"crypto": [],
"stocks": [],
"news": [],
"movers": [],
"social": []
}
self.mentions = defaultdict(lambda: {"count": 0, "sources": [], "sentiment_hints": []})
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
}
def _fetch(self, url, timeout=15):
"""Fetch URL with gzip support."""
req = urllib.request.Request(url, headers=self.headers)
with urllib.request.urlopen(req, timeout=timeout, context=SSL_CONTEXT) as resp:
data = resp.read()
# Handle gzip
if resp.info().get('Content-Encoding') == 'gzip' or data[:2] == b'\x1f\x8b':
data = gzip.decompress(data)
return data.decode('utf-8', errors='replace')
def _fetch_json(self, url, timeout=15):
"""Fetch and parse JSON."""
return json.loads(self._fetch(url, timeout))
def scan_all(self):
"""Run all scans in parallel."""
print("🔍 Scanning for hot trends...\n")
tasks = [
("CoinGecko Trending", self.scan_coingecko_trending),
("CoinGecko Movers", self.scan_coingecko_gainers_losers),
("Google News Finance", self.scan_google_news_finance),
("Google News Crypto", self.scan_google_news_crypto),
("Yahoo Movers", self.scan_yahoo_movers),
]
if self.include_social:
tasks.extend([
("Reddit WSB", self.scan_reddit_wsb),
("Reddit Crypto", self.scan_reddit_crypto),
("Twitter/X", self.scan_twitter),
])
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(task[1]): task[0] for task in tasks}
for future in as_completed(futures):
name = futures[future]
try:
future.result()
except Exception as e:
print(f"{name}: {str(e)[:50]}")
return self.results
def scan_coingecko_trending(self):
"""Get trending crypto from CoinGecko."""
print(" 📊 CoinGecko Trending...")
try:
url = "https://api.coingecko.com/api/v3/search/trending"
data = self._fetch_json(url)
for item in data.get("coins", [])[:10]:
coin = item.get("item", {})
price_data = coin.get("data", {})
price_change = price_data.get("price_change_percentage_24h", {}).get("usd", 0)
entry = {
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"rank": coin.get("market_cap_rank"),
"price_change_24h": round(price_change, 2) if price_change else None,
"source": "coingecko_trending"
}
self.results["crypto"].append(entry)
sym = entry["symbol"]
self.mentions[sym]["count"] += 2 # Trending gets extra weight
self.mentions[sym]["sources"].append("CoinGecko Trending")
if price_change:
direction = "🚀 bullish" if price_change > 0 else "📉 bearish"
self.mentions[sym]["sentiment_hints"].append(f"{direction} ({price_change:+.1f}%)")
print(f"{len(data.get('coins', []))} trending coins")
except Exception as e:
print(f" ❌ CoinGecko trending: {e}")
def scan_coingecko_gainers_losers(self):
"""Get top gainers/losers."""
print(" 📈 CoinGecko Movers...")
try:
url = "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&order=market_cap_desc&per_page=100&page=1&price_change_percentage=24h"
data = self._fetch_json(url)
sorted_data = sorted(data, key=lambda x: abs(x.get("price_change_percentage_24h") or 0), reverse=True)
count = 0
for coin in sorted_data[:20]:
change = coin.get("price_change_percentage_24h", 0)
if abs(change or 0) > 3:
entry = {
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"price": coin.get("current_price"),
"change_24h": round(change, 2) if change else None,
"volume": coin.get("total_volume"),
"source": "coingecko_movers"
}
self.results["movers"].append(entry)
count += 1
sym = entry["symbol"]
self.mentions[sym]["count"] += 1
self.mentions[sym]["sources"].append("CoinGecko Movers")
direction = "🚀 pumping" if change > 0 else "📉 dumping"
self.mentions[sym]["sentiment_hints"].append(f"{direction} ({change:+.1f}%)")
print(f"{count} significant movers")
except Exception as e:
print(f" ❌ CoinGecko movers: {e}")
def scan_google_news_finance(self):
"""Get finance news from Google News RSS."""
print(" 📰 Google News Finance...")
try:
# Business news topic
url = "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en"
text = self._fetch(url)
root = ET.fromstring(text)
items = root.findall(".//item")
for item in items[:15]:
title_elem = item.find("title")
title = title_elem.text if title_elem is not None else ""
tickers = self._extract_tickers(title)
news_entry = {
"title": title,
"tickers_mentioned": tickers,
"source": "google_news_finance"
}
self.results["news"].append(news_entry)
for ticker in tickers:
self.mentions[ticker]["count"] += 1
self.mentions[ticker]["sources"].append("Google News")
self.mentions[ticker]["sentiment_hints"].append(f"📰 {title[:40]}...")
print(f"{len(items)} news items")
except Exception as e:
print(f" ❌ Google News Finance: {e}")
def scan_google_news_crypto(self):
"""Search for crypto news."""
print(" 📰 Google News Crypto...")
try:
url = "https://news.google.com/rss/search?q=bitcoin+OR+ethereum+OR+crypto+crash+OR+crypto+pump&hl=en-US&gl=US&ceid=US:en"
text = self._fetch(url)
root = ET.fromstring(text)
items = root.findall(".//item")
crypto_keywords = {
"bitcoin": "BTC", "btc": "BTC", "ethereum": "ETH", "eth": "ETH",
"solana": "SOL", "xrp": "XRP", "ripple": "XRP", "dogecoin": "DOGE",
"cardano": "ADA", "polkadot": "DOT", "avalanche": "AVAX",
}
for item in items[:12]:
title_elem = item.find("title")
title = title_elem.text if title_elem is not None else ""
tickers = self._extract_tickers(title)
for word, ticker in crypto_keywords.items():
if word in title.lower():
tickers.append(ticker)
tickers = list(set(tickers))
if tickers:
news_entry = {
"title": title,
"tickers_mentioned": tickers,
"source": "google_news_crypto"
}
self.results["news"].append(news_entry)
for ticker in tickers:
self.mentions[ticker]["count"] += 1
self.mentions[ticker]["sources"].append("Google News Crypto")
print(f" ✅ Processed crypto news")
except Exception as e:
print(f" ❌ Google News Crypto: {e}")
def scan_yahoo_movers(self):
"""Scrape Yahoo Finance movers with gzip support."""
print(" 📈 Yahoo Finance Movers...")
categories = [
("gainers", "https://finance.yahoo.com/gainers/"),
("losers", "https://finance.yahoo.com/losers/"),
("most_active", "https://finance.yahoo.com/most-active/")
]
for category, url in categories:
try:
text = self._fetch(url, timeout=12)
# Multiple patterns for ticker extraction
tickers = []
# Pattern 1: data-symbol attribute
tickers.extend(re.findall(r'data-symbol="([A-Z]{1,5})"', text))
# Pattern 2: ticker in URL
tickers.extend(re.findall(r'/quote/([A-Z]{1,5})[/"\?]', text))
# Pattern 3: fin-streamer
tickers.extend(re.findall(r'fin-streamer[^>]*symbol="([A-Z]{1,5})"', text))
unique_tickers = list(dict.fromkeys(tickers))[:15]
for ticker in unique_tickers:
# Skip common false positives
if ticker in ['USA', 'CEO', 'IPO', 'ETF', 'SEC', 'FDA', 'NYSE', 'API']:
continue
self.results["stocks"].append({
"symbol": ticker,
"category": category,
"source": f"yahoo_{category}"
})
self.mentions[ticker]["count"] += 1
self.mentions[ticker]["sources"].append(f"Yahoo {category.replace('_', ' ').title()}")
if unique_tickers:
print(f" ✅ Yahoo {category}: {len(unique_tickers)} tickers")
except Exception as e:
print(f" ⚠️ Yahoo {category}: {str(e)[:30]}")
def scan_reddit_wsb(self):
"""Scrape r/wallstreetbets for hot stocks."""
print(" 🦍 Reddit r/wallstreetbets...")
try:
# Use old.reddit.com (more scrape-friendly)
url = "https://old.reddit.com/r/wallstreetbets/hot/.json"
headers = {**self.headers, "Accept": "application/json"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=15, context=SSL_CONTEXT) as resp:
data = resp.read()
if data[:2] == b'\x1f\x8b':
data = gzip.decompress(data)
posts = json.loads(data.decode('utf-8'))
tickers_found = []
for post in posts.get("data", {}).get("children", [])[:25]:
title = post.get("data", {}).get("title", "")
score = post.get("data", {}).get("score", 0)
# Extract tickers
tickers = self._extract_tickers(title)
for ticker in tickers:
if ticker not in ['USA', 'CEO', 'IPO', 'DD', 'WSB', 'YOLO', 'FD']:
weight = 2 if score > 1000 else 1
self.mentions[ticker]["count"] += weight
self.mentions[ticker]["sources"].append("Reddit WSB")
self.mentions[ticker]["sentiment_hints"].append(f"🦍 WSB: {title[:35]}...")
tickers_found.append(ticker)
self.results["social"].append({
"platform": "reddit_wsb",
"title": title[:100],
"score": score,
"tickers": tickers
})
print(f" ✅ WSB: {len(set(tickers_found))} tickers mentioned")
except Exception as e:
print(f" ❌ Reddit WSB: {str(e)[:40]}")
def scan_reddit_crypto(self):
"""Scrape r/cryptocurrency for hot coins."""
print(" 💎 Reddit r/cryptocurrency...")
try:
url = "https://old.reddit.com/r/cryptocurrency/hot/.json"
headers = {**self.headers, "Accept": "application/json"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=15, context=SSL_CONTEXT) as resp:
data = resp.read()
if data[:2] == b'\x1f\x8b':
data = gzip.decompress(data)
posts = json.loads(data.decode('utf-8'))
crypto_keywords = {
"bitcoin": "BTC", "btc": "BTC", "ethereum": "ETH", "eth": "ETH",
"solana": "SOL", "sol": "SOL", "xrp": "XRP", "cardano": "ADA",
"dogecoin": "DOGE", "doge": "DOGE", "shiba": "SHIB", "pepe": "PEPE",
"avalanche": "AVAX", "polkadot": "DOT", "chainlink": "LINK",
}
tickers_found = []
for post in posts.get("data", {}).get("children", [])[:20]:
title = post.get("data", {}).get("title", "").lower()
score = post.get("data", {}).get("score", 0)
for word, ticker in crypto_keywords.items():
if word in title:
weight = 2 if score > 500 else 1
self.mentions[ticker]["count"] += weight
self.mentions[ticker]["sources"].append("Reddit Crypto")
tickers_found.append(ticker)
print(f" ✅ r/crypto: {len(set(tickers_found))} coins mentioned")
except Exception as e:
print(f" ❌ Reddit Crypto: {str(e)[:40]}")
def scan_twitter(self):
"""Use bird CLI to get trending finance/crypto tweets."""
print(" 🐦 Twitter/X...")
try:
# Find bird binary
bird_paths = [
"/home/clawdbot/.nvm/versions/node/v24.12.0/bin/bird",
"/usr/local/bin/bird",
"bird"
]
bird_bin = None
for p in bird_paths:
if Path(p).exists() or p == "bird":
bird_bin = p
break
if not bird_bin:
print(" ⚠️ Twitter: bird not found")
return
# Search for finance tweets
searches = [
("stocks", "stock OR $SPY OR $QQQ OR earnings"),
("crypto", "bitcoin OR ethereum OR crypto OR $BTC"),
]
for category, query in searches:
try:
env = os.environ.copy()
result = subprocess.run(
[bird_bin, "search", query, "-n", "15", "--json"],
capture_output=True, text=True, timeout=30, env=env
)
if result.returncode == 0 and result.stdout.strip():
tweets = json.loads(result.stdout)
for tweet in tweets[:10]:
text = tweet.get("text", "")
tickers = self._extract_tickers(text)
# Add crypto keywords
crypto_map = {"bitcoin": "BTC", "ethereum": "ETH", "solana": "SOL"}
for word, ticker in crypto_map.items():
if word in text.lower():
tickers.append(ticker)
for ticker in set(tickers):
self.mentions[ticker]["count"] += 1
self.mentions[ticker]["sources"].append("Twitter/X")
self.mentions[ticker]["sentiment_hints"].append(f"🐦 {text[:35]}...")
self.results["social"].append({
"platform": "twitter",
"text": text[:100],
"tickers": list(set(tickers))
})
print(f" ✅ Twitter {category}: processed")
except subprocess.TimeoutExpired:
print(f" ⚠️ Twitter {category}: timeout")
except json.JSONDecodeError:
print(f" ⚠️ Twitter {category}: no auth?")
except FileNotFoundError:
print(" ⚠️ Twitter: bird CLI not found")
except Exception as e:
print(f" ❌ Twitter: {str(e)[:40]}")
def _extract_tickers(self, text):
"""Extract stock/crypto tickers from text."""
patterns = [
r'\$([A-Z]{1,5})\b', # $AAPL
r'\(([A-Z]{2,5})\)', # (AAPL)
r'(?:^|\s)([A-Z]{2,4})(?:\s|$|[,.])', # Standalone caps
]
tickers = []
for pattern in patterns:
matches = re.findall(pattern, text)
tickers.extend(matches)
# Company mappings
companies = {
"Apple": "AAPL", "Microsoft": "MSFT", "Google": "GOOGL", "Alphabet": "GOOGL",
"Amazon": "AMZN", "Tesla": "TSLA", "Nvidia": "NVDA", "Meta": "META",
"Netflix": "NFLX", "GameStop": "GME", "AMD": "AMD", "Intel": "INTC",
"Palantir": "PLTR", "Coinbase": "COIN", "MicroStrategy": "MSTR",
}
for company, ticker in companies.items():
if company.lower() in text.lower():
tickers.append(ticker)
# Filter out common words
skip = {'USA', 'CEO', 'IPO', 'ETF', 'SEC', 'FDA', 'NYSE', 'API', 'USD', 'EU',
'UK', 'US', 'AI', 'IT', 'AT', 'TO', 'IN', 'ON', 'IS', 'IF', 'OR', 'AN',
'DD', 'WSB', 'YOLO', 'FD', 'OP', 'PM', 'AM'}
return list(set(t for t in tickers if t not in skip and len(t) >= 2))
def get_hot_summary(self):
"""Generate summary."""
sorted_mentions = sorted(
self.mentions.items(),
key=lambda x: x[1]["count"],
reverse=True
)
summary = {
"scan_time": self.results["timestamp"],
"top_trending": [],
"crypto_highlights": [],
"stock_highlights": [],
"social_buzz": [],
"breaking_news": []
}
for symbol, data in sorted_mentions[:20]:
summary["top_trending"].append({
"symbol": symbol,
"mentions": data["count"],
"sources": list(set(data["sources"])),
"signals": data["sentiment_hints"][:3]
})
# Crypto
seen = set()
for coin in self.results["crypto"] + self.results["movers"]:
if coin["symbol"] not in seen:
summary["crypto_highlights"].append(coin)
seen.add(coin["symbol"])
# Stocks
seen = set()
for stock in self.results["stocks"]:
if stock["symbol"] not in seen:
summary["stock_highlights"].append(stock)
seen.add(stock["symbol"])
# Social
for item in self.results["social"][:15]:
summary["social_buzz"].append(item)
# News
for news in self.results["news"][:10]:
if news.get("tickers_mentioned"):
summary["breaking_news"].append({
"title": news["title"],
"tickers": news["tickers_mentioned"]
})
return summary
def main():
import argparse
parser = argparse.ArgumentParser(description="🔥 Hot Scanner - Find trending stocks & crypto")
parser.add_argument("--no-social", action="store_true", help="Skip social media scans")
parser.add_argument("--json", action="store_true", help="Output only JSON")
args = parser.parse_args()
scanner = HotScanner(include_social=not args.no_social)
if not args.json:
print("=" * 60)
print("🔥 HOT SCANNER v2 - What's Trending Right Now?")
print(f"📅 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} UTC")
print("=" * 60)
print()
scanner.scan_all()
summary = scanner.get_hot_summary()
# Save
output_file = CACHE_DIR / "hot_scan_latest.json"
with open(output_file, "w") as f:
json.dump(summary, f, indent=2, default=str)
if args.json:
print(json.dumps(summary, indent=2, default=str))
return
print()
print("=" * 60)
print("🔥 RESULTS")
print("=" * 60)
print("\n📊 TOP TRENDING (by buzz):\n")
for i, item in enumerate(summary["top_trending"][:12], 1):
sources = ", ".join(item["sources"][:2])
signal = item["signals"][0][:30] if item["signals"] else ""
print(f" {i:2}. {item['symbol']:8} ({item['mentions']:2} pts) [{sources}] {signal}")
print("\n🪙 CRYPTO:\n")
for coin in summary["crypto_highlights"][:8]:
change = coin.get("change_24h") or coin.get("price_change_24h")
change_str = f"{change:+.1f}%" if change else "🔥"
emoji = "🚀" if (change or 0) > 0 else "📉" if (change or 0) < 0 else "🔥"
print(f" {emoji} {coin.get('symbol', '?'):8} {coin.get('name', '')[:16]:16} {change_str:>8}")
print("\n📈 STOCKS:\n")
cat_emoji = {"gainers": "🟢", "losers": "🔴", "most_active": "📊"}
for stock in summary["stock_highlights"][:10]:
emoji = cat_emoji.get(stock.get("category"), "")
print(f" {emoji} {stock['symbol']:6} ({stock.get('category', 'N/A').replace('_', ' ')})")
if summary["social_buzz"]:
print("\n🐦 SOCIAL BUZZ:\n")
for item in summary["social_buzz"][:5]:
platform = item.get("platform", "?")
text = item.get("title") or item.get("text", "")
text = text[:55] + "..." if len(text) > 55 else text
print(f" [{platform}] {text}")
print("\n📰 NEWS:\n")
for news in summary["breaking_news"][:5]:
tickers = ", ".join(news["tickers"][:3])
title = news["title"][:55] + "..." if len(news["title"]) > 55 else news["title"]
print(f" [{tickers}] {title}")
print(f"\n💾 Saved: {output_file}\n")
if __name__ == "__main__":
main()

548
scripts/portfolio.py Normal file
View File

@@ -0,0 +1,548 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["yfinance>=0.2.40"]
# ///
"""
Portfolio management for stock-analysis skill.
Usage:
uv run portfolio.py create "Portfolio Name"
uv run portfolio.py list
uv run portfolio.py show [--portfolio NAME]
uv run portfolio.py delete "Portfolio Name"
uv run portfolio.py rename "Old Name" "New Name"
uv run portfolio.py add TICKER --quantity 100 --cost 150.00 [--portfolio NAME]
uv run portfolio.py update TICKER --quantity 150 [--portfolio NAME]
uv run portfolio.py remove TICKER [--portfolio NAME]
"""
import argparse
import json
import os
import sys
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Literal
import yfinance as yf
# Top 20 supported cryptocurrencies
SUPPORTED_CRYPTOS = {
"BTC-USD", "ETH-USD", "BNB-USD", "SOL-USD", "XRP-USD",
"ADA-USD", "DOGE-USD", "AVAX-USD", "DOT-USD", "MATIC-USD",
"LINK-USD", "ATOM-USD", "UNI-USD", "LTC-USD", "BCH-USD",
"XLM-USD", "ALGO-USD", "VET-USD", "FIL-USD", "NEAR-USD",
}
def get_storage_path() -> Path:
"""Get the portfolio storage path."""
# Use ~/.clawdbot/skills/stock-analysis/portfolios.json
state_dir = os.environ.get("CLAWDBOT_STATE_DIR", os.path.expanduser("~/.clawdbot"))
portfolio_dir = Path(state_dir) / "skills" / "stock-analysis"
portfolio_dir.mkdir(parents=True, exist_ok=True)
return portfolio_dir / "portfolios.json"
def detect_asset_type(ticker: str) -> Literal["stock", "crypto"]:
"""Detect asset type from ticker format."""
ticker_upper = ticker.upper()
if ticker_upper.endswith("-USD"):
base = ticker_upper[:-4]
if base.isalpha() and f"{base}-USD" in SUPPORTED_CRYPTOS:
return "crypto"
# Allow any *-USD ticker as crypto (flexible)
if base.isalpha():
return "crypto"
return "stock"
@dataclass
class Asset:
ticker: str
type: Literal["stock", "crypto"]
quantity: float
cost_basis: float
added_at: str
@dataclass
class Portfolio:
name: str
created_at: str
updated_at: str
assets: list[Asset]
class PortfolioStore:
"""Manages portfolio storage with atomic writes."""
def __init__(self, path: Path | None = None):
self.path = path or get_storage_path()
self._data: dict | None = None
def _load(self) -> dict:
"""Load portfolios from disk."""
if self._data is not None:
return self._data
if not self.path.exists():
self._data = {"version": 1, "portfolios": {}}
return self._data
try:
with open(self.path, "r", encoding="utf-8") as f:
self._data = json.load(f)
return self._data
except (json.JSONDecodeError, IOError):
self._data = {"version": 1, "portfolios": {}}
return self._data
def _save(self) -> None:
"""Save portfolios to disk with atomic write."""
if self._data is None:
return
# Ensure directory exists
self.path.parent.mkdir(parents=True, exist_ok=True)
# Atomic write: write to temp file, then rename
tmp_path = self.path.with_suffix(".tmp")
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2)
tmp_path.replace(self.path)
except Exception:
if tmp_path.exists():
tmp_path.unlink()
raise
def _get_portfolio_key(self, name: str) -> str:
"""Convert portfolio name to storage key."""
return name.lower().replace(" ", "-")
def list_portfolios(self) -> list[str]:
"""List all portfolio names."""
data = self._load()
return [p["name"] for p in data["portfolios"].values()]
def get_portfolio(self, name: str) -> Portfolio | None:
"""Get a portfolio by name."""
data = self._load()
key = self._get_portfolio_key(name)
if key not in data["portfolios"]:
# Try case-insensitive match
for k, v in data["portfolios"].items():
if v["name"].lower() == name.lower():
key = k
break
else:
return None
p = data["portfolios"][key]
assets = [
Asset(
ticker=a["ticker"],
type=a["type"],
quantity=a["quantity"],
cost_basis=a["cost_basis"],
added_at=a["added_at"],
)
for a in p.get("assets", [])
]
return Portfolio(
name=p["name"],
created_at=p["created_at"],
updated_at=p["updated_at"],
assets=assets,
)
def create_portfolio(self, name: str) -> Portfolio:
"""Create a new portfolio."""
data = self._load()
key = self._get_portfolio_key(name)
if key in data["portfolios"]:
raise ValueError(f"Portfolio '{name}' already exists")
now = datetime.now().isoformat()
portfolio = {
"name": name,
"created_at": now,
"updated_at": now,
"assets": [],
}
data["portfolios"][key] = portfolio
self._save()
return Portfolio(name=name, created_at=now, updated_at=now, assets=[])
def delete_portfolio(self, name: str) -> bool:
"""Delete a portfolio."""
data = self._load()
key = self._get_portfolio_key(name)
# Try case-insensitive match
if key not in data["portfolios"]:
for k, v in data["portfolios"].items():
if v["name"].lower() == name.lower():
key = k
break
else:
return False
del data["portfolios"][key]
self._save()
return True
def rename_portfolio(self, old_name: str, new_name: str) -> bool:
"""Rename a portfolio."""
data = self._load()
old_key = self._get_portfolio_key(old_name)
new_key = self._get_portfolio_key(new_name)
# Find old portfolio
if old_key not in data["portfolios"]:
for k, v in data["portfolios"].items():
if v["name"].lower() == old_name.lower():
old_key = k
break
else:
return False
if new_key in data["portfolios"] and new_key != old_key:
raise ValueError(f"Portfolio '{new_name}' already exists")
portfolio = data["portfolios"].pop(old_key)
portfolio["name"] = new_name
portfolio["updated_at"] = datetime.now().isoformat()
data["portfolios"][new_key] = portfolio
self._save()
return True
def add_asset(
self,
portfolio_name: str,
ticker: str,
quantity: float,
cost_basis: float,
) -> Asset:
"""Add an asset to a portfolio."""
data = self._load()
key = self._get_portfolio_key(portfolio_name)
# Find portfolio
if key not in data["portfolios"]:
for k, v in data["portfolios"].items():
if v["name"].lower() == portfolio_name.lower():
key = k
break
else:
raise ValueError(f"Portfolio '{portfolio_name}' not found")
portfolio = data["portfolios"][key]
ticker = ticker.upper()
# Check if asset already exists
for asset in portfolio["assets"]:
if asset["ticker"] == ticker:
raise ValueError(f"Asset '{ticker}' already in portfolio. Use 'update' to modify.")
# Validate ticker
asset_type = detect_asset_type(ticker)
try:
stock = yf.Ticker(ticker)
info = stock.info
if "regularMarketPrice" not in info:
raise ValueError(f"Invalid ticker: {ticker}")
except Exception as e:
raise ValueError(f"Could not validate ticker '{ticker}': {e}")
now = datetime.now().isoformat()
asset = {
"ticker": ticker,
"type": asset_type,
"quantity": quantity,
"cost_basis": cost_basis,
"added_at": now,
}
portfolio["assets"].append(asset)
portfolio["updated_at"] = now
self._save()
return Asset(**asset)
def update_asset(
self,
portfolio_name: str,
ticker: str,
quantity: float | None = None,
cost_basis: float | None = None,
) -> Asset | None:
"""Update an asset in a portfolio."""
data = self._load()
key = self._get_portfolio_key(portfolio_name)
# Find portfolio
if key not in data["portfolios"]:
for k, v in data["portfolios"].items():
if v["name"].lower() == portfolio_name.lower():
key = k
break
else:
return None
portfolio = data["portfolios"][key]
ticker = ticker.upper()
for asset in portfolio["assets"]:
if asset["ticker"] == ticker:
if quantity is not None:
asset["quantity"] = quantity
if cost_basis is not None:
asset["cost_basis"] = cost_basis
portfolio["updated_at"] = datetime.now().isoformat()
self._save()
return Asset(**asset)
return None
def remove_asset(self, portfolio_name: str, ticker: str) -> bool:
"""Remove an asset from a portfolio."""
data = self._load()
key = self._get_portfolio_key(portfolio_name)
# Find portfolio
if key not in data["portfolios"]:
for k, v in data["portfolios"].items():
if v["name"].lower() == portfolio_name.lower():
key = k
break
else:
return False
portfolio = data["portfolios"][key]
ticker = ticker.upper()
original_len = len(portfolio["assets"])
portfolio["assets"] = [a for a in portfolio["assets"] if a["ticker"] != ticker]
if len(portfolio["assets"]) < original_len:
portfolio["updated_at"] = datetime.now().isoformat()
self._save()
return True
return False
def get_default_portfolio_name(self) -> str | None:
"""Get the default (first) portfolio name, or None if empty."""
portfolios = self.list_portfolios()
return portfolios[0] if portfolios else None
def format_currency(value: float) -> str:
"""Format a value as currency."""
if abs(value) >= 1_000_000:
return f"${value/1_000_000:.2f}M"
elif abs(value) >= 1_000:
return f"${value/1_000:.2f}K"
else:
return f"${value:.2f}"
def show_portfolio(portfolio: Portfolio, verbose: bool = False) -> None:
"""Display portfolio details with current prices."""
print(f"\n{'='*60}")
print(f"PORTFOLIO: {portfolio.name}")
print(f"Created: {portfolio.created_at[:10]} | Updated: {portfolio.updated_at[:10]}")
print(f"{'='*60}\n")
if not portfolio.assets:
print(" No assets in portfolio. Use 'add' to add assets.\n")
return
total_cost = 0.0
total_value = 0.0
print(f"{'Ticker':<12} {'Type':<8} {'Qty':>10} {'Cost':>12} {'Current':>12} {'Value':>14} {'P&L':>12}")
print("-" * 82)
for asset in portfolio.assets:
try:
stock = yf.Ticker(asset.ticker)
current_price = stock.info.get("regularMarketPrice", 0) or 0
except Exception:
current_price = 0
cost_total = asset.quantity * asset.cost_basis
current_value = asset.quantity * current_price
pnl = current_value - cost_total
pnl_pct = (pnl / cost_total * 100) if cost_total > 0 else 0
total_cost += cost_total
total_value += current_value
pnl_str = f"{'+' if pnl >= 0 else ''}{format_currency(pnl)} ({pnl_pct:+.1f}%)"
print(f"{asset.ticker:<12} {asset.type:<8} {asset.quantity:>10.4f} "
f"{format_currency(asset.cost_basis):>12} {format_currency(current_price):>12} "
f"{format_currency(current_value):>14} {pnl_str:>12}")
print("-" * 82)
total_pnl = total_value - total_cost
total_pnl_pct = (total_pnl / total_cost * 100) if total_cost > 0 else 0
print(f"{'TOTAL':<12} {'':<8} {'':<10} {format_currency(total_cost):>12} {'':<12} "
f"{format_currency(total_value):>14} {'+' if total_pnl >= 0 else ''}{format_currency(total_pnl)} ({total_pnl_pct:+.1f}%)")
print()
def main():
parser = argparse.ArgumentParser(description="Portfolio management for stock-analysis")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# create
create_parser = subparsers.add_parser("create", help="Create a new portfolio")
create_parser.add_argument("name", help="Portfolio name")
# list
subparsers.add_parser("list", help="List all portfolios")
# show
show_parser = subparsers.add_parser("show", help="Show portfolio details")
show_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")
# delete
delete_parser = subparsers.add_parser("delete", help="Delete a portfolio")
delete_parser.add_argument("name", help="Portfolio name")
# rename
rename_parser = subparsers.add_parser("rename", help="Rename a portfolio")
rename_parser.add_argument("old_name", help="Current portfolio name")
rename_parser.add_argument("new_name", help="New portfolio name")
# add
add_parser = subparsers.add_parser("add", help="Add an asset to portfolio")
add_parser.add_argument("ticker", help="Stock/crypto ticker (e.g., AAPL, BTC-USD)")
add_parser.add_argument("--quantity", "-q", type=float, required=True, help="Quantity")
add_parser.add_argument("--cost", "-c", type=float, required=True, help="Cost basis per unit")
add_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")
# update
update_parser = subparsers.add_parser("update", help="Update an asset in portfolio")
update_parser.add_argument("ticker", help="Stock/crypto ticker")
update_parser.add_argument("--quantity", "-q", type=float, help="New quantity")
update_parser.add_argument("--cost", "-c", type=float, help="New cost basis per unit")
update_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")
# remove
remove_parser = subparsers.add_parser("remove", help="Remove an asset from portfolio")
remove_parser.add_argument("ticker", help="Stock/crypto ticker")
remove_parser.add_argument("--portfolio", "-p", help="Portfolio name (default: first portfolio)")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
store = PortfolioStore()
try:
if args.command == "create":
portfolio = store.create_portfolio(args.name)
print(f"Created portfolio: {portfolio.name}")
elif args.command == "list":
portfolios = store.list_portfolios()
if not portfolios:
print("No portfolios found. Use 'create' to create one.")
else:
print("\nPortfolios:")
for name in portfolios:
p = store.get_portfolio(name)
asset_count = len(p.assets) if p else 0
print(f" - {name} ({asset_count} assets)")
print()
elif args.command == "show":
portfolio_name = args.portfolio or store.get_default_portfolio_name()
if not portfolio_name:
print("No portfolios found. Use 'create' to create one.")
sys.exit(1)
portfolio = store.get_portfolio(portfolio_name)
if not portfolio:
print(f"Portfolio '{portfolio_name}' not found.")
sys.exit(1)
show_portfolio(portfolio)
elif args.command == "delete":
if store.delete_portfolio(args.name):
print(f"Deleted portfolio: {args.name}")
else:
print(f"Portfolio '{args.name}' not found.")
sys.exit(1)
elif args.command == "rename":
if store.rename_portfolio(args.old_name, args.new_name):
print(f"Renamed portfolio: {args.old_name} -> {args.new_name}")
else:
print(f"Portfolio '{args.old_name}' not found.")
sys.exit(1)
elif args.command == "add":
portfolio_name = args.portfolio or store.get_default_portfolio_name()
if not portfolio_name:
print("No portfolios found. Use 'create' to create one first.")
sys.exit(1)
asset = store.add_asset(portfolio_name, args.ticker, args.quantity, args.cost)
print(f"Added {asset.ticker} ({asset.type}) to {portfolio_name}: "
f"{asset.quantity} units @ {format_currency(asset.cost_basis)}")
elif args.command == "update":
portfolio_name = args.portfolio or store.get_default_portfolio_name()
if not portfolio_name:
print("No portfolios found.")
sys.exit(1)
if args.quantity is None and args.cost is None:
print("Must specify --quantity and/or --cost to update.")
sys.exit(1)
asset = store.update_asset(portfolio_name, args.ticker, args.quantity, args.cost)
if asset:
print(f"Updated {asset.ticker} in {portfolio_name}: "
f"{asset.quantity} units @ {format_currency(asset.cost_basis)}")
else:
print(f"Asset '{args.ticker}' not found in portfolio '{portfolio_name}'.")
sys.exit(1)
elif args.command == "remove":
portfolio_name = args.portfolio or store.get_default_portfolio_name()
if not portfolio_name:
print("No portfolios found.")
sys.exit(1)
if store.remove_asset(portfolio_name, args.ticker):
print(f"Removed {args.ticker.upper()} from {portfolio_name}")
else:
print(f"Asset '{args.ticker}' not found in portfolio '{portfolio_name}'.")
sys.exit(1)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

342
scripts/rumor_scanner.py Normal file
View File

@@ -0,0 +1,342 @@
#!/usr/bin/env python3
"""
🔮 RUMOR & BUZZ SCANNER
Scans for early signals, rumors, and whispers before they become mainstream news.
Sources:
- Twitter/X: "hearing", "rumor", "sources say", unusual buzz
- Google News: M&A, insider, upgrade/downgrade
- Unusual keywords detection
Usage: python3 rumor_scanner.py
"""
import json
import os
import subprocess
import sys
import re
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import urlopen, Request
from urllib.parse import quote_plus
import gzip
CACHE_DIR = Path(__file__).parent.parent / "cache"
CACHE_DIR.mkdir(exist_ok=True)
# Bird CLI path
BIRD_CLI = "/home/clawdbot/.nvm/versions/node/v24.12.0/bin/bird"
BIRD_ENV = Path(__file__).parent.parent / ".env"
def load_env():
"""Load environment variables from .env file."""
if BIRD_ENV.exists():
for line in BIRD_ENV.read_text().splitlines():
if '=' in line and not line.startswith('#'):
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip().strip('"').strip("'")
def fetch_url(url, timeout=15):
"""Fetch URL with headers."""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.9',
}
req = Request(url, headers=headers)
try:
with urlopen(req, timeout=timeout) as resp:
data = resp.read()
if resp.info().get('Content-Encoding') == 'gzip':
data = gzip.decompress(data)
return data.decode('utf-8', errors='ignore')
except Exception as e:
return None
def search_twitter_rumors():
"""Search Twitter for rumors and early signals."""
results = []
# Rumor-focused search queries
queries = [
'"hearing that" stock OR $',
'"sources say" stock OR company',
'"rumor" merger OR acquisition',
'insider buying stock',
'"upgrade" OR "downgrade" stock tomorrow',
'$AAPL OR $TSLA OR $NVDA rumor',
'"breaking" stock market',
'M&A rumor',
]
load_env()
for query in queries[:4]: # Limit to avoid rate limits
try:
cmd = [BIRD_CLI, 'search', query, '-n', '10', '--json']
env = os.environ.copy()
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30, env=env)
if result.returncode == 0 and result.stdout:
try:
tweets = json.loads(result.stdout)
for tweet in tweets:
text = tweet.get('text', '')
# Filter for actual rumors/signals
if any(kw in text.lower() for kw in ['hearing', 'rumor', 'source', 'insider', 'upgrade', 'downgrade', 'breaking', 'M&A', 'merger', 'acquisition']):
results.append({
'source': 'twitter',
'type': 'rumor',
'text': text[:300],
'author': tweet.get('author', {}).get('username', 'unknown'),
'likes': tweet.get('likes', 0),
'retweets': tweet.get('retweets', 0),
'query': query
})
except json.JSONDecodeError:
pass
except Exception as e:
pass
# Dedupe by text similarity
seen = set()
unique = []
for r in results:
key = r['text'][:100]
if key not in seen:
seen.add(key)
unique.append(r)
return unique
def search_twitter_buzz():
"""Search Twitter for general stock buzz - what are people talking about?"""
results = []
queries = [
'$SPY OR $QQQ',
'stock to buy',
'calls OR puts expiring',
'earnings play',
'short squeeze',
]
load_env()
for query in queries[:3]:
try:
cmd = [BIRD_CLI, 'search', query, '-n', '15', '--json']
env = os.environ.copy()
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30, env=env)
if result.returncode == 0 and result.stdout:
try:
tweets = json.loads(result.stdout)
for tweet in tweets:
text = tweet.get('text', '')
# Extract stock symbols
symbols = re.findall(r'\$([A-Z]{1,5})\b', text)
if symbols:
results.append({
'source': 'twitter',
'type': 'buzz',
'text': text[:300],
'symbols': symbols,
'author': tweet.get('author', {}).get('username', 'unknown'),
'engagement': tweet.get('likes', 0) + tweet.get('retweets', 0) * 2
})
except json.JSONDecodeError:
pass
except Exception as e:
pass
# Sort by engagement
results.sort(key=lambda x: x.get('engagement', 0), reverse=True)
return results[:20]
def search_news_rumors():
"""Search Google News for M&A, insider, upgrade news."""
results = []
queries = [
'merger acquisition rumor',
'insider buying stock',
'analyst upgrade stock',
'takeover bid company',
'SEC investigation company',
]
for query in queries:
url = f"https://news.google.com/rss/search?q={quote_plus(query)}&hl=en-US&gl=US&ceid=US:en"
content = fetch_url(url)
if content:
import xml.etree.ElementTree as ET
try:
root = ET.fromstring(content)
for item in root.findall('.//item')[:5]:
title = item.find('title')
link = item.find('link')
pub_date = item.find('pubDate')
if title is not None:
title_text = title.text or ''
# Extract company names or symbols
results.append({
'source': 'google_news',
'type': 'news_rumor',
'title': title_text,
'link': link.text if link is not None else '',
'date': pub_date.text if pub_date is not None else '',
'query': query
})
except ET.ParseError:
pass
return results
def extract_symbols_from_text(text):
"""Extract stock symbols from text."""
# $SYMBOL pattern
dollar_symbols = re.findall(r'\$([A-Z]{1,5})\b', text)
# Common company name to symbol mapping
company_map = {
'apple': 'AAPL', 'tesla': 'TSLA', 'nvidia': 'NVDA', 'microsoft': 'MSFT',
'google': 'GOOGL', 'amazon': 'AMZN', 'meta': 'META', 'netflix': 'NFLX',
'coinbase': 'COIN', 'robinhood': 'HOOD', 'disney': 'DIS', 'intel': 'INTC',
'amd': 'AMD', 'palantir': 'PLTR', 'gamestop': 'GME', 'amc': 'AMC',
}
text_lower = text.lower()
company_symbols = [sym for name, sym in company_map.items() if name in text_lower]
return list(set(dollar_symbols + company_symbols))
def calculate_rumor_score(item):
"""Score a rumor by potential impact."""
score = 0
text = (item.get('text', '') + item.get('title', '')).lower()
# High impact keywords
if any(kw in text for kw in ['merger', 'acquisition', 'takeover', 'buyout']):
score += 5
if any(kw in text for kw in ['insider', 'ceo buying', 'director buying']):
score += 4
if any(kw in text for kw in ['upgrade', 'price target raised']):
score += 3
if any(kw in text for kw in ['downgrade', 'sec investigation', 'fraud']):
score += 3
if any(kw in text for kw in ['hearing', 'sources say', 'rumor']):
score += 2
if any(kw in text for kw in ['breaking', 'just in', 'alert']):
score += 2
# Engagement boost
if item.get('engagement', 0) > 100:
score += 2
if item.get('likes', 0) > 50:
score += 1
return score
def main():
print("=" * 60)
print("🔮 RUMOR & BUZZ SCANNER")
print(f"📅 {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC")
print("=" * 60)
print()
print("🔍 Scanning for early signals...")
print()
all_rumors = []
all_buzz = []
# Twitter Rumors
print(" 🐦 Twitter rumors...")
rumors = search_twitter_rumors()
print(f"{len(rumors)} potential rumors")
all_rumors.extend(rumors)
# Twitter Buzz
print(" 🐦 Twitter buzz...")
buzz = search_twitter_buzz()
print(f"{len(buzz)} buzz items")
all_buzz.extend(buzz)
# News Rumors
print(" 📰 News rumors...")
news = search_news_rumors()
print(f"{len(news)} news items")
all_rumors.extend(news)
# Score and sort rumors
for item in all_rumors:
item['score'] = calculate_rumor_score(item)
item['symbols'] = extract_symbols_from_text(item.get('text', '') + item.get('title', ''))
all_rumors.sort(key=lambda x: x['score'], reverse=True)
# Count symbol mentions in buzz
symbol_counts = {}
for item in all_buzz:
for sym in item.get('symbols', []):
symbol_counts[sym] = symbol_counts.get(sym, 0) + 1
# Output
print()
print("=" * 60)
print("🔮 RESULTS")
print("=" * 60)
print()
# Top Rumors
print("🚨 TOP RUMORS (by potential impact):")
print()
for item in all_rumors[:10]:
if item['score'] > 0:
source = item['source']
symbols = ', '.join(item.get('symbols', [])) or 'N/A'
text = item.get('text', item.get('title', ''))[:80]
print(f" [{item['score']}] [{source}] {symbols}")
print(f" {text}...")
print()
# Buzz Leaderboard
print("📊 BUZZ LEADERBOARD (most discussed):")
print()
sorted_symbols = sorted(symbol_counts.items(), key=lambda x: x[1], reverse=True)
for symbol, count in sorted_symbols[:15]:
bar = "" * min(count, 20)
print(f" ${symbol:5} {bar} ({count})")
print()
# Recent Buzz Snippets
print("💬 WHAT PEOPLE ARE SAYING:")
print()
for item in all_buzz[:8]:
author = item.get('author', 'anon')
text = item.get('text', '')[:120]
engagement = item.get('engagement', 0)
print(f" @{author} ({engagement}♥): {text}...")
print()
# Save results
output = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'rumors': all_rumors[:20],
'buzz': all_buzz[:30],
'symbol_counts': symbol_counts,
}
output_file = CACHE_DIR / 'rumor_scan_latest.json'
output_file.write_text(json.dumps(output, indent=2, default=str))
print(f"💾 Saved: {output_file}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,381 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pytest>=8.0.0",
# "yfinance>=0.2.40",
# "pandas>=2.0.0",
# ]
# ///
"""
Tests for Stock Analysis Skill v6.0
Run with: uv run pytest test_stock_analysis.py -v
"""
import json
import pytest
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime, timezone
import pandas as pd
# Import modules to test
from analyze_stock import (
detect_asset_type,
calculate_rsi,
fetch_stock_data,
analyze_earnings_surprise,
analyze_fundamentals,
analyze_momentum,
synthesize_signal,
EarningsSurprise,
Fundamentals,
MomentumAnalysis,
MarketContext,
StockData,
)
from dividends import analyze_dividends
from watchlist import (
add_to_watchlist,
remove_from_watchlist,
list_watchlist,
WatchlistItem,
)
from portfolio import PortfolioStore
class TestAssetTypeDetection:
"""Test asset type detection."""
def test_stock_detection(self):
assert detect_asset_type("AAPL") == "stock"
assert detect_asset_type("MSFT") == "stock"
assert detect_asset_type("googl") == "stock"
def test_crypto_detection(self):
assert detect_asset_type("BTC-USD") == "crypto"
assert detect_asset_type("ETH-USD") == "crypto"
assert detect_asset_type("sol-usd") == "crypto"
def test_edge_cases(self):
# Ticker ending in USD but not crypto format
assert detect_asset_type("MUSD") == "stock"
# Numbers in ticker
assert detect_asset_type("BRK.B") == "stock"
class TestRSICalculation:
"""Test RSI calculation."""
def test_rsi_overbought(self):
"""Test RSI > 70 (overbought)."""
# Create rising prices
prices = pd.Series([100 + i * 2 for i in range(20)])
rsi = calculate_rsi(prices, period=14)
assert rsi is not None
assert rsi > 70
def test_rsi_oversold(self):
"""Test RSI < 30 (oversold)."""
# Create falling prices
prices = pd.Series([100 - i * 2 for i in range(20)])
rsi = calculate_rsi(prices, period=14)
assert rsi is not None
assert rsi < 30
def test_rsi_insufficient_data(self):
"""Test RSI with insufficient data."""
prices = pd.Series([100, 101, 102]) # Too few points
rsi = calculate_rsi(prices, period=14)
assert rsi is None
class TestEarningsSurprise:
"""Test earnings surprise analysis."""
def test_earnings_beat(self):
"""Test positive earnings surprise."""
# Mock StockData with earnings beat
mock_earnings = pd.DataFrame({
"Reported EPS": [1.50],
"EPS Estimate": [1.20],
}, index=[pd.Timestamp("2024-01-15")])
mock_data = Mock(spec=StockData)
mock_data.earnings_history = mock_earnings
result = analyze_earnings_surprise(mock_data)
assert result is not None
assert result.score > 0
assert result.surprise_pct > 0
assert "Beat" in result.explanation
def test_earnings_miss(self):
"""Test negative earnings surprise."""
mock_earnings = pd.DataFrame({
"Reported EPS": [0.80],
"EPS Estimate": [1.00],
}, index=[pd.Timestamp("2024-01-15")])
mock_data = Mock(spec=StockData)
mock_data.earnings_history = mock_earnings
result = analyze_earnings_surprise(mock_data)
assert result is not None
assert result.score < 0
assert result.surprise_pct < 0
assert "Missed" in result.explanation
class TestFundamentals:
"""Test fundamentals analysis."""
def test_strong_fundamentals(self):
"""Test stock with strong fundamentals."""
mock_data = Mock(spec=StockData)
mock_data.info = {
"trailingPE": 15,
"operatingMargins": 0.25,
"revenueGrowth": 0.30,
"debtToEquity": 30,
}
result = analyze_fundamentals(mock_data)
assert result is not None
assert result.score > 0
assert "pe_ratio" in result.key_metrics
def test_weak_fundamentals(self):
"""Test stock with weak fundamentals."""
mock_data = Mock(spec=StockData)
mock_data.info = {
"trailingPE": 50,
"operatingMargins": 0.02,
"revenueGrowth": -0.10,
"debtToEquity": 300,
}
result = analyze_fundamentals(mock_data)
assert result is not None
assert result.score < 0
class TestMomentum:
"""Test momentum analysis."""
def test_overbought_momentum(self):
"""Test overbought conditions."""
# Create mock price history with rising prices near 52w high
dates = pd.date_range(end=datetime.now(), periods=100)
prices = pd.DataFrame({
"Close": [100 + i * 0.5 for i in range(100)],
"Volume": [1000000] * 100,
}, index=dates)
mock_data = Mock(spec=StockData)
mock_data.price_history = prices
mock_data.info = {
"fiftyTwoWeekHigh": 150,
"fiftyTwoWeekLow": 80,
"regularMarketPrice": 148,
}
result = analyze_momentum(mock_data)
assert result is not None
assert result.rsi_status == "overbought"
assert result.near_52w_high == True
assert result.score < 0 # Overbought = negative score
class TestSignalSynthesis:
"""Test signal synthesis."""
def test_buy_signal(self):
"""Test BUY recommendation synthesis."""
earnings = EarningsSurprise(score=0.8, explanation="Beat by 20%", actual_eps=1.2, expected_eps=1.0, surprise_pct=20)
fundamentals = Fundamentals(score=0.6, key_metrics={"pe_ratio": 15}, explanation="Strong margins")
signal = synthesize_signal(
ticker="TEST",
company_name="Test Corp",
earnings=earnings,
fundamentals=fundamentals,
analysts=None,
historical=None,
market_context=None,
sector=None,
earnings_timing=None,
momentum=None,
sentiment=None,
)
assert signal.recommendation == "BUY"
assert signal.confidence > 0.5
def test_sell_signal(self):
"""Test SELL recommendation synthesis."""
earnings = EarningsSurprise(score=-0.8, explanation="Missed by 20%", actual_eps=0.8, expected_eps=1.0, surprise_pct=-20)
fundamentals = Fundamentals(score=-0.6, key_metrics={"pe_ratio": 50}, explanation="Weak margins")
signal = synthesize_signal(
ticker="TEST",
company_name="Test Corp",
earnings=earnings,
fundamentals=fundamentals,
analysts=None,
historical=None,
market_context=None,
sector=None,
earnings_timing=None,
momentum=None,
sentiment=None,
)
assert signal.recommendation == "SELL"
def test_risk_off_penalty(self):
"""Test risk-off mode reduces BUY confidence."""
earnings = EarningsSurprise(score=0.8, explanation="Beat", actual_eps=1.2, expected_eps=1.0, surprise_pct=20)
fundamentals = Fundamentals(score=0.6, key_metrics={}, explanation="Strong")
market = MarketContext(
vix_level=25,
vix_status="elevated",
spy_trend_10d=2.0,
qqq_trend_10d=1.5,
market_regime="choppy",
score=-0.2,
explanation="Risk-off",
gld_change_5d=3.0,
tlt_change_5d=2.0,
uup_change_5d=1.5,
risk_off_detected=True,
)
signal = synthesize_signal(
ticker="TEST",
company_name="Test Corp",
earnings=earnings,
fundamentals=fundamentals,
analysts=None,
historical=None,
market_context=market,
sector=None,
earnings_timing=None,
momentum=None,
sentiment=None,
)
# Should still be BUY but with reduced confidence
assert signal.recommendation in ["BUY", "HOLD"]
assert any("RISK-OFF" in c for c in signal.caveats)
class TestWatchlist:
"""Test watchlist functionality."""
@patch('watchlist.get_current_price')
@patch('watchlist.save_watchlist')
@patch('watchlist.load_watchlist')
def test_add_to_watchlist(self, mock_load, mock_save, mock_price):
"""Test adding ticker to watchlist."""
mock_load.return_value = []
mock_price.return_value = 150.0
mock_save.return_value = None
result = add_to_watchlist("AAPL", target_price=200.0)
assert result["success"] == True
assert result["action"] == "added"
assert result["ticker"] == "AAPL"
assert result["target_price"] == 200.0
@patch('watchlist.save_watchlist')
@patch('watchlist.load_watchlist')
def test_remove_from_watchlist(self, mock_load, mock_save):
"""Test removing ticker from watchlist."""
mock_load.return_value = [
WatchlistItem(ticker="AAPL", added_at="2024-01-01T00:00:00+00:00")
]
mock_save.return_value = None
result = remove_from_watchlist("AAPL")
assert result["success"] == True
assert result["removed"] == "AAPL"
class TestDividendAnalysis:
"""Test dividend analysis."""
@patch('yfinance.Ticker')
def test_dividend_stock(self, mock_ticker):
"""Test analysis of dividend-paying stock."""
mock_stock = Mock()
mock_stock.info = {
"longName": "Johnson & Johnson",
"regularMarketPrice": 160.0,
"dividendYield": 0.03,
"dividendRate": 4.80,
"trailingEps": 6.00,
}
mock_stock.dividends = pd.Series(
[1.2, 1.2, 1.2, 1.2] * 5, # 5 years of quarterly dividends
index=pd.date_range(start="2019-01-01", periods=20, freq="Q")
)
mock_ticker.return_value = mock_stock
result = analyze_dividends("JNJ")
assert result is not None
assert result.dividend_yield == 3.0
assert result.payout_ratio == 80.0
assert result.income_rating != "no_dividend"
@patch('yfinance.Ticker')
def test_no_dividend_stock(self, mock_ticker):
"""Test analysis of non-dividend stock."""
mock_stock = Mock()
mock_stock.info = {
"longName": "Amazon",
"regularMarketPrice": 180.0,
"dividendYield": None,
"dividendRate": None,
}
mock_ticker.return_value = mock_stock
result = analyze_dividends("AMZN")
assert result is not None
assert result.income_rating == "no_dividend"
class TestIntegration:
"""Integration tests (require network)."""
@pytest.mark.integration
def test_real_stock_analysis(self):
"""Test real stock analysis (AAPL)."""
data = fetch_stock_data("AAPL", verbose=False)
assert data is not None
assert data.ticker == "AAPL"
assert data.info is not None
assert "regularMarketPrice" in data.info
@pytest.mark.integration
def test_real_crypto_analysis(self):
"""Test real crypto analysis (BTC-USD)."""
data = fetch_stock_data("BTC-USD", verbose=False)
assert data is not None
assert data.asset_type == "crypto"
# Run tests
if __name__ == "__main__":
pytest.main([__file__, "-v", "--ignore-glob=*integration*"])

336
scripts/watchlist.py Normal file
View File

@@ -0,0 +1,336 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "yfinance>=0.2.40",
# ]
# ///
"""
Stock Watchlist with Price Alerts.
Usage:
uv run watchlist.py add AAPL # Add to watchlist
uv run watchlist.py add AAPL --target 200 # With price target
uv run watchlist.py add AAPL --stop 150 # With stop loss
uv run watchlist.py add AAPL --alert-on signal # Alert on signal change
uv run watchlist.py remove AAPL # Remove from watchlist
uv run watchlist.py list # Show watchlist
uv run watchlist.py check # Check for triggered alerts
uv run watchlist.py check --notify # Check and format for notification
"""
import argparse
import json
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
import yfinance as yf
# Storage
WATCHLIST_DIR = Path.home() / ".clawdbot" / "skills" / "stock-analysis"
WATCHLIST_FILE = WATCHLIST_DIR / "watchlist.json"
@dataclass
class WatchlistItem:
ticker: str
added_at: str
price_at_add: float | None = None
target_price: float | None = None # Alert when price >= target
stop_price: float | None = None # Alert when price <= stop
alert_on_signal: bool = False # Alert when recommendation changes
last_signal: str | None = None # BUY/HOLD/SELL
last_check: str | None = None
notes: str | None = None
@dataclass
class Alert:
ticker: str
alert_type: Literal["target_hit", "stop_hit", "signal_change"]
message: str
current_price: float
trigger_value: float | str
timestamp: str
def ensure_dirs():
"""Create storage directories."""
WATCHLIST_DIR.mkdir(parents=True, exist_ok=True)
def load_watchlist() -> list[WatchlistItem]:
"""Load watchlist from file."""
if WATCHLIST_FILE.exists():
data = json.loads(WATCHLIST_FILE.read_text())
return [WatchlistItem(**item) for item in data]
return []
def save_watchlist(items: list[WatchlistItem]):
"""Save watchlist to file."""
ensure_dirs()
data = [asdict(item) for item in items]
WATCHLIST_FILE.write_text(json.dumps(data, indent=2))
def get_current_price(ticker: str) -> float | None:
"""Get current price for a ticker."""
try:
stock = yf.Ticker(ticker)
price = stock.info.get("regularMarketPrice") or stock.info.get("currentPrice")
return float(price) if price else None
except Exception:
return None
def add_to_watchlist(
ticker: str,
target_price: float | None = None,
stop_price: float | None = None,
alert_on_signal: bool = False,
notes: str | None = None,
) -> dict:
"""Add ticker to watchlist."""
ticker = ticker.upper()
# Validate ticker
current_price = get_current_price(ticker)
if current_price is None:
return {"success": False, "error": f"Invalid ticker: {ticker}"}
# Load existing watchlist
watchlist = load_watchlist()
# Check if already exists
for item in watchlist:
if item.ticker == ticker:
# Update existing
item.target_price = target_price or item.target_price
item.stop_price = stop_price or item.stop_price
item.alert_on_signal = alert_on_signal or item.alert_on_signal
item.notes = notes or item.notes
save_watchlist(watchlist)
return {
"success": True,
"action": "updated",
"ticker": ticker,
"current_price": current_price,
"target_price": item.target_price,
"stop_price": item.stop_price,
"alert_on_signal": item.alert_on_signal,
}
# Add new
item = WatchlistItem(
ticker=ticker,
added_at=datetime.now(timezone.utc).isoformat(),
price_at_add=current_price,
target_price=target_price,
stop_price=stop_price,
alert_on_signal=alert_on_signal,
notes=notes,
)
watchlist.append(item)
save_watchlist(watchlist)
return {
"success": True,
"action": "added",
"ticker": ticker,
"current_price": current_price,
"target_price": target_price,
"stop_price": stop_price,
"alert_on_signal": alert_on_signal,
}
def remove_from_watchlist(ticker: str) -> dict:
"""Remove ticker from watchlist."""
ticker = ticker.upper()
watchlist = load_watchlist()
original_len = len(watchlist)
watchlist = [item for item in watchlist if item.ticker != ticker]
if len(watchlist) == original_len:
return {"success": False, "error": f"{ticker} not in watchlist"}
save_watchlist(watchlist)
return {"success": True, "removed": ticker}
def list_watchlist() -> dict:
"""List all watchlist items with current prices."""
watchlist = load_watchlist()
if not watchlist:
return {"success": True, "items": [], "count": 0}
items = []
for item in watchlist:
current_price = get_current_price(item.ticker)
# Calculate change since added
change_pct = None
if current_price and item.price_at_add:
change_pct = ((current_price - item.price_at_add) / item.price_at_add) * 100
# Distance to target/stop
to_target = None
to_stop = None
if current_price:
if item.target_price:
to_target = ((item.target_price - current_price) / current_price) * 100
if item.stop_price:
to_stop = ((item.stop_price - current_price) / current_price) * 100
items.append({
"ticker": item.ticker,
"current_price": current_price,
"price_at_add": item.price_at_add,
"change_pct": round(change_pct, 2) if change_pct else None,
"target_price": item.target_price,
"to_target_pct": round(to_target, 2) if to_target else None,
"stop_price": item.stop_price,
"to_stop_pct": round(to_stop, 2) if to_stop else None,
"alert_on_signal": item.alert_on_signal,
"last_signal": item.last_signal,
"added_at": item.added_at[:10],
"notes": item.notes,
})
return {"success": True, "items": items, "count": len(items)}
def check_alerts(notify_format: bool = False) -> dict:
"""Check watchlist for triggered alerts."""
watchlist = load_watchlist()
alerts: list[Alert] = []
now = datetime.now(timezone.utc).isoformat()
for item in watchlist:
current_price = get_current_price(item.ticker)
if current_price is None:
continue
# Check target price
if item.target_price and current_price >= item.target_price:
alerts.append(Alert(
ticker=item.ticker,
alert_type="target_hit",
message=f"🎯 {item.ticker} hit target! ${current_price:.2f} >= ${item.target_price:.2f}",
current_price=current_price,
trigger_value=item.target_price,
timestamp=now,
))
# Check stop price
if item.stop_price and current_price <= item.stop_price:
alerts.append(Alert(
ticker=item.ticker,
alert_type="stop_hit",
message=f"🛑 {item.ticker} hit stop! ${current_price:.2f} <= ${item.stop_price:.2f}",
current_price=current_price,
trigger_value=item.stop_price,
timestamp=now,
))
# Check signal change (requires running analyze_stock)
if item.alert_on_signal:
try:
import subprocess
result = subprocess.run(
["uv", "run", str(Path(__file__).parent / "analyze_stock.py"), item.ticker, "--output", "json"],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
analysis = json.loads(result.stdout)
new_signal = analysis.get("recommendation")
if item.last_signal and new_signal and new_signal != item.last_signal:
alerts.append(Alert(
ticker=item.ticker,
alert_type="signal_change",
message=f"📊 {item.ticker} signal changed: {item.last_signal}{new_signal}",
current_price=current_price,
trigger_value=f"{item.last_signal}{new_signal}",
timestamp=now,
))
# Update last signal
item.last_signal = new_signal
except Exception:
pass
item.last_check = now
# Save updated watchlist (with last_signal updates)
save_watchlist(watchlist)
# Format output
if notify_format and alerts:
# Format for Telegram notification
lines = ["📢 **Stock Alerts**\n"]
for alert in alerts:
lines.append(alert.message)
return {"success": True, "alerts": [asdict(a) for a in alerts], "notification": "\n".join(lines)}
return {"success": True, "alerts": [asdict(a) for a in alerts], "count": len(alerts)}
def main():
parser = argparse.ArgumentParser(description="Stock Watchlist with Alerts")
subparsers = parser.add_subparsers(dest="command", required=True)
# Add
add_parser = subparsers.add_parser("add", help="Add ticker to watchlist")
add_parser.add_argument("ticker", help="Stock ticker")
add_parser.add_argument("--target", type=float, help="Target price for alert")
add_parser.add_argument("--stop", type=float, help="Stop loss price for alert")
add_parser.add_argument("--alert-on", choices=["signal"], help="Alert on signal change")
add_parser.add_argument("--notes", help="Notes")
# Remove
remove_parser = subparsers.add_parser("remove", help="Remove ticker from watchlist")
remove_parser.add_argument("ticker", help="Stock ticker")
# List
subparsers.add_parser("list", help="List watchlist")
# Check
check_parser = subparsers.add_parser("check", help="Check for triggered alerts")
check_parser.add_argument("--notify", action="store_true", help="Format for notification")
args = parser.parse_args()
if args.command == "add":
result = add_to_watchlist(
args.ticker,
target_price=args.target,
stop_price=args.stop,
alert_on_signal=(args.alert_on == "signal"),
notes=args.notes,
)
print(json.dumps(result, indent=2))
elif args.command == "remove":
result = remove_from_watchlist(args.ticker)
print(json.dumps(result, indent=2))
elif args.command == "list":
result = list_watchlist()
print(json.dumps(result, indent=2))
elif args.command == "check":
result = check_alerts(notify_format=args.notify)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()