Files
board-monitor/board_monitor.py
hubian 8d77c4a852 feat: 历史数据获取脚本 + 离线模式支持
- 新增 fetch_history.py 获取板块历史K线数据
- 支持从数据库读取数据(离线模式)
- 历史数据分析功能已可用
2026-04-10 18:05:59 +08:00

974 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
A股板块盘后分析系统
获取东方财富板块数据,生成分析报告,发送邮件通知
支持历史数据存储和长时间跨度分析
"""
import urllib.request
import json
import os
import sys
import subprocess
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from pathlib import Path
from collections import defaultdict
# 清除代理环境变量(解决代理问题)
for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
os.environ.pop(proxy_var, None)
# 配置
SCRIPT_DIR = Path(__file__).parent
DATA_DIR = SCRIPT_DIR / "data"
DATA_DIR.mkdir(exist_ok=True)
# 数据库文件
DB_FILE = DATA_DIR / "board_history.db"
# 东方财富API配置
EASTMONEY_BASE_URL = "http://push2.eastmoney.com/api/qt/clist/get"
# 板块类型
BOARD_TYPES = {
"industry": "m:90+t:2", # 行业板块
"concept": "m:90+t:3", # 概念板块
}
# 数据字段
FIELDS = "f12,f14,f2,f3,f62,f66,f84,f104,f125,f126,f127,f128"
# ==================== 数据库操作 ====================
def init_db():
"""初始化数据库"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 创建板块数据表
cursor.execute('''
CREATE TABLE IF NOT EXISTS board_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
board_type TEXT NOT NULL,
board_code TEXT NOT NULL,
board_name TEXT NOT NULL,
pct_change REAL,
main_flow REAL,
leader_name TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(date, board_type, board_code)
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_date ON board_data(date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_board ON board_data(board_type, board_code)')
conn.commit()
conn.close()
def save_to_db(board_type: str, boards: List[Dict], date: str = None):
"""保存板块数据到数据库"""
if date is None:
date = datetime.now().strftime("%Y-%m-%d")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
for board in boards:
cursor.execute('''
INSERT OR REPLACE INTO board_data
(date, board_type, board_code, board_name, pct_change, main_flow, leader_name)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
date,
board_type,
board['code'],
board['name'],
board['pct_change'],
board['main_flow'],
board['leader_name']
))
conn.commit()
conn.close()
def get_history_data(board_type: str, days: int = 5) -> Dict[str, List[Dict]]:
"""
获取历史板块数据
返回:
Dict[date, List[Dict]]: 按日期分组的数据
"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cursor.execute('''
SELECT date, board_code, board_name, pct_change, main_flow, leader_name
FROM board_data
WHERE board_type = ? AND date >= ?
ORDER BY date DESC
''', (board_type, start_date))
rows = cursor.fetchall()
conn.close()
# 按日期分组
result = defaultdict(list)
for row in rows:
result[row[0]].append({
'code': row[1],
'name': row[2],
'pct_change': row[3],
'main_flow': row[4],
'leader_name': row[5]
})
return dict(result)
def get_board_history(board_type: str, board_name: str, days: int = 20) -> List[Dict]:
"""获取单个板块的历史数据"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cursor.execute('''
SELECT date, pct_change, main_flow
FROM board_data
WHERE board_type = ? AND board_name = ? AND date >= ?
ORDER BY date ASC
''', (board_type, board_name, start_date))
rows = cursor.fetchall()
conn.close()
return [{'date': r[0], 'pct_change': r[1], 'main_flow': r[2]} for r in rows]
def get_available_dates() -> List[str]:
"""获取有数据的日期列表"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('SELECT DISTINCT date FROM board_data ORDER BY date DESC LIMIT 30')
dates = [r[0] for r in cursor.fetchall()]
conn.close()
return dates
# ==================== 数据获取 ====================
def get_board_data(board_type: str, sort_by: str = "f3", limit: int = 100) -> Optional[List[Dict]]:
"""
获取板块数据
参数:
board_type: 板块类型 (industry/concept)
sort_by: 排序字段 (f3=涨跌幅, f62=主力资金)
limit: 返回数量
返回:
List[Dict]: 板块数据列表
"""
fs = BOARD_TYPES.get(board_type)
if not fs:
print(f"❌ 未知的板块类型: {board_type}")
return None
url = f"{EASTMONEY_BASE_URL}?fid={sort_by}&po=1&pz={limit}&pn=1&np=1&fltt=2&invt=2&fs={fs}&fields={FIELDS}"
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'http://quote.eastmoney.com/'
})
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode())
if data.get('data') and data['data'].get('diff'):
items = data['data']['diff']
boards = []
for item in items:
board = {
'code': item.get('f12', ''),
'name': item.get('f14', ''),
'price': item.get('f2', 0) / 100 if item.get('f2') else 0,
'pct_change': item.get('f3', 0) / 100 if item.get('f3') else 0,
'main_flow': item.get('f62', 0) / 1e8 if item.get('f62') else 0, # 亿元
'leader_code': item.get('f84', ''),
'leader_name': item.get('f104', ''),
}
boards.append(board)
return boards
else:
print(f"⚠️ API返回数据为空")
return None
except urllib.error.URLError as e:
print(f"❌ 网络请求失败: {e}")
return None
except Exception as e:
print(f"❌ 获取数据异常: {e}")
return None
def analyze_market(all_industry: List, all_concept: List) -> Dict:
"""
专业市场分析
返回各种分析指标
"""
analysis = {}
# 1. 市场整体统计
if all_industry:
up_count = len([b for b in all_industry if b['pct_change'] > 0])
down_count = len([b for b in all_industry if b['pct_change'] < 0])
flat_count = len([b for b in all_industry if b['pct_change'] == 0])
avg_pct = sum(b['pct_change'] for b in all_industry) / len(all_industry)
max_pct = max(b['pct_change'] for b in all_industry)
min_pct = min(b['pct_change'] for b in all_industry)
total_flow = sum(b['main_flow'] for b in all_industry)
analysis['market_stats'] = {
'up_count': up_count,
'down_count': down_count,
'flat_count': flat_count,
'up_ratio': up_count / len(all_industry) * 100,
'avg_pct': avg_pct,
'max_pct': max_pct,
'min_pct': min_pct,
'total_flow': total_flow,
}
# 2. 资金集中度分析
if all_industry:
sorted_by_flow = sorted(all_industry, key=lambda x: x['main_flow'], reverse=True)
top5_flow = sum(b['main_flow'] for b in sorted_by_flow[:5])
analysis['fund_concentration'] = {
'top5_flow': top5_flow,
'top5_ratio': abs(top5_flow) / abs(analysis['market_stats']['total_flow']) * 100 if analysis['market_stats']['total_flow'] != 0 else 0,
}
# 3. 板块强弱分析
if all_industry:
strong_boards = [b for b in all_industry if b['pct_change'] > 1 and b['main_flow'] > 5]
weak_boards = [b for b in all_industry if b['pct_change'] < -1 and b['main_flow'] < -5]
analysis['strength'] = {
'strong_count': len(strong_boards),
'weak_count': len(weak_boards),
'strong_boards': strong_boards[:5],
'weak_boards': weak_boards[:5],
}
# 4. 概念板块热度分析
if all_concept:
hot_concepts = sorted([b for b in all_concept if b['main_flow'] > 0],
key=lambda x: x['main_flow'], reverse=True)[:5]
cold_concepts = sorted([b for b in all_concept if b['main_flow'] < 0],
key=lambda x: x['main_flow'])[:5]
analysis['concept_heat'] = {
'hot': hot_concepts,
'cold': cold_concepts,
}
# 5. 市场情绪判断
if analysis.get('market_stats'):
stats = analysis['market_stats']
# 综合判断
sentiment_score = 0
# 涨跌比例贡献
if stats['up_ratio'] > 70:
sentiment_score += 2
elif stats['up_ratio'] > 50:
sentiment_score += 1
elif stats['up_ratio'] < 30:
sentiment_score -= 2
elif stats['up_ratio'] < 50:
sentiment_score -= 1
# 平均涨跌幅贡献
if stats['avg_pct'] > 0.5:
sentiment_score += 1
elif stats['avg_pct'] < -0.5:
sentiment_score -= 1
# 资金流向贡献
if stats['total_flow'] > 50:
sentiment_score += 2
elif stats['total_flow'] > 0:
sentiment_score += 1
elif stats['total_flow'] < -50:
sentiment_score -= 2
elif stats['total_flow'] < 0:
sentiment_score -= 1
# 情绪等级
if sentiment_score >= 4:
sentiment = '强势上涨'
sentiment_desc = '市场情绪高涨,多数板块上涨,资金大幅流入,建议关注强势板块机会。'
elif sentiment_score >= 2:
sentiment = '偏强'
sentiment_desc = '市场整体偏强,资金流向积极,可适度参与热门板块。'
elif sentiment_score >= 0:
sentiment = '平稳'
sentiment_desc = '市场情绪平稳,涨跌均衡,建议观望或轻仓布局。'
elif sentiment_score >= -2:
sentiment = '偏弱'
sentiment_desc = '市场整体偏弱,资金流出明显,建议谨慎操作,关注防御性板块。'
else:
sentiment = '弱势下跌'
sentiment_desc = '市场情绪低迷,多数板块下跌,资金大幅流出,建议规避风险,等待企稳信号。'
analysis['sentiment'] = {
'score': sentiment_score,
'level': sentiment,
'description': sentiment_desc,
}
return analysis
def analyze_history(all_industry: List, all_concept: List, history_days: int = 5) -> Dict:
"""
历史数据分析
参数:
all_industry: 今日行业板块数据
all_concept: 今日概念板块数据
history_days: 分析天数
返回:
Dict: 历史分析结果
"""
result = {
'available_dates': [],
'trend_analysis': {},
'continuous_up': [],
'continuous_down': [],
'strong_rotation': {},
'fund_trend': {},
}
# 获取有数据的日期
available_dates = get_available_dates()
result['available_dates'] = available_dates[:history_days]
if len(available_dates) < 2:
return result
# 获取历史数据
industry_history = get_history_data('industry', history_days + 2)
concept_history = get_history_data('concept', history_days + 2)
# 1. 连续上涨/下跌板块分析
if len(available_dates) >= 3:
recent_dates = available_dates[:3] # 最近3天
for board in all_industry:
history = get_board_history('industry', board['name'], days=5)
if len(history) >= 3:
# 检查连续上涨
if all(h['pct_change'] > 0 for h in history[-3:]):
total_pct = sum(h['pct_change'] for h in history[-3:])
result['continuous_up'].append({
'name': board['name'],
'days': len([h for h in history if h['pct_change'] > 0]),
'total_pct': total_pct,
})
# 检查连续下跌
elif all(h['pct_change'] < 0 for h in history[-3:]):
total_pct = sum(h['pct_change'] for h in history[-3:])
result['continuous_down'].append({
'name': board['name'],
'days': len([h for h in history if h['pct_change'] < 0]),
'total_pct': total_pct,
})
# 排序
result['continuous_up'] = sorted(result['continuous_up'], key=lambda x: x['total_pct'], reverse=True)[:10]
result['continuous_down'] = sorted(result['continuous_down'], key=lambda x: x['total_pct'])[:10]
# 2. 板块轮动分析
if len(available_dates) >= 2:
today_date = available_dates[0]
yesterday_date = available_dates[1]
today_industry = industry_history.get(today_date, [])
yesterday_industry = industry_history.get(yesterday_date, [])
if today_industry and yesterday_industry:
# 今日涨幅TOP10
today_top = sorted(today_industry, key=lambda x: x['pct_change'], reverse=True)[:10]
today_top_names = {b['name'] for b in today_top}
# 昨日涨幅TOP10
yesterday_top = sorted(yesterday_industry, key=lambda x: x['pct_change'], reverse=True)[:10]
yesterday_top_names = {b['name'] for b in yesterday_top}
# 新进入TOP10的板块轮动进入
new_in = today_top_names - yesterday_top_names
# 跌出TOP10的板块轮动离开
out_of = yesterday_top_names - today_top_names
result['strong_rotation'] = {
'new_in': [b for b in today_top if b['name'] in new_in],
'out_of': list(out_of),
}
# 3. 资金流向趋势分析
if len(available_dates) >= 5:
dates_5 = available_dates[:5]
fund_trend = {}
for date in dates_5:
if date in industry_history:
total_flow = sum(b['main_flow'] for b in industry_history[date])
fund_trend[date] = total_flow
result['fund_trend'] = fund_trend
# 计算资金趋势
flows = list(fund_trend.values())
if len(flows) >= 3:
if all(f > 0 for f in flows[-3:]):
result['trend_analysis']['fund'] = '连续流入'
elif all(f < 0 for f in flows[-3:]):
result['trend_analysis']['fund'] = '连续流出'
else:
result['trend_analysis']['fund'] = '波动'
# 4. 市场趋势判断
if len(available_dates) >= 5:
dates_5 = available_dates[:5]
avg_pcts = {}
for date in dates_5:
if date in industry_history:
avg_pct = sum(b['pct_change'] for b in industry_history[date]) / len(industry_history[date])
avg_pcts[date] = avg_pct
result['avg_pcts'] = avg_pcts
# 判断趋势
pcts = list(avg_pcts.values())
if len(pcts) >= 3:
if all(p > 0 for p in pcts[-3:]):
result['trend_analysis']['market'] = '连续上涨'
elif all(p < 0 for p in pcts[-3:]):
result['trend_analysis']['market'] = '连续下跌'
else:
result['trend_analysis']['market'] = '震荡'
return result
def generate_daily_report(boards_data: Dict, to_email: str = "wlq@tphai.com") -> bool:
"""
生成盘后分析报告并发送邮件
参数:
boards_data: 板块数据字典 {'industry': [], 'concept': []}
to_email: 收件人邮箱
返回:
bool: 是否发送成功
"""
all_industry = boards_data.get('industry', [])
all_concept = boards_data.get('concept', [])
if not all_industry and not all_concept:
print("❌ 无数据,无法生成报告")
return False
# 分析总结
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 排序数据
industry_by_pct = sorted(all_industry, key=lambda x: x['pct_change'], reverse=True)
industry_by_flow = sorted(all_industry, key=lambda x: x['main_flow'], reverse=True)
concept_by_pct = sorted(all_concept, key=lambda x: x['pct_change'], reverse=True)
concept_by_flow = sorted(all_concept, key=lambda x: x['main_flow'], reverse=True)
# 执行专业分析
analysis = analyze_market(all_industry, all_concept)
# 执行历史分析
history_analysis = analyze_history(all_industry, all_concept, history_days=5)
# ========== 生成HTML正文 ==========
html_lines = [
"<h2>📊 A股板块盘后分析报告</h2>",
f"<p>报告时间: {timestamp}</p>",
"",
"<hr>",
]
# ===== 一、市场情绪分析 =====
if analysis.get('sentiment'):
sent = analysis['sentiment']
stats = analysis.get('market_stats', {})
html_lines.append("")
html_lines.append("<h3>一、市场情绪分析</h3>")
html_lines.append(f"<p><strong>市场评级: {sent['level']}</strong></p>")
html_lines.append(f"<p>{sent['description']}</p>")
if stats:
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr><td>上涨板块</td><td>{}</td><td>占比 {:.1f}%</td></tr>".format(stats['up_count'], stats['up_ratio']))
html_lines.append("<tr><td>下跌板块</td><td>{}</td><td>占比 {:.1f}%</td></tr>".format(stats['down_count'], 100 - stats['up_ratio']))
html_lines.append("<tr><td>平均涨跌</td><td>{:.2f}%</td><td>最大涨幅 {:.2f}%</td></tr>".format(stats['avg_pct'], stats['max_pct']))
html_lines.append("<tr><td>资金净流</td><td>{:.2f}亿</td><td>最大跌幅 {:.2f}%</td></tr>".format(stats['total_flow'], stats['min_pct']))
html_lines.append("</table>")
# ===== 二、资金流向分析 =====
if analysis.get('fund_concentration'):
fund = analysis['fund_concentration']
html_lines.append("")
html_lines.append("<h3>二、资金流向分析</h3>")
html_lines.append(f"<p>TOP5板块资金合计: <strong>{fund['top5_flow']:.2f}亿</strong></p>")
html_lines.append(f"<p>资金集中度: <strong>{fund['top5_ratio']:.1f}%</strong>(流入资金集中在少数板块)</p>")
# ===== 三、板块强弱分析 =====
if analysis.get('strength'):
strength = analysis['strength']
html_lines.append("")
html_lines.append("<h3>三、板块强弱分析</h3>")
html_lines.append(f"<p>强势板块(涨幅>1%且资金流入>5亿: <strong>{strength['strong_count']} 个</strong></p>")
html_lines.append(f"<p>弱势板块(跌幅>1%且资金流出>5亿: <strong>{strength['weak_count']} 个</strong></p>")
if strength['strong_boards']:
html_lines.append("<p>强势板块示例:</p>")
html_lines.append("<ul>")
for b in strength['strong_boards'][:3]:
html_lines.append("<li>{name}: +{pct:.2f}%, 资金+{flow:.2f}亿</li>".format(
name=b['name'], pct=b['pct_change'], flow=b['main_flow']))
html_lines.append("</ul>")
if strength['weak_boards']:
html_lines.append("<p>弱势板块示例:</p>")
html_lines.append("<ul>")
for b in strength['weak_boards'][:3]:
html_lines.append("<li>{name}: {pct:.2f}%, 资金{flow:.2f}亿</li>".format(
name=b['name'], pct=b['pct_change'], flow=b['main_flow']))
html_lines.append("</ul>")
# ===== 四、热门概念分析 =====
if analysis.get('concept_heat'):
heat = analysis['concept_heat']
html_lines.append("")
html_lines.append("<h3>四、概念板块热度</h3>")
if heat['hot']:
html_lines.append("<p>🔥 热门概念资金流入TOP:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#fff0f0'><th>概念</th><th>资金(亿)</th><th>涨跌</th></tr>")
for b in heat['hot'][:5]:
pct_str = f"+{b['pct_change']:.2f}%" if b['pct_change'] > 0 else f"{b['pct_change']:.2f}%"
html_lines.append(f"<tr><td>{b['name']}</td><td>+{b['main_flow']:.2f}</td><td>{pct_str}</td></tr>")
html_lines.append("</table>")
if heat['cold']:
html_lines.append("<p>❄️ 冷门概念资金流出TOP:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0ff'><th>概念</th><th>资金(亿)</th><th>涨跌</th></tr>")
for b in heat['cold'][:5]:
pct_str = f"+{b['pct_change']:.2f}%" if b['pct_change'] > 0 else f"{b['pct_change']:.2f}%"
html_lines.append(f"<tr><td>{b['name']}</td><td>{b['main_flow']:.2f}</td><td>{pct_str}</td></tr>")
html_lines.append("</table>")
# ===== 五、行业板块排行 =====
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("")
html_lines.append("<h3>五、行业板块涨跌排行</h3>")
html_lines.append("<p>📈 涨幅 TOP5:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>板块</th><th>涨跌幅</th><th>主力资金</th><th>领涨股</th></tr>")
for board in industry_by_pct[:5]:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿"
html_lines.append(f"<tr><td>{board['name']}</td><td>{pct_str}</td><td>{flow_str}</td><td>{board['leader_name'] or '-'}</td></tr>")
html_lines.append("</table>")
html_lines.append("<p>📉 跌幅 TOP5:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>板块</th><th>涨跌幅</th><th>主力资金</th></tr>")
for board in industry_by_pct[-5:]:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿"
html_lines.append(f"<tr><td>{board['name']}</td><td>{pct_str}</td><td>{flow_str}</td></tr>")
html_lines.append("</table>")
# ===== 六、主力资金排行 =====
html_lines.append("")
html_lines.append("<h3>六、主力资金流向排行</h3>")
html_lines.append("<p>💰 大幅流入 TOP10 (≥10亿):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>板块</th><th>资金(亿)</th><th>涨跌</th></tr>")
inflow_boards = [b for b in industry_by_flow if b['main_flow'] > 10][:10]
for board in inflow_boards:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
html_lines.append(f"<tr><td>{board['name']}</td><td>+{board['main_flow']:.2f}</td><td>{pct_str}</td></tr>")
html_lines.append("</table>")
html_lines.append("<p>💸 大幅流出 TOP10 (≤-10亿):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>板块</th><th>资金(亿)</th><th>涨跌</th></tr>")
outflow_boards = [b for b in industry_by_flow if b['main_flow'] < -10][:10]
for board in outflow_boards:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
html_lines.append(f"<tr><td>{board['name']}</td><td>{board['main_flow']:.2f}</td><td>{pct_str}</td></tr>")
html_lines.append("</table>")
# ===== 七、投资建议 =====
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("<h3>七、投资建议</h3>")
if analysis.get('sentiment'):
sent = analysis['sentiment']
if sent['score'] >= 2:
html_lines.append("<p><strong>策略建议: 积极参与</strong></p>")
html_lines.append("<ul>")
html_lines.append("<li>关注资金大幅流入的热门板块,如新能源、科技类</li>")
html_lines.append("<li>可适当追涨强势板块龙头股</li>")
html_lines.append("<li>注意板块轮动节奏,避免追高</li>")
html_lines.append("</ul>")
elif sent['score'] >= 0:
html_lines.append("<p><strong>策略建议: 观望为主</strong></p>")
html_lines.append("<ul>")
html_lines.append("<li>等待明确的市场方向信号</li>")
html_lines.append("<li>可轻仓布局有资金流入的潜力板块</li>")
html_lines.append("<li>规避资金流出明显的板块</li>")
html_lines.append("</ul>")
else:
html_lines.append("<p><strong>策略建议: 谨慎防守</strong></p>")
html_lines.append("<ul>")
html_lines.append("<li>控制仓位,规避风险板块</li>")
html_lines.append("<li>关注防御性板块如银行、医药等</li>")
html_lines.append("<li>等待市场企稳后再介入</li>")
html_lines.append("</ul>")
# ===== 八、历史趋势分析 =====
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("<h3>八、历史趋势分析</h3>")
if history_analysis.get('available_dates'):
dates = history_analysis['available_dates']
html_lines.append(f"<p>数据覆盖: 近 {len(dates)} 个交易日 ({', '.join(dates[:3])} 等)</p>")
# 市场趋势
if history_analysis.get('trend_analysis', {}).get('market'):
trend = history_analysis['trend_analysis']['market']
html_lines.append(f"<p><strong>近期趋势: {trend}</strong></p>")
# 资金趋势
if history_analysis.get('trend_analysis', {}).get('fund'):
fund_trend = history_analysis['trend_analysis']['fund']
html_lines.append(f"<p>资金流向: {fund_trend}</p>")
# 资金流向详情
if history_analysis.get('fund_trend'):
html_lines.append("<p>近5日资金流向:</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0f0'><th>日期</th><th>净流入(亿)</th></tr>")
for date, flow in history_analysis['fund_trend'].items():
flow_str = f"+{flow:.2f}" if flow > 0 else f"{flow:.2f}"
color = "green" if flow > 0 else "red"
html_lines.append(f"<tr><td>{date}</td><td style='color:{color}'>{flow_str}</td></tr>")
html_lines.append("</table>")
# 连续上涨板块
if history_analysis.get('continuous_up'):
html_lines.append("<p>🔥 连续上涨板块 (近3日):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#fff0f0'><th>板块</th><th>连涨天数</th><th>累计涨幅</th></tr>")
for b in history_analysis['continuous_up'][:5]:
html_lines.append(f"<tr><td>{b['name']}</td><td>{b['days']}天</td><td>+{b['total_pct']:.2f}%</td></tr>")
html_lines.append("</table>")
# 连续下跌板块
if history_analysis.get('continuous_down'):
html_lines.append("<p>❄️ 连续下跌板块 (近3日):</p>")
html_lines.append("<table border='1' cellpadding='6' cellspacing='0' style='border-collapse: collapse;'>")
html_lines.append("<tr style='background:#f0f0ff'><th>板块</th><th>连跌天数</th><th>累计跌幅</th></tr>")
for b in history_analysis['continuous_down'][:5]:
html_lines.append(f"<tr><td>{b['name']}</td><td>{b['days']}天</td><td>{b['total_pct']:.2f}%</td></tr>")
html_lines.append("</table>")
# 板块轮动
if history_analysis.get('strong_rotation', {}).get('new_in'):
html_lines.append("<p>🔄 板块轮动 (新进入涨幅TOP10):</p>")
html_lines.append("<ul>")
for b in history_analysis['strong_rotation']['new_in'][:5]:
html_lines.append(f"<li>{b['name']}: +{b['pct_change']:.2f}%</li>")
html_lines.append("</ul>")
html_lines.append("")
html_lines.append("<hr>")
html_lines.append("<p><em>📊 详细数据请查看附件 CSV 文件</em></p>")
html_body = "\n".join(html_lines)
# 生成附件文件CSV格式
attachment_file = DATA_DIR / f"board_detail_{datetime.now().strftime('%Y%m%d')}.csv"
csv_lines = [
"# A股板块详细数据",
f"# 生成时间: {timestamp}",
"",
"=== 行业板块涨跌幅排行 ===",
"板块名称,涨跌幅(%),主力资金(亿),领涨股",
]
for board in industry_by_pct:
csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}")
csv_lines.append("")
csv_lines.append("=== 行业板块资金流向排行 ===")
csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股")
for board in industry_by_flow:
csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}")
csv_lines.append("")
csv_lines.append("=== 概念板块涨跌幅排行 ===")
csv_lines.append("板块名称,涨跌幅(%),主力资金(亿),领涨股")
for board in concept_by_pct:
csv_lines.append(f"{board['name']},{board['pct_change']:.2f},{board['main_flow']:.2f},{board['leader_name'] or ''}")
csv_lines.append("")
csv_lines.append("=== 概念板块资金流向排行 ===")
csv_lines.append("板块名称,主力资金(亿),涨跌幅(%),领涨股")
for board in concept_by_flow:
csv_lines.append(f"{board['name']},{board['main_flow']:.2f},{board['pct_change']:.2f},{board['leader_name'] or ''}")
attachment_file.write_text("\n".join(csv_lines), encoding='utf-8')
# 发送邮件
email_script = SCRIPT_DIR.parent.parent / "skills/email/scripts/send_email.py"
subject = f"【A股板块盘后分析】{datetime.now().strftime('%Y-%m-%d')}"
cmd = [
"python3", str(email_script),
"send",
"--to", to_email,
"--subject", subject,
"--body", html_body,
"--html",
"--attach", str(attachment_file)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✅ 报告发送成功: {to_email}")
print(f" 附件: {attachment_file}")
return True
else:
print(f"❌ 发送失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 发送异常: {e}")
return False
def run_daily_report(to_email: str = "wlq@tphai.com", verbose: bool = False) -> bool:
"""
执行盘后报告生成和发送
参数:
to_email: 收件人邮箱
verbose: 是否显示详细日志
返回:
bool: 是否成功
"""
# 初始化数据库
init_db()
if verbose:
print(f"\n📊 A股板块盘后分析")
print("=" * 50)
print(f"收件人: {to_email}")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 获取所有板块数据
boards_data = {}
for board_type in ["industry", "concept"]:
if verbose:
print(f"\n📡 获取 {board_type} 板块数据...")
boards = get_board_data(board_type, limit=100)
if boards:
boards_data[board_type] = boards
if verbose:
print(f"✅ 成功获取 {len(boards)} 条数据")
# 保存到数据库
save_to_db(board_type, boards)
if verbose:
print(f" 已保存到数据库")
else:
# 如果实时获取失败,尝试从数据库读取最新数据
if verbose:
print(f"⚠️ 实时获取失败,从数据库读取最新数据...")
available_dates = get_available_dates()
if available_dates:
latest_date = available_dates[0]
history_data = get_history_data(board_type, 1)
if latest_date in history_data:
boards_data[board_type] = history_data[latest_date]
if verbose:
print(f"✅ 从数据库读取 {latest_date}{len(history_data[latest_date])} 条数据")
# 生成并发送报告
if boards_data.get('industry') or boards_data.get('concept'):
return generate_daily_report(boards_data, to_email)
else:
print("❌ 所有数据获取失败,无法生成报告")
return False
def main():
"""命令行入口"""
import argparse
parser = argparse.ArgumentParser(description="A股板块盘后分析系统")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 测试命令
subparsers.add_parser("test", help="测试API连接")
# 获取数据命令
get_parser = subparsers.add_parser("get", help="获取板块数据")
get_parser.add_argument("type", choices=["industry", "concept"], help="板块类型")
get_parser.add_argument("--limit", type=int, default=20, help="返回数量")
# 发送报告命令
report_parser = subparsers.add_parser("report", help="生成并发送盘后报告")
report_parser.add_argument("--to", default="wlq@tphai.com", help="收件人邮箱")
report_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志")
# 查看历史命令
history_parser = subparsers.add_parser("history", help="查看历史数据统计")
history_parser.add_argument("--days", type=int, default=5, help="查看天数")
history_parser.add_argument("--board", type=str, help="查看指定板块历史")
args = parser.parse_args()
if args.command == "test":
print("\n🧪 测试东方财富API连接...")
for board_type in ["industry", "concept"]:
print(f"\n测试 {board_type} 板块...")
boards = get_board_data(board_type, limit=5)
if boards:
print(f"✅ 成功获取 {len(boards)} 条数据")
for board in boards[:3]:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
print(f" - {board['name']}: {pct_str}")
else:
print(f"{board_type} 测试失败")
elif args.command == "get":
boards = get_board_data(args.type, limit=args.limit)
if boards:
print(f"\n📊 {args.type} 板块数据 ({len(boards)} 条)")
print("=" * 50)
for board in boards:
pct_str = f"+{board['pct_change']:.2f}%" if board['pct_change'] > 0 else f"{board['pct_change']:.2f}%"
flow_str = f"+{board['main_flow']:.2f}亿" if board['main_flow'] > 0 else f"{board['main_flow']:.2f}亿"
print(f"{board['name']}: {pct_str}, 主力{flow_str}")
else:
print("❌ 获取数据失败")
elif args.command == "report":
run_daily_report(to_email=args.to, verbose=args.verbose)
elif args.command == "history":
init_db()
dates = get_available_dates()
if not dates:
print("❌ 暂无历史数据,请先执行 report 命令收集数据")
return
print(f"\n📅 可用数据日期: {len(dates)}")
print(f" 最新: {dates[0] if dates else ''}")
print(f" 范围: {dates[-1] if dates else ''} ~ {dates[0] if dates else ''}")
if args.board:
# 查看指定板块历史
print(f"\n📊 板块 '{args.board}'{args.days} 日数据:")
history = get_board_history('industry', args.board, days=args.days)
if history:
print("-" * 50)
for h in history:
pct_str = f"+{h['pct_change']:.2f}%" if h['pct_change'] > 0 else f"{h['pct_change']:.2f}%"
flow_str = f"+{h['main_flow']:.2f}亿" if h['main_flow'] > 0 else f"{h['main_flow']:.2f}亿"
print(f" {h['date']}: {pct_str}, 资金{flow_str}")
else:
print(f"❌ 未找到板块 '{args.board}' 的历史数据")
else:
# 显示整体统计
print(f"\n📊 近 {args.days} 日市场统计:")
# 资金流向
fund_trend = {}
for date in dates[:args.days]:
industry_data = get_history_data('industry', args.days).get(date, [])
if industry_data:
total_flow = sum(b['main_flow'] for b in industry_data)
avg_pct = sum(b['pct_change'] for b in industry_data) / len(industry_data)
fund_trend[date] = {'flow': total_flow, 'avg_pct': avg_pct}
print("-" * 50)
print(f"{'日期':<12} {'资金净流(亿)':<15} {'平均涨跌':<10}")
print("-" * 50)
for date, data in sorted(fund_trend.items(), reverse=True):
flow_str = f"+{data['flow']:.2f}" if data['flow'] > 0 else f"{data['flow']:.2f}"
pct_str = f"+{data['avg_pct']:.2f}%" if data['avg_pct'] > 0 else f"{data['avg_pct']:.2f}%"
print(f"{date:<12} {flow_str:<15} {pct_str:<10}")
else:
parser.print_help()
if __name__ == "__main__":
main()