Hands-on LabLesson 14 of 16

Lab: Automation Scripts

Real-world lab: Build a system administration script that monitors disk space, checks service health, and generates reports—practical DevOps automation.

📋 Lab Overview

Your task: create a system monitoring script that checks disk usage across mount points, verifies critical services are running, collects system metrics, and writes reports. Alternately: a file cleanup utility that finds and removes old files based on rules. Skills applied: subprocess, file I/O, data structures, error handling, logging, CLI.

🎯 Objectives

💻 Example Option A: System Monitoring Script

python
#!/usr/bin/env python3
"""
System monitoring script: check disk, services, memory.
"""
import subprocess
import json
import argparse
import logging
from pathlib import Path
from datetime import datetime

# Setup logging
log_dir = Path("/var/log/sysmon")
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
    filename=log_dir / "monitor.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

def get_disk_usage():
    """Get disk usage for all mount points."""
    try:
        result = subprocess.run(
            ["df", "-h"],
            capture_output=True,
            text=True,
            check=True
        )
        
        usage = {}
        lines = result.stdout.strip().split("\n")[1:]  # skip header
        
        for line in lines:
            parts = line.split()
            if len(parts) >= 5:
                filesystem = parts[0]
                used_percent = int(parts[4].rstrip("%"))
                mount_point = parts[5]
                usage[mount_point] = {
                    "filesystem": filesystem,
                    "percent": used_percent
                }
        
        return usage
    
    except subprocess.CalledProcessError as e:
        logging.error(f"Failed to get disk usage: {e}")
        return {}

def check_service_status(services):
    """Check if services are running."""
    status = {}
    
    for service in services:
        try:
            result = subprocess.run(
                ["systemctl", "is-active", service],
                capture_output=True,
                text=True
            )
            status[service] = result.returncode == 0  # 0 = active
        except Exception as e:
            logging.error(f"Failed to check {service}: {e}")
            status[service] = False
    
    return status

def get_memory_usage():
    """Get memory usage percentage."""
    try:
        result = subprocess.run(
            ["free", "-h"],
            capture_output=True,
            text=True,
            check=True
        )
        
        lines = result.stdout.strip().split("\n")
        mem_line = lines[1].split()
        total = mem_line[1]
        used = mem_line[2]
        
        # Calculate percent (rough estimate)
        total_bytes = int(mem_line[1].replace("G", "") or 0)
        used_bytes = int(mem_line[2].replace("G", "") or 0)
        percent = (used_bytes / total_bytes * 100) if total_bytes > 0 else 0
        
        return {"total": total, "used": used, "percent": percent}
    
    except Exception as e:
        logging.error(f"Failed to get memory: {e}")
        return {}

def main():
    parser = argparse.ArgumentParser(description="System monitoring script")
    parser.add_argument("--disk-threshold", type=int, default=80, help="Disk usage threshold %")
    parser.add_argument("--services", nargs="+", default=["nginx", "postgres"], help="Services to check")
    parser.add_argument("--output", help="Output JSON file")
    args = parser.parse_args()
    
    logging.info("Starting system monitoring")
    
    # Collect metrics
    disk = get_disk_usage()
    services = check_service_status(args.services)
    memory = get_memory_usage()
    
    # Check thresholds
    alerts = []
    for mount, usage in disk.items():
        if usage["percent"] > args.disk_threshold:
            alerts.append(f"ALERT: {mount} disk usage {usage['percent']}% (threshold: {args.disk_threshold}%)")
            logging.warning(f"Disk alert on {mount}: {usage['percent']}%")
    
    for service, active in services.items():
        if not active:
            alerts.append(f"ALERT: Service {service} is not running")
            logging.warning(f"Service {service} is down")
    
    # Output
    report = {
        "timestamp": datetime.now().isoformat(),
        "disk": disk,
        "services": services,
        "memory": memory,
        "alerts": alerts
    }
    
    if args.output:
        with open(args.output, "w") as f:
            json.dump(report, f, indent=2)
    else:
        print(json.dumps(report, indent=2))
    
    if alerts:
        logging.error(f"Found {len(alerts)} alerts")
    else:
        logging.info("All systems healthy")

if __name__ == "__main__":
    main()

💻 Example Option B: File Cleanup Script

python
#!/usr/bin/env python3
"""
File cleanup utility: remove old files based on age and pattern.
"""
import argparse
import logging
from pathlib import Path
from datetime import datetime, timedelta

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

def find_old_files(directory, pattern="*", days_old=30):
    """Find files older than N days matching pattern."""
    target_dir = Path(directory)
    if not target_dir.exists():
        logger.error(f"Directory not found: {directory}")
        return []
    
    cutoff_time = datetime.now() - timedelta(days=days_old)
    old_files = []
    
    for file_path in target_dir.rglob(pattern):
        if file_path.is_file():
            mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
            if mtime < cutoff_time:
                old_files.append({
                    "path": str(file_path),
                    "mtime": mtime.isoformat(),
                    "size": file_path.stat().st_size
                })
    
    return old_files

def delete_files(files, dry_run=True):
    """Delete files, optionally dry-run."""
    deleted = 0
    failed = 0
    total_freed = 0
    
    for file_info in files:
        try:
            file_path = Path(file_info["path"])
            size = file_info["size"]
            
            if dry_run:
                logger.info(f"[DRY RUN] Would delete {file_path} ({size} bytes)")
            else:
                file_path.unlink()
                logger.info(f"Deleted {file_path} ({size} bytes)")
            
            deleted += 1
            total_freed += size
        
        except Exception as e:
            logger.error(f"Failed to delete {file_info['path']}: {e}")
            failed += 1
    
    return {"deleted": deleted, "failed": failed, "total_freed": total_freed}

def main():
    parser = argparse.ArgumentParser(description="File cleanup utility")
    parser.add_argument("directory", help="Directory to scan")
    parser.add_argument("--pattern", default="*", help="File pattern (e.g., *.log)")
    parser.add_argument("--days", type=int, default=30, help="Files older than N days")
    parser.add_argument("--delete", action="store_true", help="Actually delete (default: dry-run)")
    args = parser.parse_args()
    
    logger.info(f"Scanning {args.directory} for files matching {args.pattern} older than {args.days} days")
    
    old_files = find_old_files(args.directory, args.pattern, args.days)
    logger.info(f"Found {len(old_files)} files to delete")
    
    if old_files:
        result = delete_files(old_files, dry_run=not args.delete)
        print(f"\nDeleted: {result['deleted']}, Failed: {result['failed']}, Freed: {result['total_freed']} bytes")
        
        if not args.delete:
            print("\n[DRY RUN] Use --delete to actually remove files")
    else:
        print("No files to delete")

if __name__ == "__main__":
    main()

🧪 Test Cases

  1. Normal operation: run with default settings, verify output format.
  2. Alerts triggered: force a condition (high disk, service down), verify alert appears.
  3. Error handling: run with invalid directory, service name—verify graceful failure.
  4. CLI flags: test with different threshold, services, output formats.

🚀 Enhancements

  1. Email alerts when thresholds exceeded.
  2. Integration with monitoring system (send metrics to Prometheus, CloudWatch).
  3. Schedule script with cron for periodic monitoring.
  4. Backup important files before deletion.
  5. Configuration file support (YAML/JSON) for thresholds.