初始化大模型API中转系统

功能特点:
- 多提供商支持: 配置多个上游大模型提供商
- 优先级调度: 按优先级自动选择可用提供商
- OpenAI API兼容: 完全兼容OpenAI API格式
- 故障切换: 自动切换到备用提供商
- 流式支持: 支持流式和非流式响应
- 模型别名: 支持模型别名映射
- 健康检查: 自动健康检查和熔断

上游配置:
1. [高优先] Local Qwen: http://192.168.2.5:1234/v1 (qwen3.5-4b)
2. [低优先] SiliconFlow: https://api.siliconflow.cn/v1 (DeepSeek-V3.2)

支持的模型:
- auto: 自动选择可用模型
- qwen3.5-4b, qwen3.5, qwen
- deepseek-v3, deepseek-v3.2, deepseek

端口: 19007
This commit is contained in:
2026-04-08 15:39:19 +08:00
commit 7c77cb9101
5 changed files with 720 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
# 日志
logs/
*.log
# Python
__pycache__/
*.py[cod]
# 环境
venv/
.env

202
README.md Normal file
View File

@@ -0,0 +1,202 @@
# 大模型API中转系统
> 兼容OpenAI API格式的多提供商代理系统支持优先级自动切换
## 功能特点
### 🔄 多提供商支持
- 支持配置多个上游大模型提供商
- 按优先级自动选择可用提供商
- 故障自动切换到备用提供商
### 📡 OpenAI API 兼容
- 完全兼容 OpenAI API 格式
- 支持 Chat Completions API
- 支持 Embeddings API
- 支持流式和非流式响应
### 🎯 智能路由
- `auto` 模型自动选择可用提供商
- 支持模型别名映射
- 请求参数自动适配
### 🛡️ 高可用
- 自动健康检查
- 错误计数与熔断
- 自动重试机制
## 快速开始
### 安装依赖
```bash
pip install -r requirements.txt
```
### 启动服务
```bash
python app.py
```
### 访问地址
```
http://localhost:19007
```
## API 使用
### Chat Completions
```bash
curl http://localhost:19007/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer any-key" \
-d '{
"model": "auto",
"messages": [{"role": "user", "content": "Hello!"}],
"stream": false
}'
```
### 列出模型
```bash
curl http://localhost:19007/v1/models
```
### 流式响应
```bash
curl http://localhost:19007/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "qwen3.5-4b",
"messages": [{"role": "user", "content": "Hello!"}],
"stream": true
}'
```
## 配置说明
编辑 `config/settings.py`:
```python
UPSTREAM_PROVIDERS = [
{
"name": "provider-name",
"priority": 1, # 优先级,数字越小越高
"base_url": "https://api.example.com/v1",
"api_key": "sk-xxx",
"models": ["model-1", "model-2"],
"default_model": "model-1",
"timeout": 120,
"enabled": True,
},
]
```
### 模型别名
```python
MODEL_ALIASES = {
"auto": "auto", # 自动选择
"gpt-4": "actual-model", # 别名映射
}
```
## 端点
| 端点 | 方法 | 说明 |
|------|------|------|
| `/` | GET | 服务信息 |
| `/v1/chat/completions` | POST | 聊天完成 |
| `/v1/embeddings` | POST | 文本嵌入 |
| `/v1/models` | GET | 模型列表 |
| `/health` | GET | 健康检查 |
| `/status` | GET | 详细状态 |
## 使用示例
### Python (OpenAI SDK)
```python
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:19007/v1",
api_key="any-key"
)
response = client.chat.completions.create(
model="auto",
messages=[
{"role": "user", "content": "你好!"}
]
)
print(response.choices[0].message.content)
```
### 流式响应
```python
stream = client.chat.completions.create(
model="qwen3.5-4b",
messages=[{"role": "user", "content": "讲个笑话"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
```
## 优先级机制
当使用 `model="auto"` 时:
1. 按配置的优先级顺序选择提供商
2. 跳过不可用的提供商
3. 请求失败自动切换到下一个提供商
4. 连续失败3次的提供商暂时标记为不可用
## 监控
### 健康检查
```bash
curl http://localhost:19007/health
```
### 详细状态
```bash
curl http://localhost:19007/status
```
## 项目结构
```
llm-proxy/
├── app.py # 主程序
├── requirements.txt # 依赖
├── config/
│ └── settings.py # 配置
├── logs/ # 日志目录
└── README.md
```
## 版本历史
### v0.1.0 (2026-04-08)
- 初始版本
- 多提供商支持
- OpenAI API 兼容
- 优先级自动切换
- 流式响应支持
## License
MIT

