224 lines
6.6 KiB
Python
224 lines
6.6 KiB
Python
"""
|
||
A股每日数据自动拉取脚本 (Cron专用)
|
||
每个交易日17:00自动运行,拉取当日所有股票数据
|
||
|
||
优化版本:
|
||
- 使用 trade_date 参数一次性获取当天所有股票数据
|
||
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
|
||
|
||
用法:
|
||
python3 cron_daily_fetch.py # 获取今日数据
|
||
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
|
||
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
|
||
|
||
Cron配置:
|
||
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
|
||
"""
|
||
|
||
import tushare as ts
|
||
import pandas as pd
|
||
import os
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
import argparse
|
||
|
||
# 配置
|
||
BASE_DIR = Path(__file__).parent
|
||
DATA_DIR = BASE_DIR / 'data'
|
||
DB_FILE = DATA_DIR / 'progress.db'
|
||
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
|
||
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
|
||
|
||
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
|
||
|
||
|
||
def log(msg):
|
||
"""写入日志"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
log_msg = f"[{timestamp}] {msg}"
|
||
print(log_msg)
|
||
|
||
LOG_FILE.parent.mkdir(exist_ok=True)
|
||
with open(LOG_FILE, 'a') as f:
|
||
f.write(log_msg + '\n')
|
||
|
||
|
||
def setup_tushare():
|
||
"""初始化tushare"""
|
||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||
if not token:
|
||
config_file = BASE_DIR / 'config.txt'
|
||
if config_file.exists():
|
||
token = config_file.read_text().strip()
|
||
if not token:
|
||
raise ValueError("缺少 Tushare Token")
|
||
ts.set_token(token)
|
||
return ts.pro_api()
|
||
|
||
|
||
def is_weekday(date_str):
|
||
"""判断是否为工作日(周一到周五)"""
|
||
d = datetime.strptime(date_str, '%Y%m%d')
|
||
return d.weekday() < 5
|
||
|
||
|
||
def get_trade_dates_in_range(start_date, end_date):
|
||
"""获取日期范围内的所有工作日"""
|
||
start = datetime.strptime(start_date, '%Y%m%d')
|
||
end = datetime.strptime(end_date, '%Y%m%d')
|
||
|
||
trade_dates = []
|
||
current = start
|
||
while current <= end:
|
||
if current.weekday() < 5: # 周一到周五
|
||
trade_dates.append(current.strftime('%Y%m%d'))
|
||
current += timedelta(days=1)
|
||
|
||
return trade_dates
|
||
|
||
|
||
def get_latest_trade_date():
|
||
"""获取数据中最新的交易日期"""
|
||
if not MAIN_FILE.exists():
|
||
return None
|
||
|
||
df = pd.read_parquet(MAIN_FILE)
|
||
if len(df) == 0:
|
||
return None
|
||
|
||
return df['trade_date'].max()
|
||
|
||
|
||
def fetch_data_by_date(pro, trade_date):
|
||
"""
|
||
按日期获取当日所有股票数据
|
||
这是优化版本:一次请求获取当天所有股票
|
||
"""
|
||
log(f"获取 {trade_date} 的数据...")
|
||
|
||
try:
|
||
# 一次性获取当天所有股票数据
|
||
df = pro.daily(trade_date=trade_date)
|
||
|
||
if df is None or len(df) == 0:
|
||
log(f" {trade_date} 无数据(可能是非交易日)")
|
||
return 0, None
|
||
|
||
log(f" {trade_date} 获取到 {len(df)} 条记录")
|
||
return len(df), df
|
||
|
||
except Exception as e:
|
||
log(f" {trade_date} 获取失败: {str(e)[:100]}")
|
||
return 0, None
|
||
|
||
|
||
def merge_data(new_df):
|
||
"""合并新数据到主文件"""
|
||
if new_df is None or len(new_df) == 0:
|
||
return
|
||
|
||
if MAIN_FILE.exists():
|
||
existing_df = pd.read_parquet(MAIN_FILE)
|
||
# 合并并去重
|
||
combined = pd.concat([existing_df, new_df], ignore_index=True)
|
||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
|
||
else:
|
||
combined = new_df
|
||
|
||
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
|
||
return len(combined)
|
||
|
||
|
||
def fetch_dates(pro, dates):
|
||
"""获取多个日期的数据"""
|
||
if not dates:
|
||
log("没有需要获取的日期")
|
||
return 0
|
||
|
||
total_new = 0
|
||
all_new_data = []
|
||
|
||
log(f"需要获取 {len(dates)} 个交易日的数据")
|
||
|
||
for i, trade_date in enumerate(dates):
|
||
count, df = fetch_data_by_date(pro, trade_date)
|
||
|
||
if df is not None and len(df) > 0:
|
||
all_new_data.append(df)
|
||
total_new += count
|
||
|
||
# 间隔
|
||
if i < len(dates) - 1:
|
||
time.sleep(REQUEST_INTERVAL)
|
||
|
||
# 一次性合并所有数据
|
||
if all_new_data:
|
||
combined_new = pd.concat(all_new_data, ignore_index=True)
|
||
total_records = merge_data(combined_new)
|
||
log(f"数据合并完成,总记录数: {total_records}")
|
||
|
||
return total_new
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description='A股每日数据拉取(按日期批量获取)')
|
||
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
|
||
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
|
||
args = parser.parse_args()
|
||
|
||
log("=" * 60)
|
||
log(f"A股每日数据拉取开始")
|
||
log("=" * 60)
|
||
|
||
try:
|
||
# 初始化
|
||
pro = setup_tushare()
|
||
|
||
# 确定要获取的日期
|
||
if args.date:
|
||
# 指定日期
|
||
dates = [args.date]
|
||
log(f"模式: 获取指定日期 {args.date}")
|
||
elif args.days > 0:
|
||
# 最近N天
|
||
end_date = datetime.now().strftime('%Y%m%d')
|
||
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
|
||
dates = get_trade_dates_in_range(start_date, end_date)
|
||
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
|
||
log(f"工作日: {dates}")
|
||
else:
|
||
# 默认:获取今日数据
|
||
today = datetime.now().strftime('%Y%m%d')
|
||
|
||
# 如果不是工作日,跳过
|
||
if not is_weekday(today):
|
||
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
|
||
return
|
||
|
||
dates = [today]
|
||
log(f"模式: 获取今日数据 ({today})")
|
||
|
||
# 获取数据
|
||
new_records = fetch_dates(pro, dates)
|
||
|
||
# 显示文件信息
|
||
if MAIN_FILE.exists():
|
||
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
|
||
log(f"数据文件大小: {size_mb:.2f} MB")
|
||
|
||
log(f"本次新增数据: {new_records} 条")
|
||
log("=" * 60)
|
||
log("完成")
|
||
|
||
except Exception as e:
|
||
log(f"错误: {e}")
|
||
import traceback
|
||
log(traceback.format_exc())
|
||
raise
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main() |