Hands-on LabLesson 13 of 16

Lab: Log Analysis and Parsing

Real-world lab: Parse Kubernetes and system logs, extract errors, aggregate failure patterns, generate reports—the most common DevOps task.

📋 Lab Overview

Your task: build a log analysis tool that reads application logs, parses structured entries, identifies failures, groups them by service and error type, and generates a summary report. Skills applied: file I/O, regex, data structures, control flow, functions.

🎯 Objectives

📝 Sample Log Data

log
2024-01-15T10:30:45Z [INFO] nginx starting
2024-01-15T10:30:46Z [INFO] api-gateway received request GET /health from 192.168.1.100
2024-01-15T10:30:47Z [ERROR] database connection timeout after 30s retry in 5s...
2024-01-15T10:30:50Z [ERROR] database connection timeout after 30s retry in 5s...
2024-01-15T10:30:52Z [INFO] database reconnected successfully
2024-01-15T10:30:55Z [ERROR] api-gateway: authentication failed for user admin@example.com
2024-01-15T10:31:00Z [WARN] nginx memory usage 85% threshold
2024-01-15T10:31:05Z [ERROR] message-queue: failed to publish message, queue full
2024-01-15T10:31:10Z [ERROR] api-gateway: authentication failed for user test@example.com
2024-01-15T10:31:15Z [INFO] cache service started

💻 Complete Solution (Step-by-step)

python
#!/usr/bin/env python3
"""
Log Analysis Tool: Parse logs, extract errors, generate reports.
"""
import re
import json
from pathlib import Path
from collections import defaultdict
from datetime import datetime

# Regex pattern to parse log lines
LOG_PATTERN = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s\[(\w+)\]\s([^:]+):\s(.+)"

def parse_log_line(line):
    """
    Parse a single log line.
    
    Returns dict with timestamp, level, service, message.
    Returns None if line does not match expected format.
    """
    match = re.match(LOG_PATTERN, line.strip())
    if not match:
        return None
    
    timestamp_str, level, service, message = match.groups()
    
    return {
        "timestamp": timestamp_str,
        "level": level.upper(),
        "service": service.strip(),
        "message": message.strip()
    }

def extract_error_details(line):
    """
    Extract specific error type from error message.
    
    Returns the error type or category.
    """
    line_lower = line.lower()
    
    if "timeout" in line_lower:
        return "timeout"
    elif "connection" in line_lower:
        return "connection_error"
    elif "authentication" in line_lower or "auth" in line_lower:
        return "authentication"
    elif "memory" in line_lower:
        return "memory_pressure"
    elif "queue" in line_lower or "full" in line_lower:
        return "queue_full"
    else:
        return "unknown"

def analyze_logs(log_file_path):
    """
    Analyze log file and return error summary.
    
    Returns dict with statistics and grouped errors.
    """
    errors_by_service = defaultdict(lambda: defaultdict(int))
    total_lines = 0
    total_errors = 0
    errors_by_type = defaultdict(int)
    
    try:
        with open(log_file_path, "r") as f:
            for line in f:
                total_lines += 1
                
                parsed = parse_log_line(line)
                if not parsed:
                    continue  # Skip malformed lines
                
                # Count errors
                if parsed["level"] == "ERROR":
                    total_errors += 1
                    service = parsed["service"]
                    error_type = extract_error_details(parsed["message"])
                    
                    errors_by_service[service][error_type] += 1
                    errors_by_type[error_type] += 1
    
    except FileNotFoundError:
        return {"error": f"Log file not found: {log_file_path}"}
    except Exception as e:
        return {"error": f"Error reading log file: {e}"}
    
    return {
        "total_lines": total_lines,
        "total_errors": total_errors,
        "error_rate_percent": (total_errors / total_lines * 100) if total_lines > 0 else 0,
        "errors_by_service": dict(errors_by_service),
        "errors_by_type": dict(errors_by_type)
    }

def print_report(analysis):
    """Print a human-readable report."""
    if "error" in analysis:
        print(f"Error: {analysis['error']}")
        return
    
    print("=" * 60)
    print("LOG ANALYSIS REPORT")
    print("=" * 60)
    print(f"Total lines: {analysis['total_lines']}")
    print(f"Total errors: {analysis['total_errors']}")
    print(f"Error rate: {analysis['error_rate_percent']:.2f}%")
    print()
    
    print("Errors by Service:")
    print("-" * 40)
    for service, errors in analysis["errors_by_service"].items():
        total = sum(errors.values())
        print(f"  {service}: {total} errors")
        for error_type, count in errors.items():
            print(f"    - {error_type}: {count}")
    print()
    
    print("Errors by Type:")
    print("-" * 40)
    for error_type, count in analysis["errors_by_type"].items():
        print(f"  {error_type}: {count}")

def save_json_report(analysis, output_file):
    """Save analysis as JSON."""
    with open(output_file, "w") as f:
        json.dump(analysis, f, indent=2)

# Main execution
if __name__ == "__main__":
    # Create sample log file
    sample_log = Path("app.log")
    sample_log_content = """2024-01-15T10:30:45Z [INFO] nginx starting
2024-01-15T10:30:46Z [INFO] api-gateway received request GET /health from 192.168.1.100
2024-01-15T10:30:47Z [ERROR] database: connection timeout after 30s retry in 5s...
2024-01-15T10:30:50Z [ERROR] database: connection timeout after 30s retry in 5s...
2024-01-15T10:30:52Z [INFO] database: reconnected successfully
2024-01-15T10:30:55Z [ERROR] api-gateway: authentication failed for user admin@example.com
2024-01-15T10:31:00Z [WARN] nginx: memory usage 85% threshold
2024-01-15T10:31:05Z [ERROR] message-queue: failed to publish message, queue full
2024-01-15T10:31:10Z [ERROR] api-gateway: authentication failed for user test@example.com
2024-01-15T10:31:15Z [INFO] cache: service started"""
    
    sample_log.write_text(sample_log_content)
    
    # Analyze
    analysis = analyze_logs(sample_log)
    
    # Print report
    print_report(analysis)
    
    # Save JSON report
    save_json_report(analysis, "report.json")
    print("\nJSON report saved to report.json")

🧪 Test Cases

  1. Normal case: Parse valid log file, verify error counts and types match.
  2. Edge cases: (a) File not found, (b) malformed lines (missing fields), (c) empty file, (d) no errors (all INFO lines).
  3. Accuracy: Verify regex correctly extracts all fields, error categorization works.

🚀 Enhancements

  1. Add argparse to accept log file path, output format (text/json), error threshold from CLI.
  2. Time-window filtering: analyze only errors from last 1 hour (parse timestamps).
  3. Alert threshold: flag if error rate exceeds 5% or specific service has >10 errors.
  4. Write to external log aggregation API (like sending to a monitoring system).
  5. Process multiple log files and merge reports.

💡 Hints