428
app.py Normal file
View File

@@ -0,0 +1,428 @@
"""
大模型API中转系统
兼容OpenAI API格式支持多上游提供商优先级调度
"""
from flask import Flask, request, jsonify, Response, stream_with_context
from flask_cors import CORS
import requests
import json
import time
import logging
from datetime import datetime
from pathlib import Path
import sys
# 添加配置路径
sys.path.insert(0, str(Path(__file__).parent))
from config.settings import (
UPSTREAM_PROVIDERS, MODEL_ALIASES, SERVER_CONFIG,
LOG_CONFIG, RETRY_CONFIG
)
app = Flask(__name__)
CORS(app)
# 配置日志
log_dir = Path(__file__).parent / LOG_CONFIG['log_dir']
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / 'proxy.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 提供商状态缓存
provider_status = {}
for provider in UPSTREAM_PROVIDERS:
provider_status[provider['name']] = {
'available': True,
'last_check': None,
'error_count': 0,
'last_error': None,
}
def get_provider_for_model(model_name):
"""根据模型名获取提供商"""
# 解析别名
resolved_model = MODEL_ALIASES.get(model_name, model_name)
# auto模式按优先级选择可用提供商
if resolved_model == 'auto':
return get_available_provider()
# 查找支持该模型的提供商
sorted_providers = sorted(UPSTREAM_PROVIDERS, key=lambda x: x['priority'])
for provider in sorted_providers:
if not provider['enabled']:
continue
if not provider_status[provider['name']]['available']:
continue
if resolved_model in provider['models']:
return provider, resolved_model
# 如果没找到精确匹配,尝试模糊匹配
for provider in sorted_providers:
if not provider['enabled']:
continue
if not provider_status[provider['name']]['available']:
continue
for m in provider['models']:
if resolved_model.lower() in m.lower() or m.lower() in resolved_model.lower():
return provider, m
return None, None
def get_available_provider():
"""获取可用的提供商(按优先级)"""
sorted_providers = sorted(UPSTREAM_PROVIDERS, key=lambda x: x['priority'])
for provider in sorted_providers:
if provider['enabled'] and provider_status[provider['name']]['available']:
return provider, provider['default_model']
# 如果都不可用,返回第一个尝试(让错误信息传递)
if sorted_providers:
return sorted_providers[0], sorted_providers[0]['default_model']
return None, None
def mark_provider_error(provider_name, error):
"""标记提供商错误"""
if provider_name in provider_status:
provider_status[provider_name]['error_count'] += 1
provider_status[provider_name]['last_error'] = str(error)
provider_status[provider_name]['last_check'] = datetime.now()
# 连续错误超过阈值,暂时标记不可用
if provider_status[provider_name]['error_count'] >= 3:
provider_status[provider_name]['available'] = False
logger.warning(f"Provider {provider_name} marked as unavailable due to errors")
def mark_provider_success(provider_name):
"""标记提供商成功"""
if provider_name in provider_status:
provider_status[provider_name]['error_count'] = 0
provider_status[provider_name]['available'] = True
provider_status[provider_name]['last_check'] = datetime.now()
def proxy_request(provider, model, request_data, stream=False):
"""转发请求到上游提供商"""
url = f"{provider['base_url'].rstrip('/')}/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {provider['api_key']}"
}
# 构建请求数据
data = request_data.copy()
data['model'] = model
try:
if stream:
# 流式请求
response = requests.post(
url,
headers=headers,
json=data,
stream=True,
timeout=provider.get('timeout', 120)
)
return response
else:
# 非流式请求
response = requests.post(
url,
headers=headers,
json=data,
timeout=provider.get('timeout', 120)
)
return response
except requests.exceptions.Timeout:
mark_provider_error(provider['name'], "Timeout")
raise Exception(f"Provider {provider['name']} timeout")
except requests.exceptions.ConnectionError:
mark_provider_error(provider['name'], "Connection error")
raise Exception(f"Provider {provider['name']} connection error")
except Exception as e:
mark_provider_error(provider['name'], str(e))
raise
def stream_response(response):
"""流式响应生成器"""
try:
for line in response.iter_lines():
if line:
yield line + b'\n'
except Exception as e:
logger.error(f"Stream error: {e}")
yield b'data: {"error": "' + str(e).encode() + b'"}\n\n'
# ============ API 路由 ============
@app.route('/')
def index():
"""首页"""
return jsonify({
"name": "LLM Proxy",
"version": "1.0.0",
"description": "OpenAI-compatible LLM API Proxy",
"endpoints": {
"chat": "/v1/chat/completions",
"models": "/v1/models",
"health": "/health",
"status": "/status"
}
})
@app.route('/v1/models', methods=['GET'])
def list_models():
"""列出可用模型"""
models_list = []
added_models = set()
for provider in UPSTREAM_PROVIDERS:
if not provider['enabled']:
continue
for model in provider['models']:
if model not in added_models:
models_list.append({
"id": model,
"object": "model",
"created": int(time.time()),
"owned_by": provider['name'],
})
added_models.add(model)
# 添加auto模型
if "auto" not in added_models:
models_list.insert(0, {
"id": "auto",
"object": "model",
"created": int(time.time()),
"owned_by": "proxy",
"description": "Auto-select available model by priority"
})
return jsonify({
"object": "list",
"data": models_list
})
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
"""聊天完成API"""
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid request body"}), 400
model = data.get('model', 'auto')
stream = data.get('stream', False)
# 获取提供商
provider, resolved_model = get_provider_for_model(model)
if not provider:
return jsonify({
"error": {
"message": f"No available provider for model: {model}",
"type": "invalid_request_error"
}
}), 400
logger.info(f"Request: model={model} -> provider={provider['name']}, resolved_model={resolved_model}, stream={stream}")
# 重试逻辑
last_error = None
tried_providers = []
for attempt in range(RETRY_CONFIG['max_retries']):
try:
response = proxy_request(provider, resolved_model, data, stream)
if response.status_code == 200:
mark_provider_success(provider['name'])
if stream:
# 流式响应
return Response(
stream_with_context(stream_response(response)),
content_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
)
else:
# 非流式响应
return jsonify(response.json())
elif response.status_code == 429:
# 速率限制,尝试下一个提供商
mark_provider_error(provider['name'], "Rate limit")
tried_providers.append(provider['name'])
# 尝试下一个提供商
next_provider, next_model = get_available_provider()
if next_provider and next_provider['name'] not in tried_providers:
provider = next_provider
resolved_model = next_model
continue
return jsonify(response.json()), response.status_code
else:
last_error = response.json() if response.headers.get('content-type', '').startswith('application/json') else {"error": response.text}
return jsonify(last_error), response.status_code
except Exception as e:
last_error = str(e)
logger.error(f"Attempt {attempt + 1} failed: {e}")
tried_providers.append(provider['name'])
# 尝试下一个提供商
next_provider, next_model = get_available_provider()
if next_provider and next_provider['name'] not in tried_providers:
provider = next_provider
resolved_model = next_model
time.sleep(RETRY_CONFIG['retry_delay'])
continue
# 所有重试都失败
return jsonify({
"error": {
"message": f"All providers failed. Last error: {last_error}",
"type": "api_error"
}
}), 503
except Exception as e:
logger.error(f"Unexpected error: {e}")
return jsonify({
"error": {
"message": str(e),
"type": "internal_error"
}
}), 500
@app.route('/v1/embeddings', methods=['POST'])
def embeddings():
"""嵌入API简单转发"""
try:
data = request.get_json()
model = data.get('model', 'text-embedding-ada-002')
# 使用第一个可用提供商
provider = UPSTREAM_PROVIDERS[0]
url = f"{provider['base_url'].rstrip('/')}/embeddings"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {provider['api_key']}"
}
response = requests.post(url, headers=headers, json=data, timeout=60)
return jsonify(response.json()), response.status_code
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
"""健康检查"""
available_count = sum(1 for s in provider_status.values() if s['available'])
total_count = len(provider_status)
return jsonify({
"status": "healthy" if available_count > 0 else "degraded",
"providers": {
"available": available_count,
"total": total_count,
},
"timestamp": datetime.now().isoformat()
})
@app.route('/status', methods=['GET'])
def status():
"""详细状态"""
providers_detail = []
for provider in UPSTREAM_PROVIDERS:
status_info = provider_status.get(provider['name'], {})
providers_detail.append({
"name": provider['name'],
"priority": provider['priority'],
"enabled": provider['enabled'],
"available": status_info.get('available', True),
"error_count": status_info.get('error_count', 0),
"last_error": status_info.get('last_error'),
"models": provider['models'],
})
return jsonify({
"version": "1.0.0",
"uptime": time.time(),
"providers": providers_detail,
"model_aliases": MODEL_ALIASES,
})
# 兼容 OpenAI SDK 的其他端点
@app.route('/v1/engines', methods=['GET'])
def list_engines():
"""兼容旧版 engines 端点"""
return list_models()
@app.route('/v1/engines/<model>/completions', methods=['POST'])
def engine_completions(model):
"""兼容旧版 completions 端点"""
data = request.get_json()
data['model'] = model
return chat_completions()
if __name__ == '__main__':
print("=" * 60)
print("大模型API中转系统")
print("=" * 60)
print(f"访问地址: http://localhost:{SERVER_CONFIG['port']}")
print(f"API端点: http://localhost:{SERVER_CONFIG['port']}/v1/chat/completions")
print("=" * 60)
print("上游提供商:")
for p in sorted(UPSTREAM_PROVIDERS, key=lambda x: x['priority']):
print(f" [{p['priority']}] {p['name']}: {p['base_url']}")
print(f" 模型: {', '.join(p['models'])}")
print("=" * 60)
print("支持的模型别名:")
for alias, target in MODEL_ALIASES.items():
print(f" {alias} -> {target}")
print("=" * 60)
app.run(
host=SERVER_CONFIG['host'],
port=SERVER_CONFIG['port'],
debug=SERVER_CONFIG['debug']
)

