#!/usr/bin/env python3 """ 邮件收件箱管理工具 管理已读邮件内容和处理状态 """ import json import os import sys from datetime import datetime from pathlib import Path # 路径配置 SCRIPT_DIR = Path(__file__).parent MAILBOX_DIR = SCRIPT_DIR INBOX_FILE = MAILBOX_DIR / "inbox.json" EMAILS_DIR = MAILBOX_DIR / "emails" ARCHIVE_DIR = MAILBOX_DIR / "archive" # 状态定义 STATUS_UNREAD = "unread" STATUS_READ = "read" STATUS_PENDING = "pending" # 待处理 STATUS_PROCESSING = "processing" # 处理中 STATUS_DONE = "done" # 已完成 STATUS_ARCHIVED = "archived" # 已归档 def load_inbox(): """加载收件箱状态""" if not INBOX_FILE.exists(): return {"description": "邮件收件箱状态追踪", "last_check": None, "emails": {}} with open(INBOX_FILE, "r", encoding="utf-8") as f: return json.load(f) def save_inbox(data): """保存收件箱状态""" with open(INBOX_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def add_email(email_id, subject, sender, date, has_attachment=False, body="", status=STATUS_UNREAD): """添加或更新邮件记录""" inbox = load_inbox() email_dir = EMAILS_DIR / str(email_id) email_dir.mkdir(parents=True, exist_ok=True) # 保存邮件内容 meta = { "id": email_id, "subject": subject, "sender": sender, "date": date, "has_attachment": has_attachment, "status": status, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "notes": "" } with open(email_dir / "meta.json", "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) if body: with open(email_dir / "body.txt", "w", encoding="utf-8") as f: f.write(body) # 更新收件箱索引 inbox["emails"][str(email_id)] = { "subject": subject, "sender": sender, "date": date, "status": status, "has_attachment": has_attachment, "path": str(email_dir) } save_inbox(inbox) print(f"✅ 邮件 [{email_id}] 已添加到收件箱") def update_status(email_id, status, notes=None): """更新邮件状态""" inbox = load_inbox() email_key = str(email_id) if email_key not in inbox["emails"]: print(f"❌ 邮件 [{email_id}] 不存在") return False # 更新索引 inbox["emails"][email_key]["status"] = status # 更新邮件详情 email_dir = Path(inbox["emails"][email_key]["path"]) meta_file = email_dir / "meta.json" if meta_file.exists(): with open(meta_file, "r", encoding="utf-8") as f: meta = json.load(f) meta["status"] = status meta["updated_at"] = datetime.now().isoformat() if notes: meta["notes"] = notes with open(meta_file, "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) save_inbox(inbox) status_emoji = { STATUS_UNREAD: "📬", STATUS_READ: "📖", STATUS_PENDING: "⏳", STATUS_PROCESSING: "🔄", STATUS_DONE: "✅", STATUS_ARCHIVED: "📦" } print(f"{status_emoji.get(status, '📧')} 邮件 [{email_id}] 状态更新为: {status}") return True def list_emails(status_filter=None): """列出邮件""" inbox = load_inbox() if not inbox["emails"]: print("📭 收件箱为空") return status_emoji = { STATUS_UNREAD: "📬", STATUS_READ: "📖", STATUS_PENDING: "⏳", STATUS_PROCESSING: "🔄", STATUS_DONE: "✅", STATUS_ARCHIVED: "📦" } print("📬 邮件收件箱") print("=" * 60) count = 0 for email_id, info in sorted(inbox["emails"].items(), key=lambda x: x[0], reverse=True): if status_filter and info["status"] != status_filter: continue emoji = status_emoji.get(info["status"], "📧") attach = "📎" if info.get("has_attachment") else "" print(f"\n[{email_id}] {emoji} {attach}") print(f" 主题: {info['subject']}") print(f" 发件人: {info['sender']}") print(f" 时间: {info['date']}") print(f" 状态: {info['status']}") count += 1 print("\n" + "=" * 60) print(f"共 {count} 封邮件") def show_email(email_id): """显示邮件详情""" inbox = load_inbox() email_key = str(email_id) if email_key not in inbox["emails"]: print(f"❌ 邮件 [{email_id}] 不存在") return email_dir = Path(inbox["emails"][email_key]["path"]) meta_file = email_dir / "meta.json" body_file = email_dir / "body.txt" print(f"📧 邮件详情 [{email_id}]") print("=" * 60) if meta_file.exists(): with open(meta_file, "r", encoding="utf-8") as f: meta = json.load(f) print(f"主题: {meta.get('subject', 'N/A')}") print(f"发件人: {meta.get('sender', 'N/A')}") print(f"时间: {meta.get('date', 'N/A')}") print(f"状态: {meta.get('status', 'N/A')}") print(f"更新时间: {meta.get('updated_at', 'N/A')}") if meta.get('notes'): print(f"\n备注: {meta['notes']}") if body_file.exists(): print("\n正文:") print("-" * 40) with open(body_file, "r", encoding="utf-8") as f: print(f.read()) # 列出附件 attach_dir = email_dir / "attachments" if attach_dir.exists() and list(attach_dir.iterdir()): print("\n附件:") print("-" * 40) for f in attach_dir.iterdir(): print(f" 📎 {f.name}") def archive_email(email_id): """归档邮件""" inbox = load_inbox() email_key = str(email_id) if email_key not in inbox["emails"]: print(f"❌ 邮件 [{email_id}] 不存在") return False email_dir = Path(inbox["emails"][email_key]["path"]) # 移动到归档目录 archive_path = ARCHIVE_DIR / str(email_id) if email_dir.exists(): import shutil shutil.move(str(email_dir), str(archive_path)) # 更新索引 inbox["emails"][email_key]["status"] = STATUS_ARCHIVED inbox["emails"][email_key]["path"] = str(archive_path) save_inbox(inbox) print(f"📦 邮件 [{email_id}] 已归档") return True def sync_from_imap(): """从 IMAP 同步未读邮件""" try: # 导入 receive_email 模块 email_scripts_dir = Path(__file__).parent.parent.parent / "skills" / "email" / "scripts" sys.path.insert(0, str(email_scripts_dir)) from receive_email import list_unread, read_email print("🔄 正在同步邮件...") emails = list_unread(limit=20, verbose=False) if not emails: print("📭 没有新邮件") return inbox = load_inbox() new_count = 0 for e in emails: email_id = str(e['id']) # 跳过已存在的邮件 if email_id in inbox["emails"]: continue # 读取邮件详情 detail = read_email(email_id=email_id, save_attachments=True, verbose=False) if detail: add_email( email_id=email_id, subject=detail.get('subject', ''), sender=detail.get('from', ''), date=detail.get('date', ''), has_attachment=len(detail.get('attachments', [])) > 0, body=detail.get('body', ''), status=STATUS_UNREAD ) new_count += 1 # 更新最后检查时间 inbox["last_check"] = datetime.now().isoformat() save_inbox(inbox) print(f"\n✅ 同步完成,新增 {new_count} 封邮件") except Exception as e: print(f"❌ 同步失败: {e}") def main(): if len(sys.argv) < 2: print("邮件收件箱管理工具") print() print("用法:") print(" python3 mailbox_manager.py sync # 从IMAP同步未读邮件") print(" python3 mailbox_manager.py list [status] # 列出邮件(可按状态筛选)") print(" python3 mailbox_manager.py show # 显示邮件详情") print(" python3 mailbox_manager.py read # 标记为已读") print(" python3 mailbox_manager.py pending # 标记为待处理") print(" python3 mailbox_manager.py processing # 标记为处理中") print(" python3 mailbox_manager.py done [notes] # 标记为已完成") print(" python3 mailbox_manager.py archive # 归档邮件") print() print("状态说明:") print(" unread - 未读") print(" read - 已读") print(" pending - 待处理") print(" processing - 处理中") print(" done - 已完成") print(" archived - 已归档") return command = sys.argv[1] if command == "sync": sync_from_imap() elif command == "list": status = sys.argv[2] if len(sys.argv) > 2 else None list_emails(status) elif command == "show": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return show_email(sys.argv[2]) elif command == "read": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return update_status(sys.argv[2], STATUS_READ) elif command == "pending": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return update_status(sys.argv[2], STATUS_PENDING) elif command == "processing": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return update_status(sys.argv[2], STATUS_PROCESSING) elif command == "done": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return notes = " ".join(sys.argv[3:]) if len(sys.argv) > 3 else None update_status(sys.argv[2], STATUS_DONE, notes) elif command == "archive": if len(sys.argv) < 3: print("❌ 请指定邮件ID") return archive_email(sys.argv[2]) else: print(f"❌ 未知命令: {command}") if __name__ == "__main__": main()