Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 21bbc9cd1e |
224
cron_daily_fetch.py
Normal file
224
cron_daily_fetch.py
Normal file
@@ -0,0 +1,224 @@
|
||||
"""
|
||||
A股每日数据自动拉取脚本 (Cron专用)
|
||||
每个交易日17:00自动运行,拉取当日所有股票数据
|
||||
|
||||
优化版本:
|
||||
- 使用 trade_date 参数一次性获取当天所有股票数据
|
||||
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
|
||||
|
||||
用法:
|
||||
python3 cron_daily_fetch.py # 获取今日数据
|
||||
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
|
||||
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
|
||||
|
||||
Cron配置:
|
||||
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
|
||||
"""
|
||||
|
||||
import tushare as ts
|
||||
import pandas as pd
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
# 配置
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DATA_DIR = BASE_DIR / 'data'
|
||||
DB_FILE = DATA_DIR / 'progress.db'
|
||||
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
|
||||
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
|
||||
|
||||
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
|
||||
|
||||
|
||||
def log(msg):
|
||||
"""写入日志"""
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
log_msg = f"[{timestamp}] {msg}"
|
||||
print(log_msg)
|
||||
|
||||
LOG_FILE.parent.mkdir(exist_ok=True)
|
||||
with open(LOG_FILE, 'a') as f:
|
||||
f.write(log_msg + '\n')
|
||||
|
||||
|
||||
def setup_tushare():
|
||||
"""初始化tushare"""
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
if not token:
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
if not token:
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
def is_weekday(date_str):
|
||||
"""判断是否为工作日(周一到周五)"""
|
||||
d = datetime.strptime(date_str, '%Y%m%d')
|
||||
return d.weekday() < 5
|
||||
|
||||
|
||||
def get_trade_dates_in_range(start_date, end_date):
|
||||
"""获取日期范围内的所有工作日"""
|
||||
start = datetime.strptime(start_date, '%Y%m%d')
|
||||
end = datetime.strptime(end_date, '%Y%m%d')
|
||||
|
||||
trade_dates = []
|
||||
current = start
|
||||
while current <= end:
|
||||
if current.weekday() < 5: # 周一到周五
|
||||
trade_dates.append(current.strftime('%Y%m%d'))
|
||||
current += timedelta(days=1)
|
||||
|
||||
return trade_dates
|
||||
|
||||
|
||||
def get_latest_trade_date():
|
||||
"""获取数据中最新的交易日期"""
|
||||
if not MAIN_FILE.exists():
|
||||
return None
|
||||
|
||||
df = pd.read_parquet(MAIN_FILE)
|
||||
if len(df) == 0:
|
||||
return None
|
||||
|
||||
return df['trade_date'].max()
|
||||
|
||||
|
||||
def fetch_data_by_date(pro, trade_date):
|
||||
"""
|
||||
按日期获取当日所有股票数据
|
||||
这是优化版本:一次请求获取当天所有股票
|
||||
"""
|
||||
log(f"获取 {trade_date} 的数据...")
|
||||
|
||||
try:
|
||||
# 一次性获取当天所有股票数据
|
||||
df = pro.daily(trade_date=trade_date)
|
||||
|
||||
if df is None or len(df) == 0:
|
||||
log(f" {trade_date} 无数据(可能是非交易日)")
|
||||
return 0, None
|
||||
|
||||
log(f" {trade_date} 获取到 {len(df)} 条记录")
|
||||
return len(df), df
|
||||
|
||||
except Exception as e:
|
||||
log(f" {trade_date} 获取失败: {str(e)[:100]}")
|
||||
return 0, None
|
||||
|
||||
|
||||
def merge_data(new_df):
|
||||
"""合并新数据到主文件"""
|
||||
if new_df is None or len(new_df) == 0:
|
||||
return
|
||||
|
||||
if MAIN_FILE.exists():
|
||||
existing_df = pd.read_parquet(MAIN_FILE)
|
||||
# 合并并去重
|
||||
combined = pd.concat([existing_df, new_df], ignore_index=True)
|
||||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
|
||||
else:
|
||||
combined = new_df
|
||||
|
||||
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
|
||||
return len(combined)
|
||||
|
||||
|
||||
def fetch_dates(pro, dates):
|
||||
"""获取多个日期的数据"""
|
||||
if not dates:
|
||||
log("没有需要获取的日期")
|
||||
return 0
|
||||
|
||||
total_new = 0
|
||||
all_new_data = []
|
||||
|
||||
log(f"需要获取 {len(dates)} 个交易日的数据")
|
||||
|
||||
for i, trade_date in enumerate(dates):
|
||||
count, df = fetch_data_by_date(pro, trade_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
all_new_data.append(df)
|
||||
total_new += count
|
||||
|
||||
# 间隔
|
||||
if i < len(dates) - 1:
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
# 一次性合并所有数据
|
||||
if all_new_data:
|
||||
combined_new = pd.concat(all_new_data, ignore_index=True)
|
||||
total_records = merge_data(combined_new)
|
||||
log(f"数据合并完成,总记录数: {total_records}")
|
||||
|
||||
return total_new
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
parser = argparse.ArgumentParser(description='A股每日数据拉取(按日期批量获取)')
|
||||
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
|
||||
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
|
||||
args = parser.parse_args()
|
||||
|
||||
log("=" * 60)
|
||||
log(f"A股每日数据拉取开始")
|
||||
log("=" * 60)
|
||||
|
||||
try:
|
||||
# 初始化
|
||||
pro = setup_tushare()
|
||||
|
||||
# 确定要获取的日期
|
||||
if args.date:
|
||||
# 指定日期
|
||||
dates = [args.date]
|
||||
log(f"模式: 获取指定日期 {args.date}")
|
||||
elif args.days > 0:
|
||||
# 最近N天
|
||||
end_date = datetime.now().strftime('%Y%m%d')
|
||||
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
|
||||
dates = get_trade_dates_in_range(start_date, end_date)
|
||||
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
|
||||
log(f"工作日: {dates}")
|
||||
else:
|
||||
# 默认:获取今日数据
|
||||
today = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 如果不是工作日,跳过
|
||||
if not is_weekday(today):
|
||||
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
|
||||
return
|
||||
|
||||
dates = [today]
|
||||
log(f"模式: 获取今日数据 ({today})")
|
||||
|
||||
# 获取数据
|
||||
new_records = fetch_dates(pro, dates)
|
||||
|
||||
# 显示文件信息
|
||||
if MAIN_FILE.exists():
|
||||
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
|
||||
log(f"数据文件大小: {size_mb:.2f} MB")
|
||||
|
||||
log(f"本次新增数据: {new_records} 条")
|
||||
log("=" * 60)
|
||||
log("完成")
|
||||
|
||||
except Exception as e:
|
||||
log(f"错误: {e}")
|
||||
import traceback
|
||||
log(traceback.format_exc())
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
2749
data/completed_stocks.txt
Normal file
2749
data/completed_stocks.txt
Normal file
File diff suppressed because it is too large
Load Diff
264
fetch_daily_update.py
Normal file
264
fetch_daily_update.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""
|
||||
A股每日增量数据更新脚本
|
||||
功能:
|
||||
1. 检查所有股票最新数据日期
|
||||
2. 补齐缺失的交易日数据
|
||||
3. 支持单次运行和定时运行
|
||||
|
||||
用法:
|
||||
python3 fetch_daily_update.py # 更新所有股票缺失数据
|
||||
python3 fetch_daily_update.py --check # 只检查不更新
|
||||
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
|
||||
"""
|
||||
|
||||
import tushare as ts
|
||||
import pandas as pd
|
||||
import sqlite3
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
# 配置
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DATA_DIR = BASE_DIR / 'data'
|
||||
TEMP_DIR = DATA_DIR / 'temp'
|
||||
DB_FILE = DATA_DIR / 'progress.db'
|
||||
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
|
||||
|
||||
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
|
||||
|
||||
|
||||
def setup_tushare():
|
||||
"""初始化tushare"""
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
if not token:
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
if not token:
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
def get_latest_dates():
|
||||
"""获取每只股票的最新数据日期"""
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if not main_file.exists():
|
||||
return {}
|
||||
|
||||
df = pd.read_parquet(main_file)
|
||||
latest = df.groupby('ts_code')['trade_date'].max()
|
||||
return dict(latest)
|
||||
|
||||
|
||||
def get_all_stock_codes():
|
||||
"""获取所有股票代码"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
|
||||
codes = [row[0] for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return codes
|
||||
|
||||
|
||||
def is_trading_day(date_str):
|
||||
"""判断是否为交易日(简单版:排除周末)"""
|
||||
d = datetime.strptime(date_str, '%Y%m%d')
|
||||
return d.weekday() < 5 # 周一到周五
|
||||
|
||||
|
||||
def get_trade_dates(start_date, end_date):
|
||||
"""获取日期范围内的可能交易日"""
|
||||
start = datetime.strptime(start_date, '%Y%m%d')
|
||||
end = datetime.strptime(end_date, '%Y%m%d')
|
||||
|
||||
trade_dates = []
|
||||
current = start
|
||||
while current <= end:
|
||||
if current.weekday() < 5: # 周一到周五
|
||||
trade_dates.append(current.strftime('%Y%m%d'))
|
||||
current += timedelta(days=1)
|
||||
|
||||
return trade_dates
|
||||
|
||||
|
||||
def get_missing_dates(pro, latest_dates, codes, days=30):
|
||||
"""找出需要更新的日期范围"""
|
||||
today = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 使用简单交易日判断(排除周末)
|
||||
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
|
||||
trade_dates = get_trade_dates(start, today)
|
||||
|
||||
# 找出每个股票缺失的交易日
|
||||
missing_info = {}
|
||||
for ts_code in codes:
|
||||
latest = latest_dates.get(ts_code, '20100101')
|
||||
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
|
||||
if missing:
|
||||
missing_info[ts_code] = {
|
||||
'latest': latest,
|
||||
'missing': missing,
|
||||
'missing_count': len(missing)
|
||||
}
|
||||
|
||||
return missing_info, trade_dates
|
||||
|
||||
|
||||
def fetch_missing_data(pro, missing_info, batch_size=100):
|
||||
"""批量获取缺失数据"""
|
||||
total_stocks = len(missing_info)
|
||||
print(f"\n需要更新 {total_stocks} 只股票")
|
||||
|
||||
# 计算需要获取的日期范围
|
||||
all_missing_dates = set()
|
||||
for info in missing_info.values():
|
||||
all_missing_dates.update(info['missing'])
|
||||
|
||||
if not all_missing_dates:
|
||||
print("没有缺失数据")
|
||||
return 0
|
||||
|
||||
start_date = min(all_missing_dates)
|
||||
end_date = max(all_missing_dates)
|
||||
print(f"数据范围: {start_date} ~ {end_date}")
|
||||
|
||||
# 批量获取数据(tushare支持批量查询)
|
||||
codes = list(missing_info.keys())
|
||||
total_records = 0
|
||||
batch_count = 0
|
||||
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
|
||||
|
||||
print(f"\n开始获取增量数据...")
|
||||
print("-" * 50)
|
||||
|
||||
for i, ts_code in enumerate(codes):
|
||||
try:
|
||||
latest = missing_info[ts_code]['latest']
|
||||
# 从最新日期的下一天开始获取
|
||||
start_fetch = latest
|
||||
|
||||
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
# 过滤掉已存在的数据
|
||||
df = df[df['trade_date'] > latest]
|
||||
|
||||
if len(df) > 0:
|
||||
# 合并到主文件
|
||||
if existing_df is not None:
|
||||
combined = pd.concat([existing_df, df], ignore_index=True)
|
||||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
combined.to_parquet(main_file, index=False, compression='snappy')
|
||||
existing_df = combined
|
||||
else:
|
||||
df.to_parquet(main_file, index=False, compression='snappy')
|
||||
existing_df = df
|
||||
|
||||
total_records += len(df)
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}条")
|
||||
else:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
|
||||
else:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
|
||||
|
||||
batch_count += 1
|
||||
|
||||
# 每批次保存一次,减少请求间隔
|
||||
if batch_count >= batch_size:
|
||||
batch_count = 0
|
||||
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} ✗ {str(e)[:50]}")
|
||||
time.sleep(1) # 错误后多等一会
|
||||
|
||||
print("-" * 50)
|
||||
print(f"增量更新完成,新增 {total_records} 条记录")
|
||||
|
||||
return total_records
|
||||
|
||||
|
||||
def check_missing_status(pro, days=30):
|
||||
"""检查缺失状态"""
|
||||
print("=" * 60)
|
||||
print("A股数据缺失状态检查")
|
||||
print("=" * 60)
|
||||
|
||||
# 获取现有数据最新日期
|
||||
latest_dates = get_latest_dates()
|
||||
codes = get_all_stock_codes()
|
||||
|
||||
print(f"已获取数据股票数: {len(codes)}")
|
||||
|
||||
# 获取缺失信息
|
||||
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
|
||||
|
||||
# 统计
|
||||
total_missing = len(missing_info)
|
||||
if total_missing == 0:
|
||||
print("\n✓ 所有股票数据都是最新的!")
|
||||
return {}
|
||||
|
||||
# 按缺失天数分组统计
|
||||
missing_stats = {}
|
||||
for ts_code, info in missing_info.items():
|
||||
count = info['missing_count']
|
||||
if count not in missing_stats:
|
||||
missing_stats[count] = 0
|
||||
missing_stats[count] += 1
|
||||
|
||||
print(f"\n需要更新的股票: {total_missing} 只")
|
||||
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
|
||||
|
||||
print("\n缺失天数分布:")
|
||||
for days_count in sorted(missing_stats.keys()):
|
||||
print(f" 缺{days_count}天: {missing_stats[days_count]} 只")
|
||||
|
||||
# 显示缺失最多的股票
|
||||
print("\n缺失最多的10只股票:")
|
||||
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
|
||||
for ts_code, info in sorted_missing:
|
||||
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}天")
|
||||
|
||||
return missing_info
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
|
||||
parser.add_argument('--check', action='store_true', help='只检查不更新')
|
||||
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# 初始化
|
||||
pro = setup_tushare()
|
||||
|
||||
if args.check:
|
||||
# 只检查
|
||||
check_missing_status(pro, args.days)
|
||||
else:
|
||||
# 检查并更新
|
||||
missing_info = check_missing_status(pro, args.days)
|
||||
if missing_info:
|
||||
fetch_missing_data(pro, missing_info)
|
||||
# 再次检查确认
|
||||
print("\n更新后再次检查...")
|
||||
check_missing_status(pro, args.days)
|
||||
|
||||
# 显示文件大小
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if main_file.exists():
|
||||
size_mb = main_file.stat().st_size / 1024 / 1024
|
||||
print(f"\n数据文件大小: {size_mb:.2f} MB")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -25,7 +25,7 @@ START_DATE = '20100101'
|
||||
END_DATE = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 请求间隔(秒)- tushare积分限制
|
||||
REQUEST_INTERVAL = 9
|
||||
REQUEST_INTERVAL = 5
|
||||
|
||||
|
||||
def setup_tushare(token=None):
|
||||
@@ -91,34 +91,109 @@ def get_stock_codes_with_suffix(df):
|
||||
|
||||
|
||||
def fetch_daily_data(pro, codes, start_date, end_date):
|
||||
"""逐个获取日线数据(每次一支股票)"""
|
||||
all_data = []
|
||||
"""逐个获取日线数据(每次一支股票),支持断点续传"""
|
||||
total = len(codes)
|
||||
|
||||
print(f"\n共 {total} 只股票需要获取")
|
||||
print(f"预计耗时: {total * REQUEST_INTERVAL / 60:.1f} 分钟")
|
||||
# 加载已完成的股票列表
|
||||
completed_file = DATA_DIR / 'completed_stocks.txt'
|
||||
completed_stocks = set()
|
||||
if completed_file.exists():
|
||||
lines = completed_file.read_text().strip().split('\n')
|
||||
completed_stocks = set(line.strip() for line in lines if line.strip())
|
||||
print(f"已完成: {len(completed_stocks)} 只股票")
|
||||
|
||||
# 统计已有数据
|
||||
existing_data_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if existing_data_file.exists():
|
||||
existing_df = pd.read_parquet(existing_data_file)
|
||||
print(f"已有数据: {len(existing_df)} 条记录")
|
||||
|
||||
print(f"\n共 {total} 只股票,待处理: {total - len(completed_stocks)} 只")
|
||||
print(f"预计耗时: {(total - len(completed_stocks)) * REQUEST_INTERVAL / 60:.1f} 分钟")
|
||||
print("-" * 50)
|
||||
|
||||
for i, ts_code in enumerate(codes):
|
||||
# 跳过已完成的
|
||||
if ts_code in completed_stocks:
|
||||
print(f"[{i+1}/{total}] {ts_code} 已完成,跳过")
|
||||
continue
|
||||
|
||||
try:
|
||||
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ')
|
||||
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ', flush=True)
|
||||
|
||||
df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
all_data.append(df)
|
||||
print(f"成功,{len(df)} 条记录")
|
||||
# 实时保存单只股票数据
|
||||
save_single_stock(df, ts_code, completed_file)
|
||||
else:
|
||||
print("无数据")
|
||||
# 无数据也标记为完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
except Exception as e:
|
||||
print(f"错误: {e}")
|
||||
# 出错不标记完成,下次重试
|
||||
|
||||
# 每次请求后休息9秒
|
||||
if i < total - 1: # 最后一个不需要等待
|
||||
# 每次请求后休息
|
||||
if i < total - 1:
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
return all_data
|
||||
# 最后合并所有数据
|
||||
return merge_all_data()
|
||||
|
||||
|
||||
def save_single_stock(df, ts_code, completed_file):
|
||||
"""保存单只股票数据并标记完成"""
|
||||
# 读取已有数据
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
|
||||
if output_file.exists():
|
||||
existing_df = pd.read_parquet(output_file)
|
||||
# 删除该股票的旧数据(如果有)
|
||||
existing_df = existing_df[existing_df['ts_code'] != ts_code]
|
||||
# 合并新数据
|
||||
combined_df = pd.concat([existing_df, df], ignore_index=True)
|
||||
else:
|
||||
combined_df = df
|
||||
|
||||
# 排序
|
||||
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
|
||||
# 保存
|
||||
combined_df.to_parquet(output_file, index=False)
|
||||
|
||||
# 标记完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
|
||||
def merge_all_data():
|
||||
"""最后合并所有数据(用于返回)"""
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if output_file.exists():
|
||||
return [pd.read_parquet(output_file)]
|
||||
return []
|
||||
|
||||
|
||||
def save_progress(all_data, ts_code, completed_file):
|
||||
"""实时保存进度(保留兼容性)"""
|
||||
# 合并并保存数据
|
||||
combined_df = pd.concat(all_data, ignore_index=True)
|
||||
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
|
||||
# 保存parquet
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
combined_df.to_parquet(output_file, index=False)
|
||||
|
||||
# 标记完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
|
||||
def mark_completed(ts_code, completed_file):
|
||||
"""标记股票已完成"""
|
||||
with open(completed_file, 'a') as f:
|
||||
f.write(ts_code + '\n')
|
||||
|
||||
|
||||
def save_to_parquet(df, filename):
|
||||
@@ -197,4 +272,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user