feat: 新增历史数据存储和长时间跨度分析功能

- SQLite数据库存储每日板块数据
- 连续上涨/下跌板块分析
- 板块轮动分析
- 近5日资金流向趋势
- 查看历史数据命令 (history)
This commit is contained in:
2026-04-10 17:52:14 +08:00
parent bd1ec2ea01
commit 1fb58d23da
4 changed files with 440 additions and 12 deletions

View File

@@ -1,11 +1,13 @@
# A股板块盘后分析系统
自动获取东方财富板块数据,生成盘后分析报告并发送邮件。
自动获取东方财富板块数据,生成盘后分析报告并发送邮件。支持历史数据存储和长时间跨度分析。
## 功能特点
- 每个交易日17:00自动执行
- 获取行业板块和概念板块完整数据
- **历史数据存储**SQLite数据库
- **长时间跨度分析**(连续涨跌、板块轮动、资金趋势)
- 生成分析总结邮件正文
- 详细数据CSV文件作为附件
- 发送到指定邮箱
@@ -14,11 +16,34 @@
### 正文(分析总结)
- 市场整体趋势判断
- 热门概念板块 TOP5
- 行业涨幅/跌幅 TOP5
- 主力资金大幅流入 TOP10
- 主力资金大幅流出 TOP10
**一、市场情绪分析**
- 市场评级(强势上涨/偏强/平稳/偏弱/弱势下跌)
- 涨跌板块统计、平均涨跌幅、资金净流
**二、资金流向分析**
- TOP5板块资金合计、资金集中度
**三、板块强弱分析**
- 强势板块数量、弱势板块数量及示例
**四、概念板块热度**
- 热门概念TOP5、冷门概念TOP5
**五、行业板块排行**
- 涨幅TOP5、跌幅TOP5
**六、主力资金排行**
- 大幅流入TOP10、大幅流出TOP10
**七、投资建议**
- 根据市场情绪给出策略建议
**八、历史趋势分析** ⭐新增
- 近期市场趋势(连续上涨/下跌/震荡)
- 近5日资金流向详情
- 连续上涨板块近3日累计涨幅
- 连续下跌板块近3日累计跌幅
- 板块轮动分析新进入涨幅TOP10
### 附件(详细数据)
@@ -52,6 +77,16 @@ python3 board_monitor.py report -v
python3 board_monitor.py report --to other@example.com
```
### 查看历史数据
```bash
# 查看近5日市场统计
python3 board_monitor.py history --days 5
# 查看指定板块历史
python3 board_monitor.py history --board "电力设备" --days 10
```
## 定时任务配置
每个交易日周一至周五17:00自动执行
@@ -60,11 +95,16 @@ python3 board_monitor.py report --to other@example.com
0 17 * * 1-5 python3 board_monitor.py report
```
## 数据来源
## 数据存储
东方财富HTTP API (http://push2.eastmoney.com)
历史数据保存在 `data/board_history.db` SQLite数据库中包含
- 每日板块涨跌幅
- 主力资金流向
- 领涨股信息
## 版本历史
- v1.1.0 (2026-04-10) - 改为盘后报告模式,正文分析+附件详细数据
- v1.3.0 (2026-04-10) - 新增历史数据存储和长时间跨度分析
- v1.2.0 (2026-04-10) - 增加专业分析内容
- v1.1.0 (2026-04-10) - 改为盘后报告模式
- v1.0.0 (2026-04-10) - 初始版本

View File

@@ -2,6 +2,7 @@
"""
A股板块盘后分析系统
获取东方财富板块数据,生成分析报告,发送邮件通知
支持历史数据存储和长时间跨度分析
"""
import urllib.request
@@ -9,9 +10,11 @@ import json
import os
import sys
import subprocess
from datetime import datetime
from typing import List, Dict, Optional
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from pathlib import Path
from collections import defaultdict
# 清除代理环境变量(解决代理问题)
for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
@@ -22,6 +25,9 @@ SCRIPT_DIR = Path(__file__).parent
DATA_DIR = SCRIPT_DIR / "data"
DATA_DIR.mkdir(exist_ok=True)
# 数据库文件
DB_FILE = DATA_DIR / "board_history.db"
# 东方财富API配置
EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get"
@@ -35,6 +41,134 @@ BOARD_TYPES = {
FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128"
# ==================== 数据库操作 ====================
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 创建板块数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS board_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
board_type TEXT NOT NULL,
board_code TEXT NOT NULL,
board_name TEXT NOT NULL,
pct_change REAL,
main_flow REAL,
leader_name TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, board_type, board_code)
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON board_data(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_board ON board_data(board_type, board_code)')
conn.commit()
conn.close()
def save_to_db(board_type: str, boards: List[Dict], date: str = None):
"""保存板块数据到数据库"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
for board in boards:
cursor.execute('''
INSERT OR REPLACE INTO board_data
(date, board_type, board_code, board_name, pct_change, main_flow, leader_name)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
date,
board_type,
board['code'],
board['name'],
board['pct_change'],
board['main_flow'],
board['leader_name']
))
conn.commit()
conn.close()
def get_history_data(board_type: str, days: int = 5) -> Dict[str, List[Dict]]:
"""
获取历史板块数据
返回:
Dict[date, List[Dict]]: 按日期分组的数据
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cursor.execute('''
SELECT date, board_code, board_name, pct_change, main_flow, leader_name
FROM board_data
WHERE board_type = ? AND date >= ?
ORDER BY date DESC
''', (board_type, start_date))
rows = cursor.fetchall()
conn.close()
# 按日期分组
result = defaultdict(list)
for row in rows:
result[row[0]].append({
'code': row[1],
'name': row[2],
'pct_change': row[3],
'main_flow': row[4],
'leader_name': row[5]
})
return dict(result)
def get_board_history(board_type: str, board_name: str, days: int = 20) -> List[Dict]:
"""获取单个板块的历史数据"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cursor.execute('''
SELECT date, pct_change, main_flow
FROM board_data
WHERE board_type = ? AND board_name = ? AND date >= ?
ORDER BY date ASC
''', (board_type, board_name, start_date))
rows = cursor.fetchall()
conn.close()
return [{'date': r[0], 'pct_change': r[1], 'main_flow': r[2]} for r in rows]
def get_available_dates() -> List[str]:
"""获取有数据的日期列表"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('SELECT DISTINCT date FROM board_data ORDER BY date DESC LIMIT 30')
dates = [r[0] for r in cursor.fetchall()]
conn.close()
return dates
# ==================== 数据获取 ====================
def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 100) -> Optional[List[Dict]]:
"""
获取板块数据
@@ -213,6 +347,142 @@ def analyze_market(all_industry: List, all_concept: List) -> Dict:
return analysis
def analyze_history(all_industry: List, all_concept: List, history_days: int = 5) -> Dict:
"""
历史数据分析
参数:
all_industry: 今日行业板块数据
all_concept: 今日概念板块数据
history_days: 分析天数
返回:
Dict: 历史分析结果
"""
result = {
'available_dates': [],
'trend_analysis': {},
'continuous_up': [],
'continuous_down': [],
'strong_rotation': {},
'fund_trend': {},
}
# 获取有数据的日期
available_dates = get_available_dates()
result['available_dates'] = available_dates[:history_days]
if len(available_dates) < 2:
return result
# 获取历史数据
industry_history = get_history_data('industry', history_days + 2)
concept_history = get_history_data('concept', history_days + 2)
# 1. 连续上涨/下跌板块分析
if len(available_dates) >= 3:
recent_dates = available_dates[:3] # 最近3天
for board in all_industry:
history = get_board_history('industry', board['name'], days=5)
if len(history) >= 3:
# 检查连续上涨
if all(h['pct_change'] > 0 for h in history[-3:]):
total_pct = sum(h['pct_change'] for h in history[-3:])
result['continuous_up'].append({
'name': board['name'],
'days': len([h for h in history if h['pct_change'] > 0]),
'total_pct': total_pct,
})
# 检查连续下跌
elif all(h['pct_change'] < 0 for h in history[-3:]):
total_pct = sum(h['pct_change'] for h in history[-3:])
result['continuous_down'].append({
'name': board['name'],
'days': len([h for h in history if h['pct_change'] < 0]),
'total_pct': total_pct,
})
# 排序
result['continuous_up'] = sorted(result['continuous_up'], key=lambda x: x['total_pct'], reverse=True)[:10]
result['continuous_down'] = sorted(result['continuous_down'], key=lambda x: x['total_pct'])[:10]
# 2. 板块轮动分析
if len(available_dates) >= 2:
today_date = available_dates[0]
yesterday_date = available_dates[1]
today_industry = industry_history.get(today_date, [])
yesterday_industry = industry_history.get(yesterday_date, [])
if today_industry and yesterday_industry:
# 今日涨幅TOP10
today_top = sorted(today_industry, key=lambda x: x['pct_change'], reverse=True)[:10]
today_top_names = {b['name'] for b in today_top}
# 昨日涨幅TOP10
yesterday_top = sorted(yesterday_industry, key=lambda x: x['pct_change'], reverse=True)[:10]
yesterday_top_names = {b['name'] for b in yesterday_top}
# 新进入TOP10的板块轮动进入
new_in = today_top_names - yesterday_top_names
# 跌出TOP10的板块轮动离开
out_of = yesterday_top_names - today_top_names
result['strong_rotation'] = {
'new_in': [b for b in today_top if b['name'] in new_in],
'out_of': list(out_of),
}
# 3. 资金流向趋势分析
if len(available_dates) >= 5:
dates_5 = available_dates[:5]
fund_trend = {}
for date in dates_5:
if date in industry_history:
total_flow = sum(b['main_flow'] for b in industry_history[date])
fund_trend[date] = total_flow
result['fund_trend'] = fund_trend
# 计算资金趋势
flows = list(fund_trend.values())
if len(flows) >= 3:
if all(f > 0 for f in flows[-3:]):
result['trend_analysis']['fund'] = '连续流入'
elif all(f < 0 for f in flows[-3:]):
result['trend_analysis']['fund'] = '连续流出'
else:
result['trend_analysis']['fund'] = '波动'
# 4. 市场趋势判断
if len(available_dates) >= 5:
dates_5 = available_dates[:5]
avg_pcts = {}
for date in dates_5:
if date in industry_history:
avg_pct = sum(b['pct_change'] for b in industry_history[date]) / len(industry_history[date])
avg_pcts[date] = avg_pct
result['avg_pcts'] = avg_pcts
# 判断趋势
pcts = list(avg_pcts.values())
if len(pcts) >= 3:
if all(p > 0 for p in pcts[-3:]):
result['trend_analysis']['market'] = '连续上涨'
elif all(p < 0 for p in pcts[-3:]):
result['trend_analysis']['market'] = '连续下跌'
else:
result['trend_analysis']['market'] = '震荡'
return result
def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> bool:
"""
生成盘后分析报告并发送邮件
@@ -243,6 +513,9 @@ def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") ->
# 执行专业分析
analysis = analyze_market(all_industry, all_concept)
# 执行历史分析
history_analysis = analyze_history(all_industry, all_concept, history_days=5)
# ========== 生成HTML正文 ==========
html_lines = [
"<h2>📊 A股板块盘后分析报告</h2>",
@@ -404,6 +677,62 @@ def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") ->
html_lines.append("<li>等待市场企稳后再介入</li>")
html_lines.append("</ul>")
# ===== 八、历史趋势分析 =====
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("<h3>八、历史趋势分析</h3>")
if history_analysis.get('available_dates'):
dates = history_analysis['available_dates']
html_lines.append(f"<p>数据覆盖: 近 {len(dates)} 个交易日 ({', '.join(dates[:3])} 等)</p>")
# 市场趋势
if history_analysis.get('trend_analysis', {}).get('market'):
trend = history_analysis['trend_analysis']['market']
html_lines.append(f"<p><strong>近期趋势: {trend}</strong></p>")
# 资金趋势
if history_analysis.get('trend_analysis', {}).get('fund'):
fund_trend = history_analysis['trend_analysis']['fund']
html_lines.append(f"<p>资金流向: {fund_trend}</p>")
# 资金流向详情
if history_analysis.get('fund_trend'):
html_lines.append("<p>近5日资金流向:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>日期</th><th>净流入(亿)</th></tr>")
for date, flow in history_analysis['fund_trend'].items():
flow_str = f"+{flow:.2f}" if flow > 0 else f"{flow:.2f}"
color = "green" if flow > 0 else "red"
html_lines.append(f"<tr><td>{date}</td><td style='color:{color}'>{flow_str}</td></tr>")
html_lines.append("</table>")
# 连续上涨板块
if history_analysis.get('continuous_up'):
html_lines.append("<p>🔥 连续上涨板块 (近3日):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#fff0f0'><th>板块</th><th>连涨天数</th><th>累计涨幅</th></tr>")
for b in history_analysis['continuous_up'][:5]:
html_lines.append(f"<tr><td>{b['name']}</td><td>{b['days']}天</td><td>+{b['total_pct']:.2f}%</td></tr>")
html_lines.append("</table>")
# 连续下跌板块
if history_analysis.get('continuous_down'):
html_lines.append("<p>❄️ 连续下跌板块 (近3日):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0ff'><th>板块</th><th>连跌天数</th><th>累计跌幅</th></tr>")
for b in history_analysis['continuous_down'][:5]:
html_lines.append(f"<tr><td>{b['name']}</td><td>{b['days']}天</td><td>{b['total_pct']:.2f}%</td></tr>")
html_lines.append("</table>")
# 板块轮动
if history_analysis.get('strong_rotation', {}).get('new_in'):
html_lines.append("<p>🔄 板块轮动 (新进入涨幅TOP10):</p>")
html_lines.append("<ul>")
for b in history_analysis['strong_rotation']['new_in'][:5]:
html_lines.append(f"<li>{b['name']}: +{b['pct_change']:.2f}%</li>")
html_lines.append("</ul>")
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("<p><em>📊 详细数据请查看附件 CSV 文件</em></p>")
@@ -487,6 +816,9 @@ def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) ->
返回:
bool: 是否成功
"""
# 初始化数据库
init_db()
if verbose:
print(f"\n📊 A股板块盘后分析")
print("=" * 50)
@@ -506,6 +838,11 @@ def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) ->
boards_data[board_type] = boards
if verbose:
print(f"✅ 成功获取 {len(boards)} 条数据")
# 保存到数据库
save_to_db(board_type, boards)
if verbose:
print(f" 已保存到数据库")
else:
print(f"❌ 获取 {board_type} 数据失败")
boards_data[board_type] = []
@@ -538,6 +875,11 @@ def main():
report_parser.add_argument("--to", default="wlq@tphai.com", help="收件人邮箱")
report_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志")
# 查看历史命令
history_parser = subparsers.add_parser("history", help="查看历史数据统计")
history_parser.add_argument("--days", type=int, default=5, help="查看天数")
history_parser.add_argument("--board", type=str, help="查看指定板块历史")
args = parser.parse_args()
if args.command == "test":
@@ -568,6 +910,52 @@ def main():
elif args.command == "report":
run_daily_report(to_email=args.to, verbose=args.verbose)
elif args.command == "history":
init_db()
dates = get_available_dates()
if not dates:
print("❌ 暂无历史数据,请先执行 report 命令收集数据")
return
print(f"\n📅 可用数据日期: {len(dates)}")
print(f" 最新: {dates[0] if dates else ''}")
print(f" 范围: {dates[-1] if dates else ''} ~ {dates[0] if dates else ''}")
if args.board:
# 查看指定板块历史
print(f"\n📊 板块 '{args.board}'{args.days} 日数据:")
history = get_board_history('industry', args.board, days=args.days)
if history:
print("-" * 50)
for h in history:
pct_str = f"+{h['pct_change']:.2f}%" if h['pct_change'] > 0 else f"{h['pct_change']:.2f}%"
flow_str = f"+{h['main_flow']:.2f}亿" if h['main_flow'] > 0 else f"{h['main_flow']:.2f}亿"
print(f" {h['date']}: {pct_str}, 资金{flow_str}")
else:
print(f"❌ 未找到板块 '{args.board}' 的历史数据")
else:
# 显示整体统计
print(f"\n📊 近 {args.days} 日市场统计:")
# 资金流向
fund_trend = {}
for date in dates[:args.days]:
industry_data = get_history_data('industry', args.days).get(date, [])
if industry_data:
total_flow = sum(b['main_flow'] for b in industry_data)
avg_pct = sum(b['pct_change'] for b in industry_data) / len(industry_data)
fund_trend[date] = {'flow': total_flow, 'avg_pct': avg_pct}
print("-" * 50)
print(f"{'日期':<12} {'资金净流(亿)':<15} {'平均涨跌':<10}")
print("-" * 50)
for date, data in sorted(fund_trend.items(), reverse=True):
flow_str = f"+{data['flow']:.2f}" if data['flow'] > 0 else f"{data['flow']:.2f}"
pct_str = f"+{data['avg_pct']:.2f}%" if data['avg_pct'] > 0 else f"{data['avg_pct']:.2f}%"
print(f"{date:<12} {flow_str:<15} {pct_str:<10}")
else:
parser.print_help()

View File

@@ -1,5 +1,5 @@
# A股板块详细数据
# 生成时间: 2026-04-10 17:42:51
# 生成时间: 2026-04-10 17:51:40
=== 行业板块涨跌幅排行 ===
板块名称,涨跌幅(%),主力资金(亿),领涨股
1 # A股板块详细数据
2 # 生成时间: 2026-04-10 17:42:51 # 生成时间: 2026-04-10 17:51:40
3 === 行业板块涨跌幅排行 ===
4 板块名称,涨跌幅(%),主力资金(亿),领涨股
5 蓄电池及其他电池,0.06,6.17,9

BIN
data/board_history.db Normal file

Binary file not shown.