Files
disk-scanner/disk_scanner.py
hubian 4ce1c93ad3 feat: 磁盘大文件扫描工具 v1.0.0
功能:
- 智能跳过零碎目录 (node_modules, .git, venv等)
- 文件数量阈值判断
- 大小阈值过滤
- 按大小排序展示
- 树形结构清晰展示
- 支持Windows/Linux/macOS
2026-04-12 16:22:01 +08:00

337 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
磁盘大文件扫描工具
智能扫描目录,找出大文件和大目录,自动跳过零碎文件目录
"""
import os
import sys
import argparse
from pathlib import Path
from collections import defaultdict
from typing import Optional, Set, Tuple
# 默认跳过的目录模式(零碎文件目录)
SKIP_PATTERNS = {
# 依赖/包管理
'node_modules', '.npm', '.yarn', '.pnpm-store',
'venv', 'env', '.venv', '.env', 'site-packages',
'__pycache__', '.mypy_cache', '.pytest_cache', '.tox',
'target', 'build', 'dist', '.gradle', '.m2', '.mvn',
'vendor', 'Pods', 'Carthage', 'DerivedData',
# 版本控制
'.git', '.svn', '.hg', '.bzr',
# IDE/编辑器
'.idea', '.vscode', '.vs', '*.egg-info',
# 缓存/临时
'.cache', 'cache', 'Cache', '__pycache__',
'tmp', 'temp', 'Temp', '.tmp',
'.cache', 'Library/Caches',
# 系统回收站
'$RECYCLE.BIN', '.Trash', '.Trashes',
# 其他常见零碎目录
'.cargo', '.rustup', 'go/pkg',
'.nuget', 'packages',
}
# 默认显示设置
DEFAULT_TOP_N = 20 # 每层显示前N个
DEFAULT_FILE_THRESHOLD = 100 # 文件数阈值,超过视为"程序包"
DEFAULT_SIZE_THRESHOLD = 10 * 1024 * 1024 # 10MB小于此不深入
def format_size(size: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}" if size >= 1 else f"{size:.2f}{unit}"
size /= 1024
return f"{size:.1f}PB"
def should_skip(name: str) -> bool:
"""判断目录是否应该跳过深入扫描"""
name_lower = name.lower()
for pattern in SKIP_PATTERNS:
if pattern.lower() == name_lower:
return True
if pattern.startswith('*.') and name_lower.endswith(pattern[1:].lower()):
return True
return False
def get_dir_info(path: Path, skip_dirs: Set[str] = None) -> Tuple[int, int, bool]:
"""
获取目录信息:总大小、文件数、是否为零碎目录
返回: (size, file_count, is_fragmented)
"""
if skip_dirs is None:
skip_dirs = set()
total_size = 0
file_count = 0
subdirs = []
try:
for entry in os.scandir(path):
try:
if entry.is_file(follow_symlinks=False):
total_size += entry.stat(follow_symlinks=False).st_size
file_count += 1
elif entry.is_dir(follow_symlinks=False):
if entry.name not in skip_dirs:
subdirs.append(entry.path)
except (OSError, PermissionError):
pass
except (OSError, PermissionError):
pass
return total_size, file_count, subdirs
def scan_directory(
path: Path,
depth: int = 0,
max_depth: int = -1,
skip_patterns: Set[str] = None,
file_threshold: int = DEFAULT_FILE_THRESHOLD,
size_threshold: int = DEFAULT_SIZE_THRESHOLD,
all_results: list = None,
prefix: str = ""
) -> list:
"""
递归扫描目录
Args:
path: 扫描路径
depth: 当前深度
max_depth: 最大深度(-1表示无限
skip_patterns: 跳过的目录名模式
file_threshold: 文件数阈值
size_threshold: 大小阈值
all_results: 收集所有结果
prefix: 显示前缀(树形结构)
Returns:
结果列表
"""
if skip_patterns is None:
skip_patterns = SKIP_PATTERNS
if all_results is None:
all_results = []
if max_depth > 0 and depth >= max_depth:
return all_results
# 收集当前目录下的所有条目
entries_data = []
try:
for entry in os.scandir(path):
try:
if entry.is_file(follow_symlinks=False):
size = entry.stat(follow_symlinks=False).st_size
entries_data.append({
'name': entry.name,
'path': entry.path,
'size': size,
'type': 'file'
})
elif entry.is_dir(follow_symlinks=False):
# 快速判断是否应该跳过
if should_skip(entry.name):
# 直接统计,不深入
dir_size = 0
file_count = 0
for root, dirs, files in os.walk(entry.path, onerror=lambda x: None):
# 阻止进一步递归skip目录
dirs[:] = [d for d in dirs if not should_skip(d)]
for f in files:
try:
dir_size += os.path.getsize(os.path.join(root, f))
file_count += 1
except (OSError, PermissionError):
pass
entries_data.append({
'name': entry.name,
'path': entry.path,
'size': dir_size,
'type': 'skipped_dir',
'file_count': file_count,
'reason': 'known_pattern'
})
else:
# 正常目录,获取基本信息
size, count, subdirs = get_dir_info(entry.path, skip_patterns)
# 判断是否为零碎目录
is_fragmented = count > file_threshold
entries_data.append({
'name': entry.name,
'path': entry.path,
'size': size,
'type': 'fragmented_dir' if is_fragmented else 'dir',
'file_count': count
})
except (OSError, PermissionError):
pass
except (OSError, PermissionError) as e:
return all_results
# 按大小排序
entries_data.sort(key=lambda x: x['size'], reverse=True)
# 记录当前目录信息
total_size = sum(e['size'] for e in entries_data)
# 打印当前层级
if depth == 0:
print(f"\n📁 {path}")
print(f" 总大小: {format_size(total_size)}")
print("-" * 60)
# 显示条目
for i, entry in enumerate(entries_data):
is_last = i == len(entries_data) - 1
# 构建树形前缀
connector = "└── " if is_last else "├── "
child_prefix = " " if is_last else ""
current_prefix = prefix + connector
size_str = format_size(entry['size'])
name = entry['name']
# 类型标识
type_indicator = ""
if entry['type'] == 'file':
type_indicator = "📄"
elif entry['type'] == 'skipped_dir':
type_indicator = f"📦 [{entry['file_count']} files, skipped]"
elif entry['type'] == 'fragmented_dir':
type_indicator = f"📁 [{entry['file_count']} files, fragmented]"
else:
type_indicator = "📁"
# 格式化输出
print(f"{prefix}{connector}{type_indicator} {name}: {size_str}")
# 递归处理普通目录
if entry['type'] == 'dir' and entry['size'] > size_threshold:
# 只深入大目录
scan_directory(
Path(entry['path']),
depth=depth + 1,
max_depth=max_depth,
skip_patterns=skip_patterns,
file_threshold=file_threshold,
size_threshold=size_threshold,
all_results=all_results,
prefix=prefix + child_prefix
)
return all_results
def scan_and_report(
path: str,
max_depth: int = -1,
top_n: int = DEFAULT_TOP_N,
file_threshold: int = DEFAULT_FILE_THRESHOLD,
size_threshold: int = DEFAULT_SIZE_THRESHOLD
):
"""
扫描目录并生成报告
"""
target = Path(path).resolve()
if not target.exists():
print(f"错误: 路径不存在 - {path}")
sys.exit(1)
if not target.is_dir():
print(f"错误: 不是目录 - {path}")
sys.exit(1)
print(f"\n{'='*60}")
print(f" 磁盘空间扫描报告")
print(f"{'='*60}")
print(f" 目标: {target}")
print(f" 文件数阈值: {file_threshold} (超过视为零碎目录)")
print(f" 大小阈值: {format_size(size_threshold)} (小于此值不深入)")
if max_depth > 0:
print(f" 最大深度: {max_depth}")
else:
print(f" 最大深度: 无限制")
print(f"{'='*60}")
scan_directory(
target,
max_depth=max_depth,
file_threshold=file_threshold,
size_threshold=size_threshold
)
print(f"\n{'='*60}")
print(" 扫描完成")
print(f"{'='*60}\n")
def main():
parser = argparse.ArgumentParser(
description='磁盘大文件扫描工具 - 智能扫描找出大文件和大目录',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
示例:
# 扫描当前目录
python disk_scanner.py .
# 扫描指定目录限制深度3层
python disk_scanner.py C:\\Users -d 3
# 自定义阈值
python disk_scanner.py D:\\ -f 200 -s 50M
'''
)
parser.add_argument('path', help='要扫描的目录路径')
parser.add_argument('-d', '--depth', type=int, default=-1,
help='最大扫描深度(默认无限制)')
parser.add_argument('-n', '--top', type=int, default=DEFAULT_TOP_N,
help=f'每层显示前N个条目默认{DEFAULT_TOP_N}')
parser.add_argument('-f', '--file-threshold', type=int, default=DEFAULT_FILE_THRESHOLD,
help=f'文件数阈值,超过视为零碎目录(默认{DEFAULT_FILE_THRESHOLD}')
parser.add_argument('-s', '--size-threshold', type=str, default='10M',
help='大小阈值小于此值不深入扫描默认10M支持K/M/G后缀')
args = parser.parse_args()
# 解析大小阈值
size_str = args.size_threshold.upper()
if size_str.endswith('K'):
size_threshold = int(size_str[:-1]) * 1024
elif size_str.endswith('M'):
size_threshold = int(size_str[:-1]) * 1024 * 1024
elif size_str.endswith('G'):
size_threshold = int(size_str[:-1]) * 1024 * 1024 * 1024
else:
size_threshold = int(size_str)
scan_and_report(
args.path,
max_depth=args.depth,
top_n=args.top,
file_threshold=args.file_threshold,
size_threshold=size_threshold
)
if __name__ == '__main__':
main()