Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 21bbc9cd1e | |||
| 23860c3f8c | |||
| e7a351522a | |||
| a2c9b347ca | |||
| 2857d8febb |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -4,9 +4,14 @@ __pycache__/
|
||||
*.pyo
|
||||
.env
|
||||
|
||||
# Config (contains token)
|
||||
config.txt
|
||||
|
||||
# Data files (large)
|
||||
data/*.parquet
|
||||
data/*.csv
|
||||
data/*.db
|
||||
data/temp/
|
||||
!A股股票列表.csv
|
||||
|
||||
# Logs
|
||||
|
||||
23
README.md
23
README.md
@@ -15,15 +15,32 @@ pip install -r requirements.txt
|
||||
|
||||
## Tushare Token
|
||||
|
||||
**必须先设置Token才能运行!**
|
||||
|
||||
需要在 tushare.pro 注册并获取token。
|
||||
|
||||
设置方式:
|
||||
1. 注册账号:https://tushare.pro/register
|
||||
2. 获取token后在代码中设置,或设置环境变量:
|
||||
设置方式(三选一):
|
||||
|
||||
### 方式1:环境变量(推荐)
|
||||
```bash
|
||||
export TUSHARE_TOKEN=your_token_here
|
||||
python fetch_history.py
|
||||
```
|
||||
|
||||
### 方式2:配置文件
|
||||
```bash
|
||||
echo 'your_token_here' > config.txt
|
||||
python fetch_history.py
|
||||
```
|
||||
|
||||
### 方式3:代码中直接设置
|
||||
```python
|
||||
# 编辑 fetch_history.py,在 main() 函数中修改
|
||||
pro = setup_tushare(token='your_token_here')
|
||||
```
|
||||
|
||||
**注册地址**:https://tushare.pro/register
|
||||
|
||||
## 运行
|
||||
|
||||
```bash
|
||||
|
||||
224
cron_daily_fetch.py
Normal file
224
cron_daily_fetch.py
Normal file
@@ -0,0 +1,224 @@
|
||||
"""
|
||||
A股每日数据自动拉取脚本 (Cron专用)
|
||||
每个交易日17:00自动运行,拉取当日所有股票数据
|
||||
|
||||
优化版本:
|
||||
- 使用 trade_date 参数一次性获取当天所有股票数据
|
||||
- 大幅减少 API 请求次数(从 5000+ 次减少到每天 1 次)
|
||||
|
||||
用法:
|
||||
python3 cron_daily_fetch.py # 获取今日数据
|
||||
python3 cron_daily_fetch.py --days 5 # 获取最近5天数据
|
||||
python3 cron_daily_fetch.py --date 20260408 # 获取指定日期数据
|
||||
|
||||
Cron配置:
|
||||
0 17 * * 1-5 python3 /home/xian/.openclaw/common/stock_system/cron_daily_fetch.py
|
||||
"""
|
||||
|
||||
import tushare as ts
|
||||
import pandas as pd
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
# 配置
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DATA_DIR = BASE_DIR / 'data'
|
||||
DB_FILE = DATA_DIR / 'progress.db'
|
||||
LOG_FILE = BASE_DIR / 'logs' / 'daily_fetch.log'
|
||||
MAIN_FILE = DATA_DIR / 'stock_daily_data.parquet'
|
||||
|
||||
REQUEST_INTERVAL = 0.5 # 每次请求间隔(按日期获取次数少,可以慢一点也没关系)
|
||||
|
||||
|
||||
def log(msg):
|
||||
"""写入日志"""
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
log_msg = f"[{timestamp}] {msg}"
|
||||
print(log_msg)
|
||||
|
||||
LOG_FILE.parent.mkdir(exist_ok=True)
|
||||
with open(LOG_FILE, 'a') as f:
|
||||
f.write(log_msg + '\n')
|
||||
|
||||
|
||||
def setup_tushare():
|
||||
"""初始化tushare"""
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
if not token:
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
if not token:
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
def is_weekday(date_str):
|
||||
"""判断是否为工作日(周一到周五)"""
|
||||
d = datetime.strptime(date_str, '%Y%m%d')
|
||||
return d.weekday() < 5
|
||||
|
||||
|
||||
def get_trade_dates_in_range(start_date, end_date):
|
||||
"""获取日期范围内的所有工作日"""
|
||||
start = datetime.strptime(start_date, '%Y%m%d')
|
||||
end = datetime.strptime(end_date, '%Y%m%d')
|
||||
|
||||
trade_dates = []
|
||||
current = start
|
||||
while current <= end:
|
||||
if current.weekday() < 5: # 周一到周五
|
||||
trade_dates.append(current.strftime('%Y%m%d'))
|
||||
current += timedelta(days=1)
|
||||
|
||||
return trade_dates
|
||||
|
||||
|
||||
def get_latest_trade_date():
|
||||
"""获取数据中最新的交易日期"""
|
||||
if not MAIN_FILE.exists():
|
||||
return None
|
||||
|
||||
df = pd.read_parquet(MAIN_FILE)
|
||||
if len(df) == 0:
|
||||
return None
|
||||
|
||||
return df['trade_date'].max()
|
||||
|
||||
|
||||
def fetch_data_by_date(pro, trade_date):
|
||||
"""
|
||||
按日期获取当日所有股票数据
|
||||
这是优化版本:一次请求获取当天所有股票
|
||||
"""
|
||||
log(f"获取 {trade_date} 的数据...")
|
||||
|
||||
try:
|
||||
# 一次性获取当天所有股票数据
|
||||
df = pro.daily(trade_date=trade_date)
|
||||
|
||||
if df is None or len(df) == 0:
|
||||
log(f" {trade_date} 无数据(可能是非交易日)")
|
||||
return 0, None
|
||||
|
||||
log(f" {trade_date} 获取到 {len(df)} 条记录")
|
||||
return len(df), df
|
||||
|
||||
except Exception as e:
|
||||
log(f" {trade_date} 获取失败: {str(e)[:100]}")
|
||||
return 0, None
|
||||
|
||||
|
||||
def merge_data(new_df):
|
||||
"""合并新数据到主文件"""
|
||||
if new_df is None or len(new_df) == 0:
|
||||
return
|
||||
|
||||
if MAIN_FILE.exists():
|
||||
existing_df = pd.read_parquet(MAIN_FILE)
|
||||
# 合并并去重
|
||||
combined = pd.concat([existing_df, new_df], ignore_index=True)
|
||||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
combined = combined.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
|
||||
else:
|
||||
combined = new_df
|
||||
|
||||
combined.to_parquet(MAIN_FILE, index=False, compression='snappy')
|
||||
return len(combined)
|
||||
|
||||
|
||||
def fetch_dates(pro, dates):
|
||||
"""获取多个日期的数据"""
|
||||
if not dates:
|
||||
log("没有需要获取的日期")
|
||||
return 0
|
||||
|
||||
total_new = 0
|
||||
all_new_data = []
|
||||
|
||||
log(f"需要获取 {len(dates)} 个交易日的数据")
|
||||
|
||||
for i, trade_date in enumerate(dates):
|
||||
count, df = fetch_data_by_date(pro, trade_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
all_new_data.append(df)
|
||||
total_new += count
|
||||
|
||||
# 间隔
|
||||
if i < len(dates) - 1:
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
# 一次性合并所有数据
|
||||
if all_new_data:
|
||||
combined_new = pd.concat(all_new_data, ignore_index=True)
|
||||
total_records = merge_data(combined_new)
|
||||
log(f"数据合并完成,总记录数: {total_records}")
|
||||
|
||||
return total_new
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
parser = argparse.ArgumentParser(description='A股每日数据拉取(按日期批量获取)')
|
||||
parser.add_argument('--date', type=str, help='获取指定日期数据 (格式: 20260408)')
|
||||
parser.add_argument('--days', type=int, default=0, help='获取最近N天数据')
|
||||
args = parser.parse_args()
|
||||
|
||||
log("=" * 60)
|
||||
log(f"A股每日数据拉取开始")
|
||||
log("=" * 60)
|
||||
|
||||
try:
|
||||
# 初始化
|
||||
pro = setup_tushare()
|
||||
|
||||
# 确定要获取的日期
|
||||
if args.date:
|
||||
# 指定日期
|
||||
dates = [args.date]
|
||||
log(f"模式: 获取指定日期 {args.date}")
|
||||
elif args.days > 0:
|
||||
# 最近N天
|
||||
end_date = datetime.now().strftime('%Y%m%d')
|
||||
start_date = (datetime.now() - timedelta(days=args.days-1)).strftime('%Y%m%d')
|
||||
dates = get_trade_dates_in_range(start_date, end_date)
|
||||
log(f"模式: 获取最近 {args.days} 天数据 ({start_date} ~ {end_date})")
|
||||
log(f"工作日: {dates}")
|
||||
else:
|
||||
# 默认:获取今日数据
|
||||
today = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 如果不是工作日,跳过
|
||||
if not is_weekday(today):
|
||||
log(f"今天({datetime.now().strftime('%A')})不是工作日,跳过")
|
||||
return
|
||||
|
||||
dates = [today]
|
||||
log(f"模式: 获取今日数据 ({today})")
|
||||
|
||||
# 获取数据
|
||||
new_records = fetch_dates(pro, dates)
|
||||
|
||||
# 显示文件信息
|
||||
if MAIN_FILE.exists():
|
||||
size_mb = MAIN_FILE.stat().st_size / 1024 / 1024
|
||||
log(f"数据文件大小: {size_mb:.2f} MB")
|
||||
|
||||
log(f"本次新增数据: {new_records} 条")
|
||||
log("=" * 60)
|
||||
log("完成")
|
||||
|
||||
except Exception as e:
|
||||
log(f"错误: {e}")
|
||||
import traceback
|
||||
log(traceback.format_exc())
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
2749
data/completed_stocks.txt
Normal file
2749
data/completed_stocks.txt
Normal file
File diff suppressed because it is too large
Load Diff
264
fetch_daily_update.py
Normal file
264
fetch_daily_update.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""
|
||||
A股每日增量数据更新脚本
|
||||
功能:
|
||||
1. 检查所有股票最新数据日期
|
||||
2. 补齐缺失的交易日数据
|
||||
3. 支持单次运行和定时运行
|
||||
|
||||
用法:
|
||||
python3 fetch_daily_update.py # 更新所有股票缺失数据
|
||||
python3 fetch_daily_update.py --check # 只检查不更新
|
||||
python3 fetch_daily_update.py --days 5 # 补最近5天的数据
|
||||
"""
|
||||
|
||||
import tushare as ts
|
||||
import pandas as pd
|
||||
import sqlite3
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
# 配置
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DATA_DIR = BASE_DIR / 'data'
|
||||
TEMP_DIR = DATA_DIR / 'temp'
|
||||
DB_FILE = DATA_DIR / 'progress.db'
|
||||
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
|
||||
|
||||
REQUEST_INTERVAL = 0.3 # 增量数据请求间隔更短
|
||||
|
||||
|
||||
def setup_tushare():
|
||||
"""初始化tushare"""
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
if not token:
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
if not token:
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
def get_latest_dates():
|
||||
"""获取每只股票的最新数据日期"""
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if not main_file.exists():
|
||||
return {}
|
||||
|
||||
df = pd.read_parquet(main_file)
|
||||
latest = df.groupby('ts_code')['trade_date'].max()
|
||||
return dict(latest)
|
||||
|
||||
|
||||
def get_all_stock_codes():
|
||||
"""获取所有股票代码"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT ts_code FROM progress WHERE status = 'completed'")
|
||||
codes = [row[0] for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return codes
|
||||
|
||||
|
||||
def is_trading_day(date_str):
|
||||
"""判断是否为交易日(简单版:排除周末)"""
|
||||
d = datetime.strptime(date_str, '%Y%m%d')
|
||||
return d.weekday() < 5 # 周一到周五
|
||||
|
||||
|
||||
def get_trade_dates(start_date, end_date):
|
||||
"""获取日期范围内的可能交易日"""
|
||||
start = datetime.strptime(start_date, '%Y%m%d')
|
||||
end = datetime.strptime(end_date, '%Y%m%d')
|
||||
|
||||
trade_dates = []
|
||||
current = start
|
||||
while current <= end:
|
||||
if current.weekday() < 5: # 周一到周五
|
||||
trade_dates.append(current.strftime('%Y%m%d'))
|
||||
current += timedelta(days=1)
|
||||
|
||||
return trade_dates
|
||||
|
||||
|
||||
def get_missing_dates(pro, latest_dates, codes, days=30):
|
||||
"""找出需要更新的日期范围"""
|
||||
today = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 使用简单交易日判断(排除周末)
|
||||
start = (datetime.now() - timedelta(days=days)).strftime('%Y%m%d')
|
||||
trade_dates = get_trade_dates(start, today)
|
||||
|
||||
# 找出每个股票缺失的交易日
|
||||
missing_info = {}
|
||||
for ts_code in codes:
|
||||
latest = latest_dates.get(ts_code, '20100101')
|
||||
missing = [d for d in trade_dates if d > latest and is_trading_day(d)]
|
||||
if missing:
|
||||
missing_info[ts_code] = {
|
||||
'latest': latest,
|
||||
'missing': missing,
|
||||
'missing_count': len(missing)
|
||||
}
|
||||
|
||||
return missing_info, trade_dates
|
||||
|
||||
|
||||
def fetch_missing_data(pro, missing_info, batch_size=100):
|
||||
"""批量获取缺失数据"""
|
||||
total_stocks = len(missing_info)
|
||||
print(f"\n需要更新 {total_stocks} 只股票")
|
||||
|
||||
# 计算需要获取的日期范围
|
||||
all_missing_dates = set()
|
||||
for info in missing_info.values():
|
||||
all_missing_dates.update(info['missing'])
|
||||
|
||||
if not all_missing_dates:
|
||||
print("没有缺失数据")
|
||||
return 0
|
||||
|
||||
start_date = min(all_missing_dates)
|
||||
end_date = max(all_missing_dates)
|
||||
print(f"数据范围: {start_date} ~ {end_date}")
|
||||
|
||||
# 批量获取数据(tushare支持批量查询)
|
||||
codes = list(missing_info.keys())
|
||||
total_records = 0
|
||||
batch_count = 0
|
||||
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
existing_df = pd.read_parquet(main_file) if main_file.exists() else None
|
||||
|
||||
print(f"\n开始获取增量数据...")
|
||||
print("-" * 50)
|
||||
|
||||
for i, ts_code in enumerate(codes):
|
||||
try:
|
||||
latest = missing_info[ts_code]['latest']
|
||||
# 从最新日期的下一天开始获取
|
||||
start_fetch = latest
|
||||
|
||||
df = pro.daily(ts_code=ts_code, start_date=start_fetch, end_date=end_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
# 过滤掉已存在的数据
|
||||
df = df[df['trade_date'] > latest]
|
||||
|
||||
if len(df) > 0:
|
||||
# 合并到主文件
|
||||
if existing_df is not None:
|
||||
combined = pd.concat([existing_df, df], ignore_index=True)
|
||||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
combined.to_parquet(main_file, index=False, compression='snappy')
|
||||
existing_df = combined
|
||||
else:
|
||||
df.to_parquet(main_file, index=False, compression='snappy')
|
||||
existing_df = df
|
||||
|
||||
total_records += len(df)
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} ✓ +{len(df)}条")
|
||||
else:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无新数据")
|
||||
else:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} - 无数据")
|
||||
|
||||
batch_count += 1
|
||||
|
||||
# 每批次保存一次,减少请求间隔
|
||||
if batch_count >= batch_size:
|
||||
batch_count = 0
|
||||
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[{i+1}/{total_stocks}] {ts_code} ✗ {str(e)[:50]}")
|
||||
time.sleep(1) # 错误后多等一会
|
||||
|
||||
print("-" * 50)
|
||||
print(f"增量更新完成,新增 {total_records} 条记录")
|
||||
|
||||
return total_records
|
||||
|
||||
|
||||
def check_missing_status(pro, days=30):
|
||||
"""检查缺失状态"""
|
||||
print("=" * 60)
|
||||
print("A股数据缺失状态检查")
|
||||
print("=" * 60)
|
||||
|
||||
# 获取现有数据最新日期
|
||||
latest_dates = get_latest_dates()
|
||||
codes = get_all_stock_codes()
|
||||
|
||||
print(f"已获取数据股票数: {len(codes)}")
|
||||
|
||||
# 获取缺失信息
|
||||
missing_info, trade_dates = get_missing_dates(pro, latest_dates, codes, days)
|
||||
|
||||
# 统计
|
||||
total_missing = len(missing_info)
|
||||
if total_missing == 0:
|
||||
print("\n✓ 所有股票数据都是最新的!")
|
||||
return {}
|
||||
|
||||
# 按缺失天数分组统计
|
||||
missing_stats = {}
|
||||
for ts_code, info in missing_info.items():
|
||||
count = info['missing_count']
|
||||
if count not in missing_stats:
|
||||
missing_stats[count] = 0
|
||||
missing_stats[count] += 1
|
||||
|
||||
print(f"\n需要更新的股票: {total_missing} 只")
|
||||
print(f"最近交易日: {trade_dates[-5:] if len(trade_dates) >= 5 else trade_dates}")
|
||||
|
||||
print("\n缺失天数分布:")
|
||||
for days_count in sorted(missing_stats.keys()):
|
||||
print(f" 缺{days_count}天: {missing_stats[days_count]} 只")
|
||||
|
||||
# 显示缺失最多的股票
|
||||
print("\n缺失最多的10只股票:")
|
||||
sorted_missing = sorted(missing_info.items(), key=lambda x: x[1]['missing_count'], reverse=True)[:10]
|
||||
for ts_code, info in sorted_missing:
|
||||
print(f" {ts_code}: 最新{info['latest']}, 缺{info['missing_count']}天")
|
||||
|
||||
return missing_info
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='A股每日增量数据更新')
|
||||
parser.add_argument('--check', action='store_true', help='只检查不更新')
|
||||
parser.add_argument('--days', type=int, default=30, help='检查最近多少天')
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
# 初始化
|
||||
pro = setup_tushare()
|
||||
|
||||
if args.check:
|
||||
# 只检查
|
||||
check_missing_status(pro, args.days)
|
||||
else:
|
||||
# 检查并更新
|
||||
missing_info = check_missing_status(pro, args.days)
|
||||
if missing_info:
|
||||
fetch_missing_data(pro, missing_info)
|
||||
# 再次检查确认
|
||||
print("\n更新后再次检查...")
|
||||
check_missing_status(pro, args.days)
|
||||
|
||||
# 显示文件大小
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if main_file.exists():
|
||||
size_mb = main_file.stat().st_size / 1024 / 1024
|
||||
print(f"\n数据文件大小: {size_mb:.2f} MB")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
158
fetch_history.py
158
fetch_history.py
@@ -24,17 +24,33 @@ LOGS_DIR.mkdir(exist_ok=True)
|
||||
START_DATE = '20100101'
|
||||
END_DATE = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 每批次获取的股票数量(tushare限制)
|
||||
BATCH_SIZE = 50
|
||||
|
||||
# 请求间隔(秒)- 避免频繁请求
|
||||
REQUEST_INTERVAL = 0.3
|
||||
# 请求间隔(秒)- tushare积分限制
|
||||
REQUEST_INTERVAL = 5
|
||||
|
||||
|
||||
def setup_tushare(token=None):
|
||||
"""初始化tushare"""
|
||||
if token:
|
||||
ts.set_token(token)
|
||||
import os
|
||||
|
||||
# 优先级:参数 > 环境变量 > 配置文件
|
||||
if not token:
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
|
||||
if not token:
|
||||
# 尝试从配置文件读取
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
|
||||
if not token:
|
||||
print("错误:未设置 Tushare Token!")
|
||||
print("请通过以下方式之一设置:")
|
||||
print(" 1. 设置环境变量: export TUSHARE_TOKEN=your_token")
|
||||
print(" 2. 创建配置文件: echo 'your_token' > config.txt")
|
||||
print(" 3. 注册地址: https://tushare.pro/register")
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
@@ -48,44 +64,136 @@ def load_stock_list():
|
||||
|
||||
|
||||
def get_stock_codes_with_suffix(df):
|
||||
"""将股票代码转换为tushare格式(添加后缀)"""
|
||||
"""将股票代码转换为tushare格式(添加后缀)
|
||||
|
||||
市场代码规则:
|
||||
- 6开头 → SH(上海)
|
||||
- 0、3开头 → SZ(深圳)
|
||||
- 4、8开头 → BJ(北京)
|
||||
"""
|
||||
codes = []
|
||||
for code in df['code']:
|
||||
code = str(code).zfill(6) # 补零到6位
|
||||
if code.startswith('6'):
|
||||
first_digit = code[0]
|
||||
|
||||
if first_digit == '6':
|
||||
ts_code = f"{code}.SH"
|
||||
else:
|
||||
elif first_digit in ('0', '3'):
|
||||
ts_code = f"{code}.SZ"
|
||||
elif first_digit in ('4', '8'):
|
||||
ts_code = f"{code}.BJ"
|
||||
else:
|
||||
# 未知市场,默认深圳
|
||||
ts_code = f"{code}.SZ"
|
||||
|
||||
codes.append(ts_code)
|
||||
return codes
|
||||
|
||||
|
||||
def fetch_daily_data(pro, codes, start_date, end_date):
|
||||
"""批量获取日线数据"""
|
||||
all_data = []
|
||||
"""逐个获取日线数据(每次一支股票),支持断点续传"""
|
||||
total = len(codes)
|
||||
|
||||
for i in range(0, total, BATCH_SIZE):
|
||||
batch_codes = codes[i:i + BATCH_SIZE]
|
||||
ts_codes = ','.join(batch_codes)
|
||||
# 加载已完成的股票列表
|
||||
completed_file = DATA_DIR / 'completed_stocks.txt'
|
||||
completed_stocks = set()
|
||||
if completed_file.exists():
|
||||
lines = completed_file.read_text().strip().split('\n')
|
||||
completed_stocks = set(line.strip() for line in lines if line.strip())
|
||||
print(f"已完成: {len(completed_stocks)} 只股票")
|
||||
|
||||
# 统计已有数据
|
||||
existing_data_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if existing_data_file.exists():
|
||||
existing_df = pd.read_parquet(existing_data_file)
|
||||
print(f"已有数据: {len(existing_df)} 条记录")
|
||||
|
||||
print(f"\n共 {total} 只股票,待处理: {total - len(completed_stocks)} 只")
|
||||
print(f"预计耗时: {(total - len(completed_stocks)) * REQUEST_INTERVAL / 60:.1f} 分钟")
|
||||
print("-" * 50)
|
||||
|
||||
for i, ts_code in enumerate(codes):
|
||||
# 跳过已完成的
|
||||
if ts_code in completed_stocks:
|
||||
print(f"[{i+1}/{total}] {ts_code} 已完成,跳过")
|
||||
continue
|
||||
|
||||
try:
|
||||
print(f"获取第 {i+1}-{min(i+BATCH_SIZE, total)} 只股票数据...")
|
||||
df = pro.daily(ts_code=ts_codes, start_date=start_date, end_date=end_date)
|
||||
print(f"[{i+1}/{total}] 获取 {ts_code}...", end=' ', flush=True)
|
||||
|
||||
df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
all_data.append(df)
|
||||
print(f" 成功获取 {len(df)} 条记录")
|
||||
print(f"成功,{len(df)} 条记录")
|
||||
# 实时保存单只股票数据
|
||||
save_single_stock(df, ts_code, completed_file)
|
||||
else:
|
||||
print(f" 无数据")
|
||||
print("无数据")
|
||||
# 无数据也标记为完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
except Exception as e:
|
||||
print(f" 错误: {e}")
|
||||
print(f"错误: {e}")
|
||||
# 出错不标记完成,下次重试
|
||||
|
||||
# 避免请求过快
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
# 每次请求后休息
|
||||
if i < total - 1:
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
return all_data
|
||||
# 最后合并所有数据
|
||||
return merge_all_data()
|
||||
|
||||
|
||||
def save_single_stock(df, ts_code, completed_file):
|
||||
"""保存单只股票数据并标记完成"""
|
||||
# 读取已有数据
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
|
||||
if output_file.exists():
|
||||
existing_df = pd.read_parquet(output_file)
|
||||
# 删除该股票的旧数据(如果有)
|
||||
existing_df = existing_df[existing_df['ts_code'] != ts_code]
|
||||
# 合并新数据
|
||||
combined_df = pd.concat([existing_df, df], ignore_index=True)
|
||||
else:
|
||||
combined_df = df
|
||||
|
||||
# 排序
|
||||
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
|
||||
# 保存
|
||||
combined_df.to_parquet(output_file, index=False)
|
||||
|
||||
# 标记完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
|
||||
def merge_all_data():
|
||||
"""最后合并所有数据(用于返回)"""
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if output_file.exists():
|
||||
return [pd.read_parquet(output_file)]
|
||||
return []
|
||||
|
||||
|
||||
def save_progress(all_data, ts_code, completed_file):
|
||||
"""实时保存进度(保留兼容性)"""
|
||||
# 合并并保存数据
|
||||
combined_df = pd.concat(all_data, ignore_index=True)
|
||||
combined_df = combined_df.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
|
||||
# 保存parquet
|
||||
output_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
combined_df.to_parquet(output_file, index=False)
|
||||
|
||||
# 标记完成
|
||||
mark_completed(ts_code, completed_file)
|
||||
|
||||
|
||||
def mark_completed(ts_code, completed_file):
|
||||
"""标记股票已完成"""
|
||||
with open(completed_file, 'a') as f:
|
||||
f.write(ts_code + '\n')
|
||||
|
||||
|
||||
def save_to_parquet(df, filename):
|
||||
@@ -164,4 +272,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
main()
|
||||
|
||||
393
fetch_history_v2.py
Normal file
393
fetch_history_v2.py
Normal file
@@ -0,0 +1,393 @@
|
||||
"""
|
||||
A股历史数据获取系统 V2 - 性能优化版
|
||||
优化点:
|
||||
1. 分文件存储 - 每只股票单独存小文件,避免每次读写整个大文件
|
||||
2. 批量合并 - 每100只股票合并一次,减少IO次数
|
||||
3. SQLite进度记录 - 更可靠的断点续传
|
||||
"""
|
||||
|
||||
import tushare as ts
|
||||
import pandas as pd
|
||||
import os
|
||||
import time
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import threading
|
||||
|
||||
# 配置
|
||||
BASE_DIR = Path(__file__).parent
|
||||
DATA_DIR = BASE_DIR / 'data'
|
||||
TEMP_DIR = DATA_DIR / 'temp' # 临时分片目录
|
||||
LOGS_DIR = BASE_DIR / 'logs'
|
||||
STOCK_LIST_FILE = BASE_DIR / 'A股股票列表.csv'
|
||||
DB_FILE = DATA_DIR / 'progress.db' # SQLite进度数据库
|
||||
|
||||
# 创建目录
|
||||
DATA_DIR.mkdir(exist_ok=True)
|
||||
TEMP_DIR.mkdir(exist_ok=True)
|
||||
LOGS_DIR.mkdir(exist_ok=True)
|
||||
|
||||
# 时间范围
|
||||
START_DATE = '20100101'
|
||||
END_DATE = datetime.now().strftime('%Y%m%d')
|
||||
|
||||
# 请求间隔(秒)- tushare积分限制
|
||||
REQUEST_INTERVAL = 2 # 减少间隔,用轻量存储补偿
|
||||
|
||||
# 批量合并阈值
|
||||
MERGE_BATCH_SIZE = 50 # 每50只股票合并一次
|
||||
|
||||
|
||||
def setup_tushare(token=None):
|
||||
"""初始化tushare"""
|
||||
if not token:
|
||||
token = os.environ.get('TUSHARE_TOKEN', '')
|
||||
|
||||
if not token:
|
||||
config_file = BASE_DIR / 'config.txt'
|
||||
if config_file.exists():
|
||||
token = config_file.read_text().strip()
|
||||
|
||||
if not token:
|
||||
raise ValueError("缺少 Tushare Token")
|
||||
|
||||
ts.set_token(token)
|
||||
return ts.pro_api()
|
||||
|
||||
|
||||
def init_progress_db():
|
||||
"""初始化SQLite进度数据库"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 创建进度表
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS progress (
|
||||
ts_code TEXT PRIMARY KEY,
|
||||
status TEXT DEFAULT 'pending',
|
||||
record_count INTEGER DEFAULT 0,
|
||||
updated_at TEXT,
|
||||
error_msg TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
# 创建合并记录表
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS merge_log (
|
||||
batch_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
stock_count INTEGER,
|
||||
merged_at TEXT,
|
||||
file_size INTEGER
|
||||
)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
print(f"进度数据库: {DB_FILE}")
|
||||
|
||||
|
||||
def load_stock_list():
|
||||
"""加载股票列表"""
|
||||
df = pd.read_csv(STOCK_LIST_FILE)
|
||||
df.columns = df.columns.str.strip()
|
||||
print(f"加载股票列表: {len(df)} 只股票")
|
||||
return df
|
||||
|
||||
|
||||
def get_stock_codes_with_suffix(df):
|
||||
"""将股票代码转换为tushare格式"""
|
||||
codes = []
|
||||
for code in df['code']:
|
||||
code = str(code).zfill(6)
|
||||
first_digit = code[0]
|
||||
|
||||
if first_digit == '6':
|
||||
ts_code = f"{code}.SH"
|
||||
elif first_digit in ('0', '3'):
|
||||
ts_code = f"{code}.SZ"
|
||||
elif first_digit in ('4', '8'):
|
||||
ts_code = f"{code}.BJ"
|
||||
else:
|
||||
ts_code = f"{code}.SZ"
|
||||
|
||||
codes.append(ts_code)
|
||||
return codes
|
||||
|
||||
|
||||
def init_stock_progress(codes):
|
||||
"""初始化所有股票的进度状态"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 批量插入(不存在则插入)
|
||||
for ts_code in codes:
|
||||
cursor.execute('''
|
||||
INSERT OR IGNORE INTO progress (ts_code, status, updated_at)
|
||||
VALUES (?, 'pending', ?)
|
||||
''', (ts_code, datetime.now().isoformat()))
|
||||
|
||||
conn.commit()
|
||||
|
||||
# 统计状态
|
||||
cursor.execute('SELECT status, COUNT(*) FROM progress GROUP BY status')
|
||||
stats = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
print("\n当前进度状态:")
|
||||
for status, count in stats:
|
||||
print(f" {status}: {count} 只")
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
def get_pending_stocks():
|
||||
"""获取待处理的股票列表"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT ts_code FROM progress
|
||||
WHERE status = 'pending' OR status = 'error'
|
||||
ORDER BY ts_code
|
||||
''')
|
||||
|
||||
pending = [row[0] for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
|
||||
return pending
|
||||
|
||||
|
||||
def save_stock_temp(df, ts_code):
|
||||
"""保存单只股票到临时文件(极快)"""
|
||||
temp_file = TEMP_DIR / f"{ts_code.replace('.', '_')}.parquet"
|
||||
df.to_parquet(temp_file, index=False, compression='snappy')
|
||||
|
||||
# 更新进度
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE progress
|
||||
SET status = 'completed', record_count = ?, updated_at = ?
|
||||
WHERE ts_code = ?
|
||||
''', (len(df), datetime.now().isoformat(), ts_code))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return temp_file.stat().st_size
|
||||
|
||||
|
||||
def merge_batch_to_main():
|
||||
"""将临时文件批量合并到主文件"""
|
||||
temp_files = list(TEMP_DIR.glob('*.parquet'))
|
||||
|
||||
if not temp_files:
|
||||
return 0
|
||||
|
||||
print(f"\n正在合并 {len(temp_files)} 个临时文件...")
|
||||
|
||||
# 读取所有临时文件
|
||||
batch_data = []
|
||||
for tf in temp_files:
|
||||
try:
|
||||
df = pd.read_parquet(tf)
|
||||
batch_data.append(df)
|
||||
except Exception as e:
|
||||
print(f" 警告: 读取 {tf.name} 失败: {e}")
|
||||
|
||||
if not batch_data:
|
||||
return 0
|
||||
|
||||
# 合并
|
||||
new_data = pd.concat(batch_data, ignore_index=True)
|
||||
|
||||
# 读取主文件并合并
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
|
||||
if main_file.exists():
|
||||
existing = pd.read_parquet(main_file)
|
||||
# 获取已合并的股票代码
|
||||
existing_codes = set(existing['ts_code'].unique())
|
||||
new_codes = set(new_data['ts_code'].unique())
|
||||
|
||||
# 只合并新股票的数据
|
||||
truly_new = new_data[~new_data['ts_code'].isin(existing_codes)]
|
||||
|
||||
if len(truly_new) > 0:
|
||||
combined = pd.concat([existing, truly_new], ignore_index=True)
|
||||
else:
|
||||
combined = existing
|
||||
else:
|
||||
combined = new_data
|
||||
|
||||
# 排序并保存
|
||||
combined = combined.sort_values(['ts_code', 'trade_date']).reset_index(drop=True)
|
||||
combined.to_parquet(main_file, index=False, compression='snappy')
|
||||
|
||||
# 记录合并日志
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
INSERT INTO merge_log (stock_count, merged_at, file_size)
|
||||
VALUES (?, ?, ?)
|
||||
''', (len(temp_files), datetime.now().isoformat(), main_file.stat().st_size))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# 删除临时文件
|
||||
for tf in temp_files:
|
||||
tf.unlink()
|
||||
|
||||
print(f" 合并完成: {len(new_data)} 条新记录")
|
||||
print(f" 主文件大小: {main_file.stat().st_size / 1024 / 1024:.2f} MB")
|
||||
|
||||
return len(temp_files)
|
||||
|
||||
|
||||
def fetch_stock_data(pro, ts_code):
|
||||
"""获取单只股票数据"""
|
||||
try:
|
||||
df = pro.daily(ts_code=ts_code, start_date=START_DATE, end_date=END_DATE)
|
||||
|
||||
if df is not None and len(df) > 0:
|
||||
# 保存到临时文件
|
||||
file_size = save_stock_temp(df, ts_code)
|
||||
return True, len(df), file_size
|
||||
else:
|
||||
# 无数据,标记完成
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE progress
|
||||
SET status = 'no_data', updated_at = ?
|
||||
WHERE ts_code = ?
|
||||
''', (datetime.now().isoformat(), ts_code))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return True, 0, 0
|
||||
|
||||
except Exception as e:
|
||||
# 记录错误
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE progress
|
||||
SET status = 'error', error_msg = ?, updated_at = ?
|
||||
WHERE ts_code = ?
|
||||
''', (str(e)[:200], datetime.now().isoformat(), ts_code))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return False, 0, 0
|
||||
|
||||
|
||||
def fetch_all_stocks(pro, codes):
|
||||
"""获取所有股票数据"""
|
||||
total = len(codes)
|
||||
batch_count = 0
|
||||
|
||||
print(f"\n开始获取数据...")
|
||||
print(f"共 {total} 只股票")
|
||||
print(f"每 {MERGE_BATCH_SIZE} 只合并一次")
|
||||
print("-" * 50)
|
||||
|
||||
for i, ts_code in enumerate(codes):
|
||||
success, records, size = fetch_stock_data(pro, ts_code)
|
||||
|
||||
status = "✓" if success else "✗"
|
||||
print(f"[{i+1}/{total}] {ts_code} {status} {records}条 {size/1024:.1f}KB")
|
||||
|
||||
# 批量合并检查
|
||||
batch_count += 1
|
||||
if batch_count >= MERGE_BATCH_SIZE:
|
||||
merge_batch_to_main()
|
||||
batch_count = 0
|
||||
|
||||
# 请求间隔
|
||||
if i < total - 1:
|
||||
time.sleep(REQUEST_INTERVAL)
|
||||
|
||||
# 最后合并剩余的
|
||||
if batch_count > 0:
|
||||
merge_batch_to_main()
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("数据获取完成!")
|
||||
|
||||
|
||||
def show_final_stats():
|
||||
"""显示最终统计"""
|
||||
conn = sqlite3.connect(DB_FILE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 状态统计
|
||||
cursor.execute('SELECT status, COUNT(*) FROM progress GROUP BY status')
|
||||
stats = cursor.fetchall()
|
||||
|
||||
# 记录数统计
|
||||
cursor.execute('SELECT SUM(record_count) FROM progress WHERE status = "completed"')
|
||||
total_records = cursor.fetchone()[0] or 0
|
||||
|
||||
# 合并历史
|
||||
cursor.execute('SELECT COUNT(*), SUM(stock_count) FROM merge_log')
|
||||
merge_stats = cursor.fetchone()
|
||||
|
||||
conn.close()
|
||||
|
||||
print("\n最终统计:")
|
||||
print("-" * 30)
|
||||
for status, count in stats:
|
||||
print(f" {status}: {count} 只")
|
||||
print(f"\n 总记录数: {total_records}")
|
||||
print(f" 合并批次: {merge_stats[0]} 次")
|
||||
|
||||
# 文件大小
|
||||
main_file = DATA_DIR / 'stock_daily_data.parquet'
|
||||
if main_file.exists():
|
||||
print(f" 主文件大小: {main_file.stat().st_size / 1024 / 1024:.2f} MB")
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("=" * 60)
|
||||
print("A股历史数据获取系统 V2 - 性能优化版")
|
||||
print("=" * 60)
|
||||
print(f"数据时间范围: {START_DATE} ~ {END_DATE}")
|
||||
print(f"数据保存目录: {DATA_DIR}")
|
||||
print("=" * 60)
|
||||
|
||||
# 初始化
|
||||
print("\n初始化 Tushare...")
|
||||
pro = setup_tushare()
|
||||
|
||||
print("\n初始化进度数据库...")
|
||||
init_progress_db()
|
||||
|
||||
# 加载股票列表
|
||||
print("\n加载股票列表...")
|
||||
stock_df = load_stock_list()
|
||||
codes = get_stock_codes_with_suffix(stock_df)
|
||||
|
||||
# 初始化进度
|
||||
init_stock_progress(codes)
|
||||
|
||||
# 获取待处理股票
|
||||
pending = get_pending_stocks()
|
||||
|
||||
if not pending:
|
||||
print("\n所有股票已完成!")
|
||||
show_final_stats()
|
||||
return
|
||||
|
||||
print(f"\n待处理: {len(pending)} 只股票")
|
||||
print(f"预计耗时: {len(pending) * REQUEST_INTERVAL / 60:.1f} 分钟")
|
||||
|
||||
# 开始获取
|
||||
fetch_all_stocks(pro, pending)
|
||||
|
||||
# 显示统计
|
||||
show_final_stats()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
36
run_v2.sh
Normal file
36
run_v2.sh
Normal file
@@ -0,0 +1,36 @@
|
||||
#!/bin/bash
|
||||
# A股历史数据获取 - V2优化版
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# 检查配置
|
||||
if [ ! -f "config.txt" ]; then
|
||||
echo "请先配置 Tushare Token:"
|
||||
echo " echo 'your_token' > config.txt"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 检查股票列表
|
||||
if [ ! -f "A股股票列表.csv" ]; then
|
||||
echo "缺少股票列表文件: A股股票列表.csv"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 运行
|
||||
echo "启动 A股历史数据获取 V2..."
|
||||
python3 fetch_history_v2.py
|
||||
|
||||
# 完成后生成汇总文件
|
||||
if [ -f "data/stock_daily_data.parquet" ]; then
|
||||
echo "生成CSV汇总文件..."
|
||||
python3 -c "
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
df = pd.read_parquet('data/stock_daily_data.parquet')
|
||||
timestamp = datetime.now().strftime('%Y%m%d')
|
||||
df.to_csv(f'data/A股日线数据_{timestamp}.csv', index=False)
|
||||
print(f'CSV文件: data/A股日线数据_{timestamp}.csv ({len(df)}条记录)')
|
||||
"
|
||||
fi
|
||||
Reference in New Issue
Block a user