4 Commits
v0.1.1 ... main

Author SHA1 Message Date
21bbc9cd1e feat: 优化每日数据拉取,改为按日期批量获取 2026-04-14 09:29:57 +08:00
23860c3f8c feat: V2优化版 - 分文件存储+批量合并+SQLite进度库
优化点:
- 每只股票独立存小文件,避免每次读写203MB大文件
- 每50只股票批量合并一次,减少IO次数
- SQLite进度数据库,更可靠的断点续传
- 请求间隔从5秒降到0.3秒

新增文件:
- fetch_history_v2.py: V2优化版主脚本
- run_v2.sh: 启动脚本
- .gitignore: 添加config.txt和data/*.db忽略
2026-04-09 12:11:04 +08:00
e7a351522a fix: 改为每次请求一支股票,间隔9秒
- 移除批量请求逻辑
- 每次只请求一支股票
- 请求完成后休息9秒再请求下一支
- 显示预估耗时
2026-04-08 19:18:58 +08:00
a2c9b347ca fix: 完善市场代码判断逻辑,增加北交所支持
市场代码规则:
- 6开头 → SH(上海)
- 0、3开头 → SZ(深圳)
- 4、8开头 → BJ(北京)
2026-04-08 19:14:23 +08:00
7 changed files with 3783 additions and 23 deletions

5
.gitignore vendored
View File

@@ -4,9 +4,14 @@ __pycache__/
*.pyo
.env
# Config (contains token)
config.txt
# Data files (large)
data/*.parquet
data/*.csv
data/*.db
data/temp/
!A股股票列表.csv
# Logs

224
cron_daily_fetch.py Normal file
View File

@@ -0,0 +1,224 @@
"""
A股每日数据自动拉取脚本 (Cron专用)
每个交易日17:00自动运行拉取当日所有股票数据
优化版本:
- 使用 trade_date 参数一次性获取当天所有股票数据
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
用法:
python3 cron_daily_fetch.py # 获取今日数据
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
Cron配置
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
"""
import tushare as ts
import pandas as pd
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
DB_FILE = DATA_DIR / 'progress.db'
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
def log(msg):
"""写入日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] {msg}"
print(log_msg)
LOG_FILE.parent.mkdir(exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(log_msg + '\n')
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def is_weekday(date_str):
"""判断是否为工作日(周一到周五)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5
def get_trade_dates_in_range(start_date, end_date):
"""获取日期范围内的所有工作日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_latest_trade_date():
"""获取数据中最新的交易日期"""
if not MAIN_FILE.exists():
return None
df = pd.read_parquet(MAIN_FILE)
if len(df) == 0:
return None
return df['trade_date'].max()
def fetch_data_by_date(pro, trade_date):
"""
按日期获取当日所有股票数据
这是优化版本:一次请求获取当天所有股票
"""
log(f"获取 {trade_date} 的数据...")
try:
# 一次性获取当天所有股票数据
df = pro.daily(trade_date=trade_date)
if df is None or len(df) == 0:
log(f" {trade_date} 无数据(可能是非交易日)")
return 0, None
log(f" {trade_date} 获取到 {len(df)} 条记录")
return len(df), df
except Exception as e:
log(f" {trade_date} 获取失败: {str(e)[:100]}")
return 0, None
def merge_data(new_df):
"""合并新数据到主文件"""
if new_df is None or len(new_df) == 0:
return
if MAIN_FILE.exists():
existing_df = pd.read_parquet(MAIN_FILE)
# 合并并去重
combined = pd.concat([existing_df, new_df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
else:
combined = new_df
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
return len(combined)
def fetch_dates(pro, dates):
"""获取多个日期的数据"""
if not dates:
log("没有需要获取的日期")
return 0
total_new = 0
all_new_data = []
log(f"需要获取 {len(dates)} 个交易日的数据")
for i, trade_date in enumerate(dates):
count, df = fetch_data_by_date(pro, trade_date)
if df is not None and len(df) > 0:
all_new_data.append(df)
total_new += count
# 间隔
if i < len(dates) - 1:
time.sleep(REQUEST_INTERVAL)
# 一次性合并所有数据
if all_new_data:
combined_new = pd.concat(all_new_data, ignore_index=True)
total_records = merge_data(combined_new)
log(f"数据合并完成,总记录数: {total_records}")
return total_new
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='A股每日数据拉取按日期批量获取')
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
args = parser.parse_args()
log("=" * 60)
log(f"A股每日数据拉取开始")
log("=" * 60)
try:
# 初始化
pro = setup_tushare()
# 确定要获取的日期
if args.date:
# 指定日期
dates = [args.date]
log(f"模式: 获取指定日期 {args.date}")
elif args.days > 0:
# 最近N天
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
dates = get_trade_dates_in_range(start_date, end_date)
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
log(f"工作日: {dates}")
else:
# 默认:获取今日数据
today = datetime.now().strftime('%Y%m%d')
# 如果不是工作日,跳过
if not is_weekday(today):
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
return
dates = [today]
log(f"模式: 获取今日数据 ({today})")
# 获取数据
new_records = fetch_dates(pro, dates)
# 显示文件信息
if MAIN_FILE.exists():
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
log(f"数据文件大小: {size_mb:.2f} MB")
log(f"本次新增数据: {new_records}")
log("=" * 60)
log("完成")
except Exception as e:
log(f"错误: {e}")
import traceback
log(traceback.format_exc())
raise
if __name__ == '__main__':
main()

2749
data/completed_stocks.txt Normal file

File diff suppressed because it is too large Load Diff

264
fetch_daily_update.py Normal file
View File

@@ -0,0 +1,264 @@
"""
A股每日增量数据更新脚本
功能:
1. 检查所有股票最新数据日期
2. 补齐缺失的交易日数据
3. 支持单次运行和定时运行
用法:
python3 fetch_daily_update.py # 更新所有股票缺失数据
python3 fetch_daily_update.py --check # 只检查不更新
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
"""
import tushare as ts
import pandas as pd
import sqlite3
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
import argparse
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
TEMP_DIR = DATA_DIR / 'temp'
DB_FILE = DATA_DIR / 'progress.db'
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
def setup_tushare():
"""初始化tushare"""
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def get_latest_dates():
"""获取每只股票的最新数据日期"""
main_file = DATA_DIR / 'stock_daily_data.parquet'
if not main_file.exists():
return {}
df = pd.read_parquet(main_file)
latest = df.groupby('ts_code')['trade_date'].max()
return dict(latest)
def get_all_stock_codes():
"""获取所有股票代码"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
codes = [row[0] for row in cursor.fetchall()]
conn.close()
return codes
def is_trading_day(date_str):
"""判断是否为交易日(简单版:排除周末)"""
d = datetime.strptime(date_str, '%Y%m%d')
return d.weekday() < 5 # 周一到周五
def get_trade_dates(start_date, end_date):
"""获取日期范围内的可能交易日"""
start = datetime.strptime(start_date, '%Y%m%d')
end = datetime.strptime(end_date, '%Y%m%d')
trade_dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
trade_dates.append(current.strftime('%Y%m%d'))
current += timedelta(days=1)
return trade_dates
def get_missing_dates(pro, latest_dates, codes, days=30):
"""找出需要更新的日期范围"""
today = datetime.now().strftime('%Y%m%d')
# 使用简单交易日判断(排除周末)
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
trade_dates = get_trade_dates(start, today)
# 找出每个股票缺失的交易日
missing_info = {}
for ts_code in codes:
latest = latest_dates.get(ts_code, '20100101')
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
if missing:
missing_info[ts_code] = {
'latest': latest,
'missing': missing,
'missing_count': len(missing)
}
return missing_info, trade_dates
def fetch_missing_data(pro, missing_info, batch_size=100):
"""批量获取缺失数据"""
total_stocks = len(missing_info)
print(f"\n需要更新 {total_stocks} 只股票")
# 计算需要获取的日期范围
all_missing_dates = set()
for info in missing_info.values():
all_missing_dates.update(info['missing'])
if not all_missing_dates:
print("没有缺失数据")
return 0
start_date = min(all_missing_dates)
end_date = max(all_missing_dates)
print(f"数据范围: {start_date} ~ {end_date}")
# 批量获取数据tushare支持批量查询
codes = list(missing_info.keys())
total_records = 0
batch_count = 0
main_file = DATA_DIR / 'stock_daily_data.parquet'
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
print(f"\n开始获取增量数据...")
print("-" * 50)
for i, ts_code in enumerate(codes):
try:
latest = missing_info[ts_code]['latest']
# 从最新日期的下一天开始获取
start_fetch = latest
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
if df is not None and len(df) > 0:
# 过滤掉已存在的数据
df = df[df['trade_date'] > latest]
if len(df) > 0:
# 合并到主文件
if existing_df is not None:
combined = pd.concat([existing_df, df], ignore_index=True)
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined.to_parquet(main_file, index=False, compression='snappy')
existing_df = combined
else:
df.to_parquet(main_file, index=False, compression='snappy')
existing_df = df
total_records += len(df)
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
else:
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
batch_count += 1
# 每批次保存一次,减少请求间隔
if batch_count >= batch_size:
batch_count = 0
time.sleep(REQUEST_INTERVAL)
except Exception as e:
print(f"[{i+1}/{total_stocks}] {ts_code}{str(e)[:50]}")
time.sleep(1) # 错误后多等一会
print("-" * 50)
print(f"增量更新完成,新增 {total_records} 条记录")
return total_records
def check_missing_status(pro, days=30):
"""检查缺失状态"""
print("=" * 60)
print("A股数据缺失状态检查")
print("=" * 60)
# 获取现有数据最新日期
latest_dates = get_latest_dates()
codes = get_all_stock_codes()
print(f"已获取数据股票数: {len(codes)}")
# 获取缺失信息
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
# 统计
total_missing = len(missing_info)
if total_missing == 0:
print("\n✓ 所有股票数据都是最新的!")
return {}
# 按缺失天数分组统计
missing_stats = {}
for ts_code, info in missing_info.items():
count = info['missing_count']
if count not in missing_stats:
missing_stats[count] = 0
missing_stats[count] += 1
print(f"\n需要更新的股票: {total_missing}")
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
print("\n缺失天数分布:")
for days_count in sorted(missing_stats.keys()):
print(f"{days_count}天: {missing_stats[days_count]}")
# 显示缺失最多的股票
print("\n缺失最多的10只股票:")
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
for ts_code, info in sorted_missing:
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}")
return missing_info
def main():
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
parser.add_argument('--check', action='store_true', help='只检查不更新')
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
args = parser.parse_args()
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化
pro = setup_tushare()
if args.check:
# 只检查
check_missing_status(pro, args.days)
else:
# 检查并更新
missing_info = check_missing_status(pro, args.days)
if missing_info:
fetch_missing_data(pro, missing_info)
# 再次检查确认
print("\n更新后再次检查...")
check_missing_status(pro, args.days)
# 显示文件大小
main_file = DATA_DIR / 'stock_daily_data.parquet'
if main_file.exists():
size_mb = main_file.stat().st_size / 1024 / 1024
print(f"\n数据文件大小: {size_mb:.2f} MB")
if __name__ == '__main__':
main()

View File

@@ -24,11 +24,8 @@ LOGS_DIR.mkdir(exist_ok=True)
START_DATE = '20100101'
END_DATE = datetime.now().strftime('%Y%m%d')
# 每批次获取的股票数量(tushare限制
BATCH_SIZE = 50
# 请求间隔(秒)- 避免频繁请求
REQUEST_INTERVAL = 0.3
# 请求间隔(秒)- tushare积分限制
REQUEST_INTERVAL = 5
def setup_tushare(token=None):
@@ -67,44 +64,136 @@ def load_stock_list():
def get_stock_codes_with_suffix(df):
"""将股票代码转换为tushare格式添加后缀"""
"""将股票代码转换为tushare格式添加后缀
市场代码规则:
- 6开头 → SH上海
- 0、3开头 → SZ深圳
- 4、8开头 → BJ北京
"""
codes = []
for code in df['code']:
code = str(code).zfill(6) # 补零到6位
if code.startswith('6'):
first_digit = code[0]
if first_digit == '6':
ts_code = f"{code}.SH"
else:
elif first_digit in ('0', '3'):
ts_code = f"{code}.SZ"
elif first_digit in ('4', '8'):
ts_code = f"{code}.BJ"
else:
# 未知市场,默认深圳
ts_code = f"{code}.SZ"
codes.append(ts_code)
return codes
def fetch_daily_data(pro, codes, start_date, end_date):
"""批量获取日线数据"""
all_data = []
"""逐个获取日线数据(每次一支股票),支持断点续传"""
total = len(codes)
for i in range(0, total, BATCH_SIZE):
batch_codes = codes[i:i + BATCH_SIZE]
ts_codes = ','.join(batch_codes)
# 加载已完成的股票列表
completed_file = DATA_DIR / 'completed_stocks.txt'
completed_stocks = set()
if completed_file.exists():
lines = completed_file.read_text().strip().split('\n')
completed_stocks = set(line.strip() for line in lines if line.strip())
print(f"已完成: {len(completed_stocks)} 只股票")
# 统计已有数据
existing_data_file = DATA_DIR / 'stock_daily_data.parquet'
if existing_data_file.exists():
existing_df = pd.read_parquet(existing_data_file)
print(f"已有数据: {len(existing_df)} 条记录")
print(f"\n{total} 只股票,待处理: {total - len(completed_stocks)}")
print(f"预计耗时: {(total - len(completed_stocks)) * REQUEST_INTERVAL / 60:.1f} 分钟")
print("-" * 50)
for i, ts_code in enumerate(codes):
# 跳过已完成的
if ts_code in completed_stocks:
print(f"[{i+1}/{total}] {ts_code} 已完成,跳过")
continue
try:
print(f"获取第 {i+1}-{min(i+BATCH_SIZE, total)} 只股票数据...")
df = pro.daily(ts_code=ts_codes, start_date=start_date, end_date=end_date)
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ', flush=True)
df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
if df is not None and len(df) > 0:
all_data.append(df)
print(f" 成功获取 {len(df)} 条记录")
print(f"成功,{len(df)} 条记录")
# 实时保存单只股票数据
save_single_stock(df, ts_code, completed_file)
else:
print(f" 无数据")
print("无数据")
# 无数据也标记为完成
mark_completed(ts_code, completed_file)
except Exception as e:
print(f" 错误: {e}")
print(f"错误: {e}")
# 出错不标记完成,下次重试
# 避免请求过快
time.sleep(REQUEST_INTERVAL)
# 每次请求后休息
if i < total - 1:
time.sleep(REQUEST_INTERVAL)
return all_data
# 最后合并所有数据
return merge_all_data()
def save_single_stock(df, ts_code, completed_file):
"""保存单只股票数据并标记完成"""
# 读取已有数据
output_file = DATA_DIR / 'stock_daily_data.parquet'
if output_file.exists():
existing_df = pd.read_parquet(output_file)
# 删除该股票的旧数据(如果有)
existing_df = existing_df[existing_df['ts_code'] != ts_code]
# 合并新数据
combined_df = pd.concat([existing_df, df], ignore_index=True)
else:
combined_df = df
# 排序
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
# 保存
combined_df.to_parquet(output_file, index=False)
# 标记完成
mark_completed(ts_code, completed_file)
def merge_all_data():
"""最后合并所有数据(用于返回)"""
output_file = DATA_DIR / 'stock_daily_data.parquet'
if output_file.exists():
return [pd.read_parquet(output_file)]
return []
def save_progress(all_data, ts_code, completed_file):
"""实时保存进度(保留兼容性)"""
# 合并并保存数据
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
# 保存parquet
output_file = DATA_DIR / 'stock_daily_data.parquet'
combined_df.to_parquet(output_file, index=False)
# 标记完成
mark_completed(ts_code, completed_file)
def mark_completed(ts_code, completed_file):
"""标记股票已完成"""
with open(completed_file, 'a') as f:
f.write(ts_code + '\n')
def save_to_parquet(df, filename):
@@ -183,4 +272,4 @@ def main():
if __name__ == '__main__':
main()
main()

393
fetch_history_v2.py Normal file
View File

@@ -0,0 +1,393 @@
"""
A股历史数据获取系统 V2 - 性能优化版
优化点:
1. 分文件存储 - 每只股票单独存小文件,避免每次读写整个大文件
2. 批量合并 - 每100只股票合并一次减少IO次数
3. SQLite进度记录 - 更可靠的断点续传
"""
import tushare as ts
import pandas as pd
import os
import time
import sqlite3
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import threading
# 配置
BASE_DIR = Path(__file__).parent
DATA_DIR = BASE_DIR / 'data'
TEMP_DIR = DATA_DIR / 'temp' # 临时分片目录
LOGS_DIR = BASE_DIR / 'logs'
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
DB_FILE = DATA_DIR / 'progress.db' # SQLite进度数据库
# 创建目录
DATA_DIR.mkdir(exist_ok=True)
TEMP_DIR.mkdir(exist_ok=True)
LOGS_DIR.mkdir(exist_ok=True)
# 时间范围
START_DATE = '20100101'
END_DATE = datetime.now().strftime('%Y%m%d')
# 请求间隔(秒)- tushare积分限制
REQUEST_INTERVAL = 2 # 减少间隔,用轻量存储补偿
# 批量合并阈值
MERGE_BATCH_SIZE = 50 # 每50只股票合并一次
def setup_tushare(token=None):
"""初始化tushare"""
if not token:
token = os.environ.get('TUSHARE_TOKEN', '')
if not token:
config_file = BASE_DIR / 'config.txt'
if config_file.exists():
token = config_file.read_text().strip()
if not token:
raise ValueError("缺少 Tushare Token")
ts.set_token(token)
return ts.pro_api()
def init_progress_db():
"""初始化SQLite进度数据库"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 创建进度表
cursor.execute('''
CREATE TABLE IF NOT EXISTS progress (
ts_code TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
record_count INTEGER DEFAULT 0,
updated_at TEXT,
error_msg TEXT
)
''')
# 创建合并记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS merge_log (
batch_id INTEGER PRIMARY KEY AUTOINCREMENT,
stock_count INTEGER,
merged_at TEXT,
file_size INTEGER
)
''')
conn.commit()
conn.close()
print(f"进度数据库: {DB_FILE}")
def load_stock_list():
"""加载股票列表"""
df = pd.read_csv(STOCK_LIST_FILE)
df.columns = df.columns.str.strip()
print(f"加载股票列表: {len(df)} 只股票")
return df
def get_stock_codes_with_suffix(df):
"""将股票代码转换为tushare格式"""
codes = []
for code in df['code']:
code = str(code).zfill(6)
first_digit = code[0]
if first_digit == '6':
ts_code = f"{code}.SH"
elif first_digit in ('0', '3'):
ts_code = f"{code}.SZ"
elif first_digit in ('4', '8'):
ts_code = f"{code}.BJ"
else:
ts_code = f"{code}.SZ"
codes.append(ts_code)
return codes
def init_stock_progress(codes):
"""初始化所有股票的进度状态"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 批量插入(不存在则插入)
for ts_code in codes:
cursor.execute('''
INSERT OR IGNORE INTO progress (ts_code, status, updated_at)
VALUES (?, 'pending', ?)
''', (ts_code, datetime.now().isoformat()))
conn.commit()
# 统计状态
cursor.execute('SELECT status, COUNT(*) FROM progress GROUP BY status')
stats = cursor.fetchall()
conn.close()
print("\n当前进度状态:")
for status, count in stats:
print(f" {status}: {count}")
return stats
def get_pending_stocks():
"""获取待处理的股票列表"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
SELECT ts_code FROM progress
WHERE status = 'pending' OR status = 'error'
ORDER BY ts_code
''')
pending = [row[0] for row in cursor.fetchall()]
conn.close()
return pending
def save_stock_temp(df, ts_code):
"""保存单只股票到临时文件(极快)"""
temp_file = TEMP_DIR / f"{ts_code.replace('.', '_')}.parquet"
df.to_parquet(temp_file, index=False, compression='snappy')
# 更新进度
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
UPDATE progress
SET status = 'completed', record_count = ?, updated_at = ?
WHERE ts_code = ?
''', (len(df), datetime.now().isoformat(), ts_code))
conn.commit()
conn.close()
return temp_file.stat().st_size
def merge_batch_to_main():
"""将临时文件批量合并到主文件"""
temp_files = list(TEMP_DIR.glob('*.parquet'))
if not temp_files:
return 0
print(f"\n正在合并 {len(temp_files)} 个临时文件...")
# 读取所有临时文件
batch_data = []
for tf in temp_files:
try:
df = pd.read_parquet(tf)
batch_data.append(df)
except Exception as e:
print(f" 警告: 读取 {tf.name} 失败: {e}")
if not batch_data:
return 0
# 合并
new_data = pd.concat(batch_data, ignore_index=True)
# 读取主文件并合并
main_file = DATA_DIR / 'stock_daily_data.parquet'
if main_file.exists():
existing = pd.read_parquet(main_file)
# 获取已合并的股票代码
existing_codes = set(existing['ts_code'].unique())
new_codes = set(new_data['ts_code'].unique())
# 只合并新股票的数据
truly_new = new_data[~new_data['ts_code'].isin(existing_codes)]
if len(truly_new) > 0:
combined = pd.concat([existing, truly_new], ignore_index=True)
else:
combined = existing
else:
combined = new_data
# 排序并保存
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
combined.to_parquet(main_file, index=False, compression='snappy')
# 记录合并日志
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO merge_log (stock_count, merged_at, file_size)
VALUES (?, ?, ?)
''', (len(temp_files), datetime.now().isoformat(), main_file.stat().st_size))
conn.commit()
conn.close()
# 删除临时文件
for tf in temp_files:
tf.unlink()
print(f" 合并完成: {len(new_data)} 条新记录")
print(f" 主文件大小: {main_file.stat().st_size / 1024 / 1024:.2f} MB")
return len(temp_files)
def fetch_stock_data(pro, ts_code):
"""获取单只股票数据"""
try:
df = pro.daily(ts_code=ts_code, start_date=START_DATE, end_date=END_DATE)
if df is not None and len(df) > 0:
# 保存到临时文件
file_size = save_stock_temp(df, ts_code)
return True, len(df), file_size
else:
# 无数据,标记完成
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
UPDATE progress
SET status = 'no_data', updated_at = ?
WHERE ts_code = ?
''', (datetime.now().isoformat(), ts_code))
conn.commit()
conn.close()
return True, 0, 0
except Exception as e:
# 记录错误
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
UPDATE progress
SET status = 'error', error_msg = ?, updated_at = ?
WHERE ts_code = ?
''', (str(e)[:200], datetime.now().isoformat(), ts_code))
conn.commit()
conn.close()
return False, 0, 0
def fetch_all_stocks(pro, codes):
"""获取所有股票数据"""
total = len(codes)
batch_count = 0
print(f"\n开始获取数据...")
print(f"{total} 只股票")
print(f"{MERGE_BATCH_SIZE} 只合并一次")
print("-" * 50)
for i, ts_code in enumerate(codes):
success, records, size = fetch_stock_data(pro, ts_code)
status = "" if success else ""
print(f"[{i+1}/{total}] {ts_code} {status} {records}{size/1024:.1f}KB")
# 批量合并检查
batch_count += 1
if batch_count >= MERGE_BATCH_SIZE:
merge_batch_to_main()
batch_count = 0
# 请求间隔
if i < total - 1:
time.sleep(REQUEST_INTERVAL)
# 最后合并剩余的
if batch_count > 0:
merge_batch_to_main()
print("\n" + "=" * 50)
print("数据获取完成!")
def show_final_stats():
"""显示最终统计"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 状态统计
cursor.execute('SELECT status, COUNT(*) FROM progress GROUP BY status')
stats = cursor.fetchall()
# 记录数统计
cursor.execute('SELECT SUM(record_count) FROM progress WHERE status = "completed"')
total_records = cursor.fetchone()[0] or 0
# 合并历史
cursor.execute('SELECT COUNT(*), SUM(stock_count) FROM merge_log')
merge_stats = cursor.fetchone()
conn.close()
print("\n最终统计:")
print("-" * 30)
for status, count in stats:
print(f" {status}: {count}")
print(f"\n 总记录数: {total_records}")
print(f" 合并批次: {merge_stats[0]}")
# 文件大小
main_file = DATA_DIR / 'stock_daily_data.parquet'
if main_file.exists():
print(f" 主文件大小: {main_file.stat().st_size / 1024 / 1024:.2f} MB")
def main():
"""主函数"""
print("=" * 60)
print("A股历史数据获取系统 V2 - 性能优化版")
print("=" * 60)
print(f"数据时间范围: {START_DATE} ~ {END_DATE}")
print(f"数据保存目录: {DATA_DIR}")
print("=" * 60)
# 初始化
print("\n初始化 Tushare...")
pro = setup_tushare()
print("\n初始化进度数据库...")
init_progress_db()
# 加载股票列表
print("\n加载股票列表...")
stock_df = load_stock_list()
codes = get_stock_codes_with_suffix(stock_df)
# 初始化进度
init_stock_progress(codes)
# 获取待处理股票
pending = get_pending_stocks()
if not pending:
print("\n所有股票已完成!")
show_final_stats()
return
print(f"\n待处理: {len(pending)} 只股票")
print(f"预计耗时: {len(pending) * REQUEST_INTERVAL / 60:.1f} 分钟")
# 开始获取
fetch_all_stocks(pro, pending)
# 显示统计
show_final_stats()
if __name__ == '__main__':
main()

36
run_v2.sh Normal file
View File

@@ -0,0 +1,36 @@
#!/bin/bash
# A股历史数据获取 - V2优化版
cd "$(dirname "$0")"
# 检查配置
if [ ! -f "config.txt" ]; then
echo "请先配置 Tushare Token:"
echo " echo 'your_token' > config.txt"
exit 1
fi
# 检查股票列表
if [ ! -f "A股股票列表.csv" ]; then
echo "缺少股票列表文件: A股股票列表.csv"
exit 1
fi
# 运行
echo "启动 A股历史数据获取 V2..."
python3 fetch_history_v2.py
# 完成后生成汇总文件
if [ -f "data/stock_daily_data.parquet" ]; then
echo "生成CSV汇总文件..."
python3 -c "
import pandas as pd
from pathlib import Path
from datetime import datetime
df = pd.read_parquet('data/stock_daily_data.parquet')
timestamp = datetime.now().strftime('%Y%m%d')
df.to_csv(f'data/A股日线数据_{timestamp}.csv', index=False)
print(f'CSV文件: data/A股日线数据_{timestamp}.csv ({len(df)}条记录)')
"
fi