#!/usr/bin/env python3 """ A股板块盘后分析系统 获取东方财富板块数据,生成分析报告,发送邮件通知 """ import urllib.request import json import os import sys import subprocess from datetime import datetime from typing import List, Dict, Optional from pathlib import Path # 清除代理环境变量(解决代理问题) for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: os.environ.pop(proxy_var, None) # 配置 SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" DATA_DIR.mkdir(exist_ok=True) # 东方财富API配置 EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get" # 板块类型 BOARD_TYPES = { "industry": "m:90+t:2", # 行业板块 "concept": "m:90+t:3", # 概念板块 } # 数据字段 FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128" def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 100) -> Optional[List[Dict]]: """ 获取板块数据 参数: board_type: 板块类型 (industry/concept) sort_by: 排序字段 (f3=涨跌幅, f62=主力资金) limit: 返回数量 返回: List[Dict]: 板块数据列表 """ fs = BOARD_TYPES.get(board_type) if not fs: print(f"❌ 未知的板块类型: {board_type}") return None url = f"{EASTMONEY_BASE_URL}?fid={sort_by}&po=1&pz={limit}&pn=1&np=1&fltt=2&invt=2&fs={fs}&fields={FIELDS}" try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'http://quote.eastmoney.com/' }) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read().decode()) if data.get('data') and data['data'].get('diff'): items = data['data']['diff'] boards = [] for item in items: board = { 'code': item.get('f12', ''), 'name': item.get('f14', ''), 'price': item.get('f2', 0) / 100 if item.get('f2') else 0, 'pct_change': item.get('f3', 0) / 100 if item.get('f3') else 0, 'main_flow': item.get('f62', 0) / 1e8 if item.get('f62') else 0, # 亿元 'leader_code': item.get('f84', ''), 'leader_name': item.get('f104', ''), } boards.append(board) return boards else: print(f"⚠️ API返回数据为空") return None except urllib.error.URLError as e: print(f"❌ 网络请求失败: {e}") return None except Exception as e: print(f"❌ 获取数据异常: {e}") return None def analyze_market(all_industry: List, all_concept: List) -> Dict: """ 专业市场分析 返回各种分析指标 """ analysis = {} # 1. 市场整体统计 if all_industry: up_count = len([b for b in all_industry if b['pct_change'] > 0]) down_count = len([b for b in all_industry if b['pct_change'] < 0]) flat_count = len([b for b in all_industry if b['pct_change'] == 0]) avg_pct = sum(b['pct_change'] for b in all_industry) / len(all_industry) max_pct = max(b['pct_change'] for b in all_industry) min_pct = min(b['pct_change'] for b in all_industry) total_flow = sum(b['main_flow'] for b in all_industry) analysis['market_stats'] = { 'up_count': up_count, 'down_count': down_count, 'flat_count': flat_count, 'up_ratio': up_count / len(all_industry) * 100, 'avg_pct': avg_pct, 'max_pct': max_pct, 'min_pct': min_pct, 'total_flow': total_flow, } # 2. 资金集中度分析 if all_industry: sorted_by_flow = sorted(all_industry, key=lambda x: x['main_flow'], reverse=True) top5_flow = sum(b['main_flow'] for b in sorted_by_flow[:5]) analysis['fund_concentration'] = { 'top5_flow': top5_flow, 'top5_ratio': abs(top5_flow) / abs(analysis['market_stats']['total_flow']) * 100 if analysis['market_stats']['total_flow'] != 0 else 0, } # 3. 板块强弱分析 if all_industry: strong_boards = [b for b in all_industry if b['pct_change'] > 1 and b['main_flow'] > 5] weak_boards = [b for b in all_industry if b['pct_change'] < -1 and b['main_flow'] < -5] analysis['strength'] = { 'strong_count': len(strong_boards), 'weak_count': len(weak_boards), 'strong_boards': strong_boards[:5], 'weak_boards': weak_boards[:5], } # 4. 概念板块热度分析 if all_concept: hot_concepts = sorted([b for b in all_concept if b['main_flow'] > 0], key=lambda x: x['main_flow'], reverse=True)[:5] cold_concepts = sorted([b for b in all_concept if b['main_flow'] < 0], key=lambda x: x['main_flow'])[:5] analysis['concept_heat'] = { 'hot': hot_concepts, 'cold': cold_concepts, } # 5. 市场情绪判断 if analysis.get('market_stats'): stats = analysis['market_stats'] # 综合判断 sentiment_score = 0 # 涨跌比例贡献 if stats['up_ratio'] > 70: sentiment_score += 2 elif stats['up_ratio'] > 50: sentiment_score += 1 elif stats['up_ratio'] < 30: sentiment_score -= 2 elif stats['up_ratio'] < 50: sentiment_score -= 1 # 平均涨跌幅贡献 if stats['avg_pct'] > 0.5: sentiment_score += 1 elif stats['avg_pct'] < -0.5: sentiment_score -= 1 # 资金流向贡献 if stats['total_flow'] > 50: sentiment_score += 2 elif stats['total_flow'] > 0: sentiment_score += 1 elif stats['total_flow'] < -50: sentiment_score -= 2 elif stats['total_flow'] < 0: sentiment_score -= 1 # 情绪等级 if sentiment_score >= 4: sentiment = '强势上涨' sentiment_desc = '市场情绪高涨,多数板块上涨,资金大幅流入,建议关注强势板块机会。' elif sentiment_score >= 2: sentiment = '偏强' sentiment_desc = '市场整体偏强,资金流向积极,可适度参与热门板块。' elif sentiment_score >= 0: sentiment = '平稳' sentiment_desc = '市场情绪平稳,涨跌均衡,建议观望或轻仓布局。' elif sentiment_score >= -2: sentiment = '偏弱' sentiment_desc = '市场整体偏弱,资金流出明显,建议谨慎操作,关注防御性板块。' else: sentiment = '弱势下跌' sentiment_desc = '市场情绪低迷,多数板块下跌,资金大幅流出,建议规避风险,等待企稳信号。' analysis['sentiment'] = { 'score': sentiment_score, 'level': sentiment, 'description': sentiment_desc, } return analysis def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> bool: """ 生成盘后分析报告并发送邮件 参数: boards_data: 板块数据字典 {'industry': [], 'concept': []} to_email: 收件人邮箱 返回: bool: 是否发送成功 """ all_industry = boards_data.get('industry', []) all_concept = boards_data.get('concept', []) if not all_industry and not all_concept: print("❌ 无数据,无法生成报告") return False # 分析总结 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 排序数据 industry_by_pct = sorted(all_industry, key=lambda x: x['pct_change'], reverse=True) industry_by_flow = sorted(all_industry, key=lambda x: x['main_flow'], reverse=True) concept_by_pct = sorted(all_concept, key=lambda x: x['pct_change'], reverse=True) concept_by_flow = sorted(all_concept, key=lambda x: x['main_flow'], reverse=True) # 执行专业分析 analysis = analyze_market(all_industry, all_concept) # ========== 生成HTML正文 ========== html_lines = [ "

