docs: 更新PROJECTS.md添加技术论坛项目记录
This commit is contained in:
362
works/mailbox/mailbox_manager.py
Normal file
362
works/mailbox/mailbox_manager.py
Normal file
@@ -0,0 +1,362 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
邮件收件箱管理工具
|
||||
管理已读邮件内容和处理状态
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# 路径配置
|
||||
SCRIPT_DIR = Path(__file__).parent
|
||||
MAILBOX_DIR = SCRIPT_DIR
|
||||
INBOX_FILE = MAILBOX_DIR / "inbox.json"
|
||||
EMAILS_DIR = MAILBOX_DIR / "emails"
|
||||
ARCHIVE_DIR = MAILBOX_DIR / "archive"
|
||||
|
||||
# 状态定义
|
||||
STATUS_UNREAD = "unread"
|
||||
STATUS_READ = "read"
|
||||
STATUS_PENDING = "pending" # 待处理
|
||||
STATUS_PROCESSING = "processing" # 处理中
|
||||
STATUS_DONE = "done" # 已完成
|
||||
STATUS_ARCHIVED = "archived" # 已归档
|
||||
|
||||
|
||||
def load_inbox():
|
||||
"""加载收件箱状态"""
|
||||
if not INBOX_FILE.exists():
|
||||
return {"description": "邮件收件箱状态追踪", "last_check": None, "emails": {}}
|
||||
with open(INBOX_FILE, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def save_inbox(data):
|
||||
"""保存收件箱状态"""
|
||||
with open(INBOX_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def add_email(email_id, subject, sender, date, has_attachment=False, body="", status=STATUS_UNREAD):
|
||||
"""添加或更新邮件记录"""
|
||||
inbox = load_inbox()
|
||||
|
||||
email_dir = EMAILS_DIR / str(email_id)
|
||||
email_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 保存邮件内容
|
||||
meta = {
|
||||
"id": email_id,
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"date": date,
|
||||
"has_attachment": has_attachment,
|
||||
"status": status,
|
||||
"created_at": datetime.now().isoformat(),
|
||||
"updated_at": datetime.now().isoformat(),
|
||||
"notes": ""
|
||||
}
|
||||
|
||||
with open(email_dir / "meta.json", "w", encoding="utf-8") as f:
|
||||
json.dump(meta, f, ensure_ascii=False, indent=2)
|
||||
|
||||
if body:
|
||||
with open(email_dir / "body.txt", "w", encoding="utf-8") as f:
|
||||
f.write(body)
|
||||
|
||||
# 更新收件箱索引
|
||||
inbox["emails"][str(email_id)] = {
|
||||
"subject": subject,
|
||||
"sender": sender,
|
||||
"date": date,
|
||||
"status": status,
|
||||
"has_attachment": has_attachment,
|
||||
"path": str(email_dir)
|
||||
}
|
||||
|
||||
save_inbox(inbox)
|
||||
print(f"✅ 邮件 [{email_id}] 已添加到收件箱")
|
||||
|
||||
|
||||
def update_status(email_id, status, notes=None):
|
||||
"""更新邮件状态"""
|
||||
inbox = load_inbox()
|
||||
email_key = str(email_id)
|
||||
|
||||
if email_key not in inbox["emails"]:
|
||||
print(f"❌ 邮件 [{email_id}] 不存在")
|
||||
return False
|
||||
|
||||
# 更新索引
|
||||
inbox["emails"][email_key]["status"] = status
|
||||
|
||||
# 更新邮件详情
|
||||
email_dir = Path(inbox["emails"][email_key]["path"])
|
||||
meta_file = email_dir / "meta.json"
|
||||
|
||||
if meta_file.exists():
|
||||
with open(meta_file, "r", encoding="utf-8") as f:
|
||||
meta = json.load(f)
|
||||
|
||||
meta["status"] = status
|
||||
meta["updated_at"] = datetime.now().isoformat()
|
||||
|
||||
if notes:
|
||||
meta["notes"] = notes
|
||||
|
||||
with open(meta_file, "w", encoding="utf-8") as f:
|
||||
json.dump(meta, f, ensure_ascii=False, indent=2)
|
||||
|
||||
save_inbox(inbox)
|
||||
|
||||
status_emoji = {
|
||||
STATUS_UNREAD: "📬",
|
||||
STATUS_READ: "📖",
|
||||
STATUS_PENDING: "⏳",
|
||||
STATUS_PROCESSING: "🔄",
|
||||
STATUS_DONE: "✅",
|
||||
STATUS_ARCHIVED: "📦"
|
||||
}
|
||||
|
||||
print(f"{status_emoji.get(status, '📧')} 邮件 [{email_id}] 状态更新为: {status}")
|
||||
return True
|
||||
|
||||
|
||||
def list_emails(status_filter=None):
|
||||
"""列出邮件"""
|
||||
inbox = load_inbox()
|
||||
|
||||
if not inbox["emails"]:
|
||||
print("📭 收件箱为空")
|
||||
return
|
||||
|
||||
status_emoji = {
|
||||
STATUS_UNREAD: "📬",
|
||||
STATUS_READ: "📖",
|
||||
STATUS_PENDING: "⏳",
|
||||
STATUS_PROCESSING: "🔄",
|
||||
STATUS_DONE: "✅",
|
||||
STATUS_ARCHIVED: "📦"
|
||||
}
|
||||
|
||||
print("📬 邮件收件箱")
|
||||
print("=" * 60)
|
||||
|
||||
count = 0
|
||||
for email_id, info in sorted(inbox["emails"].items(), key=lambda x: x[0], reverse=True):
|
||||
if status_filter and info["status"] != status_filter:
|
||||
continue
|
||||
|
||||
emoji = status_emoji.get(info["status"], "📧")
|
||||
attach = "📎" if info.get("has_attachment") else ""
|
||||
|
||||
print(f"\n[{email_id}] {emoji} {attach}")
|
||||
print(f" 主题: {info['subject']}")
|
||||
print(f" 发件人: {info['sender']}")
|
||||
print(f" 时间: {info['date']}")
|
||||
print(f" 状态: {info['status']}")
|
||||
count += 1
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print(f"共 {count} 封邮件")
|
||||
|
||||
|
||||
def show_email(email_id):
|
||||
"""显示邮件详情"""
|
||||
inbox = load_inbox()
|
||||
email_key = str(email_id)
|
||||
|
||||
if email_key not in inbox["emails"]:
|
||||
print(f"❌ 邮件 [{email_id}] 不存在")
|
||||
return
|
||||
|
||||
email_dir = Path(inbox["emails"][email_key]["path"])
|
||||
meta_file = email_dir / "meta.json"
|
||||
body_file = email_dir / "body.txt"
|
||||
|
||||
print(f"📧 邮件详情 [{email_id}]")
|
||||
print("=" * 60)
|
||||
|
||||
if meta_file.exists():
|
||||
with open(meta_file, "r", encoding="utf-8") as f:
|
||||
meta = json.load(f)
|
||||
|
||||
print(f"主题: {meta.get('subject', 'N/A')}")
|
||||
print(f"发件人: {meta.get('sender', 'N/A')}")
|
||||
print(f"时间: {meta.get('date', 'N/A')}")
|
||||
print(f"状态: {meta.get('status', 'N/A')}")
|
||||
print(f"更新时间: {meta.get('updated_at', 'N/A')}")
|
||||
|
||||
if meta.get('notes'):
|
||||
print(f"\n备注: {meta['notes']}")
|
||||
|
||||
if body_file.exists():
|
||||
print("\n正文:")
|
||||
print("-" * 40)
|
||||
with open(body_file, "r", encoding="utf-8") as f:
|
||||
print(f.read())
|
||||
|
||||
# 列出附件
|
||||
attach_dir = email_dir / "attachments"
|
||||
if attach_dir.exists() and list(attach_dir.iterdir()):
|
||||
print("\n附件:")
|
||||
print("-" * 40)
|
||||
for f in attach_dir.iterdir():
|
||||
print(f" 📎 {f.name}")
|
||||
|
||||
|
||||
def archive_email(email_id):
|
||||
"""归档邮件"""
|
||||
inbox = load_inbox()
|
||||
email_key = str(email_id)
|
||||
|
||||
if email_key not in inbox["emails"]:
|
||||
print(f"❌ 邮件 [{email_id}] 不存在")
|
||||
return False
|
||||
|
||||
email_dir = Path(inbox["emails"][email_key]["path"])
|
||||
|
||||
# 移动到归档目录
|
||||
archive_path = ARCHIVE_DIR / str(email_id)
|
||||
if email_dir.exists():
|
||||
import shutil
|
||||
shutil.move(str(email_dir), str(archive_path))
|
||||
|
||||
# 更新索引
|
||||
inbox["emails"][email_key]["status"] = STATUS_ARCHIVED
|
||||
inbox["emails"][email_key]["path"] = str(archive_path)
|
||||
save_inbox(inbox)
|
||||
|
||||
print(f"📦 邮件 [{email_id}] 已归档")
|
||||
return True
|
||||
|
||||
|
||||
def sync_from_imap():
|
||||
"""从 IMAP 同步未读邮件"""
|
||||
try:
|
||||
# 导入 receive_email 模块
|
||||
email_scripts_dir = Path(__file__).parent.parent.parent / "skills" / "email" / "scripts"
|
||||
sys.path.insert(0, str(email_scripts_dir))
|
||||
from receive_email import list_unread, read_email
|
||||
|
||||
print("🔄 正在同步邮件...")
|
||||
|
||||
emails = list_unread(limit=20, verbose=False)
|
||||
|
||||
if not emails:
|
||||
print("📭 没有新邮件")
|
||||
return
|
||||
|
||||
inbox = load_inbox()
|
||||
new_count = 0
|
||||
|
||||
for e in emails:
|
||||
email_id = str(e['id'])
|
||||
|
||||
# 跳过已存在的邮件
|
||||
if email_id in inbox["emails"]:
|
||||
continue
|
||||
|
||||
# 读取邮件详情
|
||||
detail = read_email(email_id=email_id, save_attachments=True, verbose=False)
|
||||
|
||||
if detail:
|
||||
add_email(
|
||||
email_id=email_id,
|
||||
subject=detail.get('subject', ''),
|
||||
sender=detail.get('from', ''),
|
||||
date=detail.get('date', ''),
|
||||
has_attachment=len(detail.get('attachments', [])) > 0,
|
||||
body=detail.get('body', ''),
|
||||
status=STATUS_UNREAD
|
||||
)
|
||||
new_count += 1
|
||||
|
||||
# 更新最后检查时间
|
||||
inbox["last_check"] = datetime.now().isoformat()
|
||||
save_inbox(inbox)
|
||||
|
||||
print(f"\n✅ 同步完成,新增 {new_count} 封邮件")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 同步失败: {e}")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("邮件收件箱管理工具")
|
||||
print()
|
||||
print("用法:")
|
||||
print(" python3 mailbox_manager.py sync # 从IMAP同步未读邮件")
|
||||
print(" python3 mailbox_manager.py list [status] # 列出邮件(可按状态筛选)")
|
||||
print(" python3 mailbox_manager.py show <id> # 显示邮件详情")
|
||||
print(" python3 mailbox_manager.py read <id> # 标记为已读")
|
||||
print(" python3 mailbox_manager.py pending <id> # 标记为待处理")
|
||||
print(" python3 mailbox_manager.py processing <id> # 标记为处理中")
|
||||
print(" python3 mailbox_manager.py done <id> [notes] # 标记为已完成")
|
||||
print(" python3 mailbox_manager.py archive <id> # 归档邮件")
|
||||
print()
|
||||
print("状态说明:")
|
||||
print(" unread - 未读")
|
||||
print(" read - 已读")
|
||||
print(" pending - 待处理")
|
||||
print(" processing - 处理中")
|
||||
print(" done - 已完成")
|
||||
print(" archived - 已归档")
|
||||
return
|
||||
|
||||
command = sys.argv[1]
|
||||
|
||||
if command == "sync":
|
||||
sync_from_imap()
|
||||
|
||||
elif command == "list":
|
||||
status = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
list_emails(status)
|
||||
|
||||
elif command == "show":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
show_email(sys.argv[2])
|
||||
|
||||
elif command == "read":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
update_status(sys.argv[2], STATUS_READ)
|
||||
|
||||
elif command == "pending":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
update_status(sys.argv[2], STATUS_PENDING)
|
||||
|
||||
elif command == "processing":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
update_status(sys.argv[2], STATUS_PROCESSING)
|
||||
|
||||
elif command == "done":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
notes = " ".join(sys.argv[3:]) if len(sys.argv) > 3 else None
|
||||
update_status(sys.argv[2], STATUS_DONE, notes)
|
||||
|
||||
elif command == "archive":
|
||||
if len(sys.argv) < 3:
|
||||
print("❌ 请指定邮件ID")
|
||||
return
|
||||
archive_email(sys.argv[2])
|
||||
|
||||
else:
|
||||
print(f"❌ 未知命令: {command}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user