commit 947c75cd78a343c1a2bc43d84100804f788bb6f5 Author: hubian <908234780@qq.com> Date: Fri Apr 10 16:57:31 2026 +0800 feat: A股板块监控系统 - 东方财富API数据获取、异动检测、邮件通知 diff --git a/README.md b/README.md new file mode 100644 index 0000000..ed96ddb --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# A股板块监控系统 + +自动获取东方财富板块数据,监控异动并发送邮件通知。 + +## 功能特点 + +- 获取行业板块涨跌幅排行 +- 获取概念板块涨跌幅排行 +- 监控主力资金流入/流出 +- 检测板块异动(涨跌幅≥3%、资金流入≥10亿) +- 自动发送HTML格式邮件通知 + +## 数据来源 + +东方财富HTTP API (http://push2.eastmoney.com) + +## 使用方法 + +### 测试API连接 + +```bash +python3 board_monitor.py test +``` + +### 获取板块数据 + +```bash +# 获取行业板块涨跌幅TOP20 +python3 board_monitor.py get industry --sort pct --limit 20 + +# 获取概念板块资金流入TOP20 +python3 board_monitor.py get concept --sort flow --limit 20 +``` + +### 执行监控检查 + +```bash +# 监控并发送通知(发现异动时) +python3 board_monitor.py monitor -v + +# 监控但不发送通知 +python3 board_monitor.py monitor --no-notify +``` + +## 定时任务配置 + +```bash +# 添加到crontab +crontab -e + +# 盘中每小时检查(9:30-15:00) +30-59 9 * * 1-5 /usr/bin/python3 /home/xian/.openclaw/workspace-coder/works/board-monitor/board_monitor.py monitor +0-15 10-14 * * 1-5 /usr/bin/python3 /home/xian/.openclaw/workspace-coder/works/board-monitor/board_monitor.py monitor +0-0 15 * * 1-5 /usr/bin/python3 /home/xian/.openclaw/workspace-coder/works/board-monitor/board_monitor.py monitor +``` + +## 异动检测阈值 + +| 类型 | 阈值 | +|------|------| +| 涨幅异动 | ≥ 3% | +| 跌幅异动 | ≤ -3% | +| 资金流入 | ≥ 10亿 | +| 资金流出 | ≤ -10亿 | + +可在 `board_monitor.py` 中修改 `check_anomaly()` 函数的参数调整阈值。 + +## 通知邮箱 + +默认发送到: zuitoushang@tphai.com + +可在 `send_notification()` 函数中修改。 + +## 版本历史 + +- v1.0.0 (2026-04-10) - 初始版本 \ No newline at end of file diff --git a/board_monitor.py b/board_monitor.py new file mode 100644 index 0000000..08a2a8e --- /dev/null +++ b/board_monitor.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +""" +A股板块监控系统 +获取东方财富板块数据,监控异动,发送通知 +""" + +import urllib.request +import json +import os +import sys +import subprocess +from datetime import datetime +from typing import List, Dict, Optional +from pathlib import Path + +# 清除代理环境变量(解决代理问题) +for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: + os.environ.pop(proxy_var, None) + +# 配置 +SCRIPT_DIR = Path(__file__).parent +DATA_DIR = SCRIPT_DIR / "data" +DATA_DIR.mkdir(exist_ok=True) + +# 东方财富API配置 +EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get" + +# 板块类型 +BOARD_TYPES = { + "industry": "m:90+t:2", # 行业板块 + "concept": "m:90+t:3", # 概念板块 +} + +# 数据字段 +FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128" +# f12: 板块代码 +# f14: 板块名称 +# f2: 最新价 +# f3: 涨跌幅 +# f62: 主力净流入 +# f66: 主力净流入-陆股通 +# f84: 领涨股代码 +# f104: 领涨股名称 + + +def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 50) -> Optional[List[Dict]]: + """ + 获取板块数据 + + 参数: + board_type: 板块类型 (industry/concept) + sort_by: 排序字段 (f3=涨跌幅, f66=主力资金) + limit: 返回数量 + + 返回: + List[Dict]: 板块数据列表 + """ + fs = BOARD_TYPES.get(board_type) + if not fs: + print(f"❌ 未知的板块类型: {board_type}") + return None + + url = f"{EASTMONEY_BASE_URL}?fid={sort_by}&po=1&pz={limit}&pn=1&np=1&fltt=2&invt=2&fs={fs}&fields={FIELDS}" + + try: + req = urllib.request.Request(url, headers={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Referer': 'http://quote.eastmoney.com/' + }) + + with urllib.request.urlopen(req, timeout=15) as resp: + data = json.loads(resp.read().decode()) + + if data.get('data') and data['data'].get('diff'): + items = data['data']['diff'] + boards = [] + + for item in items: + board = { + 'code': item.get('f12', ''), + 'name': item.get('f14', ''), + 'price': item.get('f2', 0) / 100 if item.get('f2') else 0, + 'pct_change': item.get('f3', 0) / 100 if item.get('f3') else 0, + 'main_flow': item.get('f62', 0) / 1e8 if item.get('f62') else 0, # 亿元 + 'main_flow_lgt': item.get('f66', 0) / 1e8 if item.get('f66') else 0, # 陆股通流入 + 'leader_code': item.get('f84', ''), + 'leader_name': item.get('f104', ''), + } + boards.append(board) + + return boards + else: + print(f"⚠️ API返回数据为空") + return None + + except urllib.error.URLError as e: + print(f"❌ 网络请求失败: {e}") + return None + except json.JSONDecodeError as e: + print(f"❌ JSON解析失败: {e}") + return None + except Exception as e: + print(f"❌ 获取数据异常: {e}") + return None + + +def check_anomaly(boards: List[Dict], pct_threshold: float = 3.0, flow_threshold: float = 10.0) -> Dict: + """ + 检查板块异动 + + 参数: + boards: 板块数据列表 + pct_threshold: 涨跌幅阈值 (%) + flow_threshold: 资金流入阈值 (亿元) + + 返回: + Dict: 异动信息,包含涨跌异动和资金异动 + """ + anomaly = { + 'pct_up': [], # 涨幅异动 + 'pct_down': [], # 跌幅异动 + 'flow_in': [], # 资金流入异动 + 'flow_out': [], # 资金流出异动 + 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + for board in boards: + # 涨跌幅异动 + if board['pct_change'] >= pct_threshold: + anomaly['pct_up'].append(board) + elif board['pct_change'] <= -pct_threshold: + anomaly['pct_down'].append(board) + + # 资金流向异动 + if board['main_flow'] >= flow_threshold: + anomaly['flow_in'].append(board) + elif board['main_flow'] <= -flow_threshold: + anomaly['flow_out'].append(board) + + return anomaly + + +def format_board_line(board: Dict) -> str: + """格式化单行板块信息""" + pct = board['pct_change'] + flow = board['main_flow'] + leader = board['leader_name'] or board['leader_code'] + + pct_str = f"+{pct:.2f}%" if pct > 0 else f"{pct:.2f}%" + flow_str = f"+{flow:.2f}亿" if flow > 0 else f"{flow:.2f}亿" + leader_str = f"领涨: {leader}" if leader else "" + + return f"{board['name']}: {pct_str}, 主力{flow_str} {leader_str}" + + +def print_board_summary(boards: List[Dict], title: str, limit: int = 10): + """打印板块摘要""" + if not boards: + return + + print(f"\n{title}") + print("=" * 50) + for board in boards[:limit]: + print(format_board_line(board)) + + +def print_anomaly_report(anomaly: Dict): + """打印异动报告""" + print(f"\n📊 板块异动报告 [{anomaly['timestamp']}]") + print("=" * 60) + + has_anomaly = False + + if anomaly['pct_up']: + has_anomaly = True + print(f"\n🔴 涨幅异动 (涨幅 ≥ 3%)") + for board in anomaly['pct_up']: + print(f" {format_board_line(board)}") + + if anomaly['pct_down']: + has_anomaly = True + print(f"\n🟢 跌幅异动 (跌幅 ≥ 3%)") + for board in anomaly['pct_down']: + print(f" {format_board_line(board)}") + + if anomaly['flow_in']: + has_anomaly = True + print(f"\n💰 资金大幅流入 (≥ 10亿)") + for board in anomaly['flow_in']: + print(f" {format_board_line(board)}") + + if anomaly['flow_out']: + has_anomaly = True + print(f"\n💸 资金大幅流出 (≥ 10亿)") + for board in anomaly['flow_out']: + print(f" {format_board_line(board)}") + + if not has_anomaly: + print("\n✅ 今日无明显异动") + + return has_anomaly + + +def generate_html_report(anomaly: Dict, board_type: str) -> str: + """生成HTML格式报告""" + lines = [ + "

