#!/usr/bin/env python3 """ A股板块盘后分析系统 获取东方财富板块数据,生成分析报告,发送邮件通知 """ import urllib.request import json import os import sys import subprocess from datetime import datetime from typing import List, Dict, Optional from pathlib import Path # 清除代理环境变量(解决代理问题) for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(proxy_var, None) # 配置 SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" DATA_DIR.mkdir(exist_ok=True) # 东方财富API配置 EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get" # 板块类型 BOARD_TYPES = { "industry": "m:90+t:2", # 行业板块 "concept": "m:90+t:3", # 概念板块 } # 数据字段 FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128" def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 100) -> Optional[List[Dict]]: """ 获取板块数据 参数: board_type: 板块类型 (industry/concept) sort_by: 排序字段 (f3=涨跌幅, f62=主力资金) limit: 返回数量 返回: List[Dict]: 板块数据列表 """ fs = BOARD_TYPES.get(board_type) if not fs: print(f"❌ 未知的板块类型: {board_type}") return None url = f"{EASTMONEY_BASE_URL}?fid={sort_by}&po=1&pz={limit}&pn=1&np=1&fltt=2&invt=2&fs={fs}&fields={FIELDS}" try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'http://quote.eastmoney.com/' }) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read().decode()) if data.get('data') and data['data'].get('diff'): items = data['data']['diff'] boards = [] for item in items: board = { 'code': item.get('f12', ''), 'name': item.get('f14', ''), 'price': item.get('f2', 0) / 100 if item.get('f2') else 0, 'pct_change': item.get('f3', 0) / 100 if item.get('f3') else 0, 'main_flow': item.get('f62', 0) / 1e8 if item.get('f62') else 0, # 亿元 'leader_code': item.get('f84', ''), 'leader_name': item.get('f104', ''), } boards.append(board) return boards else: print(f"⚠️ API返回数据为空") return None except urllib.error.URLError as e: print(f"❌ 网络请求失败: {e}") return None except Exception as e: print(f"❌ 获取数据异常: {e}") return None def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> bool: """ 生成盘后分析报告并发送邮件 参数: boards_data: 板块数据字典 {'industry': [], 'concept': []} to_email: 收件人邮箱 返回: bool: 是否发送成功 """ all_industry = boards_data.get('industry', []) all_concept = boards_data.get('concept', []) if not all_industry and not all_concept: print("❌ 无数据,无法生成报告") return False # 分析总结 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 计算市场趋势 avg_pct = 0 if all_industry: avg_pct = sum(b['pct_change'] for b in all_industry) / len(all_industry) market_trend = '平稳' if avg_pct > 0.5: market_trend = '上涨' elif avg_pct < -0.5: market_trend = '下跌' # 排序数据 industry_by_pct = sorted(all_industry, key=lambda x: x['pct_change'], reverse=True) industry_by_flow = sorted(all_industry, key=lambda x: x['main_flow'], reverse=True) concept_by_pct = sorted(all_concept, key=lambda x: x['pct_change'], reverse=True) concept_by_flow = sorted(all_concept, key=lambda x: x['main_flow'], reverse=True) # 生成HTML正文(分析总结) html_lines = [ "

📊 A股板块盘后分析报告

", f"

报告时间: {timestamp}

", f"

市场整体: {market_trend} (行业平均涨跌 {avg_pct:+.2f}%)

", "", "
", "", "

🔥 今日热门概念板块 TOP5

