From 3c0cb213c9627a2475dbba7f6d679a58aa0f8c29 Mon Sep 17 00:00:00 2001 From: hubian <908234780@qq.com> Date: Fri, 10 Apr 2026 16:48:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=82=AE=E4=BB=B6=E6=94=B6=E5=8F=91?= =?UTF-8?q?=E6=8A=80=E8=83=BD=20-=20SMTP=E5=8F=91=E9=80=81=20+=20IMAP?= =?UTF-8?q?=E6=8E=A5=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- SKILL.md | 285 +++++++++++++++++++++ scripts/.gitignore | 12 + scripts/receive_email.py | 526 +++++++++++++++++++++++++++++++++++++++ scripts/send_email.py | 293 ++++++++++++++++++++++ 4 files changed, 1116 insertions(+) create mode 100644 SKILL.md create mode 100644 scripts/.gitignore create mode 100644 scripts/receive_email.py create mode 100644 scripts/send_email.py diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..f894920 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,285 @@ +--- +name: email +description: Send and receive emails via SMTP/IMAP with support for attachments, CC/BCC, HTML content, and multiple accounts. Use when user wants to send/receive emails, check unread messages, or view attachments. +--- + +# Email Sender / 邮件收发工具 + +通过 SMTP 发送邮件,通过 IMAP 接收邮件,支持附件、抄送/密送、HTML 格式和多账号管理。 + +## 快速开始 + +### 1. 配置邮箱账号 + +```bash +cd ~/.openclaw/workspace-coder/skills/email-sender/scripts +python3 send_email.py config my-email \ + --server mail.tphai.com \ + --port 587 \ + --email guwen@tphai.com \ + --password "your-password" \ + --no-tls +``` + +参数说明: +- `my-email`: 配置名称(可自定义) +- `--server`: SMTP 服务器地址 +- `--port`: SMTP 端口 +- `--email`: 邮箱地址 +- `--password`: 邮箱密码 +- `--no-tls`: 不使用 TLS 加密(默认使用 TLS) + +配置后会自动用于发送和接收邮件。 + +### 2. 查看配置 + +```bash +python3 send_email.py list +``` + +--- + +## 发送邮件 + +### 简单邮件 + +```bash +python3 send_email.py send \ + --to wlq@tphai.com \ + --subject "测试邮件" \ + --body "这是一封测试邮件" +``` + +### 带附件的邮件 + +```bash +python3 send_email.py send \ + --to wlq@tphai.com \ + --subject "带附件的邮件" \ + --body "请查收附件" \ + --attach /path/to/file1.pdf \ + --attach /path/to/file2.jpg +``` + +### HTML 邮件 + +```bash +python3 send_email.py send \ + --to wlq@tphai.com \ + --subject "HTML 邮件" \ + --body "

标题

内容

" \ + --html +``` + +### 抄送和密送 + +```bash +python3 send_email.py send \ + --to wlq@tphai.com \ + --cc "cc1@example.com,cc2@example.com" \ + --bcc "bcc@example.com" \ + --subject "抄送测试" \ + --body "这是一封抄送测试邮件" +``` + +--- + +## 接收邮件 + +### 查看未读邮件 + +```bash +python3 receive_email.py unread +``` + +输出示例: +``` +📬 未读邮件 (3 封) +============================================================ + +1. [42] 📎 + 主题: 项目报告 + 发件人: boss@company.com + 时间: Fri, 10 Apr 2026 10:30:00 +0800 + +2. [41] + 主题: 会议通知 + 发件人: hr@company.com + 时间: Fri, 10 Apr 2026 09:15:00 +0800 +``` + +- `📎` 表示有附件 +- `[数字]` 是邮件ID,用于后续读取 + +### 读取邮件详情 + +```bash +python3 receive_email.py read 42 +``` + +输出示例: +``` +📧 邮件详情 +============================================================ +ID: 42 +主题: 项目报告 +发件人: boss@company.com +收件人: me@company.com +时间: Fri, 10 Apr 2026 10:30:00 +0800 + +正文: +---------------------------------------- +请查看附件中的项目报告... + +附件: +---------------------------------------- + 📝 report.txt (1234 bytes) + 内容预览: 第一行内容... + 📄 data.csv (5678 bytes) +``` + +- `📝` 表示文本附件(已自动读取内容) +- `📄` 表示非文本附件 + +### 查看附件内容 + +```bash +# 查看指定邮件的文本附件 +python3 receive_email.py attachment 42 + +# 查看指定附件 +python3 receive_email.py attachment 42 --filename report.txt +``` + +附件会自动保存到 `scripts/attachments/<邮件ID>/` 目录。 + +--- + +## Python API 使用 + +### 发送邮件 + +```python +from send_email import send_email, setup_account + +# 配置 SMTP +setup_account( + name="work", + smtp_server="mail.tphai.com", + smtp_port=587, + email="guwen@tphai.com", + password="your-password", + use_tls=False +) + +# 发送简单邮件 +send_email( + to="wlq@tphai.com", + subject="测试", + body="内容" +) + +# 发送带附件的邮件 +send_email( + to="wlq@tphai.com", + subject="附件测试", + body="请查收附件", + attachments=["/path/to/file.pdf"], + verbose=True +) + +# 发送 HTML 邮件 +send_email( + to="wlq@tphai.com", + subject="HTML 测试", + body="

