Lab: Automation Scripts
Real-world lab: Build a system administration script that monitors disk space, checks service health, and generates reports—practical DevOps automation.
📋 Lab Overview
Your task: create a system monitoring script that checks disk usage across mount points, verifies critical services are running, collects system metrics, and writes reports. Alternately: a file cleanup utility that finds and removes old files based on rules. Skills applied: subprocess, file I/O, data structures, error handling, logging, CLI.
🎯 Objectives
- Use subprocess to call system commands (df, systemctl, etc.).
- Parse command output and extract relevant data.
- Implement alert thresholds (disk >80%, service down).
- Write results to log files with timestamps.
- Handle command failures gracefully.
- Build a CLI with argparse for user configuration.
💻 Example Option A: System Monitoring Script
python
#!/usr/bin/env python3
"""
System monitoring script: check disk, services, memory.
"""
import subprocess
import json
import argparse
import logging
from pathlib import Path
from datetime import datetime
# Setup logging
log_dir = Path("/var/log/sysmon")
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
filename=log_dir / "monitor.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def get_disk_usage():
"""Get disk usage for all mount points."""
try:
result = subprocess.run(
["df", "-h"],
capture_output=True,
text=True,
check=True
)
usage = {}
lines = result.stdout.strip().split("\n")[1:] # skip header
for line in lines:
parts = line.split()
if len(parts) >= 5:
filesystem = parts[0]
used_percent = int(parts[4].rstrip("%"))
mount_point = parts[5]
usage[mount_point] = {
"filesystem": filesystem,
"percent": used_percent
}
return usage
except subprocess.CalledProcessError as e:
logging.error(f"Failed to get disk usage: {e}")
return {}
def check_service_status(services):
"""Check if services are running."""
status = {}
for service in services:
try:
result = subprocess.run(
["systemctl", "is-active", service],
capture_output=True,
text=True
)
status[service] = result.returncode == 0 # 0 = active
except Exception as e:
logging.error(f"Failed to check {service}: {e}")
status[service] = False
return status
def get_memory_usage():
"""Get memory usage percentage."""
try:
result = subprocess.run(
["free", "-h"],
capture_output=True,
text=True,
check=True
)
lines = result.stdout.strip().split("\n")
mem_line = lines[1].split()
total = mem_line[1]
used = mem_line[2]
# Calculate percent (rough estimate)
total_bytes = int(mem_line[1].replace("G", "") or 0)
used_bytes = int(mem_line[2].replace("G", "") or 0)
percent = (used_bytes / total_bytes * 100) if total_bytes > 0 else 0
return {"total": total, "used": used, "percent": percent}
except Exception as e:
logging.error(f"Failed to get memory: {e}")
return {}
def main():
parser = argparse.ArgumentParser(description="System monitoring script")
parser.add_argument("--disk-threshold", type=int, default=80, help="Disk usage threshold %")
parser.add_argument("--services", nargs="+", default=["nginx", "postgres"], help="Services to check")
parser.add_argument("--output", help="Output JSON file")
args = parser.parse_args()
logging.info("Starting system monitoring")
# Collect metrics
disk = get_disk_usage()
services = check_service_status(args.services)
memory = get_memory_usage()
# Check thresholds
alerts = []
for mount, usage in disk.items():
if usage["percent"] > args.disk_threshold:
alerts.append(f"ALERT: {mount} disk usage {usage['percent']}% (threshold: {args.disk_threshold}%)")
logging.warning(f"Disk alert on {mount}: {usage['percent']}%")
for service, active in services.items():
if not active:
alerts.append(f"ALERT: Service {service} is not running")
logging.warning(f"Service {service} is down")
# Output
report = {
"timestamp": datetime.now().isoformat(),
"disk": disk,
"services": services,
"memory": memory,
"alerts": alerts
}
if args.output:
with open(args.output, "w") as f:
json.dump(report, f, indent=2)
else:
print(json.dumps(report, indent=2))
if alerts:
logging.error(f"Found {len(alerts)} alerts")
else:
logging.info("All systems healthy")
if __name__ == "__main__":
main()
💻 Example Option B: File Cleanup Script
python
#!/usr/bin/env python3
"""
File cleanup utility: remove old files based on age and pattern.
"""
import argparse
import logging
from pathlib import Path
from datetime import datetime, timedelta
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def find_old_files(directory, pattern="*", days_old=30):
"""Find files older than N days matching pattern."""
target_dir = Path(directory)
if not target_dir.exists():
logger.error(f"Directory not found: {directory}")
return []
cutoff_time = datetime.now() - timedelta(days=days_old)
old_files = []
for file_path in target_dir.rglob(pattern):
if file_path.is_file():
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime < cutoff_time:
old_files.append({
"path": str(file_path),
"mtime": mtime.isoformat(),
"size": file_path.stat().st_size
})
return old_files
def delete_files(files, dry_run=True):
"""Delete files, optionally dry-run."""
deleted = 0
failed = 0
total_freed = 0
for file_info in files:
try:
file_path = Path(file_info["path"])
size = file_info["size"]
if dry_run:
logger.info(f"[DRY RUN] Would delete {file_path} ({size} bytes)")
else:
file_path.unlink()
logger.info(f"Deleted {file_path} ({size} bytes)")
deleted += 1
total_freed += size
except Exception as e:
logger.error(f"Failed to delete {file_info['path']}: {e}")
failed += 1
return {"deleted": deleted, "failed": failed, "total_freed": total_freed}
def main():
parser = argparse.ArgumentParser(description="File cleanup utility")
parser.add_argument("directory", help="Directory to scan")
parser.add_argument("--pattern", default="*", help="File pattern (e.g., *.log)")
parser.add_argument("--days", type=int, default=30, help="Files older than N days")
parser.add_argument("--delete", action="store_true", help="Actually delete (default: dry-run)")
args = parser.parse_args()
logger.info(f"Scanning {args.directory} for files matching {args.pattern} older than {args.days} days")
old_files = find_old_files(args.directory, args.pattern, args.days)
logger.info(f"Found {len(old_files)} files to delete")
if old_files:
result = delete_files(old_files, dry_run=not args.delete)
print(f"\nDeleted: {result['deleted']}, Failed: {result['failed']}, Freed: {result['total_freed']} bytes")
if not args.delete:
print("\n[DRY RUN] Use --delete to actually remove files")
else:
print("No files to delete")
if __name__ == "__main__":
main()
🧪 Test Cases
- Normal operation: run with default settings, verify output format.
- Alerts triggered: force a condition (high disk, service down), verify alert appears.
- Error handling: run with invalid directory, service name—verify graceful failure.
- CLI flags: test with different threshold, services, output formats.
🚀 Enhancements
- Email alerts when thresholds exceeded.
- Integration with monitoring system (send metrics to Prometheus, CloudWatch).
- Schedule script with cron for periodic monitoring.
- Backup important files before deletion.
- Configuration file support (YAML/JSON) for thresholds.