1 Commits
v0.2.0 ... main

Author SHA1 Message Date
21bbc9cd1e feat: 优化每日数据拉取,改为按日期批量获取 2026-04-14 09:29:57 +08:00
4 changed files with 3323 additions and 11 deletions

224
cron_daily_fetch.py Normal file
View File

@@ -0,0 +1,224 @@
"""
A股每日数据自动拉取脚本 (Cron专用)
每个交易日17:00自动运行拉取当日所有股票数据
优化版本:
- 使用 trade_date 参数一次性获取当天所有股票数据
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
用法:
python3 cron_daily_fetch.py # 获取今日数据
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
Cron配置
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
"""
import tushare as ts
import pandas as pd
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
DB_FILE = DATA_DIR / 'progress.db'
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
def log(msg):
"""写入日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] {msg}"
print(log_msg)
LOG_FILE.parent.mkdir(exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(log_msg + '\n')
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def is_weekday(date_str):
"""判断是否为工作日(周一到周五)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5
def get_trade_dates_in_range(start_date, end_date):
"""获取日期范围内的所有工作日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_latest_trade_date():
"""获取数据中最新的交易日期"""
if not MAIN_FILE.exists():
return None
df = pd.read_parquet(MAIN_FILE)
if len(df) == 0:
return None
return df['trade_date'].max()
def fetch_data_by_date(pro, trade_date):
"""
按日期获取当日所有股票数据
这是优化版本:一次请求获取当天所有股票
"""
log(f"获取 {trade_date} 的数据...")
try:
# 一次性获取当天所有股票数据
df = pro.daily(trade_date=trade_date)
if df is None or len(df) == 0:
log(f" {trade_date} 无数据(可能是非交易日)")
return 0, None
log(f" {trade_date} 获取到 {len(df)} 条记录")
return len(df), df
except Exception as e:
log(f" {trade_date} 获取失败: {str(e)[:100]}")
return 0, None
def merge_data(new_df):
"""合并新数据到主文件"""
if new_df is None or len(new_df) == 0:
return
if MAIN_FILE.exists():
existing_df = pd.read_parquet(MAIN_FILE)
# 合并并去重
combined = pd.concat([existing_df, new_df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
else:
combined = new_df
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
return len(combined)
def fetch_dates(pro, dates):
"""获取多个日期的数据"""
if not dates:
log("没有需要获取的日期")
return 0
total_new = 0
all_new_data = []
log(f"需要获取 {len(dates)} 个交易日的数据")
for i, trade_date in enumerate(dates):
count, df = fetch_data_by_date(pro, trade_date)
if df is not None and len(df) > 0:
all_new_data.append(df)
total_new += count
# 间隔
if i < len(dates) - 1:
time.sleep(REQUEST_INTERVAL)
# 一次性合并所有数据
if all_new_data:
combined_new = pd.concat(all_new_data, ignore_index=True)
total_records = merge_data(combined_new)
log(f"数据合并完成,总记录数: {total_records}")
return total_new
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='A股每日数据拉取按日期批量获取')
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
args = parser.parse_args()
log("=" * 60)
log(f"A股每日数据拉取开始")
log("=" * 60)
try:
# 初始化
pro = setup_tushare()
# 确定要获取的日期
if args.date:
# 指定日期
dates = [args.date]
log(f"模式: 获取指定日期 {args.date}")
elif args.days > 0:
# 最近N天
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
dates = get_trade_dates_in_range(start_date, end_date)
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
log(f"工作日: {dates}")
else:
# 默认:获取今日数据
today = datetime.now().strftime('%Y%m%d')
# 如果不是工作日,跳过
if not is_weekday(today):
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
return
dates = [today]
log(f"模式: 获取今日数据 ({today})")
# 获取数据
new_records = fetch_dates(pro, dates)
# 显示文件信息
if MAIN_FILE.exists():
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
log(f"数据文件大小: {size_mb:.2f} MB")
log(f"本次新增数据: {new_records}")
log("=" * 60)
log("完成")
except Exception as e:
log(f"错误: {e}")
import traceback
log(traceback.format_exc())
raise
if __name__ == '__main__':
main()

2749
data/completed_stocks.txt Normal file

File diff suppressed because it is too large Load Diff

264
fetch_daily_update.py Normal file
View File

@@ -0,0 +1,264 @@
"""
A股每日增量数据更新脚本
功能:
1. 检查所有股票最新数据日期
2. 补齐缺失的交易日数据
3. 支持单次运行和定时运行
用法:
python3 fetch_daily_update.py # 更新所有股票缺失数据
python3 fetch_daily_update.py --check # 只检查不更新
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
"""
import tushare as ts
import pandas as pd
import sqlite3
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
TEMP_DIR = DATA_DIR / 'temp'
DB_FILE = DATA_DIR / 'progress.db'
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def get_latest_dates():
"""获取每只股票的最新数据日期"""
main_file = DATA_DIR / 'stock_daily_data.parquet'
if not main_file.exists():
return {}
df = pd.read_parquet(main_file)
latest = df.groupby('ts_code')['trade_date'].max()
return dict(latest)
def get_all_stock_codes():
"""获取所有股票代码"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
codes = [row[0] for row in cursor.fetchall()]
conn.close()
return codes
def is_trading_day(date_str):
"""判断是否为交易日(简单版:排除周末)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5 # 周一到周五
def get_trade_dates(start_date, end_date):
"""获取日期范围内的可能交易日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_missing_dates(pro, latest_dates, codes, days=30):
"""找出需要更新的日期范围"""
today = datetime.now().strftime('%Y%m%d')
# 使用简单交易日判断(排除周末)
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
trade_dates = get_trade_dates(start, today)
# 找出每个股票缺失的交易日
missing_info = {}
for ts_code in codes:
latest = latest_dates.get(ts_code, '20100101')
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
if missing:
missing_info[ts_code] = {
'latest': latest,
'missing': missing,
'missing_count': len(missing)
}
return missing_info, trade_dates
def fetch_missing_data(pro, missing_info, batch_size=100):
"""批量获取缺失数据"""
total_stocks = len(missing_info)
print(f"\n需要更新 {total_stocks} 只股票")
# 计算需要获取的日期范围
all_missing_dates = set()
for info in missing_info.values():
all_missing_dates.update(info['missing'])
if not all_missing_dates:
print("没有缺失数据")
return 0
start_date = min(all_missing_dates)
end_date = max(all_missing_dates)
print(f"数据范围: {start_date} ~ {end_date}")
# 批量获取数据tushare支持批量查询
codes = list(missing_info.keys())
total_records = 0
batch_count = 0
main_file = DATA_DIR / 'stock_daily_data.parquet'
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
print(f"\n开始获取增量数据...")
print("-" * 50)
for i, ts_code in enumerate(codes):
try:
latest = missing_info[ts_code]['latest']
# 从最新日期的下一天开始获取
start_fetch = latest
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
if df is not None and len(df) > 0:
# 过滤掉已存在的数据
df = df[df['trade_date'] > latest]
if len(df) > 0:
# 合并到主文件
if existing_df is not None:
combined = pd.concat([existing_df, df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined.to_parquet(main_file, index=False, compression='snappy')
existing_df = combined
else:
df.to_parquet(main_file, index=False, compression='snappy')
existing_df = df
total_records += len(df)
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
batch_count += 1
# 每批次保存一次,减少请求间隔
if batch_count >= batch_size:
batch_count = 0
time.sleep(REQUEST_INTERVAL)
except Exception as e:
print(f"[{i+1}/{total_stocks}] {ts_code}{str(e)[:50]}")
time.sleep(1) # 错误后多等一会
print("-" * 50)
print(f"增量更新完成,新增 {total_records} 条记录")
return total_records
def check_missing_status(pro, days=30):
"""检查缺失状态"""
print("=" * 60)
print("A股数据缺失状态检查")
print("=" * 60)
# 获取现有数据最新日期
latest_dates = get_latest_dates()
codes = get_all_stock_codes()
print(f"已获取数据股票数: {len(codes)}")
# 获取缺失信息
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
# 统计
total_missing = len(missing_info)
if total_missing == 0:
print("\n✓ 所有股票数据都是最新的!")
return {}
# 按缺失天数分组统计
missing_stats = {}
for ts_code, info in missing_info.items():
count = info['missing_count']
if count not in missing_stats:
missing_stats[count] = 0
missing_stats[count] += 1
print(f"\n需要更新的股票: {total_missing}")
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
print("\n缺失天数分布:")
for days_count in sorted(missing_stats.keys()):
print(f"{days_count}天: {missing_stats[days_count]}")
# 显示缺失最多的股票
print("\n缺失最多的10只股票:")
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
for ts_code, info in sorted_missing:
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}")
return missing_info
def main():
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
parser.add_argument('--check', action='store_true', help='只检查不更新')
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
args = parser.parse_args()
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化
pro = setup_tushare()
if args.check:
# 只检查
check_missing_status(pro, args.days)
else:
# 检查并更新
missing_info = check_missing_status(pro, args.days)
if missing_info:
fetch_missing_data(pro, missing_info)
# 再次检查确认
print("\n更新后再次检查...")
check_missing_status(pro, args.days)
# 显示文件大小
main_file = DATA_DIR / 'stock_daily_data.parquet'
if main_file.exists():
size_mb = main_file.stat().st_size / 1024 / 1024
print(f"\n数据文件大小: {size_mb:.2f} MB")
if __name__ == '__main__':
main()

View File

@@ -25,7 +25,7 @@ START_DATE = '20100101'
END_DATE = datetime.now().strftime('%Y%m%d')
# 请求间隔(秒)- tushare积分限制
REQUEST_INTERVAL = 9
REQUEST_INTERVAL = 5
def setup_tushare(token=None):
@@ -91,34 +91,109 @@ def get_stock_codes_with_suffix(df):
def fetch_daily_data(pro, codes, start_date, end_date):
"""逐个获取日线数据(每次一支股票)"""
all_data = []
"""逐个获取日线数据(每次一支股票),支持断点续传"""
total = len(codes)
print(f"\n{total} 只股票需要获取")
print(f"预计耗时: {total * REQUEST_INTERVAL / 60:.1f} 分钟")
# 加载已完成的股票列表
completed_file = DATA_DIR / 'completed_stocks.txt'
completed_stocks = set()
if completed_file.exists():
lines = completed_file.read_text().strip().split('\n')
completed_stocks = set(line.strip() for line in lines if line.strip())
print(f"已完成: {len(completed_stocks)} 只股票")
# 统计已有数据
existing_data_file = DATA_DIR / 'stock_daily_data.parquet'
if existing_data_file.exists():
existing_df = pd.read_parquet(existing_data_file)
print(f"已有数据: {len(existing_df)} 条记录")
print(f"\n{total} 只股票,待处理: {total - len(completed_stocks)}")
print(f"预计耗时: {(total - len(completed_stocks)) * REQUEST_INTERVAL / 60:.1f} 分钟")
print("-" * 50)
for i, ts_code in enumerate(codes):
# 跳过已完成的
if ts_code in completed_stocks:
print(f"[{i+1}/{total}] {ts_code} 已完成,跳过")
continue
try:
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ')
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ', flush=True)
df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
if df is not None and len(df) > 0:
all_data.append(df)
print(f"成功,{len(df)} 条记录")
# 实时保存单只股票数据
save_single_stock(df, ts_code, completed_file)
else:
print("无数据")
# 无数据也标记为完成
mark_completed(ts_code, completed_file)
except Exception as e:
print(f"错误: {e}")
# 出错不标记完成,下次重试
# 每次请求后休息9秒
if i < total - 1: # 最后一个不需要等待
# 每次请求后休息
if i < total - 1:
time.sleep(REQUEST_INTERVAL)
return all_data
# 最后合并所有数据
return merge_all_data()
def save_single_stock(df, ts_code, completed_file):
"""保存单只股票数据并标记完成"""
# 读取已有数据
output_file = DATA_DIR / 'stock_daily_data.parquet'
if output_file.exists():
existing_df = pd.read_parquet(output_file)
# 删除该股票的旧数据(如果有)
existing_df = existing_df[existing_df['ts_code'] != ts_code]
# 合并新数据
combined_df = pd.concat([existing_df, df], ignore_index=True)
else:
combined_df = df
# 排序
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
# 保存
combined_df.to_parquet(output_file, index=False)
# 标记完成
mark_completed(ts_code, completed_file)
def merge_all_data():
"""最后合并所有数据(用于返回)"""
output_file = DATA_DIR / 'stock_daily_data.parquet'
if output_file.exists():
return [pd.read_parquet(output_file)]
return []
def save_progress(all_data, ts_code, completed_file):
"""实时保存进度(保留兼容性)"""
# 合并并保存数据
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
# 保存parquet
output_file = DATA_DIR / 'stock_daily_data.parquet'
combined_df.to_parquet(output_file, index=False)
# 标记完成
mark_completed(ts_code, completed_file)
def mark_completed(ts_code, completed_file):
"""标记股票已完成"""
with open(completed_file, 'a') as f:
f.write(ts_code + '\n')
def save_to_parquet(df, filename):
@@ -197,4 +272,4 @@ def main():
if __name__ == '__main__':
main()
main()