Files
stock_system/fetch_daily_update.py

264 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
A股每日增量数据更新脚本
功能:
1. 检查所有股票最新数据日期
2. 补齐缺失的交易日数据
3. 支持单次运行和定时运行
用法:
python3 fetch_daily_update.py # 更新所有股票缺失数据
python3 fetch_daily_update.py --check # 只检查不更新
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
"""
import tushare as ts
import pandas as pd
import sqlite3
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
TEMP_DIR = DATA_DIR / 'temp'
DB_FILE = DATA_DIR / 'progress.db'
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def get_latest_dates():
"""获取每只股票的最新数据日期"""
main_file = DATA_DIR / 'stock_daily_data.parquet'
if not main_file.exists():
return {}
df = pd.read_parquet(main_file)
latest = df.groupby('ts_code')['trade_date'].max()
return dict(latest)
def get_all_stock_codes():
"""获取所有股票代码"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
codes = [row[0] for row in cursor.fetchall()]
conn.close()
return codes
def is_trading_day(date_str):
"""判断是否为交易日(简单版:排除周末)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5 # 周一到周五
def get_trade_dates(start_date, end_date):
"""获取日期范围内的可能交易日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_missing_dates(pro, latest_dates, codes, days=30):
"""找出需要更新的日期范围"""
today = datetime.now().strftime('%Y%m%d')
# 使用简单交易日判断(排除周末)
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
trade_dates = get_trade_dates(start, today)
# 找出每个股票缺失的交易日
missing_info = {}
for ts_code in codes:
latest = latest_dates.get(ts_code, '20100101')
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
if missing:
missing_info[ts_code] = {
'latest': latest,
'missing': missing,
'missing_count': len(missing)
}
return missing_info, trade_dates
def fetch_missing_data(pro, missing_info, batch_size=100):
"""批量获取缺失数据"""
total_stocks = len(missing_info)
print(f"\n需要更新 {total_stocks} 只股票")
# 计算需要获取的日期范围
all_missing_dates = set()
for info in missing_info.values():
all_missing_dates.update(info['missing'])
if not all_missing_dates:
print("没有缺失数据")
return 0
start_date = min(all_missing_dates)
end_date = max(all_missing_dates)
print(f"数据范围: {start_date} ~ {end_date}")
# 批量获取数据tushare支持批量查询
codes = list(missing_info.keys())
total_records = 0
batch_count = 0
main_file = DATA_DIR / 'stock_daily_data.parquet'
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
print(f"\n开始获取增量数据...")
print("-" * 50)
for i, ts_code in enumerate(codes):
try:
latest = missing_info[ts_code]['latest']
# 从最新日期的下一天开始获取
start_fetch = latest
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
if df is not None and len(df) > 0:
# 过滤掉已存在的数据
df = df[df['trade_date'] > latest]
if len(df) > 0:
# 合并到主文件
if existing_df is not None:
combined = pd.concat([existing_df, df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined.to_parquet(main_file, index=False, compression='snappy')
existing_df = combined
else:
df.to_parquet(main_file, index=False, compression='snappy')
existing_df = df
total_records += len(df)
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
batch_count += 1
# 每批次保存一次,减少请求间隔
if batch_count >= batch_size:
batch_count = 0
time.sleep(REQUEST_INTERVAL)
except Exception as e:
print(f"[{i+1}/{total_stocks}] {ts_code}{str(e)[:50]}")
time.sleep(1) # 错误后多等一会
print("-" * 50)
print(f"增量更新完成,新增 {total_records} 条记录")
return total_records
def check_missing_status(pro, days=30):
"""检查缺失状态"""
print("=" * 60)
print("A股数据缺失状态检查")
print("=" * 60)
# 获取现有数据最新日期
latest_dates = get_latest_dates()
codes = get_all_stock_codes()
print(f"已获取数据股票数: {len(codes)}")
# 获取缺失信息
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
# 统计
total_missing = len(missing_info)
if total_missing == 0:
print("\n✓ 所有股票数据都是最新的!")
return {}
# 按缺失天数分组统计
missing_stats = {}
for ts_code, info in missing_info.items():
count = info['missing_count']
if count not in missing_stats:
missing_stats[count] = 0
missing_stats[count] += 1
print(f"\n需要更新的股票: {total_missing}")
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
print("\n缺失天数分布:")
for days_count in sorted(missing_stats.keys()):
print(f"{days_count}天: {missing_stats[days_count]}")
# 显示缺失最多的股票
print("\n缺失最多的10只股票:")
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
for ts_code, info in sorted_missing:
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}")
return missing_info
def main():
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
parser.add_argument('--check', action='store_true', help='只检查不更新')
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
args = parser.parse_args()
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化
pro = setup_tushare()
if args.check:
# 只检查
check_missing_status(pro, args.days)
else:
# 检查并更新
missing_info = check_missing_status(pro, args.days)
if missing_info:
fetch_missing_data(pro, missing_info)
# 再次检查确认
print("\n更新后再次检查...")
check_missing_status(pro, args.days)
# 显示文件大小
main_file = DATA_DIR / 'stock_daily_data.parquet'
if main_file.exists():
size_mb = main_file.stat().st_size / 1024 / 1024
print(f"\n数据文件大小: {size_mb:.2f} MB")
if __name__ == '__main__':
main()