76
config/settings.py Normal file
View File

@@ -0,0 +1,76 @@
"""
大模型API中转系统配置
"""
# 上游模型配置(按优先级排序,从高到低)
UPSTREAM_PROVIDERS = [
{
"name": "local-qwen",
"priority": 1, # 数字越小优先级越高
"base_url": "http://192.168.2.5:1234/v1",
"api_key": "sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j",
"models": ["qwen3.5-4b", "qwen3.5", "qwen"],
"default_model": "qwen3.5-4b",
"timeout": 120,
"enabled": True,
},
{
"name": "siliconflow-deepseek",
"priority": 2,
"base_url": "https://api.siliconflow.cn/v1",
"api_key": "sk-fhpoexpptvjghpnphtaxbkhjwulzovoqfffbckcfscjmwhcg",
"models": ["Pro/deepseek-ai/DeepSeek-V3.2", "deepseek-v3", "deepseek"],
"default_model": "Pro/deepseek-ai/DeepSeek-V3.2",
"timeout": 120,
"enabled": True,
},
]
# 模型别名映射
MODEL_ALIASES = {
# auto 自动选择可用模型
"auto": "auto",
# Qwen别名
"qwen": "qwen3.5-4b",
"qwen3.5": "qwen3.5-4b",
"qwen3.5-4b": "qwen3.5-4b",
# DeepSeek别名
"deepseek": "Pro/deepseek-ai/DeepSeek-V3.2",
"deepseek-v3": "Pro/deepseek-ai/DeepSeek-V3.2",
"deepseek-v3.2": "Pro/deepseek-ai/DeepSeek-V3.2",
}
# 服务配置
SERVER_CONFIG = {
"host": "0.0.0.0",
"port": 19007,
"debug": True,
}
# 日志配置
LOG_CONFIG = {
"log_dir": "logs",
"log_requests": True,
"log_errors": True,
}
# 重试配置
RETRY_CONFIG = {
"max_retries": 3,
"retry_delay": 1, # 秒
"retry_on_errors": [
"connection_error",
"timeout",
"rate_limit",
"server_error",
],
}
# 健康检查配置
HEALTH_CHECK = {
"enabled": True,
"interval": 60, # 秒
"timeout": 10,
}

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
flask>=2.3.0
flask-cors>=4.0.0
requests>=2.28.0