""" A股每日增量数据更新脚本 功能: 1. 检查所有股票最新数据日期 2. 补齐缺失的交易日数据 3. 支持单次运行和定时运行 用法: python3 fetch_daily_update.py # 更新所有股票缺失数据 python3 fetch_daily_update.py --check # 只检查不更新 python3 fetch_daily_update.py --days 5 # 补最近5天的数据 """ import tushare as ts import pandas as pd import sqlite3 import os import time from datetime import datetime, timedelta from pathlib import Path import argparse # 配置 BASE_DIR = Path(__file__).parent DATA_DIR = BASE_DIR / 'data' TEMP_DIR = DATA_DIR / 'temp' DB_FILE = DATA_DIR / 'progress.db' STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv' REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短 def setup_tushare(): """初始化tushare""" token = os.environ.get('TUSHARE_TOKEN', '') if not token: config_file = BASE_DIR / 'config.txt' if config_file.exists(): token = config_file.read_text().strip() if not token: raise ValueError("缺少 Tushare Token") ts.set_token(token) return ts.pro_api() def get_latest_dates(): """获取每只股票的最新数据日期""" main_file = DATA_DIR / 'stock_daily_data.parquet' if not main_file.exists(): return {} df = pd.read_parquet(main_file) latest = df.groupby('ts_code')['trade_date'].max() return dict(latest) def get_all_stock_codes(): """获取所有股票代码""" conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'") codes = [row[0] for row in cursor.fetchall()] conn.close() return codes def is_trading_day(date_str): """判断是否为交易日(简单版:排除周末)""" d = datetime.strptime(date_str, '%Y%m%d') return d.weekday() < 5 # 周一到周五 def get_trade_dates(start_date, end_date): """获取日期范围内的可能交易日""" start = datetime.strptime(start_date, '%Y%m%d') end = datetime.strptime(end_date, '%Y%m%d') trade_dates = [] current = start while current <= end: if current.weekday() < 5: # 周一到周五 trade_dates.append(current.strftime('%Y%m%d')) current += timedelta(days=1) return trade_dates def get_missing_dates(pro, latest_dates, codes, days=30): """找出需要更新的日期范围""" today = datetime.now().strftime('%Y%m%d') # 使用简单交易日判断(排除周末) start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d') trade_dates = get_trade_dates(start, today) # 找出每个股票缺失的交易日 missing_info = {} for ts_code in codes: latest = latest_dates.get(ts_code, '20100101') missing = [d for d in trade_dates if d > latest and is_trading_day(d)] if missing: missing_info[ts_code] = { 'latest': latest, 'missing': missing, 'missing_count': len(missing) } return missing_info, trade_dates def fetch_missing_data(pro, missing_info, batch_size=100): """批量获取缺失数据""" total_stocks = len(missing_info) print(f"\n需要更新 {total_stocks} 只股票") # 计算需要获取的日期范围 all_missing_dates = set() for info in missing_info.values(): all_missing_dates.update(info['missing']) if not all_missing_dates: print("没有缺失数据") return 0 start_date = min(all_missing_dates) end_date = max(all_missing_dates) print(f"数据范围: {start_date} ~ {end_date}") # 批量获取数据(tushare支持批量查询) codes = list(missing_info.keys()) total_records = 0 batch_count = 0 main_file = DATA_DIR / 'stock_daily_data.parquet' existing_df = pd.read_parquet(main_file) if main_file.exists() else None print(f"\n开始获取增量数据...") print("-" * 50) for i, ts_code in enumerate(codes): try: latest = missing_info[ts_code]['latest'] # 从最新日期的下一天开始获取 start_fetch = latest df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date) if df is not None and len(df) > 0: # 过滤掉已存在的数据 df = df[df['trade_date'] > latest] if len(df) > 0: # 合并到主文件 if existing_df is not None: combined = pd.concat([existing_df, df], ignore_index=True) combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True) combined.to_parquet(main_file, index=False, compression='snappy') existing_df = combined else: df.to_parquet(main_file, index=False, compression='snappy') existing_df = df total_records += len(df) print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}条") else: print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据") else: print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据") batch_count += 1 # 每批次保存一次,减少请求间隔 if batch_count >= batch_size: batch_count = 0 time.sleep(REQUEST_INTERVAL) except Exception as e: print(f"[{i+1}/{total_stocks}] {ts_code} ✗ {str(e)[:50]}") time.sleep(1) # 错误后多等一会 print("-" * 50) print(f"增量更新完成,新增 {total_records} 条记录") return total_records def check_missing_status(pro, days=30): """检查缺失状态""" print("=" * 60) print("A股数据缺失状态检查") print("=" * 60) # 获取现有数据最新日期 latest_dates = get_latest_dates() codes = get_all_stock_codes() print(f"已获取数据股票数: {len(codes)}") # 获取缺失信息 missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days) # 统计 total_missing = len(missing_info) if total_missing == 0: print("\n✓ 所有股票数据都是最新的!") return {} # 按缺失天数分组统计 missing_stats = {} for ts_code, info in missing_info.items(): count = info['missing_count'] if count not in missing_stats: missing_stats[count] = 0 missing_stats[count] += 1 print(f"\n需要更新的股票: {total_missing} 只") print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}") print("\n缺失天数分布:") for days_count in sorted(missing_stats.keys()): print(f" 缺{days_count}天: {missing_stats[days_count]} 只") # 显示缺失最多的股票 print("\n缺失最多的10只股票:") sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10] for ts_code, info in sorted_missing: print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}天") return missing_info def main(): parser = argparse.ArgumentParser(description='A股每日增量数据更新') parser.add_argument('--check', action='store_true', help='只检查不更新') parser.add_argument('--days', type=int, default=30, help='检查最近多少天') args = parser.parse_args() print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 初始化 pro = setup_tushare() if args.check: # 只检查 check_missing_status(pro, args.days) else: # 检查并更新 missing_info = check_missing_status(pro, args.days) if missing_info: fetch_missing_data(pro, missing_info) # 再次检查确认 print("\n更新后再次检查...") check_missing_status(pro, args.days) # 显示文件大小 main_file = DATA_DIR / 'stock_daily_data.parquet' if main_file.exists(): size_mb = main_file.stat().st_size / 1024 / 1024 print(f"\n数据文件大小: {size_mb:.2f} MB") if __name__ == '__main__': main()