📊 A股板块异动报告

", + f"

检测时间: {anomaly['timestamp']}

", + f"

板块类型: {board_type}

", + ] + + if anomaly['pct_up']: + lines.append("

🔴 涨幅异动 (涨幅 ≥ 3%)

") + lines.append("") + + if anomaly['pct_down']: + lines.append("

🟢 跌幅异动 (跌幅 ≥ 3%)

") + lines.append("") + + if anomaly['flow_in']: + lines.append("

💰 资金大幅流入 (≥ 10亿)

") + lines.append("") + + if anomaly['flow_out']: + lines.append("

💸 资金大幅流出 (≥ 10亿)

") + lines.append("") + + if not (anomaly['pct_up'] or anomaly['pct_down'] or anomaly['flow_in'] or anomaly['flow_out']): + lines.append("

✅ 今日无明显异动

") + + return "\n".join(lines) + + +def send_notification(subject: str, html_body: str, to_email: str = "zuitoushang@tphai.com"): + """发送邮件通知""" + # 使用邮件发送技能 + email_script = Path(__file__).parent.parent.parent / "skills/email/scripts/send_email.py" + + cmd = [ + "python3", str(email_script), + "send", + "--to", to_email, + "--subject", subject, + "--body", html_body, + "--html" + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode == 0: + print(f"✅ 邮件发送成功: {to_email}") + return True + else: + print(f"❌ 邮件发送失败: {result.stderr}") + return False + except Exception as e: + print(f"❌ 邮件发送异常: {e}") + return False + + +def monitor(board_types: List[str] = ["industry", "concept"], + notify: bool = True, + verbose: bool = False) -> Dict: + """ + 执行板块监控 + + 参数: + board_types: 要监控的板块类型列表 + notify: 是否发送通知(仅在发现异动时) + verbose: 显示详细日志 + + 返回: + Dict: 监控结果汇总 + """ + import subprocess + + results = { + 'boards': {}, + 'anomalies': {}, + 'has_anomaly': False + } + + for board_type in board_types: + if verbose: + print(f"\n📡 获取 {board_type} 板块数据...") + + # 获取板块数据(按涨跌幅排序) + boards = get_board_data(board_type, sort_by="f3", limit=100) + + if boards: + results['boards'][board_type] = boards + + # 检查异动 + anomaly = check_anomaly(boards) + results['anomalies'][board_type] = anomaly + + if anomaly['pct_up'] or anomaly['pct_down'] or anomaly['flow_in'] or anomaly['flow_out']: + results['has_anomaly'] = True + + if verbose: + # 打印TOP10涨跌 + sorted_by_pct = sorted(boards, key=lambda x: x['pct_change'], reverse=True) + print_board_summary(sorted_by_pct[:10], f"涨幅TOP10 ({board_type})") + print_board_summary(sorted_by_pct[-10:], f"跌幅TOP10 ({board_type})") + + # 打印异动报告 + print_anomaly_report(anomaly) + else: + print(f"❌ 获取 {board_type} 数据失败") + + # 发送通知(仅在发现异动时) + if notify and results['has_anomaly']: + subject = "【板块异动警报】检测到板块异动" + + # 合并所有异动 + combined_anomaly = { + 'pct_up': [], + 'pct_down': [], + 'flow_in': [], + 'flow_out': [], + 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + for anomaly in results['anomalies'].values(): + combined_anomaly['pct_up'].extend(anomaly['pct_up']) + combined_anomaly['pct_down'].extend(anomaly['pct_down']) + combined_anomaly['flow_in'].extend(anomaly['flow_in']) + combined_anomaly['flow_out'].extend(anomaly['flow_out']) + + # 去重(按板块名称) + for key in ['pct_up', 'pct_down', 'flow_in', 'flow_out']: + seen = set() + unique = [] + for board in combined_anomaly[key]: + if board['name'] not in seen: + seen.add(board['name']) + unique.append(board) + combined_anomaly[key] = unique + + html_body = generate_html_report(combined_anomaly, "行业+概念板块") + send_notification(subject, html_body) + + return results + + +def main(): + """命令行入口""" + import argparse + + parser = argparse.ArgumentParser(description="A股板块监控系统") + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # 获取数据命令 + get_parser = subparsers.add_parser("get", help="获取板块数据") + get_parser.add_argument("type", choices=["industry", "concept"], help="板块类型") + get_parser.add_argument("--sort", choices=["pct", "flow"], default="pct", help="排序方式") + get_parser.add_argument("--limit", type=int, default=20, help="返回数量") + + # 监控命令 + monitor_parser = subparsers.add_parser("monitor", help="执行监控检查") + monitor_parser.add_argument("--types", nargs="+", default=["industry", "concept"], help="板块类型") + monitor_parser.add_argument("--no-notify", action="store_true", help="不发送通知") + monitor_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") + + # 测试命令 + subparsers.add_parser("test", help="测试API连接") + + args = parser.parse_args() + + if args.command == "get": + sort_by = "f3" if args.sort == "pct" else "f66" + boards = get_board_data(args.type, sort_by=sort_by, limit=args.limit) + + if boards: + print(f"\n📊 {args.type} 板块数据 ({len(boards)} 条)") + print("=" * 50) + for board in boards: + print(format_board_line(board)) + else: + print("❌ 获取数据失败") + + elif args.command == "monitor": + monitor( + board_types=args.types, + notify=not args.no_notify, + verbose=args.verbose + ) + + elif args.command == "test": + print("\n🧪 测试东方财富API连接...") + for board_type in ["industry", "concept"]: + print(f"\n测试 {board_type} 板块...") + boards = get_board_data(board_type, limit=5) + if boards: + print(f"✅ 成功获取 {len(boards)} 条数据") + for board in boards[:3]: + print(f" - {board['name']}: {board['pct_change']:+.2f}%") + else: + print(f"❌ {board_type} 测试失败") + + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file