#!/usr/bin/env python3 """ A股板块监控系统 获取东方财富板块数据,监控异动,发送通知 """ import urllib.request import json import os import sys import subprocess from datetime import datetime from typing import List, Dict, Optional from pathlib import Path # 清除代理环境变量(解决代理问题) for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(proxy_var, None) # 配置 SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" DATA_DIR.mkdir(exist_ok=True) # 东方财富API配置 EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get" # 板块类型 BOARD_TYPES = { "industry": "m:90+t:2", # 行业板块 "concept": "m:90+t:3", # 概念板块 } # 数据字段 FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128" # f12: 板块代码 # f14: 板块名称 # f2: 最新价 # f3: 涨跌幅 # f62: 主力净流入 # f66: 主力净流入-陆股通 # f84: 领涨股代码 # f104: 领涨股名称 def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 50) -> Optional[List[Dict]]: """ 获取板块数据 参数: board_type: 板块类型 (industry/concept) sort_by: 排序字段 (f3=涨跌幅, f66=主力资金) limit: 返回数量 返回: List[Dict]: 板块数据列表 """ fs = BOARD_TYPES.get(board_type) if not fs: print(f"❌ 未知的板块类型: {board_type}") return None url = f"{EASTMONEY_BASE_URL}?fid={sort_by}&po=1&pz={limit}&pn=1&np=1&fltt=2&invt=2&fs={fs}&fields={FIELDS}" try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'http://quote.eastmoney.com/' }) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read().decode()) if data.get('data') and data['data'].get('diff'): items = data['data']['diff'] boards = [] for item in items: board = { 'code': item.get('f12', ''), 'name': item.get('f14', ''), 'price': item.get('f2', 0) / 100 if item.get('f2') else 0, 'pct_change': item.get('f3', 0) / 100 if item.get('f3') else 0, 'main_flow': item.get('f62', 0) / 1e8 if item.get('f62') else 0, # 亿元 'main_flow_lgt': item.get('f66', 0) / 1e8 if item.get('f66') else 0, # 陆股通流入 'leader_code': item.get('f84', ''), 'leader_name': item.get('f104', ''), } boards.append(board) return boards else: print(f"⚠️ API返回数据为空") return None except urllib.error.URLError as e: print(f"❌ 网络请求失败: {e}") return None except json.JSONDecodeError as e: print(f"❌ JSON解析失败: {e}") return None except Exception as e: print(f"❌ 获取数据异常: {e}") return None def check_anomaly(boards: List[Dict], pct_threshold: float = 3.0, flow_threshold: float = 10.0) -> Dict: """ 检查板块异动 参数: boards: 板块数据列表 pct_threshold: 涨跌幅阈值 (%) flow_threshold: 资金流入阈值 (亿元) 返回: Dict: 异动信息,包含涨跌异动和资金异动 """ anomaly = { 'pct_up': [], # 涨幅异动 'pct_down': [], # 跌幅异动 'flow_in': [], # 资金流入异动 'flow_out': [], # 资金流出异动 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } for board in boards: # 涨跌幅异动 if board['pct_change'] >= pct_threshold: anomaly['pct_up'].append(board) elif board['pct_change'] <= -pct_threshold: anomaly['pct_down'].append(board) # 资金流向异动 if board['main_flow'] >= flow_threshold: anomaly['flow_in'].append(board) elif board['main_flow'] <= -flow_threshold: anomaly['flow_out'].append(board) return anomaly def format_board_line(board: Dict) -> str: """格式化单行板块信息""" pct = board['pct_change'] flow = board['main_flow'] leader = board['leader_name'] or board['leader_code'] pct_str = f"+{pct:.2f}%" if pct > 0 else f"{pct:.2f}%" flow_str = f"+{flow:.2f}亿" if flow > 0 else f"{flow:.2f}亿" leader_str = f"领涨: {leader}" if leader else "" return f"{board['name']}: {pct_str}, 主力{flow_str} {leader_str}" def print_board_summary(boards: List[Dict], title: str, limit: int = 10): """打印板块摘要""" if not boards: return print(f"\n{title}") print("=" * 50) for board in boards[:limit]: print(format_board_line(board)) def print_anomaly_report(anomaly: Dict): """打印异动报告""" print(f"\n📊 板块异动报告 [{anomaly['timestamp']}]") print("=" * 60) has_anomaly = False if anomaly['pct_up']: has_anomaly = True print(f"\n🔴 涨幅异动 (涨幅 ≥ 3%)") for board in anomaly['pct_up']: print(f" {format_board_line(board)}") if anomaly['pct_down']: has_anomaly = True print(f"\n🟢 跌幅异动 (跌幅 ≥ 3%)") for board in anomaly['pct_down']: print(f" {format_board_line(board)}") if anomaly['flow_in']: has_anomaly = True print(f"\n💰 资金大幅流入 (≥ 10亿)") for board in anomaly['flow_in']: print(f" {format_board_line(board)}") if anomaly['flow_out']: has_anomaly = True print(f"\n💸 资金大幅流出 (≥ 10亿)") for board in anomaly['flow_out']: print(f" {format_board_line(board)}") if not has_anomaly: print("\n✅ 今日无明显异动") return has_anomaly def generate_html_report(anomaly: Dict, board_type: str) -> str: """生成HTML格式报告""" lines = [ "

📊 A股板块异动报告

", f"

检测时间: {anomaly['timestamp']}

", f"

