264 lines
8.4 KiB
Python
264 lines
8.4 KiB
Python
"""
|
||
A股每日增量数据更新脚本
|
||
功能:
|
||
1. 检查所有股票最新数据日期
|
||
2. 补齐缺失的交易日数据
|
||
3. 支持单次运行和定时运行
|
||
|
||
用法:
|
||
python3 fetch_daily_update.py # 更新所有股票缺失数据
|
||
python3 fetch_daily_update.py --check # 只检查不更新
|
||
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
|
||
"""
|
||
|
||
import tushare as ts
|
||
import pandas as pd
|
||
import sqlite3
|
||
import os
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
import argparse
|
||
|
||
# 配置
|
||
BASE_DIR = Path(__file__).parent
|
||
DATA_DIR = BASE_DIR / 'data'
|
||
TEMP_DIR = DATA_DIR / 'temp'
|
||
DB_FILE = DATA_DIR / 'progress.db'
|
||
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
|
||
|
||
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
|
||
|
||
|
||
def setup_tushare():
|
||
"""初始化tushare"""
|
||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||
if not token:
|
||
config_file = BASE_DIR / 'config.txt'
|
||
if config_file.exists():
|
||
token = config_file.read_text().strip()
|
||
if not token:
|
||
raise ValueError("缺少 Tushare Token")
|
||
ts.set_token(token)
|
||
return ts.pro_api()
|
||
|
||
|
||
def get_latest_dates():
|
||
"""获取每只股票的最新数据日期"""
|
||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||
if not main_file.exists():
|
||
return {}
|
||
|
||
df = pd.read_parquet(main_file)
|
||
latest = df.groupby('ts_code')['trade_date'].max()
|
||
return dict(latest)
|
||
|
||
|
||
def get_all_stock_codes():
|
||
"""获取所有股票代码"""
|
||
conn = sqlite3.connect(DB_FILE)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
|
||
codes = [row[0] for row in cursor.fetchall()]
|
||
conn.close()
|
||
return codes
|
||
|
||
|
||
def is_trading_day(date_str):
|
||
"""判断是否为交易日(简单版:排除周末)"""
|
||
d = datetime.strptime(date_str, '%Y%m%d')
|
||
return d.weekday() < 5 # 周一到周五
|
||
|
||
|
||
def get_trade_dates(start_date, end_date):
|
||
"""获取日期范围内的可能交易日"""
|
||
start = datetime.strptime(start_date, '%Y%m%d')
|
||
end = datetime.strptime(end_date, '%Y%m%d')
|
||
|
||
trade_dates = []
|
||
current = start
|
||
while current <= end:
|
||
if current.weekday() < 5: # 周一到周五
|
||
trade_dates.append(current.strftime('%Y%m%d'))
|
||
current += timedelta(days=1)
|
||
|
||
return trade_dates
|
||
|
||
|
||
def get_missing_dates(pro, latest_dates, codes, days=30):
|
||
"""找出需要更新的日期范围"""
|
||
today = datetime.now().strftime('%Y%m%d')
|
||
|
||
# 使用简单交易日判断(排除周末)
|
||
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
|
||
trade_dates = get_trade_dates(start, today)
|
||
|
||
# 找出每个股票缺失的交易日
|
||
missing_info = {}
|
||
for ts_code in codes:
|
||
latest = latest_dates.get(ts_code, '20100101')
|
||
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
|
||
if missing:
|
||
missing_info[ts_code] = {
|
||
'latest': latest,
|
||
'missing': missing,
|
||
'missing_count': len(missing)
|
||
}
|
||
|
||
return missing_info, trade_dates
|
||
|
||
|
||
def fetch_missing_data(pro, missing_info, batch_size=100):
|
||
"""批量获取缺失数据"""
|
||
total_stocks = len(missing_info)
|
||
print(f"\n需要更新 {total_stocks} 只股票")
|
||
|
||
# 计算需要获取的日期范围
|
||
all_missing_dates = set()
|
||
for info in missing_info.values():
|
||
all_missing_dates.update(info['missing'])
|
||
|
||
if not all_missing_dates:
|
||
print("没有缺失数据")
|
||
return 0
|
||
|
||
start_date = min(all_missing_dates)
|
||
end_date = max(all_missing_dates)
|
||
print(f"数据范围: {start_date} ~ {end_date}")
|
||
|
||
# 批量获取数据(tushare支持批量查询)
|
||
codes = list(missing_info.keys())
|
||
total_records = 0
|
||
batch_count = 0
|
||
|
||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
|
||
|
||
print(f"\n开始获取增量数据...")
|
||
print("-" * 50)
|
||
|
||
for i, ts_code in enumerate(codes):
|
||
try:
|
||
latest = missing_info[ts_code]['latest']
|
||
# 从最新日期的下一天开始获取
|
||
start_fetch = latest
|
||
|
||
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
|
||
|
||
if df is not None and len(df) > 0:
|
||
# 过滤掉已存在的数据
|
||
df = df[df['trade_date'] > latest]
|
||
|
||
if len(df) > 0:
|
||
# 合并到主文件
|
||
if existing_df is not None:
|
||
combined = pd.concat([existing_df, df], ignore_index=True)
|
||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||
combined.to_parquet(main_file, index=False, compression='snappy')
|
||
existing_df = combined
|
||
else:
|
||
df.to_parquet(main_file, index=False, compression='snappy')
|
||
existing_df = df
|
||
|
||
total_records += len(df)
|
||
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}条")
|
||
else:
|
||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
|
||
else:
|
||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
|
||
|
||
batch_count += 1
|
||
|
||
# 每批次保存一次,减少请求间隔
|
||
if batch_count >= batch_size:
|
||
batch_count = 0
|
||
|
||
time.sleep(REQUEST_INTERVAL)
|
||
|
||
except Exception as e:
|
||
print(f"[{i+1}/{total_stocks}] {ts_code} ✗ {str(e)[:50]}")
|
||
time.sleep(1) # 错误后多等一会
|
||
|
||
print("-" * 50)
|
||
print(f"增量更新完成,新增 {total_records} 条记录")
|
||
|
||
return total_records
|
||
|
||
|
||
def check_missing_status(pro, days=30):
|
||
"""检查缺失状态"""
|
||
print("=" * 60)
|
||
print("A股数据缺失状态检查")
|
||
print("=" * 60)
|
||
|
||
# 获取现有数据最新日期
|
||
latest_dates = get_latest_dates()
|
||
codes = get_all_stock_codes()
|
||
|
||
print(f"已获取数据股票数: {len(codes)}")
|
||
|
||
# 获取缺失信息
|
||
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
|
||
|
||
# 统计
|
||
total_missing = len(missing_info)
|
||
if total_missing == 0:
|
||
print("\n✓ 所有股票数据都是最新的!")
|
||
return {}
|
||
|
||
# 按缺失天数分组统计
|
||
missing_stats = {}
|
||
for ts_code, info in missing_info.items():
|
||
count = info['missing_count']
|
||
if count not in missing_stats:
|
||
missing_stats[count] = 0
|
||
missing_stats[count] += 1
|
||
|
||
print(f"\n需要更新的股票: {total_missing} 只")
|
||
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
|
||
|
||
print("\n缺失天数分布:")
|
||
for days_count in sorted(missing_stats.keys()):
|
||
print(f" 缺{days_count}天: {missing_stats[days_count]} 只")
|
||
|
||
# 显示缺失最多的股票
|
||
print("\n缺失最多的10只股票:")
|
||
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
|
||
for ts_code, info in sorted_missing:
|
||
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}天")
|
||
|
||
return missing_info
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
|
||
parser.add_argument('--check', action='store_true', help='只检查不更新')
|
||
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
|
||
args = parser.parse_args()
|
||
|
||
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# 初始化
|
||
pro = setup_tushare()
|
||
|
||
if args.check:
|
||
# 只检查
|
||
check_missing_status(pro, args.days)
|
||
else:
|
||
# 检查并更新
|
||
missing_info = check_missing_status(pro, args.days)
|
||
if missing_info:
|
||
fetch_missing_data(pro, missing_info)
|
||
# 再次检查确认
|
||
print("\n更新后再次检查...")
|
||
check_missing_status(pro, args.days)
|
||
|
||
# 显示文件大小
|
||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||
if main_file.exists():
|
||
size_mb = main_file.stat().st_size / 1024 / 1024
|
||
print(f"\n数据文件大小: {size_mb:.2f} MB")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |