- 后端新增 /api/parse-prompt API 获取 prompt 模板 - parse_with_llm 函数支持 custom_prompt 参数 - 智能添加弹框添加可折叠的 prompt 编辑区域 - 显示字段配置信息,方便用户理解解析逻辑 - 可重置为默认 prompt 或刷新字段配置 - 所有 smart-add API 支持接收自定义 prompt
1762 lines
60 KiB
Python
1762 lines
60 KiB
Python
"""
|
||
ParamHub - 参数百科
|
||
AI大模型与硬件参数速查平台
|
||
v1.7.1 - 子类别管理界面重构,支持可视化增删改
|
||
"""
|
||
|
||
from flask import Flask, render_template, jsonify, request
|
||
from flask_cors import CORS
|
||
import json
|
||
import requests
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
import uuid
|
||
import time
|
||
import os
|
||
import base64
|
||
from werkzeug.utils import secure_filename
|
||
|
||
app = Flask(__name__, static_folder='static', static_url_path='/static')
|
||
CORS(app)
|
||
|
||
# 数据目录
|
||
DATA_DIR = Path(__file__).parent / 'data'
|
||
DATA_DIR.mkdir(exist_ok=True)
|
||
|
||
# 图片上传目录
|
||
IMAGES_DIR = Path(__file__).parent / 'static' / 'uploads'
|
||
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 允许的图片格式
|
||
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
||
|
||
def allowed_file(filename):
|
||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
||
|
||
# 数据文件
|
||
MODELS_FILE = DATA_DIR / 'models.json'
|
||
GPUS_FILE = DATA_DIR / 'gpus.json'
|
||
CPUS_FILE = DATA_DIR / 'cpus.json'
|
||
CATEGORIES_FILE = DATA_DIR / 'categories.json'
|
||
KNOWLEDGE_FILE = DATA_DIR / 'knowledge.json'
|
||
CONFIG_FILE = DATA_DIR / 'config.json'
|
||
|
||
# 大模型配置
|
||
LLM_CONFIG = {
|
||
'base_url': 'http://192.168.2.17:19007/v1',
|
||
'api_key': 'xxxx',
|
||
'model': 'auto',
|
||
'vision_model': 'gpt-4-vision-preview', # 视觉模型(解析图片)
|
||
}
|
||
|
||
# 默认网站配置(包含LLM配置)
|
||
DEFAULT_CONFIG = {
|
||
'site_name': 'ParamHub',
|
||
'site_subtitle': '参数百科',
|
||
'footer_text': 'ParamHub - AI大模型与硬件参数速查平台',
|
||
'icp_number': '',
|
||
'copyright_year': '2024',
|
||
'contact_email': '',
|
||
'github_url': '',
|
||
# LLM配置
|
||
'llm_base_url': 'http://192.168.2.17:19007/v1',
|
||
'llm_api_key': '',
|
||
'llm_model': 'auto',
|
||
'llm_vision_model': 'gpt-4-vision-preview',
|
||
}
|
||
|
||
def get_llm_config():
|
||
"""获取LLM配置(从config.json动态读取)"""
|
||
config = load_config()
|
||
return {
|
||
'base_url': config.get('llm_base_url', DEFAULT_CONFIG['llm_base_url']),
|
||
'api_key': config.get('llm_api_key', DEFAULT_CONFIG['llm_api_key']),
|
||
'model': config.get('llm_model', DEFAULT_CONFIG['llm_model']),
|
||
'vision_model': config.get('llm_vision_model', DEFAULT_CONFIG['llm_vision_model']),
|
||
}
|
||
|
||
def load_config():
|
||
"""加载网站配置"""
|
||
if CONFIG_FILE.exists():
|
||
loaded = json.loads(CONFIG_FILE.read_text(encoding='utf-8'))
|
||
# 合并默认配置(确保新字段存在)
|
||
result = DEFAULT_CONFIG.copy()
|
||
result.update(loaded)
|
||
return result
|
||
return DEFAULT_CONFIG.copy()
|
||
|
||
def save_config(config):
|
||
"""保存网站配置"""
|
||
CONFIG_FILE.write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
def load_data(file_path):
|
||
"""加载JSON数据"""
|
||
if file_path.exists():
|
||
return json.loads(file_path.read_text(encoding='utf-8'))
|
||
return []
|
||
|
||
def save_data(file_path, data):
|
||
"""保存JSON数据"""
|
||
file_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
# ============ 大模型智能解析 ============
|
||
|
||
def get_parse_prompt_template(category_type, category_id=None, subcategory_id=None):
|
||
"""
|
||
获取解析 prompt 模板(供前端显示和编辑)
|
||
"""
|
||
# 从类别配置中获取字段定义
|
||
categories = load_data(CATEGORIES_FILE)
|
||
|
||
# 确定类别ID
|
||
if category_id:
|
||
cat = next((c for c in categories if c['id'] == category_id), None)
|
||
else:
|
||
type_to_cat_id = {'model': 'ai-models', 'gpu': 'gpus', 'cpu': 'cpus', 'dynamic': None}
|
||
cat_id = type_to_cat_id.get(category_type)
|
||
cat = next((c for c in categories if c['id'] == cat_id), None)
|
||
|
||
# 构建字段模板
|
||
fields = {}
|
||
|
||
if cat and 'fields' in cat:
|
||
for field in cat['fields']:
|
||
field_desc = field['label']
|
||
if field.get('input_style') == 'long':
|
||
field_desc += '(长文本)'
|
||
else:
|
||
field_desc += '(文本)'
|
||
if field.get('description'):
|
||
field_desc += f" - {field['description']}"
|
||
fields[field['key']] = field_desc
|
||
|
||
if subcategory_id:
|
||
subcat = next((s for s in cat.get('subcategories', []) if s['id'] == subcategory_id), None)
|
||
if subcat and 'extra_fields' in subcat:
|
||
for field in subcat['extra_fields']:
|
||
field_desc = field['label']
|
||
if field.get('input_style') == 'long':
|
||
field_desc += '(长文本)'
|
||
else:
|
||
field_desc += '(文本)'
|
||
if field.get('description'):
|
||
field_desc += f" - {field['description']}"
|
||
fields[field['key']] = field_desc
|
||
else:
|
||
fields = {
|
||
'name': '名称',
|
||
'brand': '品牌',
|
||
'price': '价格(数字)',
|
||
'year': '年份(数字)',
|
||
'specs': '规格参数(JSON对象)',
|
||
'description': '简介描述',
|
||
}
|
||
|
||
fields_json = json.dumps(fields, ensure_ascii=False, indent=2)
|
||
|
||
# 图片解析 prompt
|
||
image_prompt = """请分析图片中的产品参数信息,提取结构化数据。
|
||
|
||
需要提取的字段:
|
||
""" + fields_json + """
|
||
|
||
重要要求:
|
||
1. 图片中可能包含1个或多个产品,请识别所有产品
|
||
2. 如果是多张图片,请综合分析所有图片内容
|
||
3. 数字字段只返回数字,不带单位
|
||
4. 如果某字段没有提及,返回null
|
||
5. 返回格式:如果识别到多个产品,返回数组 [对象列表]; 如果只有一个产品,返回单个对象
|
||
6. 只返回JSON数据,不要其他内容"""
|
||
|
||
return {
|
||
'fields': fields,
|
||
'fields_json': fields_json,
|
||
'image_prompt': image_prompt,
|
||
'category_name': cat.get('name', '') if cat else ''
|
||
}
|
||
|
||
|
||
def parse_with_llm(text, category_type, images=None, category_id=None, subcategory_id=None, custom_prompt=None):
|
||
"""
|
||
使用大模型解析文本/图片,提取结构化数据
|
||
支持多张图片输入,可能解析出多个产品
|
||
根据类别配置的参数字段进行解析
|
||
支持自定义 prompt(优先使用自定义)
|
||
"""
|
||
|
||
# 从类别配置中获取字段定义
|
||
categories = load_data(CATEGORIES_FILE)
|
||
|
||
# 确定类别ID
|
||
if category_id:
|
||
cat = next((c for c in categories if c['id'] == category_id), None)
|
||
else:
|
||
# 根据类型映射到内置类别ID
|
||
type_to_cat_id = {'model': 'ai-models', 'gpu': 'gpus', 'cpu': 'cpus'}
|
||
cat_id = type_to_cat_id.get(category_type)
|
||
cat = next((c for c in categories if c['id'] == cat_id), None)
|
||
|
||
# 构建字段模板
|
||
fields = {}
|
||
|
||
if cat and 'fields' in cat:
|
||
# 使用类别配置的字段
|
||
for field in cat['fields']:
|
||
field_desc = field['label']
|
||
# 所有字段都是文本类型
|
||
if field.get('input_style') == 'long':
|
||
field_desc += '(长文本)'
|
||
else:
|
||
field_desc += '(文本)'
|
||
if field.get('description'):
|
||
field_desc += f" - {field['description']}"
|
||
fields[field['key']] = field_desc
|
||
|
||
# 如果有子类别,添加子类别的额外字段
|
||
if subcategory_id:
|
||
subcat = next((s for s in cat.get('subcategories', []) if s['id'] == subcategory_id), None)
|
||
if subcat and 'extra_fields' in subcat:
|
||
for field in subcat['extra_fields']:
|
||
field_desc = field['label']
|
||
if field.get('input_style') == 'long':
|
||
field_desc += '(长文本)'
|
||
else:
|
||
field_desc += '(文本)'
|
||
if field.get('description'):
|
||
field_desc += f" - {field['description']}"
|
||
fields[field['key']] = field_desc
|
||
else:
|
||
# 兜底:使用默认字段模板
|
||
fields = {
|
||
'name': '名称',
|
||
'brand': '品牌',
|
||
'price': '价格(数字)',
|
||
'year': '年份(数字)',
|
||
'specs': '规格参数(JSON对象)',
|
||
'description': '简介描述',
|
||
}
|
||
|
||
fields_json = json.dumps(fields, ensure_ascii=False, indent=2)
|
||
|
||
# 构建消息内容
|
||
content_parts = []
|
||
|
||
# 如果有图片,添加图片内容
|
||
if images and len(images) > 0:
|
||
# 优先使用自定义 prompt,否则使用默认
|
||
if custom_prompt and custom_prompt.strip():
|
||
prompt_text = custom_prompt
|
||
else:
|
||
prompt_text = """请分析图片中的产品参数信息,提取结构化数据。
|
||
|
||
需要提取的字段:
|
||
""" + fields_json + """
|
||
|
||
重要要求:
|
||
1. 图片中可能包含1个或多个产品,请识别所有产品
|
||
2. 如果是多张图片,请综合分析所有图片内容
|
||
3. 数字字段只返回数字,不带单位
|
||
4. 如果某字段没有提及,返回null
|
||
5. 返回格式:如果识别到多个产品,返回数组 [对象列表]; 如果只有一个产品,返回单个对象
|
||
6. 只返回JSON数据,不要其他内容"""
|
||
|
||
content_parts.append({
|
||
"type": "text",
|
||
"text": prompt_text
|
||
})
|
||
|
||
# 添加每张图片(支持URL或base64)
|
||
for img in images:
|
||
if isinstance(img, str):
|
||
if img.startswith('http'):
|
||
# URL图片
|
||
content_parts.append({
|
||
"type": "image_url",
|
||
"image_url": {"url": img}
|
||
})
|
||
elif img.startswith('data:'):
|
||
# base64图片
|
||
content_parts.append({
|
||
"type": "image_url",
|
||
"image_url": {"url": img}
|
||
})
|
||
else:
|
||
# 本地路径,读取并转为base64
|
||
try:
|
||
img_path = IMAGES_DIR / img.replace('/static/uploads/', '')
|
||
if img_path.exists():
|
||
with open(img_path, 'rb') as f:
|
||
img_data = base64.b64encode(f.read()).decode()
|
||
ext = img_path.suffix.lower()
|
||
mime_type = f'image/{ext if ext != "jpg" else "jpeg"}'
|
||
content_parts.append({
|
||
"type": "image_url",
|
||
"image_url": {"url": f"data:{mime_type};base64,{img_data}"}
|
||
})
|
||
except Exception as e:
|
||
print(f"读取图片失败: {e}")
|
||
else:
|
||
# 纯文本解析
|
||
prompt_text = """请解析以下文本,提取结构化数据。
|
||
|
||
文本内容:
|
||
""" + str(text) + """
|
||
|
||
需要提取的字段:
|
||
""" + fields_json + """
|
||
|
||
要求:
|
||
1. 根据文本内容智能提取各个字段的值
|
||
2. 数字字段只返回数字,不带单位
|
||
3. 如果某字段在文本中没有提及,返回null
|
||
4. 返回JSON格式,不要包含任何其他内容
|
||
|
||
请直接返回JSON数据:"""
|
||
|
||
content_parts.append({
|
||
"type": "text",
|
||
"text": prompt_text
|
||
})
|
||
|
||
try:
|
||
# 动态获取LLM配置
|
||
llm_config = get_llm_config()
|
||
|
||
# 使用视觉模型解析(如果有图片)
|
||
model = llm_config.get('vision_model', 'gpt-4-vision-preview') if images else llm_config['model']
|
||
|
||
response = requests.post(
|
||
f"{llm_config['base_url']}/chat/completions",
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {llm_config['api_key']}"
|
||
},
|
||
json={
|
||
"model": model,
|
||
"messages": [
|
||
{"role": "system", "content": "你是一个产品参数提取助手,负责从文本和图片中提取结构化的产品参数数据。只返回JSON,不要其他内容。如果图片中包含多个产品,返回数组。"},
|
||
{"role": "user", "content": content_parts}
|
||
],
|
||
"max_tokens": 2000,
|
||
"temperature": 0.1
|
||
},
|
||
timeout=60
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
content = data['choices'][0]['message']['content'].strip()
|
||
|
||
# 清理可能的markdown包裹
|
||
if content.startswith('```'):
|
||
content = content.split('\n', 1)[1] if '\n' in content else content[3:]
|
||
content = content.rsplit('```', 1)[0] if '```' in content else content
|
||
|
||
# 解析JSON
|
||
parsed = json.loads(content)
|
||
|
||
# 处理结果(可能是数组或单个对象)
|
||
results = []
|
||
if isinstance(parsed, list):
|
||
results = parsed
|
||
else:
|
||
results = [parsed]
|
||
|
||
# 清理每个结果的null值
|
||
cleaned_results = []
|
||
for item in results:
|
||
cleaned = {}
|
||
for k, v in item.items():
|
||
if v is not None and v != '' and v != 'null':
|
||
# 尝试转换数字
|
||
if isinstance(v, str):
|
||
try:
|
||
if '.' in v:
|
||
cleaned[k] = float(v)
|
||
else:
|
||
cleaned[k] = int(v)
|
||
except:
|
||
cleaned[k] = v
|
||
else:
|
||
cleaned[k] = v
|
||
cleaned_results.append(cleaned)
|
||
|
||
return cleaned_results
|
||
except Exception as e:
|
||
print(f"LLM解析失败: {e}")
|
||
|
||
# 降级处理:返回基本结构
|
||
return [{'name': text[:50] if text else '未命名产品', 'description': text}]
|
||
|
||
# ============ 页面路由 ============
|
||
|
||
@app.route('/')
|
||
def index():
|
||
"""首页"""
|
||
return render_template('index.html')
|
||
|
||
@app.route('/models')
|
||
def models_page():
|
||
"""模型数据库页面"""
|
||
return render_template('models.html')
|
||
|
||
@app.route('/gpus')
|
||
def gpus_page():
|
||
"""GPU数据库页面"""
|
||
return render_template('gpus.html')
|
||
|
||
@app.route('/cpus')
|
||
def cpus_page():
|
||
"""CPU数据库页面"""
|
||
return render_template('cpus.html')
|
||
|
||
@app.route('/tools')
|
||
def tools_page():
|
||
"""工具页面"""
|
||
return render_template('tools.html')
|
||
|
||
@app.route('/compare')
|
||
def compare_page():
|
||
"""对比页面"""
|
||
return render_template('compare.html')
|
||
|
||
@app.route('/knowledge')
|
||
def knowledge_page():
|
||
"""知识库页面"""
|
||
return render_template('knowledge.html')
|
||
|
||
@app.route('/admin')
|
||
def admin_page():
|
||
"""后台管理页面"""
|
||
return render_template('admin.html')
|
||
|
||
@app.route('/category/<category_id>')
|
||
def category_page(category_id):
|
||
"""动态分类页面"""
|
||
categories = load_data(CATEGORIES_FILE)
|
||
category = next((c for c in categories if c['id'] == category_id), None)
|
||
|
||
if not category:
|
||
return "分类不存在", 404
|
||
|
||
return render_template('category.html', category=category)
|
||
|
||
# ============ API路由 ============
|
||
|
||
@app.route('/api/models')
|
||
def api_models():
|
||
"""获取模型列表"""
|
||
models = load_data(MODELS_FILE)
|
||
|
||
# 过滤隐藏项(前台默认不显示visible=false)
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
models = [m for m in models if m.get('visible', True)]
|
||
|
||
# 搜索过滤
|
||
keyword = request.args.get('q', '').strip().lower()
|
||
if keyword:
|
||
models = [m for m in models if keyword in m.get('name', '').lower() or
|
||
keyword in m.get('organization', '').lower()]
|
||
|
||
# 排序
|
||
sort_by = request.args.get('sort', 'default')
|
||
reverse = request.args.get('order', 'desc') == 'desc'
|
||
|
||
# 安全排序:处理可能的None/缺失值
|
||
def safe_sort_key(x, key):
|
||
val = x.get(key)
|
||
if val is None:
|
||
if key in ['parameters', 'context_length', 'mmlu', 'views']:
|
||
return 0
|
||
elif key in ['publish_date', 'created_at', 'updated_at']:
|
||
return ''
|
||
return ''
|
||
return val
|
||
|
||
# 默认排序:置顶优先 → 发布日期最新
|
||
if sort_by == 'default':
|
||
# 先按置顶排序,再按发布日期排序
|
||
def default_sort_key(x):
|
||
is_pinned = x.get('is_pinned', False)
|
||
publish_date = x.get('publish_date', '')
|
||
created_at = x.get('created_at', '')
|
||
# 置顶的排在前面 (True > False)
|
||
# 发布日期按时间戳排序(越新的排在前面)
|
||
return (not is_pinned, -(parse_date_to_timestamp(publish_date) or parse_date_to_timestamp(created_at) or 0))
|
||
models = sorted(models, key=default_sort_key)
|
||
elif sort_by in ['name', 'parameters', 'context_length', 'mmlu', 'created_at', 'publish_date', 'views', 'updated_at']:
|
||
models = sorted(models, key=lambda x: safe_sort_key(x, sort_by), reverse=reverse)
|
||
|
||
return jsonify(models)
|
||
|
||
def parse_date_to_timestamp(date_str):
|
||
"""将日期字符串转换为时间戳"""
|
||
if not date_str:
|
||
return None
|
||
try:
|
||
# 支持多种格式
|
||
for fmt in ['%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y/%m/%d']:
|
||
try:
|
||
return datetime.strptime(date_str, fmt).timestamp()
|
||
except:
|
||
pass
|
||
return None
|
||
except:
|
||
return None
|
||
|
||
@app.route('/api/models/<model_id>')
|
||
def api_model_detail(model_id):
|
||
"""获取单个模型详情"""
|
||
models = load_data(MODELS_FILE)
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
return jsonify(model)
|
||
|
||
@app.route('/api/models', methods=['POST'])
|
||
def api_create_model():
|
||
"""创建新模型"""
|
||
data = request.get_json()
|
||
models = load_data(MODELS_FILE)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True) # 默认显示
|
||
data['publish_date'] = data.get('publish_date', '') # 发布日期
|
||
data['views'] = data.get('views', 0) # 热度/阅读数
|
||
data['is_pinned'] = data.get('is_pinned', False) # 置顶
|
||
|
||
models.append(data)
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/models/<model_id>', methods=['PUT'])
|
||
def api_update_model(model_id):
|
||
"""更新模型"""
|
||
data = request.get_json()
|
||
models = load_data(MODELS_FILE)
|
||
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
model.update(data)
|
||
model['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify(model)
|
||
|
||
@app.route('/api/models/<model_id>', methods=['DELETE'])
|
||
def api_delete_model(model_id):
|
||
"""删除模型"""
|
||
models = load_data(MODELS_FILE)
|
||
models = [m for m in models if m['id'] != model_id]
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/models/<model_id>/visible', methods=['POST'])
|
||
def api_toggle_model_visible(model_id):
|
||
"""切换模型显示状态"""
|
||
models = load_data(MODELS_FILE)
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
model['visible'] = not model.get('visible', True)
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True, 'visible': model['visible']})
|
||
|
||
# ============ 获取解析Prompt模板API ============
|
||
|
||
@app.route('/api/parse-prompt', methods=['POST'])
|
||
def api_get_parse_prompt():
|
||
"""
|
||
获取智能解析的 prompt 模板(供前端显示和编辑)
|
||
"""
|
||
data = request.get_json()
|
||
category_type = data.get('category_type', 'dynamic')
|
||
category_id = data.get('category_id')
|
||
subcategory_id = data.get('subcategory_id')
|
||
|
||
template = get_parse_prompt_template(category_type, category_id, subcategory_id)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'template': template
|
||
})
|
||
|
||
# ============ 图片解析API(预览) ============
|
||
|
||
@app.route('/api/parse-images', methods=['POST'])
|
||
def api_parse_images():
|
||
"""
|
||
解析图片中的产品参数(预览模式,不保存)
|
||
支持多张图片,可能返回多个产品
|
||
根据类别配置的参数字段进行解析
|
||
支持自定义 prompt(可选)
|
||
"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
category_type = data.get('category_type', 'dynamic')
|
||
subcategory_id = data.get('subcategory_id', '')
|
||
custom_prompt = data.get('custom_prompt', '') # 自定义 prompt
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
if not images:
|
||
return jsonify({'error': '请上传至少一张图片'}), 400
|
||
|
||
# 确定类别ID
|
||
type_to_cat_id = {'model': 'ai-models', 'gpu': 'gpus', 'cpu': 'cpus', 'dynamic': None}
|
||
category_id = type_to_cat_id.get(category_type)
|
||
|
||
# 调用大模型解析(根据类别字段配置,支持自定义 prompt)
|
||
parsed_list = parse_with_llm(text, category_type, images, category_id=category_id, subcategory_id=subcategory_id, custom_prompt=custom_prompt)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'count': len(parsed_list),
|
||
'products': parsed_list,
|
||
'raw_text': text,
|
||
'images': images
|
||
})
|
||
|
||
# ============ 智能添加API ============
|
||
|
||
# ============ 智能补充参数API ============
|
||
|
||
@app.route('/api/models/<model_id>/smart-update', methods=['POST'])
|
||
def api_smart_update_model(model_id):
|
||
"""智能补充模型参数(只填充缺失字段)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
# 获取现有数据
|
||
models = load_data(MODELS_FILE)
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
# 解析新参数
|
||
parsed_list = parse_with_llm(text, 'model', images)
|
||
if not parsed_list:
|
||
return jsonify({'error': '解析失败'}), 500
|
||
|
||
parsed = parsed_list[0] # 补充只取第一个
|
||
|
||
# 只填充缺失或为空的字段
|
||
updated_fields = []
|
||
for key, value in parsed.items():
|
||
if value is not None and value != '' and value != 0:
|
||
existing = model.get(key)
|
||
if existing is None or existing == '' or existing == 0:
|
||
model[key] = value
|
||
updated_fields.append(key)
|
||
|
||
model['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 追加解析来源记录
|
||
parse_source = {
|
||
'type': 'smart_update',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else '',
|
||
'updated_fields': updated_fields
|
||
}
|
||
if 'parse_sources' not in model:
|
||
model['parse_sources'] = []
|
||
model['parse_sources'].append(parse_source)
|
||
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True, 'updated_fields': updated_fields, 'model': model})
|
||
|
||
@app.route('/api/gpus/<gpu_id>/smart-update', methods=['POST'])
|
||
def api_smart_update_gpu(gpu_id):
|
||
"""智能补充GPU参数(只填充缺失字段)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
gpus = load_data(GPUS_FILE)
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
parsed_list = parse_with_llm(text, 'gpu', images)
|
||
if not parsed_list:
|
||
return jsonify({'error': '解析失败'}), 500
|
||
|
||
parsed = parsed_list[0]
|
||
|
||
updated_fields = []
|
||
for key, value in parsed.items():
|
||
if value is not None and value != '' and value != 0:
|
||
existing = gpu.get(key)
|
||
if existing is None or existing == '' or existing == 0:
|
||
gpu[key] = value
|
||
updated_fields.append(key)
|
||
|
||
gpu['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
parse_source = {
|
||
'type': 'smart_update',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else '',
|
||
'updated_fields': updated_fields
|
||
}
|
||
if 'parse_sources' not in gpu:
|
||
gpu['parse_sources'] = []
|
||
gpu['parse_sources'].append(parse_source)
|
||
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True, 'updated_fields': updated_fields, 'gpu': gpu})
|
||
|
||
@app.route('/api/cpus/<cpu_id>/smart-update', methods=['POST'])
|
||
def api_smart_update_cpu(cpu_id):
|
||
"""智能补充CPU参数(只填充缺失字段)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
cpus = load_data(CPUS_FILE)
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
parsed_list = parse_with_llm(text, 'cpu', images)
|
||
if not parsed_list:
|
||
return jsonify({'error': '解析失败'}), 500
|
||
|
||
parsed = parsed_list[0]
|
||
|
||
updated_fields = []
|
||
for key, value in parsed.items():
|
||
if value is not None and value != '' and value != 0:
|
||
existing = cpu.get(key)
|
||
if existing is None or existing == '' or existing == 0:
|
||
cpu[key] = value
|
||
updated_fields.append(key)
|
||
|
||
cpu['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
parse_source = {
|
||
'type': 'smart_update',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else '',
|
||
'updated_fields': updated_fields
|
||
}
|
||
if 'parse_sources' not in cpu:
|
||
cpu['parse_sources'] = []
|
||
cpu['parse_sources'].append(parse_source)
|
||
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True, 'updated_fields': updated_fields, 'cpu': cpu})
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>/smart-update', methods=['POST'])
|
||
def api_smart_update_item(category_id, item_id):
|
||
"""智能补充动态分类数据参数(只填充缺失字段)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
parsed_list = parse_with_llm(text, 'dynamic', images)
|
||
if not parsed_list:
|
||
return jsonify({'error': '解析失败'}), 500
|
||
|
||
parsed = parsed_list[0]
|
||
|
||
updated_fields = []
|
||
for key, value in parsed.items():
|
||
if value is not None and value != '' and value != 0:
|
||
existing = item.get(key)
|
||
if existing is None or existing == '' or existing == 0:
|
||
item[key] = value
|
||
updated_fields.append(key)
|
||
|
||
item['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
parse_source = {
|
||
'type': 'smart_update',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else '',
|
||
'updated_fields': updated_fields
|
||
}
|
||
if 'parse_sources' not in item:
|
||
item['parse_sources'] = []
|
||
item['parse_sources'].append(parse_source)
|
||
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True, 'updated_fields': updated_fields, 'item': item})
|
||
|
||
@app.route('/api/models/smart-add', methods=['POST'])
|
||
def api_smart_add_model():
|
||
"""智能添加模型(支持文本和多图解析,可能添加多个产品)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
subcategory_id = data.get('subcategory_id', '') # 子类别ID
|
||
custom_prompt = data.get('custom_prompt', '') # 自定义 prompt
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
# 大模型解析(根据类别字段配置,支持自定义 prompt)
|
||
parsed_list = parse_with_llm(text, 'model', images, category_id='ai-models', subcategory_id=subcategory_id, custom_prompt=custom_prompt)
|
||
|
||
# 处理多个产品
|
||
results = []
|
||
models = load_data(MODELS_FILE)
|
||
|
||
# 构建解析来源记录
|
||
parse_source = {
|
||
'type': 'smart_add',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else '' # 截取前500字符
|
||
}
|
||
|
||
for parsed in parsed_list:
|
||
# 补充必要字段
|
||
parsed['id'] = uuid.uuid4().hex[:12]
|
||
parsed['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
parsed['visible'] = True
|
||
parsed['subcategory_id'] = subcategory_id # 保存子类别
|
||
parsed['publish_date'] = parsed.get('publish_date', '')
|
||
parsed['views'] = 0
|
||
parsed['is_pinned'] = False
|
||
parsed['product_images'] = [] # 产品展示图(不同于参数截图)
|
||
parsed['parse_sources'] = [parse_source] # 解析来源历史
|
||
|
||
models.append(parsed)
|
||
results.append(parsed)
|
||
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True, 'count': len(results), 'products': results})
|
||
|
||
@app.route('/api/gpus/smart-add', methods=['POST'])
|
||
def api_smart_add_gpu():
|
||
"""智能添加GPU(支持文本和多图解析)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
subcategory_id = data.get('subcategory_id', '')
|
||
custom_prompt = data.get('custom_prompt', '') # 自定义 prompt
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
parsed_list = parse_with_llm(text, 'gpu', images, category_id='gpus', subcategory_id=subcategory_id, custom_prompt=custom_prompt)
|
||
|
||
results = []
|
||
gpus = load_data(GPUS_FILE)
|
||
|
||
parse_source = {
|
||
'type': 'smart_add',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else ''
|
||
}
|
||
|
||
for parsed in parsed_list:
|
||
parsed['id'] = uuid.uuid4().hex[:12]
|
||
parsed['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
parsed['visible'] = True
|
||
parsed['subcategory_id'] = subcategory_id
|
||
parsed['publish_date'] = parsed.get('publish_date', '')
|
||
parsed['views'] = 0
|
||
parsed['is_pinned'] = False
|
||
parsed['product_images'] = []
|
||
parsed['parse_sources'] = [parse_source]
|
||
|
||
gpus.append(parsed)
|
||
results.append(parsed)
|
||
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True, 'count': len(results), 'products': results})
|
||
|
||
@app.route('/api/cpus/smart-add', methods=['POST'])
|
||
def api_smart_add_cpu():
|
||
"""智能添加CPU(支持文本和多图解析)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
subcategory_id = data.get('subcategory_id', '')
|
||
custom_prompt = data.get('custom_prompt', '') # 自定义 prompt
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
parsed_list = parse_with_llm(text, 'cpu', images, category_id='cpus', subcategory_id=subcategory_id, custom_prompt=custom_prompt)
|
||
|
||
results = []
|
||
cpus = load_data(CPUS_FILE)
|
||
|
||
parse_source = {
|
||
'type': 'smart_add',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else ''
|
||
}
|
||
|
||
for parsed in parsed_list:
|
||
parsed['id'] = uuid.uuid4().hex[:12]
|
||
parsed['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
parsed['visible'] = True
|
||
parsed['subcategory_id'] = subcategory_id
|
||
parsed['publish_date'] = parsed.get('publish_date', '')
|
||
parsed['views'] = 0
|
||
parsed['is_pinned'] = False
|
||
parsed['product_images'] = []
|
||
parsed['parse_sources'] = [parse_source]
|
||
|
||
cpus.append(parsed)
|
||
results.append(parsed)
|
||
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True, 'count': len(results), 'products': results})
|
||
|
||
@app.route('/api/items/<category_id>/smart-add', methods=['POST'])
|
||
def api_smart_add_item(category_id):
|
||
"""智能添加动态分类数据(支持文本和多图解析)"""
|
||
data = request.get_json()
|
||
text = data.get('text', '')
|
||
images = data.get('images', [])
|
||
subcategory_id = data.get('subcategory_id', '')
|
||
custom_prompt = data.get('custom_prompt', '') # 自定义 prompt
|
||
|
||
if not text and not images:
|
||
return jsonify({'error': '文本或图片不能都为空'}), 400
|
||
|
||
# 使用类别配置的字段解析,支持自定义 prompt
|
||
parsed_list = parse_with_llm(text, 'dynamic', images, category_id=category_id, subcategory_id=subcategory_id, custom_prompt=custom_prompt)
|
||
|
||
results = []
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
|
||
parse_source = {
|
||
'type': 'smart_add',
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'images': images,
|
||
'text': text[:500] if text else ''
|
||
}
|
||
|
||
for parsed in parsed_list:
|
||
parsed['id'] = uuid.uuid4().hex[:12]
|
||
parsed['category_id'] = category_id
|
||
parsed['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
parsed['visible'] = True
|
||
parsed['subcategory_id'] = subcategory_id
|
||
parsed['publish_date'] = parsed.get('publish_date', '')
|
||
parsed['views'] = 0
|
||
parsed['is_pinned'] = False
|
||
parsed['product_images'] = []
|
||
parsed['parse_sources'] = [parse_source]
|
||
|
||
items.append(parsed)
|
||
results.append(parsed)
|
||
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True, 'count': len(results), 'products': results})
|
||
|
||
# ============ GPU API ============
|
||
|
||
@app.route('/api/gpus')
|
||
def api_gpus():
|
||
"""获取GPU列表"""
|
||
gpus = load_data(GPUS_FILE)
|
||
|
||
# 过滤隐藏项
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
gpus = [g for g in gpus if g.get('visible', True)]
|
||
|
||
keyword = request.args.get('q', '').strip().lower()
|
||
if keyword:
|
||
gpus = [g for g in gpus if keyword in g.get('name', '').lower() or
|
||
keyword in g.get('manufacturer', '').lower()]
|
||
|
||
# 排序
|
||
sort_by = request.args.get('sort', 'default')
|
||
reverse = request.args.get('order', 'desc') == 'desc'
|
||
|
||
def safe_sort_key(x, key):
|
||
val = x.get(key)
|
||
if val is None:
|
||
if key in ['memory_gb', 'cuda_cores', 'tensor_cores', 'views']:
|
||
return 0
|
||
elif key in ['publish_date', 'created_at', 'updated_at']:
|
||
return ''
|
||
return ''
|
||
return val
|
||
|
||
if sort_by == 'default':
|
||
def default_sort_key(x):
|
||
is_pinned = x.get('is_pinned', False)
|
||
publish_date = x.get('publish_date', '')
|
||
created_at = x.get('created_at', '')
|
||
return (not is_pinned, -(parse_date_to_timestamp(publish_date) or parse_date_to_timestamp(created_at) or 0))
|
||
gpus = sorted(gpus, key=default_sort_key)
|
||
elif sort_by in ['name', 'memory_gb', 'price_usd', 'created_at', 'publish_date', 'views', 'updated_at', 'release_year']:
|
||
gpus = sorted(gpus, key=lambda x: safe_sort_key(x, sort_by), reverse=reverse)
|
||
|
||
return jsonify(gpus)
|
||
|
||
@app.route('/api/gpus/<gpu_id>')
|
||
def api_gpu_detail(gpu_id):
|
||
"""获取单个GPU详情"""
|
||
gpus = load_data(GPUS_FILE)
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
return jsonify(gpu)
|
||
|
||
@app.route('/api/gpus', methods=['POST'])
|
||
def api_create_gpu():
|
||
"""创建新GPU"""
|
||
data = request.get_json()
|
||
gpus = load_data(GPUS_FILE)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True)
|
||
data['publish_date'] = data.get('publish_date', '')
|
||
data['views'] = data.get('views', 0)
|
||
data['is_pinned'] = data.get('is_pinned', False)
|
||
|
||
gpus.append(data)
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/gpus/<gpu_id>', methods=['PUT'])
|
||
def api_update_gpu(gpu_id):
|
||
"""更新GPU"""
|
||
data = request.get_json()
|
||
gpus = load_data(GPUS_FILE)
|
||
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
gpu.update(data)
|
||
gpu['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify(gpu)
|
||
|
||
@app.route('/api/gpus/<gpu_id>', methods=['DELETE'])
|
||
def api_delete_gpu(gpu_id):
|
||
"""删除GPU"""
|
||
gpus = load_data(GPUS_FILE)
|
||
gpus = [g for g in gpus if g['id'] != gpu_id]
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/gpus/<gpu_id>/visible', methods=['POST'])
|
||
def api_toggle_gpu_visible(gpu_id):
|
||
"""切换GPU显示状态"""
|
||
gpus = load_data(GPUS_FILE)
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
gpu['visible'] = not gpu.get('visible', True)
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True, 'visible': gpu['visible']})
|
||
|
||
# ============ CPU API ============
|
||
|
||
@app.route('/api/cpus')
|
||
def api_cpus():
|
||
"""获取CPU列表"""
|
||
cpus = load_data(CPUS_FILE)
|
||
|
||
# 过滤隐藏项
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
cpus = [c for c in cpus if c.get('visible', True)]
|
||
|
||
keyword = request.args.get('q', '').strip().lower()
|
||
if keyword:
|
||
cpus = [c for c in cpus if keyword in c.get('name', '').lower() or
|
||
keyword in c.get('manufacturer', '').lower()]
|
||
|
||
# 排序
|
||
sort_by = request.args.get('sort', 'default')
|
||
reverse = request.args.get('order', 'desc') == 'desc'
|
||
|
||
def safe_sort_key(x, key):
|
||
val = x.get(key)
|
||
if val is None:
|
||
if key in ['cores', 'threads', 'views']:
|
||
return 0
|
||
elif key in ['publish_date', 'created_at', 'updated_at']:
|
||
return ''
|
||
return ''
|
||
return val
|
||
|
||
if sort_by == 'default':
|
||
def default_sort_key(x):
|
||
is_pinned = x.get('is_pinned', False)
|
||
publish_date = x.get('publish_date', '')
|
||
created_at = x.get('created_at', '')
|
||
return (not is_pinned, -(parse_date_to_timestamp(publish_date) or parse_date_to_timestamp(created_at) or 0))
|
||
cpus = sorted(cpus, key=default_sort_key)
|
||
elif sort_by in ['name', 'cores', 'threads', 'price_usd', 'created_at', 'publish_date', 'views', 'updated_at']:
|
||
cpus = sorted(cpus, key=lambda x: safe_sort_key(x, sort_by), reverse=reverse)
|
||
|
||
return jsonify(cpus)
|
||
|
||
@app.route('/api/cpus/<cpu_id>')
|
||
def api_cpu_detail(cpu_id):
|
||
"""获取单个CPU详情"""
|
||
cpus = load_data(CPUS_FILE)
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
return jsonify(cpu)
|
||
|
||
@app.route('/api/cpus', methods=['POST'])
|
||
def api_create_cpu():
|
||
"""创建新CPU"""
|
||
data = request.get_json()
|
||
cpus = load_data(CPUS_FILE)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True)
|
||
data['publish_date'] = data.get('publish_date', '')
|
||
data['views'] = data.get('views', 0)
|
||
data['is_pinned'] = data.get('is_pinned', False)
|
||
|
||
cpus.append(data)
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/cpus/<cpu_id>', methods=['PUT'])
|
||
def api_update_cpu(cpu_id):
|
||
"""更新CPU"""
|
||
data = request.get_json()
|
||
cpus = load_data(CPUS_FILE)
|
||
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
cpu.update(data)
|
||
cpu['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify(cpu)
|
||
|
||
@app.route('/api/cpus/<cpu_id>', methods=['DELETE'])
|
||
def api_delete_cpu(cpu_id):
|
||
"""删除CPU"""
|
||
cpus = load_data(CPUS_FILE)
|
||
cpus = [c for c in cpus if c['id'] != cpu_id]
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/cpus/<cpu_id>/visible', methods=['POST'])
|
||
def api_toggle_cpu_visible(cpu_id):
|
||
"""切换CPU显示状态"""
|
||
cpus = load_data(CPUS_FILE)
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
cpu['visible'] = not cpu.get('visible', True)
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True, 'visible': cpu['visible']})
|
||
|
||
# ============ 搜索和其他API ============
|
||
|
||
@app.route('/api/search')
|
||
def api_search():
|
||
"""全局搜索"""
|
||
keyword = request.args.get('q', '').strip().lower()
|
||
|
||
if not keyword:
|
||
return jsonify({'models': [], 'gpus': [], 'cpus': []})
|
||
|
||
models = load_data(MODELS_FILE)
|
||
gpus = load_data(GPUS_FILE)
|
||
cpus = load_data(CPUS_FILE)
|
||
|
||
result = {
|
||
'models': [m for m in models if m.get('visible', True) and (keyword in m.get('name', '').lower() or keyword in m.get('organization', '').lower())],
|
||
'gpus': [g for g in gpus if g.get('visible', True) and (keyword in g.get('name', '').lower() or keyword in g.get('manufacturer', '').lower())],
|
||
'cpus': [c for c in cpus if c.get('visible', True) and (keyword in c.get('name', '').lower() or keyword in c.get('manufacturer', '').lower())]
|
||
}
|
||
|
||
return jsonify(result)
|
||
|
||
@app.route('/api/calculate/vram')
|
||
def api_calculate_vram():
|
||
"""显存计算"""
|
||
params = request.args.get('params', '7', type=float)
|
||
precision = request.args.get('precision', 'fp16', type=str)
|
||
|
||
bytes_per_param = {'fp32': 4, 'fp16': 2, 'int8': 1, 'int4': 0.5}
|
||
multiplier = bytes_per_param.get(precision, 2)
|
||
vram_gb = params * multiplier * 1e9 / (1024**3)
|
||
total_vram = vram_gb * 1.3
|
||
|
||
gpus = load_data(GPUS_FILE)
|
||
suitable_gpus = [g for g in gpus if g.get('visible', True) and g.get('memory_gb', 0) >= total_vram]
|
||
|
||
return jsonify({
|
||
'model_vram': round(vram_gb, 2),
|
||
'total_vram': round(total_vram, 2),
|
||
'suitable_gpus': suitable_gpus
|
||
})
|
||
|
||
@app.route('/api/stats')
|
||
def api_stats():
|
||
"""统计数据"""
|
||
models = load_data(MODELS_FILE)
|
||
gpus = load_data(GPUS_FILE)
|
||
cpus = load_data(CPUS_FILE)
|
||
categories = load_data(CATEGORIES_FILE)
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
|
||
return jsonify({
|
||
'models_count': len([m for m in models if m.get('visible', True)]),
|
||
'gpus_count': len([g for g in gpus if g.get('visible', True)]),
|
||
'cpus_count': len([c for c in cpus if c.get('visible', True)]),
|
||
'categories_count': len([c for c in categories if c.get('visible', True)]),
|
||
'knowledge_count': len(knowledge),
|
||
'latest_models': sorted([m for m in models if m.get('visible', True)], key=lambda x: x.get('created_at', ''), reverse=True)[:5]
|
||
})
|
||
|
||
# ============ 分类管理API ============
|
||
|
||
@app.route('/api/categories')
|
||
def api_categories():
|
||
"""获取分类列表"""
|
||
categories = load_data(CATEGORIES_FILE)
|
||
|
||
# 过滤隐藏项
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
categories = [c for c in categories if c.get('visible', True)]
|
||
|
||
return jsonify(sorted(categories, key=lambda x: x.get('order', 0)))
|
||
|
||
@app.route('/api/categories/<category_id>')
|
||
def api_category_detail(category_id):
|
||
"""获取单个分类详情"""
|
||
categories = load_data(CATEGORIES_FILE)
|
||
category = next((c for c in categories if c['id'] == category_id), None)
|
||
|
||
if not category:
|
||
return jsonify({'error': 'Category not found'}), 404
|
||
|
||
return jsonify(category)
|
||
|
||
@app.route('/api/categories', methods=['POST'])
|
||
def api_create_category():
|
||
"""创建新分类"""
|
||
data = request.get_json()
|
||
categories = load_data(CATEGORIES_FILE)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True)
|
||
|
||
categories.append(data)
|
||
save_data(CATEGORIES_FILE, categories)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/categories/<category_id>', methods=['PUT'])
|
||
def api_update_category(category_id):
|
||
"""更新分类"""
|
||
data = request.get_json()
|
||
categories = load_data(CATEGORIES_FILE)
|
||
|
||
category = next((c for c in categories if c['id'] == category_id), None)
|
||
if not category:
|
||
return jsonify({'error': 'Category not found'}), 404
|
||
|
||
category.update(data)
|
||
category['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(CATEGORIES_FILE, categories)
|
||
|
||
return jsonify(category)
|
||
|
||
@app.route('/api/categories/<category_id>', methods=['DELETE'])
|
||
def api_delete_category(category_id):
|
||
"""删除分类"""
|
||
categories = load_data(CATEGORIES_FILE)
|
||
categories = [c for c in categories if c['id'] != category_id]
|
||
save_data(CATEGORIES_FILE, categories)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/categories/<category_id>/visible', methods=['POST'])
|
||
def api_toggle_category_visible(category_id):
|
||
"""切换分类显示状态"""
|
||
categories = load_data(CATEGORIES_FILE)
|
||
category = next((c for c in categories if c['id'] == category_id), None)
|
||
if not category:
|
||
return jsonify({'error': 'Category not found'}), 404
|
||
|
||
category['visible'] = not category.get('visible', True)
|
||
save_data(CATEGORIES_FILE, categories)
|
||
|
||
return jsonify({'success': True, 'visible': category['visible']})
|
||
|
||
# ============ 知识库管理API ============
|
||
|
||
@app.route('/api/knowledge')
|
||
def api_knowledge():
|
||
"""获取知识列表"""
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
|
||
# 过滤隐藏项(前台默认隐藏)
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
knowledge = [k for k in knowledge if k.get('visible', True)]
|
||
|
||
keyword = request.args.get('q', '').strip().lower()
|
||
if keyword:
|
||
knowledge = [k for k in knowledge if keyword in k.get('title', '').lower() or keyword in k.get('content', '').lower()]
|
||
|
||
category = request.args.get('category', '')
|
||
if category:
|
||
knowledge = [k for k in knowledge if k.get('category') == category]
|
||
|
||
return jsonify(sorted(knowledge, key=lambda x: x.get('order', 0)))
|
||
|
||
@app.route('/api/knowledge/<knowledge_id>')
|
||
def api_knowledge_detail(knowledge_id):
|
||
"""获取单个知识详情"""
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
item = next((k for k in knowledge if k['id'] == knowledge_id), None)
|
||
|
||
if not item:
|
||
return jsonify({'error': 'Knowledge not found'}), 404
|
||
|
||
return jsonify(item)
|
||
|
||
@app.route('/api/knowledge', methods=['POST'])
|
||
def api_create_knowledge():
|
||
"""创建新知识"""
|
||
data = request.get_json()
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True) # 默认显示
|
||
if 'order' not in data:
|
||
data['order'] = len(knowledge)
|
||
|
||
knowledge.append(data)
|
||
save_data(KNOWLEDGE_FILE, knowledge)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/knowledge/<knowledge_id>', methods=['PUT'])
|
||
def api_update_knowledge(knowledge_id):
|
||
"""更新知识"""
|
||
data = request.get_json()
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
|
||
item = next((k for k in knowledge if k['id'] == knowledge_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Knowledge not found'}), 404
|
||
|
||
item.update(data)
|
||
item['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(KNOWLEDGE_FILE, knowledge)
|
||
|
||
return jsonify(item)
|
||
|
||
@app.route('/api/knowledge/<knowledge_id>', methods=['DELETE'])
|
||
def api_delete_knowledge(knowledge_id):
|
||
"""删除知识"""
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
knowledge = [k for k in knowledge if k['id'] != knowledge_id]
|
||
save_data(KNOWLEDGE_FILE, knowledge)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/knowledge/<knowledge_id>/visible', methods=['POST'])
|
||
def api_toggle_knowledge_visible(knowledge_id):
|
||
"""切换知识显示状态"""
|
||
knowledge = load_data(KNOWLEDGE_FILE)
|
||
item = next((k for k in knowledge if k['id'] == knowledge_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Knowledge not found'}), 404
|
||
|
||
item['visible'] = not item.get('visible', True)
|
||
save_data(KNOWLEDGE_FILE, knowledge)
|
||
|
||
return jsonify({'success': True, 'visible': item['visible']})
|
||
|
||
# ============ 动态分类数据API ============
|
||
|
||
@app.route('/api/items/<category_id>')
|
||
def api_items(category_id):
|
||
"""获取分类下的数据列表"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
|
||
# 过滤隐藏项
|
||
hide_hidden = request.args.get('all', '0') == '0'
|
||
if hide_hidden:
|
||
items = [i for i in items if i.get('visible', True)]
|
||
|
||
# 排序
|
||
sort_by = request.args.get('sort', 'default')
|
||
reverse = request.args.get('order', 'desc') == 'desc'
|
||
|
||
def safe_sort_key(x, key):
|
||
val = x.get(key)
|
||
if val is None:
|
||
if key in ['price', 'views', 'year']:
|
||
return 0
|
||
elif key in ['publish_date', 'created_at', 'updated_at']:
|
||
return ''
|
||
return ''
|
||
return val
|
||
|
||
if sort_by == 'default':
|
||
def default_sort_key(x):
|
||
is_pinned = x.get('is_pinned', False)
|
||
publish_date = x.get('publish_date', '')
|
||
created_at = x.get('created_at', '')
|
||
return (not is_pinned, -(parse_date_to_timestamp(publish_date) or parse_date_to_timestamp(created_at) or 0))
|
||
items = sorted(items, key=default_sort_key)
|
||
elif sort_by in ['name', 'price', 'year', 'created_at', 'publish_date', 'views', 'updated_at']:
|
||
items = sorted(items, key=lambda x: safe_sort_key(x, sort_by), reverse=reverse)
|
||
|
||
return jsonify(items)
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>')
|
||
def api_item_detail(category_id, item_id):
|
||
"""获取单个数据详情"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
return jsonify(item)
|
||
|
||
@app.route('/api/items/<category_id>', methods=['POST'])
|
||
def api_create_item(category_id):
|
||
"""创建新数据"""
|
||
data = request.get_json()
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
|
||
data['id'] = uuid.uuid4().hex[:12]
|
||
data['category_id'] = category_id
|
||
data['created_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
data['visible'] = data.get('visible', True)
|
||
data['publish_date'] = data.get('publish_date', '')
|
||
data['views'] = data.get('views', 0)
|
||
data['is_pinned'] = data.get('is_pinned', False)
|
||
|
||
items.append(data)
|
||
save_data(items_file, items)
|
||
|
||
return jsonify(data)
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>', methods=['PUT'])
|
||
def api_update_item(category_id, item_id):
|
||
"""更新数据"""
|
||
data = request.get_json()
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
item.update(data)
|
||
item['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_data(items_file, items)
|
||
|
||
return jsonify(item)
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>', methods=['DELETE'])
|
||
def api_delete_item(category_id, item_id):
|
||
"""删除数据"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
items = [i for i in items if i['id'] != item_id]
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>/visible', methods=['POST'])
|
||
def api_toggle_item_visible(category_id, item_id):
|
||
"""切换动态数据显示状态"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
item['visible'] = not item.get('visible', True)
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True, 'visible': item['visible']})
|
||
|
||
# ============ 置顶和热度API ============
|
||
|
||
@app.route('/api/models/<model_id>/pin', methods=['POST'])
|
||
def api_toggle_model_pin(model_id):
|
||
"""切换模型置顶状态"""
|
||
models = load_data(MODELS_FILE)
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
model['is_pinned'] = not model.get('is_pinned', False)
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True, 'is_pinned': model['is_pinned']})
|
||
|
||
@app.route('/api/models/<model_id>/view', methods=['POST'])
|
||
def api_model_view(model_id):
|
||
"""增加模型阅读数"""
|
||
models = load_data(MODELS_FILE)
|
||
model = next((m for m in models if m['id'] == model_id), None)
|
||
if not model:
|
||
return jsonify({'error': 'Model not found'}), 404
|
||
|
||
model['views'] = model.get('views', 0) + 1
|
||
save_data(MODELS_FILE, models)
|
||
|
||
return jsonify({'success': True, 'views': model['views']})
|
||
|
||
@app.route('/api/gpus/<gpu_id>/pin', methods=['POST'])
|
||
def api_toggle_gpu_pin(gpu_id):
|
||
"""切换GPU置顶状态"""
|
||
gpus = load_data(GPUS_FILE)
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
gpu['is_pinned'] = not gpu.get('is_pinned', False)
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True, 'is_pinned': gpu['is_pinned']})
|
||
|
||
@app.route('/api/gpus/<gpu_id>/view', methods=['POST'])
|
||
def api_gpu_view(gpu_id):
|
||
"""增加GPU阅读数"""
|
||
gpus = load_data(GPUS_FILE)
|
||
gpu = next((g for g in gpus if g['id'] == gpu_id), None)
|
||
if not gpu:
|
||
return jsonify({'error': 'GPU not found'}), 404
|
||
|
||
gpu['views'] = gpu.get('views', 0) + 1
|
||
save_data(GPUS_FILE, gpus)
|
||
|
||
return jsonify({'success': True, 'views': gpu['views']})
|
||
|
||
@app.route('/api/cpus/<cpu_id>/pin', methods=['POST'])
|
||
def api_toggle_cpu_pin(cpu_id):
|
||
"""切换CPU置顶状态"""
|
||
cpus = load_data(CPUS_FILE)
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
cpu['is_pinned'] = not cpu.get('is_pinned', False)
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True, 'is_pinned': cpu['is_pinned']})
|
||
|
||
@app.route('/api/cpus/<cpu_id>/view', methods=['POST'])
|
||
def api_cpu_view(cpu_id):
|
||
"""增加CPU阅读数"""
|
||
cpus = load_data(CPUS_FILE)
|
||
cpu = next((c for c in cpus if c['id'] == cpu_id), None)
|
||
if not cpu:
|
||
return jsonify({'error': 'CPU not found'}), 404
|
||
|
||
cpu['views'] = cpu.get('views', 0) + 1
|
||
save_data(CPUS_FILE, cpus)
|
||
|
||
return jsonify({'success': True, 'views': cpu['views']})
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>/pin', methods=['POST'])
|
||
def api_toggle_item_pin(category_id, item_id):
|
||
"""切换动态数据置顶状态"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
item['is_pinned'] = not item.get('is_pinned', False)
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True, 'is_pinned': item['is_pinned']})
|
||
|
||
@app.route('/api/items/<category_id>/<item_id>/view', methods=['POST'])
|
||
def api_item_view(category_id, item_id):
|
||
"""增加动态数据阅读数"""
|
||
items_file = DATA_DIR / f'items_{category_id}.json'
|
||
items = load_data(items_file)
|
||
item = next((i for i in items if i['id'] == item_id), None)
|
||
if not item:
|
||
return jsonify({'error': 'Item not found'}), 404
|
||
|
||
item['views'] = item.get('views', 0) + 1
|
||
save_data(items_file, items)
|
||
|
||
return jsonify({'success': True, 'views': item['views']})
|
||
|
||
# ============ 网站配置API ============
|
||
|
||
@app.route('/api/config')
|
||
def api_get_config():
|
||
"""获取网站配置"""
|
||
return jsonify(load_config())
|
||
|
||
@app.route('/api/config', methods=['PUT'])
|
||
def api_update_config():
|
||
"""更新网站配置"""
|
||
data = request.get_json()
|
||
config = load_config()
|
||
config.update(data)
|
||
config['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
save_config(config)
|
||
return jsonify(config)
|
||
|
||
# ============ 图片上传API ============
|
||
|
||
@app.route('/api/upload/image', methods=['POST'])
|
||
def api_upload_image():
|
||
"""上传图片"""
|
||
if 'file' not in request.files:
|
||
return jsonify({'error': '没有文件'}), 400
|
||
|
||
file = request.files['file']
|
||
if file.filename == '':
|
||
return jsonify({'error': '没有选择文件'}), 400
|
||
|
||
if not allowed_file(file.filename):
|
||
return jsonify({'error': '不允许的文件格式'}), 400
|
||
|
||
# 生成唯一文件名
|
||
ext = file.filename.rsplit('.', 1)[1].lower()
|
||
filename = f"{uuid.uuid4().hex[:12]}_{int(time.time())}.{ext}"
|
||
|
||
# 保存文件
|
||
filepath = IMAGES_DIR / filename
|
||
file.save(filepath)
|
||
|
||
# 返回图片URL
|
||
return jsonify({
|
||
'success': True,
|
||
'filename': filename,
|
||
'url': f'/static/uploads/{filename}'
|
||
})
|
||
|
||
@app.route('/api/upload/image/base64', methods=['POST'])
|
||
def api_upload_image_base64():
|
||
"""上传Base64图片"""
|
||
data = request.get_json()
|
||
image_data = data.get('image', '')
|
||
|
||
if not image_data:
|
||
return jsonify({'error': '没有图片数据'}), 400
|
||
|
||
# 解析Base64数据
|
||
try:
|
||
# 移除data:image/xxx;base64,前缀
|
||
if 'base64,' in image_data:
|
||
image_data = image_data.split('base64,')[1]
|
||
|
||
# 生成文件名
|
||
ext = data.get('ext', 'png')
|
||
filename = f"{uuid.uuid4().hex[:12]}_{int(time.time())}.{ext}"
|
||
|
||
# 保存文件
|
||
filepath = IMAGES_DIR / filename
|
||
with open(filepath, 'wb') as f:
|
||
f.write(base64.b64decode(image_data))
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'filename': filename,
|
||
'url': f'/static/uploads/{filename}'
|
||
})
|
||
except Exception as e:
|
||
return jsonify({'error': str(e)}), 400
|
||
|
||
@app.route('/api/upload/image/delete/<filename>', methods=['DELETE'])
|
||
def api_delete_image(filename):
|
||
"""删除图片"""
|
||
filepath = IMAGES_DIR / filename
|
||
if filepath.exists():
|
||
filepath.unlink()
|
||
return jsonify({'success': True})
|
||
return jsonify({'error': '文件不存在'}), 404
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 50)
|
||
print("ParamHub - 参数百科 v1.7.1")
|
||
print("=" * 50)
|
||
print(f"访问地址: http://localhost:19010")
|
||
print(f"后台管理: http://localhost:19010/admin")
|
||
print("=" * 50)
|
||
|
||
app.run(host='0.0.0.0', port=19010, debug=True) |