"C 盘满了"——这句话大概是 IT 运维中听到最多的求助了。与其每次手动清理,不如让 python 自动搞定。
磁盘空间问题的真相
企业环境中磁盘空间不足的常见原因:
- 日志文件膨胀:IIS 日志、应用日志、数据库日志没人清理
- 临时文件堆积:Windows 临时目录、浏览器缓存、安装残留
- 重复文件:同一个文件被复制了 N 份到不同位置
- 大文件遗忘:测试时拷入的 ISO、数据库备份、录像文件
- 回收站和缩略图缓存:用户删了文件但没清回收站
手动排查?右键→属性→看看哪个盘满了→再一层层找……效率极低。
基础篇:磁盘空间全景扫描
import os
import shutil
from pathlib import Path
from collections import defaultdict
def get_disk_usage():
"""获取所有磁盘分区的使用情况"""
print("=" * 60)
print(f"{'盘符':<8}{'总容量':<12}{'已用':<12}{'可用':<12}{'使用率':<8}")
print("=" * 60)
results = []
for partition in os.popen("wmic logicaldisk get caption,size,freespace").readlines()[1:]:
parts = partition.strip().split()
if len(parts) < 3:
continue
drive = parts[0]
total = int(parts[1]) # 字节
free = int(parts[2])
used = total - free
usage_percent = (used / total * 100) if total > 0 else 0
# 格式化显示
total_gb = total / 1024**3
used_gb = used / 1024**3
free_gb = free / 1024**3
# 根据使用率选择显示标记
if usage_percent >= 90:
icon = "🔴"
elif usage_percent >= 80:
icon = "🟡"
else:
icon = "🟢"
print(f"{icon} {drive:<6}{total_gb:>8.1f} GB{used_gb:>8.1f} GB"
f"{free_gb:>8.1f} GB{usage_percent:>6.1f}%")
results.append({
"drive": drive,
"total_gb": total_gb,
"used_gb": used_gb,
"free_gb": free_gb,
"usage_percent": usage_percent,
})
return results
# 使用
# disks = get_disk_usage()
实战一:大文件扫描器
快速找出占用空间最多的文件:
def scan_large_files(
directory,
min_size_mb=100,
top_n=20,
exclude_dirs=None
):
"""
扫描大文件
directory: 扫描目录
min_size_mb: 最小文件大小(MB)
top_n: 返回前 N 个
exclude_dirs: 排除的目录列表
"""
if exclude_dirs is None:
exclude_dirs = [
r"C:\Windows", r"C:\Program Files",
r"C:\Program Files (x86)",
]
large_files = []
min_size_bytes = min_size_mb * 1024 * 1024
print(f"正在扫描 {directory}(最小 {min_size_mb} MB)...")
for root, dirs, files in os.walk(directory):
# 排除目录
dirs[:] = [
d for d in dirs
if not any(
os.path.normpath(os.path.join(root, d)).startswith(
os.path.normpath(excl)
)
for excl in exclude_dirs
)
]
for file in files:
try:
filepath = os.path.join(root, file)
size = os.path.getsize(filepath)
if size >= min_size_bytes:
mtime = os.path.getmtime(filepath)
large_files.append({
"path": filepath,
"size_mb": round(size / 1024**2, 1),
"size_bytes": size,
"modified": mtime,
})
except (PermissionError, OSError):
continue
# 按大小排序
large_files.sort(key=lambda x: x["size_bytes"], reverse=True)
print(f"\n发现 {len(large_files)} 个大文件,TOP {top_n}:\n")
for f in large_files[:top_n]:
from datetime import datetime
mod_time = datetime.fromtimestamp(f["modified"]).strftime("%Y-%m-%d")
print(f" {f['size_mb']:>8.1f} MB {mod_time} {f['path']}")
return large_files
# 使用:扫描 D 盘超过 500MB 的文件
# large_files = scan_large_files(r"D:\", min_size_mb=500, top_n=30)
实战二:目录大小分析
找出哪些目录占用了最多空间:
def get_directory_sizes(
root_dir,
max_depth=2,
top_n=15,
exclude_dirs=None
):
"""
分析目录大小
max_depth: 递归深度
"""
if exclude_dirs is None:
exclude_dirs = []
dir_sizes = defaultdict(int)
print(f"正在分析 {root_dir} 的目录大小...")
for root, dirs, files in os.walk(root_dir):
# 计算当前深度
rel_path = os.path.relpath(root, root_dir)
depth = rel_path.count(os.sep) + (1 if rel_path != "." else 0)
if depth > max_depth:
dirs.clear() # 不再深入
continue
# 排除目录
dirs[:] = [
d for d in dirs
if not any(os.path.join(root, d).startswith(excl) for excl in exclude_dirs)
]
# 统计当前目录下文件大小
current_size = 0
for file in files:
try:
current_size += os.path.getsize(os.path.join(root, file))
except (PermissionError, OSError):
pass
# 累加到各级父目录
if depth <= max_depth:
dir_sizes[root] += current_size
# 排序
sorted_dirs = sorted(dir_sizes.items(), key=lambda x: x[1], reverse=True)
print(f"\nTOP {top_n} 目录:\n")
total = sum(s for _, s in sorted_dirs)
for path, size in sorted_dirs[:top_n]:
size_gb = size / 1024**3
percent = (size / total * 100) if total > 0 else 0
# 进度条可视化
bar_len = 30
filled = int(bar_len * percent / 100)
bar = "█" * filled + "░" * (bar_len - filled)
print(f" {size_gb:>8.2f} GB {bar} {percent:>5.1f}%")
print(f" {path}")
return sorted_dirs
# 使用
# dirs = get_directory_sizes(r"C:\Users", max_depth=2, exclude_dirs=[r"C:\Users\Default"])
实战三:重复文件检测
清理重复文件可以释放大量空间:
import hashlib
def get_file_hash(filepath, block_size=65536):
"""计算文件的 MD5 哈希"""
hasher = hashlib.md5()
try:
with open(filepath, "rb") as f:
for block in iter(lambda: f.read(block_size), b""):
hasher.update(block)
return hasher.hexdigest()
except (PermissionError, OSError):
return None
def find_duplicate_files(directory, min_size_kb=100):
"""
找出重复文件(基于文件大小+MD5双重校验)
min_size_kb: 只检测大于此大小的文件
"""
# 第一步:按文件大小分组
print("第一步:按文件大小分组...")
size_groups = defaultdict(list)
for root, dirs, files in os.walk(directory):
for file in files:
try:
filepath = os.path.join(root, file)
size = os.path.getsize(filepath)
if size >= min_size_kb * 1024:
size_groups[size].append(filepath)
except (PermissionError, OSError):
continue
# 只保留有多个文件的组
potential_duplicates = {
size: files for size, files in size_groups.items()
if len(files) >= 2
}
print(f"发现 {len(potential_duplicates)} 组大小相同的文件")
# 第二步:对大小相同的文件计算 MD5
print("第二步:计算 MD5 确认重复...")
duplicates = {} # hash -> [filepath, ...]
total_to_check = sum(len(files) for files in potential_duplicates.values())
checked = 0
for size, files in potential_duplicates.items():
for filepath in files:
file_hash = get_file_hash(filepath)
checked += 1
if file_hash:
if file_hash not in duplicates:
duplicates[file_hash] = []
duplicates[file_hash].append(filepath)
# 只保留真正重复的
true_duplicates = {
h: files for h, files in duplicates.items()
if len(files) >= 2
}
# 统计可释放空间
total_wasted = 0
for h, files in true_duplicates.items():
# 保留一个,其余都是浪费
size = os.path.getsize(files[0])
total_wasted += size * (len(files) - 1)
print(f"\n发现 {len(true_duplicates)} 组重复文件")
print(f"可释放空间: {total_wasted / 1024**3:.2f} GB\n")
# 按浪费空间排序
dup_sorted = sorted(
true_duplicates.items(),
key=lambda x: os.path.getsize(x[1][0]) * (len(x[1]) - 1),
reverse=True
)
for h, files in dup_sorted[:10]:
size = os.path.getsize(files[0])
wasted = size * (len(files) - 1)
print(f" 重复 {len(files)} 份, 各 {size/1024**2:.1f} MB, "
f"浪费 {wasted/1024**2:.1f} MB")
for f in files:
print(f" {f}")
print()
return true_duplicates
def cleanup_duplicates(duplicate_map, dry_run=True):
"""
清理重复文件(保留每组中的第一个)
dry_run: True=只报告不删除
"""
total_freed = 0
for h, files in duplicate_map.items():
# 保留第一个,删除其余
keep = files[0]
remove = files[1:]
for filepath in remove:
if dry_run:
size = os.path.getsize(filepath)
total_freed += size
print(f" [模拟删除] {filepath} ({size/1024**2:.1f} MB)")
else:
try:
size = os.path.getsize(filepath)
os.remove(filepath)
total_freed += size
print(f" [已删除] {filepath} ({size/1024**2:.1f} MB)")
except Exception as e:
print(f" [失败] {filepath}: {e}")
action = "可释放" if dry_run else "已释放"
print(f"\n{action}空间: {total_freed / 1024**3:.2f} GB")
if dry_run:
print("\n⚠️ 这是模拟运行,没有实际删除文件。")
print("确认无误后,设置 dry_run=False 重新运行。")
return total_freed
# 使用
# dups = find_duplicate_files(r"D:\Data", min_size_kb=1024)
# cleanup_duplicates(dups, dry_run=True) # 先模拟
# cleanup_duplicates(dups, dry_run=False) # 确认后删除
实战四:智能清理策略
不同类型的垃圾文件需要不同的清理策略:
from datetime import datetime, timedelta
import re
class DiskCleaner:
"""智能磁盘清理器"""
# 常见可清理的目录和模式
CLEANUP_TARGETS = {
"Windows 临时文件": r"C:\Windows\Temp",
"用户临时文件": os.path.join(os.environ.get("TEMP", ""), ""),
"缩略图缓存": os.path.join(
os.environ.get("LOCALAPPDATA", ""), "Microsoft", "Windows", "Explorer"
),
"Windows 更新缓存": r"C:\Windows\SoftwareDistribution\Download",
"Prefetch 缓存": r"C:\Windows\Prefetch",
}
# 可清理的文件模式
CLEANUP_PATTERNS = {
"日志文件": [r"\.log$"],
"临时文件": [r"\.tmp$", r"\.temp$", r"~\$"],
"缓存文件": [r"\.cache$", r"\.bak$"],
"缩略图数据库": [r"thumbcache_.*\.db$"],
}
# 可清理的目录模式
CLEANUP_DIR_PATTERNS = {
"node_modules": r"node_modules$",
"__pycache__": r"__pycache__$",
".pytest_cache": r"\.pytest_cache$",
"pip cache": r"pip[\\/]cache$",
}
def __init__(self, dry_run=True):
self.dry_run = dry_run
self.report = defaultdict(lambda: {"count": 0, "size": 0})
def clean_temp_directories(self):
"""清理系统临时目录"""
print("\n=== 清理临时目录 ===")
for name, path in self.CLEANUP_TARGETS.items():
if not os.path.exists(path):
continue
total_size = 0
file_count = 0
for item in os.listdir(path):
item_path = os.path.join(path, item)
try:
if os.path.isfile(item_path):
size = os.path.getsize(item_path)
total_size += size
file_count += 1
if not self.dry_run:
os.remove(item_path)
elif os.path.isdir(item_path):
size = sum(
os.path.getsize(os.path.join(dp, f))
for dp, dn, fn in os.walk(item_path)
for f in fn
)
total_size += size
file_count += 1
if not self.dry_run:
shutil.rmtree(item_path)
except (PermissionError, OSError):
pass
self.report[name]["count"] = file_count
self.report[name]["size"] = total_size
action = "可清理" if self.dry_run else "已清理"
if file_count > 0:
print(f" {name}: {action} {file_count} 项 "
f"({total_size/1024/1024:.1f} MB)")
def clean_old_logs(self, days=30, log_dirs=None):
"""清理 N 天前的旧日志"""
print(f"\n=== 清理 {days} 天前的日志 ===")
if log_dirs is None:
log_dirs = [
r"C:\inetpub\logs",
r"C:\Windows\System32\winevt\Logs",
]
cutoff = time.time() - (days * 86400)
total_cleaned = 0
for log_dir in log_dirs:
if not os.path.exists(log_dir):
continue
for root, dirs, files in os.walk(log_dir):
for file in files:
if not file.endswith(".log"):
continue
filepath = os.path.join(root, file)
try:
mtime = os.path.getmtime(filepath)
if mtime < cutoff:
size = os.path.getsize(filepath)
total_cleaned += size
if self.dry_run:
from datetime import datetime as dt
mod_date = dt.fromtimestamp(mtime).strftime("%Y-%m-%d")
print(f" [模拟] {filepath} ({size/1024/1024:.1f} MB, {mod_date})")
else:
os.remove(filepath)
except (PermissionError, OSError):
pass
action = "可释放" if self.dry_run else "已释放"
print(f"\n日志清理: {action} {total_cleaned/1024/1024:.1f} MB")
def clean_recycle_bin(self):
"""清空回收站"""
import ctypes
print("\n=== 清空回收站 ===")
if self.dry_run:
# 估算回收站大小
print(" [模拟] 清空回收站")
return
try:
# Windows API 清空回收站
result = ctypes.windll.shell32.SHEmptyRecycleBinW(None, None, 7)
if result == 0:
print(" 回收站已清空")
else:
print(f" 清空回收站返回: {result}")
except Exception as e:
print(f" 清空回收站失败: {e}")
def run_full_cleanup(self):
"""执行完整清理"""
mode = "模拟" if self.dry_run else "实际"
print(f"{'=' * 60}")
print(f" 磁盘清理 ({mode}模式)")
print(f"{'=' * 60}")
self.clean_temp_directories()
self.clean_old_logs(days=30)
self.clean_recycle_bin()
# 汇总
total_size = sum(item["size"] for item in self.report.values())
print(f"\n{'=' * 60}")
action = "总计可释放" if self.dry_run else "总计已释放"
print(f" {action}: {total_size / 1024 / 1024:.1f} MB")
print(f"{'=' * 60}")
if self.dry_run:
print("\n确认无误后,使用 DiskCleaner(dry_run=False) 执行实际清理")
return total_size
import time
# 使用示例
# cleaner = DiskCleaner(dry_run=True) # 先模拟
# cleaner.run_full_cleanup()
# cleaner = DiskCleaner(dry_run=False) # 确认后执行
# cleaner.run_full_cleanup()
实战五:磁盘空间监控与告警
import json from datetime import datetime, timedelta from pathlib import Path class DiskMonitor: """磁盘空间持续监控器""" def __init__(self, data_file="disk_history.json"): self.data_file = Path(data_file) self.history = self._load_history() self.threshold_warning = 80 # 警告阈值 % self.threshold_critical = 95 # 严重阈值 % def _load_history(self): """加载历史数据""" if self.data_file.exists(): with open(self.data_file, "r") as f: return json.load(f) return [] def _save_history(self): """保存历史数据""" with open(self.data_file, "w") as f: json.dump(self.history, f, ensure_ascii=False, indent=2) def record_snapshot(self): """记录当前磁盘快照""" snapshot = { "timestamp": datetime.now().isoformat(), "drives": [], } for partition in os.popen( "wmic logicaldisk get caption,size,freespace" ).readlines()[1:]: parts = partition.strip().split() if len(parts) < 3: continue total = int(parts[1]) free = int(parts[2]) usage = ((total - free) / total * 100) if total > 0 else 0 snapshot["drives"].append({ "drive": parts[0], "total_gb": round(total / 1024**3, 2), "free_gb": round(free / 1024**3, 2), "usage_percent": round(usage, 1), }) self.history.append(snapshot) # 只保留最近 90 天的数据 cutoff = (datetime.now() - timedelta(days=90)).isoformat() self.history = [ h for h in self.history if h["timestamp"] >= cutoff ] self._save_history() return snapshot def check_alerts(self, snapshot=None): """检查是否需要告警""" if not snapshot: snapshot = self.record_snapshot() alerts = [] for drive in snapshot["drives"]: usage = drive["usage_percent"] free_gb = drive["free_gb"] if usage >= self.threshold_critical: alerts.append({ "severity": "CRITICAL", "drive": drive["drive"], "message": ( f"{drive['drive']} 磁盘空间严重不足!" f"已用 {usage}%,仅剩 {free_gb:.1f} GB" ), }) elif usage >= self.threshold_warning: alerts.append({ "severity": "WARNING", "drive": drive["drive"], "message": ( f"{drive['drive']} 磁盘空间不足警告:" f"已用 {usage}%,剩余 {free_gb:.1f} GB" ), }) return alerts def analyze_trend(self, drive_letter, days=30): """分析磁盘使用趋势""" cutoff = (datetime.now() - timedelta(days=days)).isoformat() recent = [ h for h in self.history if h["timestamp"] >= cutoff ] # 提取目标盘符的数据点 data_points = [] for h in recent: for d in h["drives"]: if d["drive"] == drive_letter: data_points.append({ "time": h["timestamp"], "usage": d["usage_percent"], "free_gb": d["free_gb"], }) break if len(data_points) < 2: print(f"数据不足(需要至少 2 个数据点,当前 {len(data_points)} 个)") return None # 计算趋势 first = data_points[0] last = data_points[-1] usage_change = last["usage"] - first["usage"] free_change = last["free_gb"] - first["free_gb"] days_span = len(data_points) # 近似天数 daily_usage_increase = usage_change / days_span if days_span > 0 else 0 daily_free_decrease = abs(free_change) / days_span if days_span > 0 else 0 # 预计何时达到阈值 if daily_usage_increase > 0: days_to_warning = (self.threshold_warning - last["usage"]) / daily_usage_increase days_to_critical = (self.threshold_critical - last["usage"]) / daily_usage_increase else: days_to_warning = float('inf') days_to_critical = float('inf') report = { "drive": drive_letter, "period": f"最近 {days} 天", "data_points": len(data_points), "current_usage": f"{last['usage']}%", "current_free": f"{last['free_gb']} GB", "usage_change": f"+{usage_change:.1f}%" if usage_change > 0 else f"{usage_change:.1f}%", "free_change": f"{free_change:.1f} GB", "daily_increase": f"+{daily_usage_increase:.2f}%" if daily_usage_increase > 0 else "稳定", "days_to_warning": f"{days_to_warning:.0f} 天" if days_to_warning < 365 else "无风险", "days_to_critical": f"{days_to_critical:.0f} 天" if days_to_critical < 365 else "无风险", } print(f"\n=== {drive_letter} 磁盘使用趋势 ({report['period']}) ===") print(f" 当前使用率: {report['current_usage']}, 剩余: {report['current_free']}") print(f" 变化: 使用率 {report['usage_change']}, 空间 {report['free_change']}") print(f" 日均增长: {report['daily_increase']}") print(f" 预计触发警告: {report['days_to_warning']}") print(f" 预计磁盘满: {report['days_to_critical']}") return report # 使用示例 # monitor = DiskMonitor() # monitor.record_snapshot() # 记录快照 # alerts = monitor.check_alerts() # monitor.analyze_trend("C:", days=30)
小结
| 需求 | 方案 | 复杂度 |
|---|---|---|
| 磁盘全景 |
wmic + shutil
|
入门 |
| 大文件扫描 |
os.walk + 大小排序 |
入门 |
| 目录大小分析 |
os.walk + 深度控制 |
中等 |
| 重复文件检测 | 文件大小 + MD5 双重校验 | 中等 |
| 自动清理 | 按类型策略清理 | 中等 |
| 趋势分析 | 历史数据 + 线性回归 | 进阶 |
| 告警通知 | 阈值检查 + 邮件/微信推送 | 进阶 |
磁盘空间管理是运维的基本功。建好监控,设好告警,自动清理,把"C 盘满了"这种低级问题消灭在萌芽状态。
以上就是Python实现智能磁盘监控并自动清理垃圾文件的详细内容,更多关于Python磁盘监控的资料请关注本站其它相关文章!
声明:本站(华域联盟www.cnhackhy.com)所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

评论(0)