#!/usr/bin/env python3 import json import math import re import os import sys import time import urllib.error import urllib.parse import urllib.request from datetime import datetime, timezone DEFAULT_HOURS = 24 CANDLE_MINUTES = 15 CACHE_TTL_SEC = 300 COINGECKO_PRICE_URL = "https://api.coingecko.com/api/v3/simple/price?ids={id}&vs_currencies={currency}" COINGECKO_OHLC_URL = "https://api.coingecko.com/api/v3/coins/{id}/ohlc?vs_currency={currency}&days=1" COINGECKO_SEARCH_URL = "https://api.coingecko.com/api/v3/search?query={query}" COINGECKO_MARKET_CHART_URL = "https://api.coingecko.com/api/v3/coins/{id}/market_chart?vs_currency={currency}&days=1" COINGECKO_MARKET_CHART_DAYS_URL = "https://api.coingecko.com/api/v3/coins/{id}/market_chart?vs_currency={currency}&days={days}" HYPERLIQUID_INFO_URL = "https://api.hyperliquid.xyz/info" TOKEN_ID_MAP = { "HYPE": "hyperliquid", "HYPERLIQUID": "hyperliquid", } def _json_error(message, details=None): payload = {"error": message} if details: payload["details"] = details print(json.dumps(payload)) return 0 def _cache_path(prefix, token_id): safe = token_id.replace("/", "-") return f"/tmp/crypto_price_{prefix}_{safe}.json" def _read_cache(path, max_age_sec): try: stat = os.stat(path) except FileNotFoundError: return None age = time.time() - stat.st_mtime if age > max_age_sec: return None try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except (OSError, json.JSONDecodeError): return None def _write_cache(path, payload): try: with open(path, "w", encoding="utf-8") as handle: json.dump(payload, handle) except OSError: return def _fetch_json(url): req = urllib.request.Request( url, headers={"User-Agent": "clawdbot-crypto-price/1.0"}, ) retry_codes = {429, 502, 503, 504} last_error = None for attempt in range(3): try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8") try: return json.loads(raw) except json.JSONDecodeError as exc: raise RuntimeError("invalid JSON") from exc except urllib.error.HTTPError as exc: last_error = exc if exc.code in retry_codes and attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc except urllib.error.URLError as exc: last_error = exc if attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc raise RuntimeError(str(last_error)) def _post_json(url, payload): req = urllib.request.Request( url, data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json", "User-Agent": "clawdbot-crypto-price/1.0"}, ) retry_codes = {429, 502, 503, 504} last_error = None for attempt in range(3): try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8") try: return json.loads(raw) except json.JSONDecodeError as exc: raise RuntimeError("invalid JSON") from exc except urllib.error.HTTPError as exc: last_error = exc if exc.code in retry_codes and attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc except urllib.error.URLError as exc: last_error = exc if attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc raise RuntimeError(str(last_error)) def _post_json(url, payload): req = urllib.request.Request( url, data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json", "User-Agent": "clawdbot-crypto-price/1.0"}, ) retry_codes = {429, 502, 503, 504} last_error = None for attempt in range(3): try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8") try: return json.loads(raw) except json.JSONDecodeError as exc: raise RuntimeError("invalid JSON") from exc except urllib.error.HTTPError as exc: last_error = exc if exc.code in retry_codes and attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc except urllib.error.URLError as exc: last_error = exc if attempt < 2: time.sleep(2 * (attempt + 1)) continue raise RuntimeError(str(exc)) from exc raise RuntimeError(str(last_error)) def _get_price(token_id, currency): cache_path = _cache_path(f"price_{currency}", token_id) cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is not None: return cached data = _fetch_json(COINGECKO_PRICE_URL.format(id=token_id, currency=currency)) _write_cache(cache_path, data) return data def _get_ohlc(token_id, currency): cache_path = _cache_path(f"ohlc_{currency}", token_id) cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is not None: return cached data = _fetch_json(COINGECKO_OHLC_URL.format(id=token_id, currency=currency)) _write_cache(cache_path, data) return data def _get_market_chart(token_id, currency, days): cache_path = _cache_path(f"market_{currency}_{days}", token_id) cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is not None: return cached if days == 1: url = COINGECKO_MARKET_CHART_URL.format(id=token_id, currency=currency) else: url = COINGECKO_MARKET_CHART_DAYS_URL.format(id=token_id, currency=currency, days=days) data = _fetch_json(url) _write_cache(cache_path, data) return data def _get_hyperliquid_meta(): cache_path = _cache_path("hyperliquid_meta", "meta") cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is not None: return cached data = _post_json(HYPERLIQUID_INFO_URL, {"type": "metaAndAssetCtxs"}) _write_cache(cache_path, data) return data def _hyperliquid_lookup(symbol): try: meta, ctxs = _get_hyperliquid_meta() except RuntimeError: return None, None universe = meta.get("universe", []) mapping = {} for idx, entry in enumerate(universe): name = str(entry.get("name", "")).upper() if name: mapping[name] = idx idx = mapping.get(symbol.upper()) if idx is None or idx >= len(ctxs): return None, None return universe[idx], ctxs[idx] def _pick_hyperliquid_interval_minutes(total_minutes): if total_minutes <= 180: return 1 if total_minutes <= 360: return 3 if total_minutes <= 720: return 5 if total_minutes <= 1440: return 15 if total_minutes <= 4320: return 30 if total_minutes <= 10080: return 60 if total_minutes <= 20160: return 120 if total_minutes <= 40320: return 240 if total_minutes <= 80640: return 480 return 1440 def _interval_minutes_to_str(minutes): if minutes < 60: return f"{int(minutes)}m" hours = int(minutes / 60) if hours < 24: return f"{hours}h" days = int(hours / 24) return f"{days}d" def _get_hyperliquid_candles(symbol, total_minutes, interval_minutes): now_ms = int(time.time() * 1000) start_ms = now_ms - int(total_minutes * 60 * 1000) payload = { "type": "candleSnapshot", "req": { "coin": symbol.upper(), "interval": _interval_minutes_to_str(interval_minutes), "startTime": start_ms, "endTime": now_ms, }, } data = _post_json(HYPERLIQUID_INFO_URL, payload) candles = [] for row in data: try: ts_ms = int(row["t"]) open_price = float(row["o"]) high_price = float(row["h"]) low_price = float(row["l"]) close_price = float(row["c"]) except (KeyError, TypeError, ValueError): continue candles.append((ts_ms, open_price, high_price, low_price, close_price)) return candles def _get_hyperliquid_meta(): cache_path = _cache_path("hyperliquid_meta", "meta") cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is not None: return cached data = _post_json(HYPERLIQUID_INFO_URL, {"type": "metaAndAssetCtxs"}) _write_cache(cache_path, data) return data def _hyperliquid_lookup(symbol): try: meta, ctxs = _get_hyperliquid_meta() except RuntimeError: return None, None universe = meta.get("universe", []) mapping = {} for idx, entry in enumerate(universe): name = str(entry.get("name", "")).upper() if name: mapping[name] = idx idx = mapping.get(symbol.upper()) if idx is None or idx >= len(ctxs): return None, None return universe[idx], ctxs[idx] def _pick_hyperliquid_interval_minutes(total_minutes): if total_minutes <= 180: return 1 if total_minutes <= 360: return 3 if total_minutes <= 720: return 5 if total_minutes <= 1440: return 15 if total_minutes <= 4320: return 30 if total_minutes <= 10080: return 60 if total_minutes <= 20160: return 120 if total_minutes <= 40320: return 240 if total_minutes <= 80640: return 480 return 1440 def _interval_minutes_to_str(minutes): if minutes < 60: return f"{int(minutes)}m" hours = int(minutes / 60) if hours < 24: return f"{hours}h" days = int(hours / 24) return f"{days}d" def _get_hyperliquid_candles(symbol, total_minutes, interval_minutes): now_ms = int(time.time() * 1000) start_ms = now_ms - int(total_minutes * 60 * 1000) payload = { "type": "candleSnapshot", "req": { "coin": symbol.upper(), "interval": _interval_minutes_to_str(interval_minutes), "startTime": start_ms, "endTime": now_ms, }, } data = _post_json(HYPERLIQUID_INFO_URL, payload) candles = [] for row in data: try: ts_ms = int(row["t"]) open_price = float(row["o"]) high_price = float(row["h"]) low_price = float(row["l"]) close_price = float(row["c"]) volume = float(row.get("v", 0)) except (KeyError, TypeError, ValueError): continue candles.append((ts_ms, open_price, high_price, low_price, close_price, volume)) return candles def _find_fractals(ohlc_rows, window=10, max_fractals=3): """Find true swing highs and lows. Swing high: highest high within window candles on both sides. Swing low: lowest low within window candles on both sides. Returns list of (index, type, price) where type is 'up' or 'down'. """ if len(ohlc_rows) < window * 2 + 1: return [] swing_highs = [] swing_lows = [] for i in range(window, len(ohlc_rows) - window): current_high = ohlc_rows[i][2] current_low = ohlc_rows[i][3] # Check if this is a swing high (highest high in the window) is_swing_high = True for j in range(i - window, i + window + 1): if j != i and ohlc_rows[j][2] >= current_high: is_swing_high = False break if is_swing_high: swing_highs.append((i, current_high)) # Check if this is a swing low (lowest low in the window) is_swing_low = True for j in range(i - window, i + window + 1): if j != i and ohlc_rows[j][3] <= current_low: is_swing_low = False break if is_swing_low: swing_lows.append((i, current_low)) # Sort by price extremity and take top N # For highs: sort by price descending (highest first) swing_highs.sort(key=lambda x: -x[1]) # For lows: sort by price ascending (lowest first) swing_lows.sort(key=lambda x: x[1]) result = [] for idx, price in swing_highs[:max_fractals]: result.append((idx, 'down', price)) # down arrow for resistance/high for idx, price in swing_lows[:max_fractals]: result.append((idx, 'up', price)) # up arrow for support/low return sorted(result, key=lambda x: x[0]) def _search_token_id(symbol): cache_path = _cache_path("search", symbol.upper()) cached = _read_cache(cache_path, CACHE_TTL_SEC) if cached is None: data = _fetch_json(COINGECKO_SEARCH_URL.format(query=urllib.parse.quote(symbol))) _write_cache(cache_path, data) else: data = cached coins = data.get("coins", []) symbol_upper = symbol.upper() matches = [coin for coin in coins if coin.get("symbol", "").upper() == symbol_upper] if not matches: return None def _rank_key(coin): rank = coin.get("market_cap_rank") return rank if isinstance(rank, int) else 10**9 matches.sort(key=_rank_key) return matches[0].get("id") def _format_price(value): if value is None: return "n/a" if value >= 1: return f"{value:.2f}" return f"{value:.6f}" def _build_candles_from_prices(price_points, hours, candle_minutes): if not price_points: return [] price_points.sort(key=lambda row: row[0]) last_ts = price_points[-1][0] start_ts = last_ts - (hours * 3600 * 1000) bucket_ms = candle_minutes * 60 * 1000 candles = [] bucket = None for ts, price in price_points: if ts < start_ts: continue bucket_start = (int(ts) // bucket_ms) * bucket_ms if bucket is None or bucket["bucket_start"] != bucket_start: if bucket is not None: candles.append(( bucket["bucket_start"], bucket["open"], bucket["high"], bucket["low"], bucket["close"], )) bucket = { "bucket_start": bucket_start, "open": price, "high": price, "low": price, "close": price, } else: bucket["high"] = max(bucket["high"], price) bucket["low"] = min(bucket["low"], price) bucket["close"] = price if bucket is not None: candles.append(( bucket["bucket_start"], bucket["open"], bucket["high"], bucket["low"], bucket["close"], )) return candles def _parse_duration(args): for arg in args: cleaned = arg.strip().lower() match = re.match(r"^(\d+(?:\.\d+)?)([mhd])?$", cleaned) if not match: continue value = float(match.group(1)) unit = match.group(2) or "h" if unit == "m": total_minutes = max(1.0, value) label = f"{int(value)}m" if value.is_integer() else f"{value}m" elif unit == "d": total_minutes = max(1.0, value * 24 * 60) label = f"{int(value)}d" if value.is_integer() else f"{value}d" else: total_minutes = max(1.0, value * 60) label = f"{int(value)}h" if value.is_integer() else f"{value}h" return total_minutes, label return float(DEFAULT_HOURS * 60), f"{DEFAULT_HOURS}h" def _pick_candle_minutes(total_minutes): if total_minutes <= 360: return 5 if total_minutes <= 1440: return 15 if total_minutes <= 4320: return 30 return 60 def _timestamp_to_datetime(ts_value): ts = float(ts_value) if ts >= 1e12: ts = ts / 1000.0 return datetime.fromtimestamp(ts, tz=timezone.utc) def _build_chart(symbol, ohlc_rows, currency, label, use_gradient=False): try: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import matplotlib.dates as mdates from matplotlib.lines import Line2D from matplotlib.patches import Rectangle import matplotlib.font_manager as fm # Load custom font font_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'fonts', 'Tomorrow.ttf') if os.path.exists(font_path): fm.fontManager.addfont(font_path) custom_font = fm.FontProperties(fname=font_path).get_name() plt.rcParams['font.family'] = custom_font except Exception: return None if not ohlc_rows: return None # Check if we have volume data (6-element tuples) has_volume = len(ohlc_rows[0]) >= 6 if ohlc_rows else False if has_volume: fig, (ax, ax_vol) = plt.subplots(2, 1, figsize=(8, 9), facecolor="#121212", gridspec_kw={'height_ratios': [3, 1], 'hspace': 0.05}) ax_vol.set_facecolor("#121212") else: fig, ax = plt.subplots(figsize=(8, 8), facecolor="#121212") ax_vol = None ax.set_facecolor("#121212") times = [_timestamp_to_datetime(row[0]) for row in ohlc_rows] x_vals = mdates.date2num(times) widths = [] if len(x_vals) > 1: delta = min(x_vals[i + 1] - x_vals[i] for i in range(len(x_vals) - 1)) widths = [delta * 0.7] * len(x_vals) else: widths = [0.02] * len(x_vals) delta = 0.02 lows = [] highs = [] volumes = [] colors = [] # Pre-calculate fractals fractals = _find_fractals(ohlc_rows) # Build sets of indices for swing coloring (only used in default mode) bullish_reversal_indices = set() bearish_reversal_indices = set() # Always include absolute high/low candles in coloring abs_high_idx = None abs_low_idx = None if ohlc_rows: abs_high_idx = max(range(len(ohlc_rows)), key=lambda i: ohlc_rows[i][2]) abs_low_idx = min(range(len(ohlc_rows)), key=lambda i: ohlc_rows[i][3]) if not use_gradient: for frac_idx, frac_type, frac_price in fractals: if frac_type == 'up': # swing low = bullish reversal for off in range(0, 3): # swing candle + 2 after = 3 total if frac_idx + off < len(ohlc_rows): bullish_reversal_indices.add(frac_idx + off) else: # swing high = bearish reversal for off in range(0, 3): # swing candle + 2 after = 3 total if frac_idx + off < len(ohlc_rows): bearish_reversal_indices.add(frac_idx + off) if abs_low_idx is not None: for off in range(0, 3): if abs_low_idx + off < len(ohlc_rows): bullish_reversal_indices.add(abs_low_idx + off) if abs_high_idx is not None: for off in range(0, 3): if abs_high_idx + off < len(ohlc_rows): bearish_reversal_indices.add(abs_high_idx + off) for idx, row in enumerate(ohlc_rows): if has_volume: _ts, open_price, high_price, low_price, close_price, volume = row volumes.append(volume) else: _ts, open_price, high_price, low_price, close_price = row[:5] is_bullish = close_price >= open_price x = x_vals[idx] width = widths[idx] lower = min(open_price, close_price) height = max(abs(close_price - open_price), 1e-9) if use_gradient: # Gradient mode: green gradient up, blue-purple gradient down wick_color = "#888888" border_color = "#000000" if is_bullish: color_top = "#84dc58" # Bright green color_bottom = "#336d16" # Dark green else: color_top = "#6c7ce4" # Blue color_bottom = "#544996" # Purple colors.append(color_top) wick = Line2D([x, x], [low_price, high_price], color=wick_color, linewidth=1.0, zorder=3) ax.add_line(wick) # Draw gradient candle body n_segments = 10 segment_height = height / n_segments for seg in range(n_segments): t = seg / (n_segments - 1) if n_segments > 1 else 0 r1, g1, b1 = int(color_bottom[1:3], 16), int(color_bottom[3:5], 16), int(color_bottom[5:7], 16) r2, g2, b2 = int(color_top[1:3], 16), int(color_top[3:5], 16), int(color_top[5:7], 16) r = int(r1 + (r2 - r1) * t) g = int(g1 + (g2 - g1) * t) b = int(b1 + (b2 - b1) * t) seg_color = f'#{r:02x}{g:02x}{b:02x}' seg_y = lower + seg * segment_height rect = Rectangle((x - width / 2, seg_y), width, segment_height, facecolor=seg_color, edgecolor='none', zorder=4) ax.add_patch(rect) border_rect = Rectangle((x - width / 2, lower), width, height, facecolor='none', edgecolor=border_color, linewidth=0.5, zorder=5) ax.add_patch(border_rect) else: # Default mode: Grey + Cyan/Magenta for swings wick_color = "#808080" border_color = "#000000" up_normal = "#B0B0B0" down_normal = "#606060" up_reversal = "#00FFFF" down_reversal = "#FF00FF" if idx in bullish_reversal_indices: color = up_reversal elif idx in bearish_reversal_indices: color = down_reversal else: color = up_normal if is_bullish else down_normal colors.append(color) wick = Line2D([x, x], [low_price, high_price], color=wick_color, linewidth=1.0, zorder=3) ax.add_line(wick) rect = Rectangle((x - width / 2, lower), width, height, facecolor=color, edgecolor=border_color, linewidth=0.5, zorder=4) ax.add_patch(rect) lows.append(low_price) highs.append(high_price) # Draw fractals (already calculated above) price_range = max(highs) - min(lows) if highs and lows else 1 offset = price_range * 0.02 # Fractal colors based on mode frac_up_color = "#84dc58" if use_gradient else "#00FFFF" frac_down_color = "#6c7ce4" if use_gradient else "#FF00FF" for frac_idx, frac_type, frac_price in fractals: x = x_vals[frac_idx] if frac_type == 'down': # bearish fractal - arrow down above high ax.plot(x, frac_price + offset * 0.5, marker='v', color=frac_down_color, markersize=6, zorder=5) ax.annotate(f'{frac_price:.2f}', xy=(x, frac_price + offset * 1.5), fontsize=8, color='white', ha='center', va='bottom', zorder=6) else: # bullish fractal - arrow up below low ax.plot(x, frac_price - offset * 0.5, marker='^', color=frac_up_color, markersize=6, zorder=5) ax.annotate(f'{frac_price:.2f}', xy=(x, frac_price - offset * 1.5), fontsize=8, color='white', ha='center', va='top', zorder=6) # Always mark absolute high/low so at least one swing high/low is visible if highs and lows: abs_high_price = max(highs) abs_low_price = min(lows) abs_high_idx = highs.index(abs_high_price) abs_low_idx = lows.index(abs_low_price) abs_high_color = "#FFD54F" # gold abs_low_color = "#90CAF9" # light blue xh = x_vals[abs_high_idx] xl = x_vals[abs_low_idx] ax.plot(xh, abs_high_price + offset * 0.5, marker='v', color=abs_high_color, markersize=7, zorder=6) ax.annotate(f'{abs_high_price:.2f}', xy=(xh, abs_high_price + offset * 1.7), fontsize=8, color='white', ha='center', va='bottom', zorder=7) ax.plot(xl, abs_low_price - offset * 0.5, marker='^', color=abs_low_color, markersize=7, zorder=6) ax.annotate(f'{abs_low_price:.2f}', xy=(xl, abs_low_price - offset * 1.7), fontsize=8, color='white', ha='center', va='top', zorder=7) # Draw volume bars if has_volume and ax_vol and volumes: for idx, vol in enumerate(volumes): x = x_vals[idx] width = widths[idx] rect = Rectangle((x - width / 2, 0), width, vol, facecolor=colors[idx], edgecolor=colors[idx], alpha=0.7, zorder=3) ax_vol.add_patch(rect) ax_vol.set_xlim(min(x_vals) - delta, max(x_vals) + delta) ax_vol.set_ylim(0, max(volumes) * 1.1 if volumes else 1) ax_vol.set_ylabel("Volume", color="#8b949e", fontsize=9) ax_vol.tick_params(axis="x", colors="#8b949e") ax_vol.tick_params(axis="y", colors="#8b949e") for spine in ax_vol.spines.values(): spine.set_color("#2a2f38") ax_vol.grid(True, linestyle="-", linewidth=0.6, color="#1f2630", alpha=0.8, zorder=1) # Format volume axis locator = mdates.AutoDateLocator(minticks=4, maxticks=8) formatter = mdates.ConciseDateFormatter(locator) ax_vol.xaxis.set_major_locator(locator) ax_vol.xaxis.set_major_formatter(formatter) ax.set_xticklabels([]) # Hide x labels on price chart ax.set_title(f"{symbol} last {label}", loc="center", fontsize=14, color="white", fontweight="bold", pad=12) if not has_volume: ax.set_xlabel("Time (UTC)", color="#8b949e") ax.set_ylabel(currency.upper(), color="#8b949e") ax.tick_params(axis="x", colors="#8b949e") ax.tick_params(axis="y", colors="#8b949e") for spine in ax.spines.values(): spine.set_color("#2a2f38") ax.grid(True, linestyle="-", linewidth=0.6, color="#1f2630", alpha=0.8, zorder=1) if len(x_vals) > 1: ax.set_xlim(min(x_vals) - delta, max(x_vals) + delta) if lows and highs: min_y = min(lows) max_y = max(highs) pad = (max_y - min_y) * 0.08 if max_y > min_y else max_y * 0.01 # More padding for fractals ax.set_ylim(min_y - pad, max_y + pad) if not has_volume: locator = mdates.AutoDateLocator(minticks=4, maxticks=8) formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) ax.tick_params(axis="x", labelrotation=0) # Highlight last price on Y-axis if ohlc_rows: last_close = ohlc_rows[-1][4] # Draw horizontal dashed line at last price ax.axhline(y=last_close, color='white', linestyle='--', linewidth=0.8, alpha=0.6) # Add price label on left side ax.annotate(f'{last_close:.2f}', xy=(ax.get_xlim()[0], last_close), fontsize=9, color='white', fontweight='bold', ha='right', va='center', bbox=dict(boxstyle='round,pad=0.3', facecolor='#0f141c', edgecolor='white', linewidth=1)) ts = int(time.time()) chart_path = f"/tmp/crypto_chart_{symbol}_{ts}.png" fig.tight_layout() fig.savefig(chart_path, dpi=150) plt.close(fig) return chart_path def _normalize_hl_symbol(symbol): sym = str(symbol or "").upper() # Strip common separators (e.g., BTC-USD, BTC/USDC) for sep in ("-", "/", "_"): if sep in sym: sym = sym.split(sep)[0] break # Strip common stablecoin suffixes (e.g., BTCUSDC) stable_suffixes = ("USDC", "USDH", "USDE", "USD", "USDT") for suf in stable_suffixes: if sym.endswith(suf) and len(sym) > len(suf): sym = sym[: -len(suf)] break return sym def _hyperliquid_lookup(symbol): try: meta, ctxs = _get_hyperliquid_meta() except RuntimeError: return None, None universe = meta.get("universe", []) mapping = {} for idx, entry in enumerate(universe): name = str(entry.get("name", "")).upper() if name: mapping[name] = idx norm = _normalize_hl_symbol(symbol) idx = mapping.get(norm) if idx is None or idx >= len(ctxs): return None, None return universe[idx], ctxs[idx] def main(): if len(sys.argv) < 2: return _json_error("missing symbol", "Usage: get_price_chart.py ") raw_symbol = sys.argv[1].strip() if not raw_symbol: return _json_error("missing symbol", "Usage: get_price_chart.py ") symbol_upper = raw_symbol.upper() token_id = TOKEN_ID_MAP.get(symbol_upper) if token_id is None: token_id = raw_symbol.lower() total_minutes, label = _parse_duration(sys.argv[2:]) # Check for gradient mode flag use_gradient = any(arg.lower() in ('gradient', 'grad', '-g', '--gradient') for arg in sys.argv[2:]) hours = total_minutes / 60.0 source = "coingecko" currency = "usdt" price_usdt = None hl_symbol = _normalize_hl_symbol(symbol_upper) hl_meta, hl_ctx = _hyperliquid_lookup(hl_symbol) if hl_ctx: source = "hyperliquid" currency = "usd" try: price_usdt = float(hl_ctx.get("markPx") or hl_ctx.get("midPx")) except (TypeError, ValueError): price_usdt = None if price_usdt is None: try: price_payload = _get_price(token_id, currency) except RuntimeError as exc: return _json_error("price lookup failed", str(exc)) price_entry = price_payload.get(token_id, {}) price_usdt = price_entry.get(currency) if price_usdt is None: currency = "usd" try: price_payload = _get_price(token_id, currency) except RuntimeError as exc: return _json_error("price lookup failed", str(exc)) price_entry = price_payload.get(token_id, {}) price_usdt = price_entry.get(currency) if price_usdt is None and token_id == raw_symbol.lower(): try: searched_id = _search_token_id(symbol_upper) except RuntimeError as exc: return _json_error("token search failed", str(exc)) if searched_id: token_id = searched_id currency = "usdt" try: price_payload = _get_price(token_id, currency) except RuntimeError as exc: return _json_error("price lookup failed", str(exc)) price_entry = price_payload.get(token_id, {}) price_usdt = price_entry.get(currency) if price_usdt is None: currency = "usd" try: price_payload = _get_price(token_id, currency) except RuntimeError as exc: return _json_error("price lookup failed", str(exc)) price_entry = price_payload.get(token_id, {}) price_usdt = price_entry.get(currency) if price_usdt is None: return _json_error("token not found", f"CoinGecko id: {token_id}") candles = [] candle_minutes = _pick_candle_minutes(total_minutes) if source == "hyperliquid": interval_minutes = _pick_hyperliquid_interval_minutes(total_minutes) candle_minutes = interval_minutes try: candles = _get_hyperliquid_candles(hl_symbol, total_minutes, interval_minutes) except RuntimeError: candles = [] if not candles: try: days = max(1, int(math.ceil(total_minutes / 1440.0))) if days > 365: days = 365 chart_payload = _get_market_chart(token_id, currency, days) price_points = chart_payload.get("prices", []) candles = _build_candles_from_prices(price_points, hours, candle_minutes) except RuntimeError: candles = [] if not candles: candle_minutes = 30 try: ohlc_payload = _get_ohlc(token_id, currency) except RuntimeError: ohlc_payload = [] for row in ohlc_payload: if len(row) < 5: continue ts_ms, open_price, high_price, low_price, close_price = row candles.append((ts_ms, open_price, high_price, low_price, close_price)) candles.sort(key=lambda item: item[0]) if candles: target = max(2, int((hours * 60) / candle_minutes)) target = int(target * 0.8) # 20% fewer candles for breathing room last_points = candles[-target:] else: last_points = [] change_period = None change_period_percent = None price_period_ago = None if len(last_points) >= 2: price_period_ago = last_points[0][4] if price_period_ago: change_period = price_usdt - price_period_ago change_period_percent = (change_period / price_period_ago) * 100 chart_path = _build_chart(symbol_upper, last_points, currency, label, use_gradient) if change_period_percent is None: change_text = f"{label} n/a" else: if change_period_percent >= 0: emoji = "⬆️" sign = "+" else: emoji = "🔻" sign = "" change_text = f"{emoji} {sign}{change_period_percent:.2f}% over {label}" text = f"{symbol_upper}: ${_format_price(price_usdt)} {currency.upper()} {change_text}" text = text.replace("*", "") result = { "symbol": symbol_upper, "token_id": token_id, "source": source, "currency": currency.upper(), "hours": hours, "duration_label": label, "candle_minutes": candle_minutes, "price": price_usdt, "price_usdt": price_usdt, "change_12h": change_period, "change_12h_percent": change_period_percent, "change_period": change_period, "change_period_percent": change_period_percent, "chart_path": chart_path, "text": text, "text_plain": text, } print(json.dumps(result, ensure_ascii=True)) return 0 if __name__ == "__main__": sys.exit(main())