diff --git a/README.md b/README.md index 07603f3..4130fad 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,13 @@ # A股板块盘后分析系统 -自动获取东方财富板块数据,生成盘后分析报告并发送邮件。 +自动获取东方财富板块数据,生成盘后分析报告并发送邮件。支持历史数据存储和长时间跨度分析。 ## 功能特点 - 每个交易日17:00自动执行 - 获取行业板块和概念板块完整数据 +- **历史数据存储**(SQLite数据库) +- **长时间跨度分析**(连续涨跌、板块轮动、资金趋势) - 生成分析总结邮件正文 - 详细数据CSV文件作为附件 - 发送到指定邮箱 @@ -14,11 +16,34 @@ ### 正文(分析总结) -- 市场整体趋势判断 -- 热门概念板块 TOP5 -- 行业涨幅/跌幅 TOP5 -- 主力资金大幅流入 TOP10 -- 主力资金大幅流出 TOP10 +**一、市场情绪分析** +- 市场评级(强势上涨/偏强/平稳/偏弱/弱势下跌) +- 涨跌板块统计、平均涨跌幅、资金净流 + +**二、资金流向分析** +- TOP5板块资金合计、资金集中度 + +**三、板块强弱分析** +- 强势板块数量、弱势板块数量及示例 + +**四、概念板块热度** +- 热门概念TOP5、冷门概念TOP5 + +**五、行业板块排行** +- 涨幅TOP5、跌幅TOP5 + +**六、主力资金排行** +- 大幅流入TOP10、大幅流出TOP10 + +**七、投资建议** +- 根据市场情绪给出策略建议 + +**八、历史趋势分析** ⭐新增 +- 近期市场趋势(连续上涨/下跌/震荡) +- 近5日资金流向详情 +- 连续上涨板块(近3日累计涨幅) +- 连续下跌板块(近3日累计跌幅) +- 板块轮动分析(新进入涨幅TOP10) ### 附件(详细数据) @@ -52,6 +77,16 @@ python3 board_monitor.py report -v python3 board_monitor.py report --to other@example.com ``` +### 查看历史数据 + +```bash +# 查看近5日市场统计 +python3 board_monitor.py history --days 5 + +# 查看指定板块历史 +python3 board_monitor.py history --board "电力设备" --days 10 +``` + ## 定时任务配置 每个交易日(周一至周五)17:00自动执行: @@ -60,11 +95,16 @@ python3 board_monitor.py report --to other@example.com 0 17 * * 1-5 python3 board_monitor.py report ``` -## 数据来源 +## 数据存储 -东方财富HTTP API (http://push2.eastmoney.com) +历史数据保存在 `data/board_history.db` SQLite数据库中,包含: +- 每日板块涨跌幅 +- 主力资金流向 +- 领涨股信息 ## 版本历史 -- v1.1.0 (2026-04-10) - 改为盘后报告模式,正文分析+附件详细数据 +- v1.3.0 (2026-04-10) - 新增历史数据存储和长时间跨度分析 +- v1.2.0 (2026-04-10) - 增加专业分析内容 +- v1.1.0 (2026-04-10) - 改为盘后报告模式 - v1.0.0 (2026-04-10) - 初始版本 \ No newline at end of file diff --git a/board_monitor.py b/board_monitor.py index 72c9f64..15b9b47 100644 --- a/board_monitor.py +++ b/board_monitor.py @@ -2,6 +2,7 @@ """ A股板块盘后分析系统 获取东方财富板块数据,生成分析报告,发送邮件通知 +支持历史数据存储和长时间跨度分析 """ import urllib.request @@ -9,9 +10,11 @@ import json import os import sys import subprocess -from datetime import datetime -from typing import List, Dict, Optional +import sqlite3 +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple from pathlib import Path +from collections import defaultdict # 清除代理环境变量(解决代理问题) for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']: @@ -22,6 +25,9 @@ SCRIPT_DIR = Path(__file__).parent DATA_DIR = SCRIPT_DIR / "data" DATA_DIR.mkdir(exist_ok=True) +# 数据库文件 +DB_FILE = DATA_DIR / "board_history.db" + # 东方财富API配置 EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get" @@ -35,6 +41,134 @@ BOARD_TYPES = { FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128" +# ==================== 数据库操作 ==================== + +def init_db(): + """初始化数据库""" + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + # 创建板块数据表 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS board_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, + board_type TEXT NOT NULL, + board_code TEXT NOT NULL, + board_name TEXT NOT NULL, + pct_change REAL, + main_flow REAL, + leader_name TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(date, board_type, board_code) + ) + ''') + + # 创建索引 + cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON board_data(date)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_board ON board_data(board_type, board_code)') + + conn.commit() + conn.close() + + +def save_to_db(board_type: str, boards: List[Dict], date: str = None): + """保存板块数据到数据库""" + if date is None: + date = datetime.now().strftime("%Y-%m-%d") + + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + for board in boards: + cursor.execute(''' + INSERT OR REPLACE INTO board_data + (date, board_type, board_code, board_name, pct_change, main_flow, leader_name) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', ( + date, + board_type, + board['code'], + board['name'], + board['pct_change'], + board['main_flow'], + board['leader_name'] + )) + + conn.commit() + conn.close() + + +def get_history_data(board_type: str, days: int = 5) -> Dict[str, List[Dict]]: + """ + 获取历史板块数据 + + 返回: + Dict[date, List[Dict]]: 按日期分组的数据 + """ + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + cursor.execute(''' + SELECT date, board_code, board_name, pct_change, main_flow, leader_name + FROM board_data + WHERE board_type = ? AND date >= ? + ORDER BY date DESC + ''', (board_type, start_date)) + + rows = cursor.fetchall() + conn.close() + + # 按日期分组 + result = defaultdict(list) + for row in rows: + result[row[0]].append({ + 'code': row[1], + 'name': row[2], + 'pct_change': row[3], + 'main_flow': row[4], + 'leader_name': row[5] + }) + + return dict(result) + + +def get_board_history(board_type: str, board_name: str, days: int = 20) -> List[Dict]: + """获取单个板块的历史数据""" + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + cursor.execute(''' + SELECT date, pct_change, main_flow + FROM board_data + WHERE board_type = ? AND board_name = ? AND date >= ? + ORDER BY date ASC + ''', (board_type, board_name, start_date)) + + rows = cursor.fetchall() + conn.close() + + return [{'date': r[0], 'pct_change': r[1], 'main_flow': r[2]} for r in rows] + + +def get_available_dates() -> List[str]: + """获取有数据的日期列表""" + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + cursor.execute('SELECT DISTINCT date FROM board_data ORDER BY date DESC LIMIT 30') + dates = [r[0] for r in cursor.fetchall()] + + conn.close() + return dates + + +# ==================== 数据获取 ==================== + def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 100) -> Optional[List[Dict]]: """ 获取板块数据 @@ -213,6 +347,142 @@ def analyze_market(all_industry: List, all_concept: List) -> Dict: return analysis +def analyze_history(all_industry: List, all_concept: List, history_days: int = 5) -> Dict: + """ + 历史数据分析 + + 参数: + all_industry: 今日行业板块数据 + all_concept: 今日概念板块数据 + history_days: 分析天数 + + 返回: + Dict: 历史分析结果 + """ + result = { + 'available_dates': [], + 'trend_analysis': {}, + 'continuous_up': [], + 'continuous_down': [], + 'strong_rotation': {}, + 'fund_trend': {}, + } + + # 获取有数据的日期 + available_dates = get_available_dates() + result['available_dates'] = available_dates[:history_days] + + if len(available_dates) < 2: + return result + + # 获取历史数据 + industry_history = get_history_data('industry', history_days + 2) + concept_history = get_history_data('concept', history_days + 2) + + # 1. 连续上涨/下跌板块分析 + if len(available_dates) >= 3: + recent_dates = available_dates[:3] # 最近3天 + + for board in all_industry: + history = get_board_history('industry', board['name'], days=5) + + if len(history) >= 3: + # 检查连续上涨 + if all(h['pct_change'] > 0 for h in history[-3:]): + total_pct = sum(h['pct_change'] for h in history[-3:]) + result['continuous_up'].append({ + 'name': board['name'], + 'days': len([h for h in history if h['pct_change'] > 0]), + 'total_pct': total_pct, + }) + + # 检查连续下跌 + elif all(h['pct_change'] < 0 for h in history[-3:]): + total_pct = sum(h['pct_change'] for h in history[-3:]) + result['continuous_down'].append({ + 'name': board['name'], + 'days': len([h for h in history if h['pct_change'] < 0]), + 'total_pct': total_pct, + }) + + # 排序 + result['continuous_up'] = sorted(result['continuous_up'], key=lambda x: x['total_pct'], reverse=True)[:10] + result['continuous_down'] = sorted(result['continuous_down'], key=lambda x: x['total_pct'])[:10] + + # 2. 板块轮动分析 + if len(available_dates) >= 2: + today_date = available_dates[0] + yesterday_date = available_dates[1] + + today_industry = industry_history.get(today_date, []) + yesterday_industry = industry_history.get(yesterday_date, []) + + if today_industry and yesterday_industry: + # 今日涨幅TOP10 + today_top = sorted(today_industry, key=lambda x: x['pct_change'], reverse=True)[:10] + today_top_names = {b['name'] for b in today_top} + + # 昨日涨幅TOP10 + yesterday_top = sorted(yesterday_industry, key=lambda x: x['pct_change'], reverse=True)[:10] + yesterday_top_names = {b['name'] for b in yesterday_top} + + # 新进入TOP10的板块(轮动进入) + new_in = today_top_names - yesterday_top_names + # 跌出TOP10的板块(轮动离开) + out_of = yesterday_top_names - today_top_names + + result['strong_rotation'] = { + 'new_in': [b for b in today_top if b['name'] in new_in], + 'out_of': list(out_of), + } + + # 3. 资金流向趋势分析 + if len(available_dates) >= 5: + dates_5 = available_dates[:5] + fund_trend = {} + + for date in dates_5: + if date in industry_history: + total_flow = sum(b['main_flow'] for b in industry_history[date]) + fund_trend[date] = total_flow + + result['fund_trend'] = fund_trend + + # 计算资金趋势 + flows = list(fund_trend.values()) + if len(flows) >= 3: + if all(f > 0 for f in flows[-3:]): + result['trend_analysis']['fund'] = '连续流入' + elif all(f < 0 for f in flows[-3:]): + result['trend_analysis']['fund'] = '连续流出' + else: + result['trend_analysis']['fund'] = '波动' + + # 4. 市场趋势判断 + if len(available_dates) >= 5: + dates_5 = available_dates[:5] + avg_pcts = {} + + for date in dates_5: + if date in industry_history: + avg_pct = sum(b['pct_change'] for b in industry_history[date]) / len(industry_history[date]) + avg_pcts[date] = avg_pct + + result['avg_pcts'] = avg_pcts + + # 判断趋势 + pcts = list(avg_pcts.values()) + if len(pcts) >= 3: + if all(p > 0 for p in pcts[-3:]): + result['trend_analysis']['market'] = '连续上涨' + elif all(p < 0 for p in pcts[-3:]): + result['trend_analysis']['market'] = '连续下跌' + else: + result['trend_analysis']['market'] = '震荡' + + return result + + def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> bool: """ 生成盘后分析报告并发送邮件 @@ -243,6 +513,9 @@ def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> # 执行专业分析 analysis = analyze_market(all_industry, all_concept) + # 执行历史分析 + history_analysis = analyze_history(all_industry, all_concept, history_days=5) + # ========== 生成HTML正文 ========== html_lines = [ "
数据覆盖: 近 {len(dates)} 个交易日 ({', '.join(dates[:3])} 等)
") + + # 市场趋势 + if history_analysis.get('trend_analysis', {}).get('market'): + trend = history_analysis['trend_analysis']['market'] + html_lines.append(f"近期趋势: {trend}
") + + # 资金趋势 + if history_analysis.get('trend_analysis', {}).get('fund'): + fund_trend = history_analysis['trend_analysis']['fund'] + html_lines.append(f"资金流向: {fund_trend}
") + + # 资金流向详情 + if history_analysis.get('fund_trend'): + html_lines.append("近5日资金流向:
") + html_lines.append("| 日期 | 净流入(亿) |
|---|---|
| {date} | {flow_str} |
🔥 连续上涨板块 (近3日):
") + html_lines.append("| 板块 | 连涨天数 | 累计涨幅 |
|---|---|---|
| {b['name']} | {b['days']}天 | +{b['total_pct']:.2f}% |
❄️ 连续下跌板块 (近3日):
") + html_lines.append("| 板块 | 连跌天数 | 累计跌幅 |
|---|---|---|
| {b['name']} | {b['days']}天 | {b['total_pct']:.2f}% |
🔄 板块轮动 (新进入涨幅TOP10):
") + html_lines.append("📊 详细数据请查看附件 CSV 文件
") @@ -487,6 +816,9 @@ def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) -> 返回: bool: 是否成功 """ + # 初始化数据库 + init_db() + if verbose: print(f"\n📊 A股板块盘后分析") print("=" * 50) @@ -506,6 +838,11 @@ def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) -> boards_data[board_type] = boards if verbose: print(f"✅ 成功获取 {len(boards)} 条数据") + + # 保存到数据库 + save_to_db(board_type, boards) + if verbose: + print(f" 已保存到数据库") else: print(f"❌ 获取 {board_type} 数据失败") boards_data[board_type] = [] @@ -538,6 +875,11 @@ def main(): report_parser.add_argument("--to", default="wlq@tphai.com", help="收件人邮箱") report_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") + # 查看历史命令 + history_parser = subparsers.add_parser("history", help="查看历史数据统计") + history_parser.add_argument("--days", type=int, default=5, help="查看天数") + history_parser.add_argument("--board", type=str, help="查看指定板块历史") + args = parser.parse_args() if args.command == "test": @@ -568,6 +910,52 @@ def main(): elif args.command == "report": run_daily_report(to_email=args.to, verbose=args.verbose) + elif args.command == "history": + init_db() + dates = get_available_dates() + + if not dates: + print("❌ 暂无历史数据,请先执行 report 命令收集数据") + return + + print(f"\n📅 可用数据日期: {len(dates)} 天") + print(f" 最新: {dates[0] if dates else '无'}") + print(f" 范围: {dates[-1] if dates else '无'} ~ {dates[0] if dates else '无'}") + + if args.board: + # 查看指定板块历史 + print(f"\n📊 板块 '{args.board}' 近 {args.days} 日数据:") + history = get_board_history('industry', args.board, days=args.days) + + if history: + print("-" * 50) + for h in history: + pct_str = f"+{h['pct_change']:.2f}%" if h['pct_change'] > 0 else f"{h['pct_change']:.2f}%" + flow_str = f"+{h['main_flow']:.2f}亿" if h['main_flow'] > 0 else f"{h['main_flow']:.2f}亿" + print(f" {h['date']}: {pct_str}, 资金{flow_str}") + else: + print(f"❌ 未找到板块 '{args.board}' 的历史数据") + else: + # 显示整体统计 + print(f"\n📊 近 {args.days} 日市场统计:") + + # 资金流向 + fund_trend = {} + for date in dates[:args.days]: + industry_data = get_history_data('industry', args.days).get(date, []) + if industry_data: + total_flow = sum(b['main_flow'] for b in industry_data) + avg_pct = sum(b['pct_change'] for b in industry_data) / len(industry_data) + fund_trend[date] = {'flow': total_flow, 'avg_pct': avg_pct} + + print("-" * 50) + print(f"{'日期':<12} {'资金净流(亿)':<15} {'平均涨跌':<10}") + print("-" * 50) + for date, data in sorted(fund_trend.items(), reverse=True): + flow_str = f"+{data['flow']:.2f}" if data['flow'] > 0 else f"{data['flow']:.2f}" + pct_str = f"+{data['avg_pct']:.2f}%" if data['avg_pct'] > 0 else f"{data['avg_pct']:.2f}%" + print(f"{date:<12} {flow_str:<15} {pct_str:<10}") + else: parser.print_help() diff --git a/data/board_detail_20260410.csv b/data/board_detail_20260410.csv index b4adeb4..47914b2 100644 --- a/data/board_detail_20260410.csv +++ b/data/board_detail_20260410.csv @@ -1,5 +1,5 @@ # A股板块详细数据 -# 生成时间: 2026-04-10 17:42:51 +# 生成时间: 2026-04-10 17:51:40 === 行业板块涨跌幅排行 === 板块名称,涨跌幅(%),主力资金(亿),领涨股 diff --git a/data/board_history.db b/data/board_history.db new file mode 100644 index 0000000..5f37c8b Binary files /dev/null and b/data/board_history.db differ