板块类型: {board_type}

", ] if anomaly['pct_up']: lines.append("

🔴 涨幅异动 (涨幅 ≥ 3%)

") lines.append("") if anomaly['pct_down']: lines.append("

🟢 跌幅异动 (跌幅 ≥ 3%)

") lines.append("") if anomaly['flow_in']: lines.append("

💰 资金大幅流入 (≥ 10亿)

") lines.append("") if anomaly['flow_out']: lines.append("

💸 资金大幅流出 (≥ 10亿)

") lines.append("") if not (anomaly['pct_up'] or anomaly['pct_down'] or anomaly['flow_in'] or anomaly['flow_out']): lines.append("

✅ 今日无明显异动

") return "\n".join(lines) def send_notification(subject: str, html_body: str, to_email: str = "zuitoushang@tphai.com"): """发送邮件通知""" # 使用邮件发送技能 email_script = Path(__file__).parent.parent.parent / "skills/email/scripts/send_email.py" cmd = [ "python3", str(email_script), "send", "--to", to_email, "--subject", subject, "--body", html_body, "--html" ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: print(f"✅ 邮件发送成功: {to_email}") return True else: print(f"❌ 邮件发送失败: {result.stderr}") return False except Exception as e: print(f"❌ 邮件发送异常: {e}") return False def monitor(board_types: List[str] = ["industry", "concept"], notify: bool = True, verbose: bool = False) -> Dict: """ 执行板块监控 参数: board_types: 要监控的板块类型列表 notify: 是否发送通知(仅在发现异动时) verbose: 显示详细日志 返回: Dict: 监控结果汇总 """ import subprocess results = { 'boards': {}, 'anomalies': {}, 'has_anomaly': False } for board_type in board_types: if verbose: print(f"\n📡 获取 {board_type} 板块数据...") # 获取板块数据(按涨跌幅排序) boards = get_board_data(board_type, sort_by="f3", limit=100) if boards: results['boards'][board_type] = boards # 检查异动 anomaly = check_anomaly(boards) results['anomalies'][board_type] = anomaly if anomaly['pct_up'] or anomaly['pct_down'] or anomaly['flow_in'] or anomaly['flow_out']: results['has_anomaly'] = True if verbose: # 打印TOP10涨跌 sorted_by_pct = sorted(boards, key=lambda x: x['pct_change'], reverse=True) print_board_summary(sorted_by_pct[:10], f"涨幅TOP10 ({board_type})") print_board_summary(sorted_by_pct[-10:], f"跌幅TOP10 ({board_type})") # 打印异动报告 print_anomaly_report(anomaly) else: print(f"❌ 获取 {board_type} 数据失败") # 发送通知(仅在发现异动时) if notify and results['has_anomaly']: subject = "【板块异动警报】检测到板块异动" # 合并所有异动 combined_anomaly = { 'pct_up': [], 'pct_down': [], 'flow_in': [], 'flow_out': [], 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") } for anomaly in results['anomalies'].values(): combined_anomaly['pct_up'].extend(anomaly['pct_up']) combined_anomaly['pct_down'].extend(anomaly['pct_down']) combined_anomaly['flow_in'].extend(anomaly['flow_in']) combined_anomaly['flow_out'].extend(anomaly['flow_out']) # 去重(按板块名称) for key in ['pct_up', 'pct_down', 'flow_in', 'flow_out']: seen = set() unique = [] for board in combined_anomaly[key]: if board['name'] not in seen: seen.add(board['name']) unique.append(board) combined_anomaly[key] = unique html_body = generate_html_report(combined_anomaly, "行业+概念板块") send_notification(subject, html_body) return results def main(): """命令行入口""" import argparse parser = argparse.ArgumentParser(description="A股板块监控系统") subparsers = parser.add_subparsers(dest="command", help="可用命令") # 获取数据命令 get_parser = subparsers.add_parser("get", help="获取板块数据") get_parser.add_argument("type", choices=["industry", "concept"], help="板块类型") get_parser.add_argument("--sort", choices=["pct", "flow"], default="pct", help="排序方式") get_parser.add_argument("--limit", type=int, default=20, help="返回数量") # 监控命令 monitor_parser = subparsers.add_parser("monitor", help="执行监控检查") monitor_parser.add_argument("--types", nargs="+", default=["industry", "concept"], help="板块类型") monitor_parser.add_argument("--no-notify", action="store_true", help="不发送通知") monitor_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") # 测试命令 subparsers.add_parser("test", help="测试API连接") args = parser.parse_args() if args.command == "get": sort_by = "f3" if args.sort == "pct" else "f66" boards = get_board_data(args.type, sort_by=sort_by, limit=args.limit) if boards: print(f"\n📊 {args.type} 板块数据 ({len(boards)} 条)") print("=" * 50) for board in boards: print(format_board_line(board)) else: print("❌ 获取数据失败") elif args.command == "monitor": monitor( board_types=args.types, notify=not args.no_notify, verbose=args.verbose ) elif args.command == "test": print("\n🧪 测试东方财富API连接...") for board_type in ["industry", "concept"]: print(f"\n测试 {board_type} 板块...") boards = get_board_data(board_type, limit=5) if boards: print(f"✅ 成功获取 {len(boards)} 条数据") for board in boards[:3]: print(f" - {board['name']}: {board['pct_change']:+.2f}%") else: print(f"❌ {board_type} 测试失败") else: parser.print_help() if __name__ == "__main__": main()