Files
ai-chat-app/works/mailbox/mailbox_manager.py

362 lines
11 KiB
Python

#!/usr/bin/env python3
"""
邮件收件箱管理工具
管理已读邮件内容和处理状态
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
# 路径配置
SCRIPT_DIR = Path(__file__).parent
MAILBOX_DIR = SCRIPT_DIR
INBOX_FILE = MAILBOX_DIR / "inbox.json"
EMAILS_DIR = MAILBOX_DIR / "emails"
ARCHIVE_DIR = MAILBOX_DIR / "archive"
# 状态定义
STATUS_UNREAD = "unread"
STATUS_READ = "read"
STATUS_PENDING = "pending" # 待处理
STATUS_PROCESSING = "processing" # 处理中
STATUS_DONE = "done" # 已完成
STATUS_ARCHIVED = "archived" # 已归档
def load_inbox():
"""加载收件箱状态"""
if not INBOX_FILE.exists():
return {"description": "邮件收件箱状态追踪", "last_check": None, "emails": {}}
with open(INBOX_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def save_inbox(data):
"""保存收件箱状态"""
with open(INBOX_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def add_email(email_id, subject, sender, date, has_attachment=False, body="", status=STATUS_UNREAD):
"""添加或更新邮件记录"""
inbox = load_inbox()
email_dir = EMAILS_DIR / str(email_id)
email_dir.mkdir(parents=True, exist_ok=True)
# 保存邮件内容
meta = {
"id": email_id,
"subject": subject,
"sender": sender,
"date": date,
"has_attachment": has_attachment,
"status": status,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"notes": ""
}
with open(email_dir / "meta.json", "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
if body:
with open(email_dir / "body.txt", "w", encoding="utf-8") as f:
f.write(body)
# 更新收件箱索引
inbox["emails"][str(email_id)] = {
"subject": subject,
"sender": sender,
"date": date,
"status": status,
"has_attachment": has_attachment,
"path": str(email_dir)
}
save_inbox(inbox)
print(f"✅ 邮件 [{email_id}] 已添加到收件箱")
def update_status(email_id, status, notes=None):
"""更新邮件状态"""
inbox = load_inbox()
email_key = str(email_id)
if email_key not in inbox["emails"]:
print(f"❌ 邮件 [{email_id}] 不存在")
return False
# 更新索引
inbox["emails"][email_key]["status"] = status
# 更新邮件详情
email_dir = Path(inbox["emails"][email_key]["path"])
meta_file = email_dir / "meta.json"
if meta_file.exists():
with open(meta_file, "r", encoding="utf-8") as f:
meta = json.load(f)
meta["status"] = status
meta["updated_at"] = datetime.now().isoformat()
if notes:
meta["notes"] = notes
with open(meta_file, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
save_inbox(inbox)
status_emoji = {
STATUS_UNREAD: "📬",
STATUS_READ: "📖",
STATUS_PENDING: "",
STATUS_PROCESSING: "🔄",
STATUS_DONE: "",
STATUS_ARCHIVED: "📦"
}
print(f"{status_emoji.get(status, '📧')} 邮件 [{email_id}] 状态更新为: {status}")
return True
def list_emails(status_filter=None):
"""列出邮件"""
inbox = load_inbox()
if not inbox["emails"]:
print("📭 收件箱为空")
return
status_emoji = {
STATUS_UNREAD: "📬",
STATUS_READ: "📖",
STATUS_PENDING: "",
STATUS_PROCESSING: "🔄",
STATUS_DONE: "",
STATUS_ARCHIVED: "📦"
}
print("📬 邮件收件箱")
print("=" * 60)
count = 0
for email_id, info in sorted(inbox["emails"].items(), key=lambda x: x[0], reverse=True):
if status_filter and info["status"] != status_filter:
continue
emoji = status_emoji.get(info["status"], "📧")
attach = "📎" if info.get("has_attachment") else ""
print(f"\n[{email_id}] {emoji} {attach}")
print(f" 主题: {info['subject']}")
print(f" 发件人: {info['sender']}")
print(f" 时间: {info['date']}")
print(f" 状态: {info['status']}")
count += 1
print("\n" + "=" * 60)
print(f"{count} 封邮件")
def show_email(email_id):
"""显示邮件详情"""
inbox = load_inbox()
email_key = str(email_id)
if email_key not in inbox["emails"]:
print(f"❌ 邮件 [{email_id}] 不存在")
return
email_dir = Path(inbox["emails"][email_key]["path"])
meta_file = email_dir / "meta.json"
body_file = email_dir / "body.txt"
print(f"📧 邮件详情 [{email_id}]")
print("=" * 60)
if meta_file.exists():
with open(meta_file, "r", encoding="utf-8") as f:
meta = json.load(f)
print(f"主题: {meta.get('subject', 'N/A')}")
print(f"发件人: {meta.get('sender', 'N/A')}")
print(f"时间: {meta.get('date', 'N/A')}")
print(f"状态: {meta.get('status', 'N/A')}")
print(f"更新时间: {meta.get('updated_at', 'N/A')}")
if meta.get('notes'):
print(f"\n备注: {meta['notes']}")
if body_file.exists():
print("\n正文:")
print("-" * 40)
with open(body_file, "r", encoding="utf-8") as f:
print(f.read())
# 列出附件
attach_dir = email_dir / "attachments"
if attach_dir.exists() and list(attach_dir.iterdir()):
print("\n附件:")
print("-" * 40)
for f in attach_dir.iterdir():
print(f" 📎 {f.name}")
def archive_email(email_id):
"""归档邮件"""
inbox = load_inbox()
email_key = str(email_id)
if email_key not in inbox["emails"]:
print(f"❌ 邮件 [{email_id}] 不存在")
return False
email_dir = Path(inbox["emails"][email_key]["path"])
# 移动到归档目录
archive_path = ARCHIVE_DIR / str(email_id)
if email_dir.exists():
import shutil
shutil.move(str(email_dir), str(archive_path))
# 更新索引
inbox["emails"][email_key]["status"] = STATUS_ARCHIVED
inbox["emails"][email_key]["path"] = str(archive_path)
save_inbox(inbox)
print(f"📦 邮件 [{email_id}] 已归档")
return True
def sync_from_imap():
"""从 IMAP 同步未读邮件"""
try:
# 导入 receive_email 模块
email_scripts_dir = Path(__file__).parent.parent.parent / "skills" / "email" / "scripts"
sys.path.insert(0, str(email_scripts_dir))
from receive_email import list_unread, read_email
print("🔄 正在同步邮件...")
emails = list_unread(limit=20, verbose=False)
if not emails:
print("📭 没有新邮件")
return
inbox = load_inbox()
new_count = 0
for e in emails:
email_id = str(e['id'])
# 跳过已存在的邮件
if email_id in inbox["emails"]:
continue
# 读取邮件详情
detail = read_email(email_id=email_id, save_attachments=True, verbose=False)
if detail:
add_email(
email_id=email_id,
subject=detail.get('subject', ''),
sender=detail.get('from', ''),
date=detail.get('date', ''),
has_attachment=len(detail.get('attachments', [])) > 0,
body=detail.get('body', ''),
status=STATUS_UNREAD
)
new_count += 1
# 更新最后检查时间
inbox["last_check"] = datetime.now().isoformat()
save_inbox(inbox)
print(f"\n✅ 同步完成,新增 {new_count} 封邮件")
except Exception as e:
print(f"❌ 同步失败: {e}")
def main():
if len(sys.argv) < 2:
print("邮件收件箱管理工具")
print()
print("用法:")
print(" python3 mailbox_manager.py sync # 从IMAP同步未读邮件")
print(" python3 mailbox_manager.py list [status] # 列出邮件(可按状态筛选)")
print(" python3 mailbox_manager.py show <id> # 显示邮件详情")
print(" python3 mailbox_manager.py read <id> # 标记为已读")
print(" python3 mailbox_manager.py pending <id> # 标记为待处理")
print(" python3 mailbox_manager.py processing <id> # 标记为处理中")
print(" python3 mailbox_manager.py done <id> [notes] # 标记为已完成")
print(" python3 mailbox_manager.py archive <id> # 归档邮件")
print()
print("状态说明:")
print(" unread - 未读")
print(" read - 已读")
print(" pending - 待处理")
print(" processing - 处理中")
print(" done - 已完成")
print(" archived - 已归档")
return
command = sys.argv[1]
if command == "sync":
sync_from_imap()
elif command == "list":
status = sys.argv[2] if len(sys.argv) > 2 else None
list_emails(status)
elif command == "show":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
show_email(sys.argv[2])
elif command == "read":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
update_status(sys.argv[2], STATUS_READ)
elif command == "pending":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
update_status(sys.argv[2], STATUS_PENDING)
elif command == "processing":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
update_status(sys.argv[2], STATUS_PROCESSING)
elif command == "done":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
notes = " ".join(sys.argv[3:]) if len(sys.argv) > 3 else None
update_status(sys.argv[2], STATUS_DONE, notes)
elif command == "archive":
if len(sys.argv) < 3:
print("❌ 请指定邮件ID")
return
archive_email(sys.argv[2])
else:
print(f"❌ 未知命令: {command}")
if __name__ == "__main__":
main()