Files
skill-email/scripts/receive_email.py

526 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
邮件接收脚本
支持 IMAP 协议接收邮件、查看未读邮件、下载附件
"""
import imaplib
import os
import sys
import json
import re
from email.header import decode_header
from email.utils import parseaddr
import email as em
from typing import List, Optional, Dict, Tuple
from datetime import datetime
# 配置文件路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(SCRIPT_DIR, "smtp_config.json")
ATTACHMENTS_DIR = os.path.join(SCRIPT_DIR, "attachments")
def load_config() -> dict:
"""加载配置"""
if not os.path.exists(CONFIG_FILE):
return {"accounts": []}
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
def get_account(name: Optional[str] = None) -> Optional[dict]:
"""获取账号配置"""
config = load_config()
if not config["accounts"]:
return None
if name is None:
return config["accounts"][0]
for acc in config["accounts"]:
if acc["name"] == name:
return acc
return None
def get_imap_server(email_addr: str) -> Tuple[str, int, bool]:
"""
根据邮箱地址推断 IMAP 服务器
返回: (服务器地址, 端口, 是否使用SSL)
"""
domain = email_addr.split("@")[1] if "@" in email_addr else ""
# 常见邮箱 IMAP 配置
imap_configs = {
"gmail.com": ("imap.gmail.com", 993, True),
"outlook.com": ("outlook.office365.com", 993, True),
"hotmail.com": ("outlook.office365.com", 993, True),
"qq.com": ("imap.qq.com", 993, True),
"163.com": ("imap.163.com", 993, True),
"tphai.com": ("mail.tphai.com", 143, False), # 企业邮箱
}
# 尽找匹配的配置
for key, config in imap_configs.items():
if key in domain:
return config
# 默认使用企业邮箱配置
return ("mail.tphai.com", 143, False)
def decode_str(s: str) -> str:
"""解码邮件头字符串"""
if s is None:
return ""
decoded_parts = decode_header(s)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
charset = charset or "utf-8"
try:
result.append(part.decode(charset))
except:
result.append(part.decode("utf-8", errors="ignore"))
else:
result.append(part)
return "".join(result)
def get_email_content(msg) -> str:
"""提取邮件正文"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# 跳过附件
if "attachment" in content_disposition:
continue
# 提取文本内容
if content_type == "text/plain":
charset = part.get_content_charset() or "utf-8"
try:
payload = part.get_payload(decode=True)
if payload:
body += payload.decode(charset, errors="ignore")
except:
pass
elif content_type == "text/html" and not body:
charset = part.get_content_charset() or "utf-8"
try:
payload = part.get_payload(decode=True)
if payload:
# 简单去除HTML标签获取纯文本
html_content = payload.decode(charset, errors="ignore")
text = re.sub(r'<[^>]+>', '', html_content)
body += text.strip()
except:
pass
else:
content_type = msg.get_content_type()
if content_type in ["text/plain", "text/html"]:
charset = msg.get_content_charset() or "utf-8"
try:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="ignore")
if content_type == "text/html":
body = re.sub(r'<[^>]+>', '', body).strip()
except:
pass
return body
def get_attachments(msg) -> List[Dict]:
"""提取附件信息"""
attachments = []
for part in msg.walk():
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
filename = decode_str(filename)
content_type = part.get_content_type()
size = len(part.get_payload(decode=True) or b"")
attachments.append({
"filename": filename,
"content_type": content_type,
"size": size,
"part": part
})
return attachments
def save_attachment(part, filename: str, email_id: str) -> str:
"""保存附件到本地"""
# 确保附件目录存在
os.makedirs(ATTACHMENTS_DIR, exist_ok=True)
# 为每封邮件创建子目录
email_dir = os.path.join(ATTACHMENTS_DIR, email_id)
os.makedirs(email_dir, exist_ok=True)
# 保存附件
filepath = os.path.join(email_dir, filename)
payload = part.get_payload(decode=True)
if payload:
with open(filepath, "wb") as f:
f.write(payload)
return filepath
def read_text_attachment(filepath: str) -> Optional[str]:
"""读取文本附件内容"""
# 检查是否是文本文件
text_extensions = ['.txt', '.md', '.json', '.csv', '.log', '.py', '.js', '.html', '.css', '.xml', '.yaml', '.yml', '.ini', '.cfg', '.conf', '.sh', '.bat']
ext = os.path.splitext(filepath)[1].lower()
if ext not in text_extensions:
return None
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
try:
with open(filepath, 'r', encoding='gbk') as f:
return f.read()
except:
return None
def connect_imap(account: dict) -> Optional[imaplib.IMAP4]:
"""连接 IMAP 服务器"""
# 获取 IMAP 配置
imap_server, imap_port, use_ssl = get_imap_server(account["email"])
try:
if use_ssl:
imap = imaplib.IMAP4_SSL(imap_server, imap_port)
else:
imap = imaplib.IMAP4(imap_server, imap_port)
# 登录
imap.login(account["email"], account["password"])
return imap
except Exception as e:
print(f"❌ IMAP 连接失败: {e}")
return None
def list_unread(account_name: Optional[str] = None, limit: int = 10, verbose: bool = False) -> List[Dict]:
"""
列出未读邮件
返回:
List[Dict]: 邮件列表,每项包含 id, subject, from, date, has_attachment
"""
account = get_account(account_name)
if account is None:
print("❌ 错误:未找到邮箱配置")
return []
imap = connect_imap(account)
if imap is None:
return []
try:
# 选择收件箱
imap.select("INBOX")
# 搜索未读邮件
status, data = imap.search(None, "UNSEEN")
if status != "OK":
print("❌ 搜索未读邮件失败")
return []
email_ids = data[0].split()
total_unread = len(email_ids)
if verbose:
print(f"📊 共有 {total_unread} 封未读邮件")
# 限制返回数量
email_ids = email_ids[:limit]
emails = []
for email_id in email_ids:
email_id_str = email_id.decode()
# 获取邮件头
status, msg_data = imap.fetch(email_id, "(BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])")
if status != "OK":
continue
# 解析邮件头
raw_header = msg_data[0][1]
header_msg = em.message_from_bytes(raw_header)
subject = decode_str(header_msg.get("Subject", ""))
from_addr = decode_str(header_msg.get("From", ""))
date_str = header_msg.get("Date", "")
# 检查是否有附件
status, msg_data = imap.fetch(email_id, "(BODYSTRUCTURE)")
has_attachment = False
if status == "OK" and msg_data:
# 简单检查 BODYSTRUCTURE 中是否有 attachment
structure = str(msg_data[0])
if "attachment" in structure.lower() or "filename" in structure.lower():
has_attachment = True
emails.append({
"id": email_id_str,
"subject": subject,
"from": from_addr,
"date": date_str,
"has_attachment": has_attachment
})
imap.close()
imap.logout()
return emails
except Exception as e:
print(f"❌ 获取未读邮件失败: {e}")
return []
def read_email(email_id: str, account_name: Optional[str] = None, save_attachments: bool = True, verbose: bool = False) -> Optional[Dict]:
"""
读取指定邮件内容
参数:
email_id: 邮件ID
account_name: 账号名称
save_attachments: 是否保存附件
verbose: 显示详细日志
返回:
Dict: 邮件详情,包含 subject, from, to, date, body, attachments
"""
account = get_account(account_name)
if account is None:
print("❌ 错误:未找到邮箱配置")
return None
imap = connect_imap(account)
if imap is None:
return None
try:
imap.select("INBOX")
# 获取完整邮件
status, msg_data = imap.fetch(email_id, "(RFC822)")
if status != "OK":
print(f"❌ 获取邮件 {email_id} 失败")
return None
# 解析邮件
raw_email = msg_data[0][1]
msg = em.message_from_bytes(raw_email)
# 提取邮件信息
subject = decode_str(msg.get("Subject", ""))
from_addr = decode_str(msg.get("From", ""))
to_addr = decode_str(msg.get("To", ""))
date_str = msg.get("Date", "")
# 提取正文
body = get_email_content(msg)
# 提取附件
attachments_info = get_attachments(msg)
attachments = []
for att in attachments_info:
if save_attachments:
filepath = save_attachment(att["part"], att["filename"], email_id)
if verbose:
print(f"📎 已保存附件: {filepath}")
# 如果是文本文件,读取内容
text_content = read_text_attachment(filepath)
attachments.append({
"filename": att["filename"],
"filepath": filepath,
"size": att["size"],
"is_text": text_content is not None,
"content": text_content
})
else:
attachments.append({
"filename": att["filename"],
"size": att["size"],
"is_text": False,
"content": None
})
# 标记为已读
imap.store(email_id, "+FLAGS", "\\Seen")
imap.close()
imap.logout()
return {
"id": email_id,
"subject": subject,
"from": from_addr,
"to": to_addr,
"date": date_str,
"body": body,
"attachments": attachments
}
except Exception as e:
print(f"❌ 读取邮件失败: {e}")
return None
def print_email_summary(emails: List[Dict]):
"""打印邮件摘要"""
if not emails:
print("📭 没有未读邮件")
return
print(f"\n📬 未读邮件 ({len(emails)} 封)")
print("=" * 60)
for i, e in enumerate(emails, 1):
attachment_mark = "📎" if e.get("has_attachment") else ""
print(f"\n{i}. [{e['id']}] {attachment_mark}")
print(f" 主题: {e['subject']}")
print(f" 发件人: {e['from']}")
print(f" 时间: {e['date']}")
def print_email_detail(email_data: Dict):
"""打印邮件详情"""
print(f"\n📧 邮件详情")
print("=" * 60)
print(f"ID: {email_data['id']}")
print(f"主题: {email_data['subject']}")
print(f"发件人: {email_data['from']}")
print(f"收件人: {email_data['to']}")
print(f"时间: {email_data['date']}")
print("\n正文:")
print("-" * 40)
print(email_data['body'][:500] if len(email_data['body']) > 500 else email_data['body'])
if len(email_data['body']) > 500:
print(f"\n... (正文共 {len(email_data['body'])} 字符,已截断)")
if email_data['attachments']:
print("\n附件:")
print("-" * 40)
for att in email_data['attachments']:
text_mark = "📝" if att['is_text'] else "📎"
print(f" {text_mark} {att['filename']} ({att['size']} bytes)")
if att['is_text'] and att['content']:
preview = att['content'][:200]
print(f" 内容预览: {preview}...")
def main():
"""命令行入口"""
import argparse
parser = argparse.ArgumentParser(description="邮件接收工具")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 列出未读邮件命令
list_parser = subparsers.add_parser("unread", help="列出未读邮件")
list_parser.add_argument("--account", "-a", help="使用指定配置")
list_parser.add_argument("--limit", "-l", type=int, default=10, help="最大返回数量")
list_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志")
# 读取邮件命令
read_parser = subparsers.add_parser("read", help="读取指定邮件")
read_parser.add_argument("id", help="邮件ID")
read_parser.add_argument("--account", "-a", help="使用指定配置")
read_parser.add_argument("--no-save", action="store_true", help="不保存附件")
read_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志")
# 查看附件命令
att_parser = subparsers.add_parser("attachment", help="查看附件内容")
att_parser.add_argument("id", help="邮件ID")
att_parser.add_argument("--filename", "-f", help="指定附件文件名")
att_parser.add_argument("--account", "-a", help="使用指定配置")
args = parser.parse_args()
if args.command == "unread":
emails = list_unread(
account_name=args.account,
limit=args.limit,
verbose=args.verbose
)
print_email_summary(emails)
elif args.command == "read":
email_data = read_email(
email_id=args.id,
account_name=args.account,
save_attachments=not args.no_save,
verbose=args.verbose
)
if email_data:
print_email_detail(email_data)
elif args.command == "attachment":
email_data = read_email(
email_id=args.id,
account_name=args.account,
save_attachments=True,
verbose=False
)
if email_data and email_data['attachments']:
# 找到指定附件或第一个文本附件
target_att = None
for att in email_data['attachments']:
if args.filename and att['filename'] == args.filename:
target_att = att
break
elif not args.filename and att['is_text']:
target_att = att
break
if target_att:
print(f"\n📄 附件: {target_att['filename']}")
print("=" * 60)
print(target_att['content'] or "(非文本附件)")
else:
print(f"❌ 未找到附件: {args.filename or '文本附件'}")
else:
print("❌ 邮件无附件或读取失败")
else:
parser.print_help()
if __name__ == "__main__":
main()