362 lines
11 KiB
Python
362 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
邮件收件箱管理工具
|
|
管理已读邮件内容和处理状态
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# 路径配置
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
MAILBOX_DIR = SCRIPT_DIR
|
|
INBOX_FILE = MAILBOX_DIR / "inbox.json"
|
|
EMAILS_DIR = MAILBOX_DIR / "emails"
|
|
ARCHIVE_DIR = MAILBOX_DIR / "archive"
|
|
|
|
# 状态定义
|
|
STATUS_UNREAD = "unread"
|
|
STATUS_READ = "read"
|
|
STATUS_PENDING = "pending" # 待处理
|
|
STATUS_PROCESSING = "processing" # 处理中
|
|
STATUS_DONE = "done" # 已完成
|
|
STATUS_ARCHIVED = "archived" # 已归档
|
|
|
|
|
|
def load_inbox():
|
|
"""加载收件箱状态"""
|
|
if not INBOX_FILE.exists():
|
|
return {"description": "邮件收件箱状态追踪", "last_check": None, "emails": {}}
|
|
with open(INBOX_FILE, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_inbox(data):
|
|
"""保存收件箱状态"""
|
|
with open(INBOX_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def add_email(email_id, subject, sender, date, has_attachment=False, body="", status=STATUS_UNREAD):
|
|
"""添加或更新邮件记录"""
|
|
inbox = load_inbox()
|
|
|
|
email_dir = EMAILS_DIR / str(email_id)
|
|
email_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 保存邮件内容
|
|
meta = {
|
|
"id": email_id,
|
|
"subject": subject,
|
|
"sender": sender,
|
|
"date": date,
|
|
"has_attachment": has_attachment,
|
|
"status": status,
|
|
"created_at": datetime.now().isoformat(),
|
|
"updated_at": datetime.now().isoformat(),
|
|
"notes": ""
|
|
}
|
|
|
|
with open(email_dir / "meta.json", "w", encoding="utf-8") as f:
|
|
json.dump(meta, f, ensure_ascii=False, indent=2)
|
|
|
|
if body:
|
|
with open(email_dir / "body.txt", "w", encoding="utf-8") as f:
|
|
f.write(body)
|
|
|
|
# 更新收件箱索引
|
|
inbox["emails"][str(email_id)] = {
|
|
"subject": subject,
|
|
"sender": sender,
|
|
"date": date,
|
|
"status": status,
|
|
"has_attachment": has_attachment,
|
|
"path": str(email_dir)
|
|
}
|
|
|
|
save_inbox(inbox)
|
|
print(f"✅ 邮件 [{email_id}] 已添加到收件箱")
|
|
|
|
|
|
def update_status(email_id, status, notes=None):
|
|
"""更新邮件状态"""
|
|
inbox = load_inbox()
|
|
email_key = str(email_id)
|
|
|
|
if email_key not in inbox["emails"]:
|
|
print(f"❌ 邮件 [{email_id}] 不存在")
|
|
return False
|
|
|
|
# 更新索引
|
|
inbox["emails"][email_key]["status"] = status
|
|
|
|
# 更新邮件详情
|
|
email_dir = Path(inbox["emails"][email_key]["path"])
|
|
meta_file = email_dir / "meta.json"
|
|
|
|
if meta_file.exists():
|
|
with open(meta_file, "r", encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
|
|
meta["status"] = status
|
|
meta["updated_at"] = datetime.now().isoformat()
|
|
|
|
if notes:
|
|
meta["notes"] = notes
|
|
|
|
with open(meta_file, "w", encoding="utf-8") as f:
|
|
json.dump(meta, f, ensure_ascii=False, indent=2)
|
|
|
|
save_inbox(inbox)
|
|
|
|
status_emoji = {
|
|
STATUS_UNREAD: "📬",
|
|
STATUS_READ: "📖",
|
|
STATUS_PENDING: "⏳",
|
|
STATUS_PROCESSING: "🔄",
|
|
STATUS_DONE: "✅",
|
|
STATUS_ARCHIVED: "📦"
|
|
}
|
|
|
|
print(f"{status_emoji.get(status, '📧')} 邮件 [{email_id}] 状态更新为: {status}")
|
|
return True
|
|
|
|
|
|
def list_emails(status_filter=None):
|
|
"""列出邮件"""
|
|
inbox = load_inbox()
|
|
|
|
if not inbox["emails"]:
|
|
print("📭 收件箱为空")
|
|
return
|
|
|
|
status_emoji = {
|
|
STATUS_UNREAD: "📬",
|
|
STATUS_READ: "📖",
|
|
STATUS_PENDING: "⏳",
|
|
STATUS_PROCESSING: "🔄",
|
|
STATUS_DONE: "✅",
|
|
STATUS_ARCHIVED: "📦"
|
|
}
|
|
|
|
print("📬 邮件收件箱")
|
|
print("=" * 60)
|
|
|
|
count = 0
|
|
for email_id, info in sorted(inbox["emails"].items(), key=lambda x: x[0], reverse=True):
|
|
if status_filter and info["status"] != status_filter:
|
|
continue
|
|
|
|
emoji = status_emoji.get(info["status"], "📧")
|
|
attach = "📎" if info.get("has_attachment") else ""
|
|
|
|
print(f"\n[{email_id}] {emoji} {attach}")
|
|
print(f" 主题: {info['subject']}")
|
|
print(f" 发件人: {info['sender']}")
|
|
print(f" 时间: {info['date']}")
|
|
print(f" 状态: {info['status']}")
|
|
count += 1
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"共 {count} 封邮件")
|
|
|
|
|
|
def show_email(email_id):
|
|
"""显示邮件详情"""
|
|
inbox = load_inbox()
|
|
email_key = str(email_id)
|
|
|
|
if email_key not in inbox["emails"]:
|
|
print(f"❌ 邮件 [{email_id}] 不存在")
|
|
return
|
|
|
|
email_dir = Path(inbox["emails"][email_key]["path"])
|
|
meta_file = email_dir / "meta.json"
|
|
body_file = email_dir / "body.txt"
|
|
|
|
print(f"📧 邮件详情 [{email_id}]")
|
|
print("=" * 60)
|
|
|
|
if meta_file.exists():
|
|
with open(meta_file, "r", encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
|
|
print(f"主题: {meta.get('subject', 'N/A')}")
|
|
print(f"发件人: {meta.get('sender', 'N/A')}")
|
|
print(f"时间: {meta.get('date', 'N/A')}")
|
|
print(f"状态: {meta.get('status', 'N/A')}")
|
|
print(f"更新时间: {meta.get('updated_at', 'N/A')}")
|
|
|
|
if meta.get('notes'):
|
|
print(f"\n备注: {meta['notes']}")
|
|
|
|
if body_file.exists():
|
|
print("\n正文:")
|
|
print("-" * 40)
|
|
with open(body_file, "r", encoding="utf-8") as f:
|
|
print(f.read())
|
|
|
|
# 列出附件
|
|
attach_dir = email_dir / "attachments"
|
|
if attach_dir.exists() and list(attach_dir.iterdir()):
|
|
print("\n附件:")
|
|
print("-" * 40)
|
|
for f in attach_dir.iterdir():
|
|
print(f" 📎 {f.name}")
|
|
|
|
|
|
def archive_email(email_id):
|
|
"""归档邮件"""
|
|
inbox = load_inbox()
|
|
email_key = str(email_id)
|
|
|
|
if email_key not in inbox["emails"]:
|
|
print(f"❌ 邮件 [{email_id}] 不存在")
|
|
return False
|
|
|
|
email_dir = Path(inbox["emails"][email_key]["path"])
|
|
|
|
# 移动到归档目录
|
|
archive_path = ARCHIVE_DIR / str(email_id)
|
|
if email_dir.exists():
|
|
import shutil
|
|
shutil.move(str(email_dir), str(archive_path))
|
|
|
|
# 更新索引
|
|
inbox["emails"][email_key]["status"] = STATUS_ARCHIVED
|
|
inbox["emails"][email_key]["path"] = str(archive_path)
|
|
save_inbox(inbox)
|
|
|
|
print(f"📦 邮件 [{email_id}] 已归档")
|
|
return True
|
|
|
|
|
|
def sync_from_imap():
|
|
"""从 IMAP 同步未读邮件"""
|
|
try:
|
|
# 导入 receive_email 模块
|
|
email_scripts_dir = Path(__file__).parent.parent.parent / "skills" / "email" / "scripts"
|
|
sys.path.insert(0, str(email_scripts_dir))
|
|
from receive_email import list_unread, read_email
|
|
|
|
print("🔄 正在同步邮件...")
|
|
|
|
emails = list_unread(limit=20, verbose=False)
|
|
|
|
if not emails:
|
|
print("📭 没有新邮件")
|
|
return
|
|
|
|
inbox = load_inbox()
|
|
new_count = 0
|
|
|
|
for e in emails:
|
|
email_id = str(e['id'])
|
|
|
|
# 跳过已存在的邮件
|
|
if email_id in inbox["emails"]:
|
|
continue
|
|
|
|
# 读取邮件详情
|
|
detail = read_email(email_id=email_id, save_attachments=True, verbose=False)
|
|
|
|
if detail:
|
|
add_email(
|
|
email_id=email_id,
|
|
subject=detail.get('subject', ''),
|
|
sender=detail.get('from', ''),
|
|
date=detail.get('date', ''),
|
|
has_attachment=len(detail.get('attachments', [])) > 0,
|
|
body=detail.get('body', ''),
|
|
status=STATUS_UNREAD
|
|
)
|
|
new_count += 1
|
|
|
|
# 更新最后检查时间
|
|
inbox["last_check"] = datetime.now().isoformat()
|
|
save_inbox(inbox)
|
|
|
|
print(f"\n✅ 同步完成,新增 {new_count} 封邮件")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 同步失败: {e}")
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("邮件收件箱管理工具")
|
|
print()
|
|
print("用法:")
|
|
print(" python3 mailbox_manager.py sync # 从IMAP同步未读邮件")
|
|
print(" python3 mailbox_manager.py list [status] # 列出邮件(可按状态筛选)")
|
|
print(" python3 mailbox_manager.py show <id> # 显示邮件详情")
|
|
print(" python3 mailbox_manager.py read <id> # 标记为已读")
|
|
print(" python3 mailbox_manager.py pending <id> # 标记为待处理")
|
|
print(" python3 mailbox_manager.py processing <id> # 标记为处理中")
|
|
print(" python3 mailbox_manager.py done <id> [notes] # 标记为已完成")
|
|
print(" python3 mailbox_manager.py archive <id> # 归档邮件")
|
|
print()
|
|
print("状态说明:")
|
|
print(" unread - 未读")
|
|
print(" read - 已读")
|
|
print(" pending - 待处理")
|
|
print(" processing - 处理中")
|
|
print(" done - 已完成")
|
|
print(" archived - 已归档")
|
|
return
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == "sync":
|
|
sync_from_imap()
|
|
|
|
elif command == "list":
|
|
status = sys.argv[2] if len(sys.argv) > 2 else None
|
|
list_emails(status)
|
|
|
|
elif command == "show":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
show_email(sys.argv[2])
|
|
|
|
elif command == "read":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
update_status(sys.argv[2], STATUS_READ)
|
|
|
|
elif command == "pending":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
update_status(sys.argv[2], STATUS_PENDING)
|
|
|
|
elif command == "processing":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
update_status(sys.argv[2], STATUS_PROCESSING)
|
|
|
|
elif command == "done":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
notes = " ".join(sys.argv[3:]) if len(sys.argv) > 3 else None
|
|
update_status(sys.argv[2], STATUS_DONE, notes)
|
|
|
|
elif command == "archive":
|
|
if len(sys.argv) < 3:
|
|
print("❌ 请指定邮件ID")
|
|
return
|
|
archive_email(sys.argv[2])
|
|
|
|
else:
|
|
print(f"❌ 未知命令: {command}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |