Files
stock_system/cron_daily_fetch.py

224 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
A股每日数据自动拉取脚本 (Cron专用)
每个交易日17:00自动运行拉取当日所有股票数据
优化版本:
- 使用 trade_date 参数一次性获取当天所有股票数据
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
用法:
python3 cron_daily_fetch.py # 获取今日数据
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
Cron配置
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
"""
import tushare as ts
import pandas as pd
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
DB_FILE = DATA_DIR / 'progress.db'
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
def log(msg):
"""写入日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] {msg}"
print(log_msg)
LOG_FILE.parent.mkdir(exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(log_msg + '\n')
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def is_weekday(date_str):
"""判断是否为工作日(周一到周五)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5
def get_trade_dates_in_range(start_date, end_date):
"""获取日期范围内的所有工作日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_latest_trade_date():
"""获取数据中最新的交易日期"""
if not MAIN_FILE.exists():
return None
df = pd.read_parquet(MAIN_FILE)
if len(df) == 0:
return None
return df['trade_date'].max()
def fetch_data_by_date(pro, trade_date):
"""
按日期获取当日所有股票数据
这是优化版本:一次请求获取当天所有股票
"""
log(f"获取 {trade_date} 的数据...")
try:
# 一次性获取当天所有股票数据
df = pro.daily(trade_date=trade_date)
if df is None or len(df) == 0:
log(f" {trade_date} 无数据(可能是非交易日)")
return 0, None
log(f" {trade_date} 获取到 {len(df)} 条记录")
return len(df), df
except Exception as e:
log(f" {trade_date} 获取失败: {str(e)[:100]}")
return 0, None
def merge_data(new_df):
"""合并新数据到主文件"""
if new_df is None or len(new_df) == 0:
return
if MAIN_FILE.exists():
existing_df = pd.read_parquet(MAIN_FILE)
# 合并并去重
combined = pd.concat([existing_df, new_df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
else:
combined = new_df
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
return len(combined)
def fetch_dates(pro, dates):
"""获取多个日期的数据"""
if not dates:
log("没有需要获取的日期")
return 0
total_new = 0
all_new_data = []
log(f"需要获取 {len(dates)} 个交易日的数据")
for i, trade_date in enumerate(dates):
count, df = fetch_data_by_date(pro, trade_date)
if df is not None and len(df) > 0:
all_new_data.append(df)
total_new += count
# 间隔
if i < len(dates) - 1:
time.sleep(REQUEST_INTERVAL)
# 一次性合并所有数据
if all_new_data:
combined_new = pd.concat(all_new_data, ignore_index=True)
total_records = merge_data(combined_new)
log(f"数据合并完成,总记录数: {total_records}")
return total_new
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='A股每日数据拉取按日期批量获取')
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
args = parser.parse_args()
log("=" * 60)
log(f"A股每日数据拉取开始")
log("=" * 60)
try:
# 初始化
pro = setup_tushare()
# 确定要获取的日期
if args.date:
# 指定日期
dates = [args.date]
log(f"模式: 获取指定日期 {args.date}")
elif args.days > 0:
# 最近N天
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
dates = get_trade_dates_in_range(start_date, end_date)
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
log(f"工作日: {dates}")
else:
# 默认:获取今日数据
today = datetime.now().strftime('%Y%m%d')
# 如果不是工作日,跳过
if not is_weekday(today):
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
return
dates = [today]
log(f"模式: 获取今日数据 ({today})")
# 获取数据
new_records = fetch_dates(pro, dates)
# 显示文件信息
if MAIN_FILE.exists():
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
log(f"数据文件大小: {size_mb:.2f} MB")
log(f"本次新增数据: {new_records}")
log("=" * 60)
log("完成")
except Exception as e:
log(f"错误: {e}")
import traceback
log(traceback.format_exc())
raise
if __name__ == '__main__':
main()