📊 A股板块盘后分析报告

", f"

报告时间: {timestamp}

", "", "
", ] # ===== 一、市场情绪分析 ===== if analysis.get('sentiment'): sent = analysis['sentiment'] stats = analysis.get('market_stats', {}) html_lines.append("") html_lines.append("

一、市场情绪分析

") html_lines.append(f"

市场评级: {sent['level']}

") html_lines.append(f"

{sent['description']}

") if stats: html_lines.append("") html_lines.append("".format(stats['up_count'], stats['up_ratio'])) html_lines.append("".format(stats['down_count'], 100 - stats['up_ratio'])) html_lines.append("".format(stats['avg_pct'], stats['max_pct'])) html_lines.append("".format(stats['total_flow'], stats['min_pct'])) html_lines.append("
上涨板块{}占比 {:.1f}%
下跌板块{}占比 {:.1f}%
平均涨跌{:.2f}%最大涨幅 {:.2f}%
资金净流{:.2f}亿最大跌幅 {:.2f}%
") # ===== 二、资金流向分析 ===== if analysis.get('fund_concentration'): fund = analysis['fund_concentration'] html_lines.append("") html_lines.append("

二、资金流向分析

") html_lines.append(f"

TOP5板块资金合计: {fund['top5_flow']:.2f}亿

