Lab: Log Analysis and Parsing
Real-world lab: Parse Kubernetes and system logs, extract errors, aggregate failure patterns, generate reports—the most common DevOps task.
📋 Lab Overview
Your task: build a log analysis tool that reads application logs, parses structured entries, identifies failures, groups them by service and error type, and generates a summary report. Skills applied: file I/O, regex, data structures, control flow, functions.
🎯 Objectives
- Read log files line by line (memory-efficient).
- Parse dates, log levels, service names, and error messages using regex.
- Group errors by service and error type in a nested dictionary.
- Calculate aggregate statistics (total errors, error rate per service).
- Generate a formatted report (text or JSON).
- Handle edge cases (malformed lines, missing fields, file not found).
📝 Sample Log Data
log
2024-01-15T10:30:45Z [INFO] nginx starting 2024-01-15T10:30:46Z [INFO] api-gateway received request GET /health from 192.168.1.100 2024-01-15T10:30:47Z [ERROR] database connection timeout after 30s retry in 5s... 2024-01-15T10:30:50Z [ERROR] database connection timeout after 30s retry in 5s... 2024-01-15T10:30:52Z [INFO] database reconnected successfully 2024-01-15T10:30:55Z [ERROR] api-gateway: authentication failed for user admin@example.com 2024-01-15T10:31:00Z [WARN] nginx memory usage 85% threshold 2024-01-15T10:31:05Z [ERROR] message-queue: failed to publish message, queue full 2024-01-15T10:31:10Z [ERROR] api-gateway: authentication failed for user test@example.com 2024-01-15T10:31:15Z [INFO] cache service started
💻 Complete Solution (Step-by-step)
python
#!/usr/bin/env python3
"""
Log Analysis Tool: Parse logs, extract errors, generate reports.
"""
import re
import json
from pathlib import Path
from collections import defaultdict
from datetime import datetime
# Regex pattern to parse log lines
LOG_PATTERN = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)\s\[(\w+)\]\s([^:]+):\s(.+)"
def parse_log_line(line):
"""
Parse a single log line.
Returns dict with timestamp, level, service, message.
Returns None if line does not match expected format.
"""
match = re.match(LOG_PATTERN, line.strip())
if not match:
return None
timestamp_str, level, service, message = match.groups()
return {
"timestamp": timestamp_str,
"level": level.upper(),
"service": service.strip(),
"message": message.strip()
}
def extract_error_details(line):
"""
Extract specific error type from error message.
Returns the error type or category.
"""
line_lower = line.lower()
if "timeout" in line_lower:
return "timeout"
elif "connection" in line_lower:
return "connection_error"
elif "authentication" in line_lower or "auth" in line_lower:
return "authentication"
elif "memory" in line_lower:
return "memory_pressure"
elif "queue" in line_lower or "full" in line_lower:
return "queue_full"
else:
return "unknown"
def analyze_logs(log_file_path):
"""
Analyze log file and return error summary.
Returns dict with statistics and grouped errors.
"""
errors_by_service = defaultdict(lambda: defaultdict(int))
total_lines = 0
total_errors = 0
errors_by_type = defaultdict(int)
try:
with open(log_file_path, "r") as f:
for line in f:
total_lines += 1
parsed = parse_log_line(line)
if not parsed:
continue # Skip malformed lines
# Count errors
if parsed["level"] == "ERROR":
total_errors += 1
service = parsed["service"]
error_type = extract_error_details(parsed["message"])
errors_by_service[service][error_type] += 1
errors_by_type[error_type] += 1
except FileNotFoundError:
return {"error": f"Log file not found: {log_file_path}"}
except Exception as e:
return {"error": f"Error reading log file: {e}"}
return {
"total_lines": total_lines,
"total_errors": total_errors,
"error_rate_percent": (total_errors / total_lines * 100) if total_lines > 0 else 0,
"errors_by_service": dict(errors_by_service),
"errors_by_type": dict(errors_by_type)
}
def print_report(analysis):
"""Print a human-readable report."""
if "error" in analysis:
print(f"Error: {analysis['error']}")
return
print("=" * 60)
print("LOG ANALYSIS REPORT")
print("=" * 60)
print(f"Total lines: {analysis['total_lines']}")
print(f"Total errors: {analysis['total_errors']}")
print(f"Error rate: {analysis['error_rate_percent']:.2f}%")
print()
print("Errors by Service:")
print("-" * 40)
for service, errors in analysis["errors_by_service"].items():
total = sum(errors.values())
print(f" {service}: {total} errors")
for error_type, count in errors.items():
print(f" - {error_type}: {count}")
print()
print("Errors by Type:")
print("-" * 40)
for error_type, count in analysis["errors_by_type"].items():
print(f" {error_type}: {count}")
def save_json_report(analysis, output_file):
"""Save analysis as JSON."""
with open(output_file, "w") as f:
json.dump(analysis, f, indent=2)
# Main execution
if __name__ == "__main__":
# Create sample log file
sample_log = Path("app.log")
sample_log_content = """2024-01-15T10:30:45Z [INFO] nginx starting
2024-01-15T10:30:46Z [INFO] api-gateway received request GET /health from 192.168.1.100
2024-01-15T10:30:47Z [ERROR] database: connection timeout after 30s retry in 5s...
2024-01-15T10:30:50Z [ERROR] database: connection timeout after 30s retry in 5s...
2024-01-15T10:30:52Z [INFO] database: reconnected successfully
2024-01-15T10:30:55Z [ERROR] api-gateway: authentication failed for user admin@example.com
2024-01-15T10:31:00Z [WARN] nginx: memory usage 85% threshold
2024-01-15T10:31:05Z [ERROR] message-queue: failed to publish message, queue full
2024-01-15T10:31:10Z [ERROR] api-gateway: authentication failed for user test@example.com
2024-01-15T10:31:15Z [INFO] cache: service started"""
sample_log.write_text(sample_log_content)
# Analyze
analysis = analyze_logs(sample_log)
# Print report
print_report(analysis)
# Save JSON report
save_json_report(analysis, "report.json")
print("\nJSON report saved to report.json")
🧪 Test Cases
- Normal case: Parse valid log file, verify error counts and types match.
- Edge cases: (a) File not found, (b) malformed lines (missing fields), (c) empty file, (d) no errors (all INFO lines).
- Accuracy: Verify regex correctly extracts all fields, error categorization works.
🚀 Enhancements
- Add argparse to accept log file path, output format (text/json), error threshold from CLI.
- Time-window filtering: analyze only errors from last 1 hour (parse timestamps).
- Alert threshold: flag if error rate exceeds 5% or specific service has >10 errors.
- Write to external log aggregation API (like sending to a monitoring system).
- Process multiple log files and merge reports.
💡 Hints
- Use regex groups to extract components; test pattern with sample lines first.
- Collections.defaultdict simplifies nested grouping.
- File I/O should be a generator (for line in f:) for large files; do not load entire file into memory.
- Handle parsing errors gracefully; a few malformed lines should not crash the script.