Hello

World

", + html=True +) +``` + +### 接收邮件 + +```python +from receive_email import list_unread, read_email + +# 列出未读邮件 +emails = list_unread(limit=10, verbose=True) +for e in emails: + print(f"[{e['id']}] {e['subject']} - {e['from']}") + +# 读取邮件详情 +email_data = read_email(email_id="42") +print(f"主题: {email_data['subject']}") +print(f"正文: {email_data['body']}") + +# 查看附件 +for att in email_data['attachments']: + print(f"附件: {att['filename']}") + if att['is_text']: + print(f"内容: {att['content']}") +``` + +--- + +## 支持的文本附件类型 + +自动识别并读取以下类型的附件: + +| 类型 | 扩展名 | +|------|--------| +| 纯文本 | .txt, .md, .log | +| 数据文件 | .json, .csv, .xml, .yaml, .yml | +| 配置文件 | .ini, .cfg, .conf | +| 代码文件 | .py, .js, .html, .css, .sh, .bat | + +--- + +## 配置文件 + +配置保存在 `scripts/smtp_config.json` 中: + +```json +{ + "accounts": [ + { + "name": "my-email", + "smtp_server": "mail.tphai.com", + "smtp_port": 587, + "email": "guwen@tphai.com", + "password": "your-password", + "use_tls": false + } + ] +} +``` + +--- + +## 常用邮箱服务器设置 + +| 服务商 | SMTP | IMAP | TLS | +|--------|------|------|-----| +| Gmail | smtp.gmail.com:587 | imap.gmail.com:993 | 是 | +| Outlook | smtp.office365.com:587 | outlook.office365.com:993 | 是 | +| QQ邮箱 | smtp.qq.com:587 | imap.qq.com:993 | 是 | +| 163邮箱 | smtp.163.com:465 | imap.163.com:993 | 是 | +| 企业邮箱 | mail.tphai.com:587 | mail.tphai.com:143 | 否 | + +--- + +## 故障排除 + +**认证失败:** +- 检查邮箱密码是否正确 +- Gmail 需要使用「应用专用密码」而非登录密码 +- QQ邮箱需要使用授权码而非登录密码 + +**连接失败:** +- 检查服务器地址和端口 +- 检查防火墙设置 +- 尝试切换 TLS/SSL 模式 + +**IMAP 连接失败:** +- 确认邮箱支持 IMAP 协议 +- 检查 IMAP 端口是否正确(通常是 993 或 143) +- 某些邮箱需要在设置中启用 IMAP \ No newline at end of file diff --git a/scripts/.gitignore b/scripts/.gitignore new file mode 100644 index 0000000..90d6f25 --- /dev/null +++ b/scripts/.gitignore @@ -0,0 +1,12 @@ +# attachments 目录 - 保存邮件附件 +attachments/ + +# Python 缓存 +__pycache__/ +*.py[cod] + +# 日志文件 +*.log + +# 配置文件(包含密码) +smtp_config.json \ No newline at end of file diff --git a/scripts/receive_email.py b/scripts/receive_email.py new file mode 100644 index 0000000..b4a7fb9 --- /dev/null +++ b/scripts/receive_email.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +""" +邮件接收脚本 +支持 IMAP 协议接收邮件、查看未读邮件、下载附件 +""" + +import imaplib +import os +import sys +import json +import re +from email.header import decode_header +from email.utils import parseaddr +import email as em +from typing import List, Optional, Dict, Tuple +from datetime import datetime + +# 配置文件路径 +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +CONFIG_FILE = os.path.join(SCRIPT_DIR, "smtp_config.json") +ATTACHMENTS_DIR = os.path.join(SCRIPT_DIR, "attachments") + + +def load_config() -> dict: + """加载配置""" + if not os.path.exists(CONFIG_FILE): + return {"accounts": []} + + with open(CONFIG_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + + +def get_account(name: Optional[str] = None) -> Optional[dict]: + """获取账号配置""" + config = load_config() + + if not config["accounts"]: + return None + + if name is None: + return config["accounts"][0] + + for acc in config["accounts"]: + if acc["name"] == name: + return acc + + return None + + +def get_imap_server(email_addr: str) -> Tuple[str, int, bool]: + """ + 根据邮箱地址推断 IMAP 服务器 + 返回: (服务器地址, 端口, 是否使用SSL) + """ + domain = email_addr.split("@")[1] if "@" in email_addr else "" + + # 常见邮箱 IMAP 配置 + imap_configs = { + "gmail.com": ("imap.gmail.com", 993, True), + "outlook.com": ("outlook.office365.com", 993, True), + "hotmail.com": ("outlook.office365.com", 993, True), + "qq.com": ("imap.qq.com", 993, True), + "163.com": ("imap.163.com", 993, True), + "tphai.com": ("mail.tphai.com", 143, False), # 企业邮箱 + } + + # 尽找匹配的配置 + for key, config in imap_configs.items(): + if key in domain: + return config + + # 默认使用企业邮箱配置 + return ("mail.tphai.com", 143, False) + + +def decode_str(s: str) -> str: + """解码邮件头字符串""" + if s is None: + return "" + + decoded_parts = decode_header(s) + result = [] + for part, charset in decoded_parts: + if isinstance(part, bytes): + charset = charset or "utf-8" + try: + result.append(part.decode(charset)) + except: + result.append(part.decode("utf-8", errors="ignore")) + else: + result.append(part) + + return "".join(result) + + +def get_email_content(msg) -> str: + """提取邮件正文""" + body = "" + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + # 跳过附件 + if "attachment" in content_disposition: + continue + + # 提取文本内容 + if content_type == "text/plain": + charset = part.get_content_charset() or "utf-8" + try: + payload = part.get_payload(decode=True) + if payload: + body += payload.decode(charset, errors="ignore") + except: + pass + + elif content_type == "text/html" and not body: + charset = part.get_content_charset() or "utf-8" + try: + payload = part.get_payload(decode=True) + if payload: + # 简单去除HTML标签,获取纯文本 + html_content = payload.decode(charset, errors="ignore") + text = re.sub(r'<[^>]+>', '', html_content) + body += text.strip() + except: + pass + else: + content_type = msg.get_content_type() + if content_type in ["text/plain", "text/html"]: + charset = msg.get_content_charset() or "utf-8" + try: + payload = msg.get_payload(decode=True) + if payload: + body = payload.decode(charset, errors="ignore") + if content_type == "text/html": + body = re.sub(r'<[^>]+>', '', body).strip() + except: + pass + + return body + + +def get_attachments(msg) -> List[Dict]: + """提取附件信息""" + attachments = [] + + for part in msg.walk(): + content_disposition = str(part.get("Content-Disposition", "")) + + if "attachment" in content_disposition: + filename = part.get_filename() + if filename: + filename = decode_str(filename) + content_type = part.get_content_type() + size = len(part.get_payload(decode=True) or b"") + + attachments.append({ + "filename": filename, + "content_type": content_type, + "size": size, + "part": part + }) + + return attachments + + +def save_attachment(part, filename: str, email_id: str) -> str: + """保存附件到本地""" + # 确保附件目录存在 + os.makedirs(ATTACHMENTS_DIR, exist_ok=True) + + # 为每封邮件创建子目录 + email_dir = os.path.join(ATTACHMENTS_DIR, email_id) + os.makedirs(email_dir, exist_ok=True) + + # 保存附件 + filepath = os.path.join(email_dir, filename) + payload = part.get_payload(decode=True) + + if payload: + with open(filepath, "wb") as f: + f.write(payload) + + return filepath + + +def read_text_attachment(filepath: str) -> Optional[str]: + """读取文本附件内容""" + # 检查是否是文本文件 + text_extensions = ['.txt', '.md', '.json', '.csv', '.log', '.py', '.js', '.html', '.css', '.xml', '.yaml', '.yml', '.ini', '.cfg', '.conf', '.sh', '.bat'] + + ext = os.path.splitext(filepath)[1].lower() + if ext not in text_extensions: + return None + + try: + with open(filepath, 'r', encoding='utf-8') as f: + return f.read() + except UnicodeDecodeError: + try: + with open(filepath, 'r', encoding='gbk') as f: + return f.read() + except: + return None + + +def connect_imap(account: dict) -> Optional[imaplib.IMAP4]: + """连接 IMAP 服务器""" + # 获取 IMAP 配置 + imap_server, imap_port, use_ssl = get_imap_server(account["email"]) + + try: + if use_ssl: + imap = imaplib.IMAP4_SSL(imap_server, imap_port) + else: + imap = imaplib.IMAP4(imap_server, imap_port) + + # 登录 + imap.login(account["email"], account["password"]) + + return imap + + except Exception as e: + print(f"❌ IMAP 连接失败: {e}") + return None + + +def list_unread(account_name: Optional[str] = None, limit: int = 10, verbose: bool = False) -> List[Dict]: + """ + 列出未读邮件 + + 返回: + List[Dict]: 邮件列表,每项包含 id, subject, from, date, has_attachment + """ + account = get_account(account_name) + if account is None: + print("❌ 错误:未找到邮箱配置") + return [] + + imap = connect_imap(account) + if imap is None: + return [] + + try: + # 选择收件箱 + imap.select("INBOX") + + # 搜索未读邮件 + status, data = imap.search(None, "UNSEEN") + + if status != "OK": + print("❌ 搜索未读邮件失败") + return [] + + email_ids = data[0].split() + total_unread = len(email_ids) + + if verbose: + print(f"📊 共有 {total_unread} 封未读邮件") + + # 限制返回数量 + email_ids = email_ids[:limit] + + emails = [] + for email_id in email_ids: + email_id_str = email_id.decode() + + # 获取邮件头 + status, msg_data = imap.fetch(email_id, "(BODY.PEEK[HEADER.FIELDS (SUBJECT FROM DATE)])") + + if status != "OK": + continue + + # 解析邮件头 + raw_header = msg_data[0][1] + header_msg = em.message_from_bytes(raw_header) + + subject = decode_str(header_msg.get("Subject", "")) + from_addr = decode_str(header_msg.get("From", "")) + date_str = header_msg.get("Date", "") + + # 检查是否有附件 + status, msg_data = imap.fetch(email_id, "(BODYSTRUCTURE)") + has_attachment = False + if status == "OK" and msg_data: + # 简单检查 BODYSTRUCTURE 中是否有 attachment + structure = str(msg_data[0]) + if "attachment" in structure.lower() or "filename" in structure.lower(): + has_attachment = True + + emails.append({ + "id": email_id_str, + "subject": subject, + "from": from_addr, + "date": date_str, + "has_attachment": has_attachment + }) + + imap.close() + imap.logout() + + return emails + + except Exception as e: + print(f"❌ 获取未读邮件失败: {e}") + return [] + + +def read_email(email_id: str, account_name: Optional[str] = None, save_attachments: bool = True, verbose: bool = False) -> Optional[Dict]: + """ + 读取指定邮件内容 + + 参数: + email_id: 邮件ID + account_name: 账号名称 + save_attachments: 是否保存附件 + verbose: 显示详细日志 + + 返回: + Dict: 邮件详情,包含 subject, from, to, date, body, attachments + """ + account = get_account(account_name) + if account is None: + print("❌ 错误:未找到邮箱配置") + return None + + imap = connect_imap(account) + if imap is None: + return None + + try: + imap.select("INBOX") + + # 获取完整邮件 + status, msg_data = imap.fetch(email_id, "(RFC822)") + + if status != "OK": + print(f"❌ 获取邮件 {email_id} 失败") + return None + + # 解析邮件 + raw_email = msg_data[0][1] + msg = em.message_from_bytes(raw_email) + + # 提取邮件信息 + subject = decode_str(msg.get("Subject", "")) + from_addr = decode_str(msg.get("From", "")) + to_addr = decode_str(msg.get("To", "")) + date_str = msg.get("Date", "") + + # 提取正文 + body = get_email_content(msg) + + # 提取附件 + attachments_info = get_attachments(msg) + attachments = [] + + for att in attachments_info: + if save_attachments: + filepath = save_attachment(att["part"], att["filename"], email_id) + if verbose: + print(f"📎 已保存附件: {filepath}") + + # 如果是文本文件,读取内容 + text_content = read_text_attachment(filepath) + attachments.append({ + "filename": att["filename"], + "filepath": filepath, + "size": att["size"], + "is_text": text_content is not None, + "content": text_content + }) + else: + attachments.append({ + "filename": att["filename"], + "size": att["size"], + "is_text": False, + "content": None + }) + + # 标记为已读 + imap.store(email_id, "+FLAGS", "\\Seen") + + imap.close() + imap.logout() + + return { + "id": email_id, + "subject": subject, + "from": from_addr, + "to": to_addr, + "date": date_str, + "body": body, + "attachments": attachments + } + + except Exception as e: + print(f"❌ 读取邮件失败: {e}") + return None + + +def print_email_summary(emails: List[Dict]): + """打印邮件摘要""" + if not emails: + print("📭 没有未读邮件") + return + + print(f"\n📬 未读邮件 ({len(emails)} 封)") + print("=" * 60) + + for i, e in enumerate(emails, 1): + attachment_mark = "📎" if e.get("has_attachment") else "" + print(f"\n{i}. [{e['id']}] {attachment_mark}") + print(f" 主题: {e['subject']}") + print(f" 发件人: {e['from']}") + print(f" 时间: {e['date']}") + + +def print_email_detail(email_data: Dict): + """打印邮件详情""" + print(f"\n📧 邮件详情") + print("=" * 60) + print(f"ID: {email_data['id']}") + print(f"主题: {email_data['subject']}") + print(f"发件人: {email_data['from']}") + print(f"收件人: {email_data['to']}") + print(f"时间: {email_data['date']}") + print("\n正文:") + print("-" * 40) + print(email_data['body'][:500] if len(email_data['body']) > 500 else email_data['body']) + if len(email_data['body']) > 500: + print(f"\n... (正文共 {len(email_data['body'])} 字符,已截断)") + + if email_data['attachments']: + print("\n附件:") + print("-" * 40) + for att in email_data['attachments']: + text_mark = "📝" if att['is_text'] else "📎" + print(f" {text_mark} {att['filename']} ({att['size']} bytes)") + if att['is_text'] and att['content']: + preview = att['content'][:200] + print(f" 内容预览: {preview}...") + + +def main(): + """命令行入口""" + import argparse + + parser = argparse.ArgumentParser(description="邮件接收工具") + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # 列出未读邮件命令 + list_parser = subparsers.add_parser("unread", help="列出未读邮件") + list_parser.add_argument("--account", "-a", help="使用指定配置") + list_parser.add_argument("--limit", "-l", type=int, default=10, help="最大返回数量") + list_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") + + # 读取邮件命令 + read_parser = subparsers.add_parser("read", help="读取指定邮件") + read_parser.add_argument("id", help="邮件ID") + read_parser.add_argument("--account", "-a", help="使用指定配置") + read_parser.add_argument("--no-save", action="store_true", help="不保存附件") + read_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") + + # 查看附件命令 + att_parser = subparsers.add_parser("attachment", help="查看附件内容") + att_parser.add_argument("id", help="邮件ID") + att_parser.add_argument("--filename", "-f", help="指定附件文件名") + att_parser.add_argument("--account", "-a", help="使用指定配置") + + args = parser.parse_args() + + if args.command == "unread": + emails = list_unread( + account_name=args.account, + limit=args.limit, + verbose=args.verbose + ) + print_email_summary(emails) + + elif args.command == "read": + email_data = read_email( + email_id=args.id, + account_name=args.account, + save_attachments=not args.no_save, + verbose=args.verbose + ) + if email_data: + print_email_detail(email_data) + + elif args.command == "attachment": + email_data = read_email( + email_id=args.id, + account_name=args.account, + save_attachments=True, + verbose=False + ) + if email_data and email_data['attachments']: + # 找到指定附件或第一个文本附件 + target_att = None + for att in email_data['attachments']: + if args.filename and att['filename'] == args.filename: + target_att = att + break + elif not args.filename and att['is_text']: + target_att = att + break + + if target_att: + print(f"\n📄 附件: {target_att['filename']}") + print("=" * 60) + print(target_att['content'] or "(非文本附件)") + else: + print(f"❌ 未找到附件: {args.filename or '文本附件'}") + else: + print("❌ 邮件无附件或读取失败") + + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/send_email.py b/scripts/send_email.py new file mode 100644 index 0000000..fa36aa4 --- /dev/null +++ b/scripts/send_email.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +邮件发送脚本 +支持纯文字/HTML 邮件、抄送/密送、附件和批量发送 +""" + +import smtplib +import os +import sys +import json +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication +from email.utils import formatdate, make_msgid +from typing import List, Optional + +# 配置文件路径 +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +CONFIG_FILE = os.path.join(SCRIPT_DIR, "smtp_config.json") + + +def load_config() -> dict: + """加载 SMTP 配置""" + if not os.path.exists(CONFIG_FILE): + return {"accounts": []} + + with open(CONFIG_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + + +def save_config(config: dict): + """保存 SMTP 配置""" + with open(CONFIG_FILE, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + + +def setup_account( + name: str, + smtp_server: str, + smtp_port: int, + email: str, + password: str, + use_tls: bool = True +): + """配置 SMTP 账号""" + config = load_config() + + # 检查是否已存在同名配置 + for i, acc in enumerate(config["accounts"]): + if acc["name"] == name: + print(f"配置 '{name}' 已存在,正在更新...") + config["accounts"][i] = { + "name": name, + "smtp_server": smtp_server, + "smtp_port": smtp_port, + "email": email, + "password": password, # 明文存储,仅用于本地 + "use_tls": use_tls + } + save_config(config) + return + + # 添加新配置 + config["accounts"].append({ + "name": name, + "smtp_server": smtp_server, + "smtp_port": smtp_port, + "email": email, + "password": password, + "use_tls": use_tls + }) + save_config(config) + print(f"✅ 配置 '{name}' 已保存") + + +def list_accounts(): + """列出所有 SMTP 配置""" + config = load_config() + + if not config["accounts"]: + print("暂无 SMTP 配置") + return + + print("\n=== SMTP 配置列表 ===") + for i, acc in enumerate(config["accounts"], 1): + print(f"\n{i}. {acc['name']}") + print(f" 服务器: {acc['smtp_server']}:{acc['smtp_port']}") + print(f" 邮箱: {acc['email']}") + print(f" TLS: {'是' if acc['use_tls'] else '否'}") + + +def get_account(name: Optional[str] = None) -> Optional[dict]: + """获取 SMTP 配置""" + config = load_config() + + if not config["accounts"]: + return None + + if name is None: + return config["accounts"][0] + + for acc in config["accounts"]: + if acc["name"] == name: + return acc + + return None + + +def send_email( + to: str, + subject: str, + body: str, + account_name: Optional[str] = None, + html: bool = False, + cc: Optional[str] = None, + bcc: Optional[str] = None, + from_name: Optional[str] = None, + attachments: Optional[List[str]] = None, + verbose: bool = False +) -> bool: + """ + 发送邮件 + + 参数: + to: 收件人邮箱地址 + subject: 邮件主题 + body: 邮件内容 + account_name: SMTP 配置名称(可选,默认使用第一个) + html: 是否为 HTML 邮件(默认 False) + cc: 抄送邮箱地址,多个用逗号分隔(可选) + bcc: 密送邮箱地址,多个用逗号分隔(可选) + from_name: 发件人显示名称(可选) + attachments: 附件文件路径列表(可选) + verbose: 是否显示详细日志(默认 False) + + 返回: + bool: 发送成功返回 True,失败返回 False + """ + try: + # 获取 SMTP 配置 + account = get_account(account_name) + if account is None: + print("❌ 错误:未找到 SMTP 配置,请先运行配置") + return False + + # 创建邮件 + if attachments: + msg = MIMEMultipart() + msg.attach(MIMEText(body, "html" if html else "plain", "utf-8")) + else: + msg = MIMEText(body, "html" if html else "plain", "utf-8") + + # 设置邮件头 + msg["Subject"] = subject + msg["From"] = f"{from_name} <{account['email']}>" if from_name else account["email"] + msg["To"] = to + msg["Date"] = formatdate(localtime=True) + msg["Message-Id"] = make_msgid(domain=account["email"].split("@")[1]) + + if cc: + msg["Cc"] = cc + + # 添加附件 + if attachments: + for file_path in attachments: + if not os.path.exists(file_path): + print(f"⚠️ 附件不存在,已跳过: {file_path}") + continue + + with open(file_path, "rb") as f: + part = MIMEApplication(f.read()) + + filename = os.path.basename(file_path) + part.add_header( + "Content-Disposition", + "attachment", + filename=filename + ) + msg.attach(part) + if verbose: + print(f"📎 已添加附件: {filename}") + + # 连接 SMTP 服务器 + if verbose: + print(f"📡 正在连接 {account['smtp_server']}:{account['smtp_port']}...") + + if account["use_tls"]: + server = smtplib.SMTP(account["smtp_server"], account["smtp_port"]) + server.starttls() + else: + server = smtplib.SMTP(account["smtp_server"], account["smtp_port"]) + + # 登录 + if verbose: + print(f"🔑 正在登录 {account['email']}...") + server.login(account["email"], account["password"]) + + # 准备收件人列表 + recipients = [to] + if cc: + recipients.extend([addr.strip() for addr in cc.split(",")]) + if bcc: + recipients.extend([addr.strip() for addr in bcc.split(",")]) + + # 发送邮件 + if verbose: + print(f"📤 正在发送邮件...") + server.sendmail(account["email"], recipients, msg.as_string()) + server.quit() + + print(f"✅ 邮件发送成功") + print(f" 收件人: {to}") + if cc: + print(f" 抄送: {cc}") + if attachments: + valid_attachments = [a for a in attachments if os.path.exists(a)] + print(f" 附件: {len(valid_attachments)} 个") + + return True + + except Exception as e: + print(f"❌ 邮件发送失败: {e}") + return False + + +def main(): + """命令行入口""" + import argparse + + parser = argparse.ArgumentParser(description="邮件发送工具") + subparsers = parser.add_subparsers(dest="command", help="可用命令") + + # 配置命令 + config_parser = subparsers.add_parser("config", help="配置 SMTP") + config_parser.add_argument("name", help="配置名称") + config_parser.add_argument("--server", required=True, help="SMTP 服务器地址") + config_parser.add_argument("--port", type=int, required=True, help="SMTP 端口") + config_parser.add_argument("--email", required=True, help="邮箱地址") + config_parser.add_argument("--password", required=True, help="邮箱密码") + config_parser.add_argument("--no-tls", action="store_true", help="不使用 TLS(默认使用)") + + # 列出配置命令 + subparsers.add_parser("list", help="列出所有配置") + + # 发送命令 + send_parser = subparsers.add_parser("send", help="发送邮件") + send_parser.add_argument("--to", "-t", required=True, help="收件人邮箱") + send_parser.add_argument("--subject", "-s", required=True, help="邮件主题") + send_parser.add_argument("--body", "-b", required=True, help="邮件内容") + send_parser.add_argument("--account", "-a", help="使用指定配置") + send_parser.add_argument("--html", action="store_true", help="HTML 格式") + send_parser.add_argument("--cc", help="抄送邮箱(多个用逗号分隔)") + send_parser.add_argument("--bcc", help="密送邮箱(多个用逗号分隔)") + send_parser.add_argument("--from-name", help="发件人显示名称") + send_parser.add_argument("--attach", "-f", action="append", help="附件文件路径(可多次使用)") + send_parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志") + + args = parser.parse_args() + + if args.command == "config": + setup_account( + name=args.name, + smtp_server=args.server, + smtp_port=args.port, + email=args.email, + password=args.password, + use_tls=not args.no_tls + ) + + elif args.command == "list": + list_accounts() + + elif args.command == "send": + success = send_email( + to=args.to, + subject=args.subject, + body=args.body, + account_name=args.account, + html=args.html, + cc=args.cc, + bcc=args.bcc, + from_name=args.from_name, + attachments=args.attach, + verbose=args.verbose + ) + sys.exit(0 if success else 1) + + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file