") html_lines.append(f"

资金集中度: {fund['top5_ratio']:.1f}%(流入资金集中在少数板块)

") # ===== 三、板块强弱分析 ===== if analysis.get('strength'): strength = analysis['strength'] html_lines.append("") html_lines.append("

三、板块强弱分析

") html_lines.append(f"

强势板块(涨幅>1%且资金流入>5亿): {strength['strong_count']} 个

") html_lines.append(f"

弱势板块(跌幅>1%且资金流出>5亿): {strength['weak_count']} 个

") if strength['strong_boards']: html_lines.append("

强势板块示例:

") html_lines.append("") if strength['weak_boards']: html_lines.append("

弱势板块示例:

") html_lines.append("") # ===== 四、热门概念分析 ===== if analysis.get('concept_heat'): heat = analysis['concept_heat'] html_lines.append("") html_lines.append("

四、概念板块热度

") if heat['hot']: html_lines.append("

🔥 热门概念(资金流入TOP):

") html_lines.append("") html_lines.append("") for b in heat['hot'][:5]: pct_str = f"+{b['pct_change']:.2f}%" if b['pct_change'] > 0 else f"{b['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
概念资金(亿)涨跌
{b['name']}+{b['main_flow']:.2f}{pct_str}
") if heat['cold']: html_lines.append("

❄️ 冷门概念(资金流出TOP):

") html_lines.append("") html_lines.append("") for b in heat['cold'][:5]: pct_str = f"+{b['pct_change']:.2f}%" if b['pct_change'] > 0 else f"{b['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
概念资金(亿)涨跌
{b['name']}{b['main_flow']:.2f}{pct_str}
") # ===== 五、行业板块排行 ===== html_lines.append("") html_lines.append("
") html_lines.append("") html_lines.append("

五、行业板块涨跌排行

") html_lines.append("

📈 涨幅 TOP5:

") html_lines.append("") html_lines.append("") for board in industry_by_pct[:5]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" html_lines.append(f"") html_lines.append("
板块涨跌幅主力资金领涨股
{board['name']}{pct_str}{flow_str}{board['leader_name'] or '-'}
") html_lines.append("

📉 跌幅 TOP5:

") html_lines.append("") html_lines.append("") for board in industry_by_pct[-5:]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" html_lines.append(f"") html_lines.append("
板块涨跌幅主力资金
{board['name']}{pct_str}{flow_str}
") # ===== 六、主力资金排行 ===== html_lines.append("") html_lines.append("

六、主力资金流向排行

") html_lines.append("

💰 大幅流入 TOP10 (≥10亿):

") html_lines.append("") html_lines.append("") inflow_boards = [b for b in industry_by_flow if b['main_flow'] > 10][:10] for board in inflow_boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
板块资金(亿)涨跌
{board['name']}+{board['main_flow']:.2f}{pct_str}
") html_lines.append("

💸 大幅流出 TOP10 (≤-10亿):

") html_lines.append("") html_lines.append("") outflow_boards = [b for b in industry_by_flow if b['main_flow'] < -10][:10] for board in outflow_boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" html_lines.append(f"") html_lines.append("
板块资金(亿)涨跌
{board['name']}{board['main_flow']:.2f}{pct_str}
") # ===== 七、投资建议 ===== html_lines.append("") html_lines.append("
") html_lines.append("

七、投资建议

") if analysis.get('sentiment'): sent = analysis['sentiment'] if sent['score'] >= 2: html_lines.append("

策略建议: 积极参与

") html_lines.append("") elif sent['score'] >= 0: html_lines.append("

策略建议: 观望为主

") html_lines.append("") else: html_lines.append("

策略建议: 谨慎防守

") html_lines.append("") html_lines.append("") html_lines.append("
") html_lines.append("

📊 详细数据请查看附件 CSV 文件

") html_body = "\n".join(html_lines) # 生成附件文件(CSV格式) attachment_file = DATA_DIR / f"board_detail_{datetime.now().strftime('%Y%m%d')}.csv" csv_lines = [ "# A股板块详细数据", f"# 生成时间: {timestamp}", "", "=== 行业板块涨跌幅排行 ===", "板块名称,涨跌幅(%),主力资金(亿),领涨股", ] for board in industry_by_pct: csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 行业板块资金流向排行 ===") csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股") for board in industry_by_flow: csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 概念板块涨跌幅排行 ===") csv_lines.append("板块名称,涨跌幅(%),主力资金(亿),领涨股") for board in concept_by_pct: csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}") csv_lines.append("") csv_lines.append("=== 概念板块资金流向排行 ===") csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股") for board in concept_by_flow: csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}") attachment_file.write_text("\n".join(csv_lines), encoding='utf-8') # 发送邮件 email_script = SCRIPT_DIR.parent.parent / "skills/email/scripts/send_email.py" subject = f"【A股板块盘后分析】{datetime.now().strftime('%Y-%m-%d')}" cmd = [ "python3", str(email_script), "send", "--to", to_email, "--subject", subject, "--body", html_body, "--html", "--attach", str(attachment_file) ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: print(f"✅ 报告发送成功: {to_email}") print(f" 附件: {attachment_file}") return True else: print(f"❌ 发送失败: {result.stderr}") return False except Exception as e: print(f"❌ 发送异常: {e}") return False def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) -> bool: """ 执行盘后报告生成和发送 参数: to_email: 收件人邮箱 verbose: 是否显示详细日志 返回: bool: 是否成功 """ if verbose: print(f"\n📊 A股板块盘后分析") print("=" * 50) print(f"收件人: {to_email}") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 获取所有板块数据 boards_data = {} for board_type in ["industry", "concept"]: if verbose: print(f"\n📡 获取 {board_type} 板块数据...") boards = get_board_data(board_type, limit=100) if boards: boards_data[board_type] = boards if verbose: print(f"✅ 成功获取 {len(boards)} 条数据") else: print(f"❌ 获取 {board_type} 数据失败") boards_data[board_type] = [] # 生成并发送报告 if boards_data.get('industry') or boards_data.get('concept'): return generate_daily_report(boards_data, to_email) else: print("❌ 所有数据获取失败,无法生成报告") return False def main(): """命令行入口""" import argparse parser = argparse.ArgumentParser(description="A股板块盘后分析系统") subparsers = parser.add_subparsers(dest="command", help="可用命令") # 测试命令 subparsers.add_parser("test", help="测试API连接") # 获取数据命令 get_parser = subparsers.add_parser("get", help="获取板块数据") get_parser.add_argument("type", choices=["industry", "concept"], help="板块类型") get_parser.add_argument("--limit", type=int, default=20, help="返回数量") # 发送报告命令 report_parser = subparsers.add_parser("report", help="生成并发送盘后报告") report_parser.add_argument("--to", default="wlq@tphai.com", help="收件人邮箱") report_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") args = parser.parse_args() if args.command == "test": print("\n🧪 测试东方财富API连接...") for board_type in ["industry", "concept"]: print(f"\n测试 {board_type} 板块...") boards = get_board_data(board_type, limit=5) if boards: print(f"✅ 成功获取 {len(boards)} 条数据") for board in boards[:3]: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" print(f" - {board['name']}: {pct_str}") else: print(f"❌ {board_type} 测试失败") elif args.command == "get": boards = get_board_data(args.type, limit=args.limit) if boards: print(f"\n📊 {args.type} 板块数据 ({len(boards)} 条)") print("=" * 50) for board in boards: pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%" flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿" print(f"{board['name']}: {pct_str}, 主力{flow_str}") else: print("❌ 获取数据失败") elif args.command == "report": run_daily_report(to_email=args.to, verbose=args.verbose) else: parser.print_help() if __name__ == "__main__": main()