", "", "", ] for board in concept_by_pct[:5]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" html_lines.append(f"") html_lines.append("
板块涨跌幅主力资金
{board['name']}{pct_str}{flow_str}
") html_lines.append("") html_lines.append("

📈 行业板块涨幅 TOP5

") html_lines.append("") html_lines.append("") for board in industry_by_pct[:5]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" html_lines.append(f"") html_lines.append("
板块涨跌幅主力资金领涨股
{board['name']}{pct_str}{flow_str}{board['leader_name'] or '-'}
") html_lines.append("") html_lines.append("

📉 行业板块跌幅 TOP5

") html_lines.append("") html_lines.append("") for board in industry_by_pct[-5:]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" html_lines.append(f"") html_lines.append("
板块涨跌幅主力资金
{board['name']}{pct_str}{flow_str}
") html_lines.append("") html_lines.append("

💰 主力资金大幅流入 TOP10

") html_lines.append("") html_lines.append("") inflow_boards = [b for b in industry_by_flow if b['main_flow'] > 10][:10] for board in inflow_boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
板块资金流入(亿)涨跌幅
{board['name']}+{board['main_flow']:.2f}{pct_str}
") html_lines.append("") html_lines.append("

💸 主力资金大幅流出 TOP10

") html_lines.append("") html_lines.append("") outflow_boards = [b for b in industry_by_flow if b['main_flow'] < -10][:10] for board in outflow_boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
板块资金流出(亿)涨跌幅
{board['name']}{board['main_flow']:.2f}{pct_str}
") html_lines.append("") html_lines.append("
") html_lines.append("

📊 详细数据请查看附件 CSV 文件

") html_body = "\n".join(html_lines) # 生成附件文件(CSV格式) attachment_file = DATA_DIR / f"board_detail_{datetime.now().strftime('%Y%m%d')}.csv" csv_lines = [ "# A股板块详细数据", f"# 生成时间: {timestamp}", "", "=== 行业板块涨跌幅排行 ===", "板块名称,涨跌幅(%),主力资金(亿),领涨股", ] for board in industry_by_pct: csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 行业板块资金流向排行 ===") csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股") for board in industry_by_flow: csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 概念板块涨跌幅排行 ===") csv_lines.append("板块名称,涨跌幅(%),主力资金(亿),领涨股") for board in concept_by_pct: csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 概念板块资金流向排行 ===") csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股") for board in concept_by_flow: csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}") attachment_file.write_text("\n".join(csv_lines), encoding='utf-8') # 发送邮件 email_script = SCRIPT_DIR.parent.parent / "skills/email/scripts/send_email.py" subject = f"【A股板块盘后分析】{datetime.now().strftime('%Y-%m-%d')}" cmd = [ "python3", str(email_script), "send", "--to", to_email, "--subject", subject, "--body", html_body, "--html", "--attach", str(attachment_file) ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: print(f"✅ 报告发送成功: {to_email}") print(f" 附件: {attachment_file}") return True else: print(f"❌ 发送失败: {result.stderr}") return False except Exception as e: print(f"❌ 发送异常: {e}") return False def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) -> bool: """ 执行盘后报告生成和发送 参数: to_email: 收件人邮箱 verbose: 是否显示详细日志 返回: bool: 是否成功 """ if verbose: print(f"\n📊 A股板块盘后分析") print("=" * 50) print(f"收件人: {to_email}") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 获取所有板块数据 boards_data = {} for board_type in ["industry", "concept"]: if verbose: print(f"\n📡 获取 {board_type} 板块数据...") boards = get_board_data(board_type, limit=100) if boards: boards_data[board_type] = boards if verbose: print(f"✅ 成功获取 {len(boards)} 条数据") else: print(f"❌ 获取 {board_type} 数据失败") boards_data[board_type] = [] # 生成并发送报告 if boards_data.get('industry') or boards_data.get('concept'): return generate_daily_report(boards_data, to_email) else: print("❌ 所有数据获取失败,无法生成报告") return False def main(): """命令行入口""" import argparse parser = argparse.ArgumentParser(description="A股板块盘后分析系统") subparsers = parser.add_subparsers(dest="command", help="可用命令") # 测试命令 subparsers.add_parser("test", help="测试API连接") # 获取数据命令 get_parser = subparsers.add_parser("get", help="获取板块数据") get_parser.add_argument("type", choices=["industry", "concept"], help="板块类型") get_parser.add_argument("--limit", type=int, default=20, help="返回数量") # 发送报告命令 report_parser = subparsers.add_parser("report", help="生成并发送盘后报告") report_parser.add_argument("--to", default="wlq@tphai.com", help="收件人邮箱") report_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") args = parser.parse_args() if args.command == "test": print("\n🧪 测试东方财富API连接...") for board_type in ["industry", "concept"]: print(f"\n测试 {board_type} 板块...") boards = get_board_data(board_type, limit=5) if boards: print(f"✅ 成功获取 {len(boards)} 条数据") for board in boards[:3]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" print(f" - {board['name']}: {pct_str}") else: print(f"❌ {board_type} 测试失败") elif args.command == "get": boards = get_board_data(args.type, limit=args.limit) if boards: print(f"\n📊 {args.type} 板块数据 ({len(boards)} 条)") print("=" * 50) for board in boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" print(f"{board['name']}: {pct_str}, 主力{flow_str}") else: print("❌ 获取数据失败") elif args.command == "report": run_daily_report(to_email=args.to, verbose=args.verbose) else: parser.print_help() if __name__ == "__main__": main()