BeginnerLesson 4 of 16

Data Pipelines and Feature Engineering for AIOps

Build the data foundations that feed AI: log ingestion, metric normalization, feature extraction, and quality validation — because garbage in, garbage out.

🧒 Simple Explanation (ELI5)

Imagine you're a chef making a gourmet meal. Raw ingredients arrive dirty and unprocessed — vegetables with soil, meat in packaging, spices unmeasured. Before cooking, you wash, chop, weigh, and organise everything. AI is the chef; data pipelines do the prep work. Without clean, structured ingredients, the chef produces terrible food no matter how skilled they are.

🔧 Why Data Pipelines Are the Foundation of AIOps

🌍 Real-world Analogy

A hospital's electronic records system collects data from blood pressure monitors, ECG machines, temperature sensors, and nurse notes. Before a doctor reviews a patient, all these signals are normalised (blood pressure always in mmHg, not kPa), timestamped, and organised in a timeline. That's feature engineering: raw heterogeneous signals transformed into a consistent, structured form that a physician (or ML model) can reason about correctly.

⚙️ How AIOps Data Pipelines Work

Stage 1: Data Collection

Stage 2: Parsing and Normalization

Stage 3: Feature Engineering

Features are the AI inputs — they must capture signal, not noise:

Raw SignalEngineered FeatureWhy It's Better
CPU % at timestampCPU % relative to 24h p95 baselineAccounts for normal load variation
Error log countError rate per minute (rate of change)Spike detection vs slow accumulation
Absolute latencyLatency as % deviation from hourly averageContext-aware, adapts to service type
Raw timestampHour of day, day of week, is_business_hoursCyclical pattern awareness
Deployment happenedMinutes since last deploymentCorrelate incidents with change events

Stage 4: Quality Validation

📊 Visual: AIOps Data Pipeline Architecture

End-to-End Data Pipeline for AIOps
📋 Raw Logs
JSONL, syslog, nginx
⚙️ Fluent Bit
Parse + tag
📨 Kafka / Event Hub
Buffer + fan-out
🔧 Feature Engine
Extract, normalize
🤖 ML Model
Anomaly / classify
📊 Prometheus Metrics
📐 Normalization
Baseline-relative
💾 Feature Store
Redis / Table Storage
✅ Validation
Schema, freshness
🚨 Alert Engine

⌨️ Feature Engineering Pipeline in Python

python
"""
AIOps feature engineering pipeline.
Ingests raw Prometheus metric samples and produces ML-ready features.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timezone

# ── Simulate raw Prometheus metrics (as if queried via prom API) ──────────────
def load_raw_metrics() -> pd.DataFrame:
    """Load raw metric time series from Prometheus or CSV export."""
    np.random.seed(42)
    timestamps = pd.date_range('2026-01-01', periods=1440, freq='1min', tz='UTC')
    return pd.DataFrame({
        'timestamp': timestamps,
        'cpu_pct': np.random.normal(45, 8, 1440).clip(5, 100),
        'memory_pct': np.random.normal(60, 5, 1440).clip(10, 100),
        'error_count': np.random.poisson(2, 1440),   # avg 2 errors/min
        'request_rate': np.random.normal(500, 50, 1440).clip(100, 2000),
        'service': 'payment-api'
    })

# ── Feature Engineering ─────────────────────────────────────────────────────
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Transform raw metrics into ML-ready features."""
    df = df.sort_values('timestamp').copy()

    # Rolling baselines (24h window = 1440 minutes)
    window = min(1440, len(df))
    df['cpu_baseline_p95'] = df['cpu_pct'].rolling(window, min_periods=30).quantile(0.95)
    df['cpu_vs_baseline'] = df['cpu_pct'] / df['cpu_baseline_p95'].clip(lower=1)

    # Rate-of-change features (detect spikes)
    df['error_rate_1m'] = df['error_count']                          # per minute
    df['error_rate_5m'] = df['error_count'].rolling(5).mean()        # 5-min rolling
    df['error_spike'] = df['error_rate_1m'] / df['error_rate_5m'].clip(lower=0.1)

    # Cyclical time features (encode as sin/cos to preserve circularity)
    df['hour'] = df['timestamp'].dt.hour
    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df['is_business_hours'] = df['hour'].between(8, 18).astype(int)
    df['day_of_week'] = df['timestamp'].dt.dayofweek   # 0=Mon, 6=Sun
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

    # Latency deviation (normalised)
    latency_baseline = df['request_rate'].rolling(60).mean()
    df['latency_deviation_pct'] = (
        (df['request_rate'] - latency_baseline) / latency_baseline.clip(lower=1)
    ) * 100

    return df.dropna()   # drop rows where rolling windows don't yet have min_periods

# ── Data Quality Validation ──────────────────────────────────────────────────
def validate_features(df: pd.DataFrame, max_age_seconds: int = 300) -> dict:
    """Run quality checks before feeding features to ML model."""
    issues = []

    latest_ts = df['timestamp'].max()
    age = (datetime.now(timezone.utc) - latest_ts).total_seconds()
    if age > max_age_seconds:
        issues.append(f"STALE: latest feature is {age:.0f}s old (max {max_age_seconds}s)")

    null_counts = df.isnull().sum()
    for col, count in null_counts[null_counts > 0].items():
        issues.append(f"NULLS: {col} has {count} null values ({count/len(df)*100:.1f}%)")

    if (df['cpu_pct'] > 100).any():
        issues.append("RANGE: cpu_pct contains values > 100%")

    return {'valid': len(issues) == 0, 'issues': issues, 'rows': len(df)}

# ── Run Pipeline ─────────────────────────────────────────────────────────────
raw = load_raw_metrics()
features = engineer_features(raw)
validation = validate_features(features)

print(f"Pipeline output: {validation['rows']} rows")
print(f"Valid: {validation['valid']}")
print(f"Feature columns: {[c for c in features.columns if c not in ['timestamp','service']]}")

🧪 Hands-on

  1. Run the feature engineering script above. Observe the output features — verify that cpu_vs_baseline is approximately 1.0 for normal data.
  2. Inject an artificial anomaly: set df.loc[1000:1005, 'cpu_pct'] = 95. Verify cpu_vs_baseline spikes to ~1.6 for those rows.
  3. Export 1 day of real Prometheus metrics using promtool query range and run the pipeline on real data.
  4. Add a new feature: minutes_since_last_deployment. Use a mock deployment timestamp list to compute this. Observe how it creates a strong signal around incident times.
  5. Test the validation function by introducing a null in the CPU column. Verify the validator catches it and reports it correctly.
💡
Critical Production Pattern

Always encode time features as sin/cos pairs (e.g., hour_sin = sin(2π * hour/24)). If you use raw hour values (0–23), the model treats midnight (0) and 11pm (23) as maximally far apart, when they're actually adjacent time points. Sin/cos encoding preserves the circular nature of time.

🎮 Try It Yourself

🎮
Challenge: Build a Feature Pipeline for a K8s Anomaly Detector
  1. Implement sin/cos time encoding: Write a Python function that takes a datetime object and returns hour_sin, hour_cos, day_sin, day_cos. Verify that midnight and 23:00 produce similar values (close in circular distance), unlike raw 0 vs 23.
  2. Design a feature set for a Kubernetes CPU anomaly detector. List at least 6 features you would engineer from raw Prometheus metrics. Think about: time, rolling windows, baseline deltas, inter-service relationships, recent deployments.
  3. Schema validation test: Take the feature pipeline from the code section above. Introduce a deliberate bug — pass a CPU value of 1.2 (fractional instead of percentage). Verify the validation function catches "value out of range 0-100" before it enters the model.
  4. End-to-end drill: Export 1 hour of Prometheus data as CSV (or generate synthetic data: np.random.normal(45, 8, 360) for 360 one-minute samples). Run the full pipeline: ingest → validate → engineer features → output a DataFrame ready for Isolation Forest training. Confirm zero nulls in the output.

K8s reality check: In AKS, your feature pipeline runs as a Kubernetes CronJob every 5 minutes, writing feature vectors to Azure Blob Storage. The anomaly model reads from this storage on each inference. If the CronJob fails, stale features silently degrade model accuracy — add a feature_age_hours check that alerts if features are more than 15 minutes old.

🧠 Debugging Scenario

Problem: AI anomaly detector runs perfectly for 2 weeks, then starts firing false alerts constantly. You haven't changed the model.

🎯 Interview Questions

Beginner

What is feature engineering and why is it important for AIOps?

Feature engineering transforms raw signals (CPU%, log counts) into ML-ready inputs that capture meaningful patterns. In AIOps, a raw CPU reading of 80% tells the model nothing about whether that's normal — 80% baseline-relative CPU does. Good features encode domain knowledge (business hours, deployment recency) that the model can't extract from raw values alone.

What is a data pipeline and what are its main stages?

A data pipeline ingests raw data from sources (logs, metrics, traces), transforms it through parsing and normalisation, extracts ML features, validates quality, and delivers structured inputs to the ML model. Main stages: collection (Fluentd/Prometheus), parsing/normalisation, feature extraction, validation, and storage/serving.

What problems occur when features are stale (too old)?

Stale features make the AI act on outdated information. A 10-minute-old CPU reading may not reflect a memory leak that started 5 minutes ago. The model may label a currently critical situation as "normal" based on old data. Define a maximum feature age (e.g., 2 minutes for real-time anomaly detection) and reject or flag inputs that exceed it.

Why do we normalise raw metrics before feeding them to ML models?

Normalisation ensures models aren't biased by scale differences. CPU at 65 and memory at 65 have the same numerical value, but one might represent 65% of total CPU and 65GB of 256GB memory. Normalising both to 0-1 baseline-relative values lets the model treat both features consistently.

What is "garbage in, garbage out" in the context of AI for DevOps?

The principle that an AI system's output quality is bounded by its input quality. A perfectly trained model will produce wrong predictions if fed null values, wrong-scale metrics, or mislabeled log events. Data quality issues account for the majority of production AI failures — this is why data pipeline investment is as important as model investment.

Intermediate

How do you detect feature distribution drift in production?

Monitor the statistical distribution of each feature over time: mean, standard deviation, min/max, and percentiles (p50, p95, p99). Use statistical tests like Population Stability Index (PSI) or Kolmogorov-Smirnov test to detect when production distributions diverge significantly from training distributions. Alert when PSI > 0.2 (moderate drift) or > 0.25 (significant drift requiring retraining).

How would you design a feature store for AIOps at scale?

A feature store has online (low-latency) and offline (historical) layers. Online store (Redis): serves real-time features in <10ms for inference. Offline store (Azure Data Lake/S3): stores historical features for training. Key design elements: feature versioning (timestamped), point-in-time correct joins (never use future data for past training), and consistent transformations between train and serve pipelines.

What is the train-serve skew problem and how do you prevent it?

Train-serve skew occurs when features at serving time differ from features at training time — same name, different calculation. Common causes: different code paths for train vs serve pipelines, timezone differences (training in UTC, serving in local time), or different null handling. Prevention: use the same feature transformation code in both pipelines (a single library), and monitor feature value distributions at both stages.

Scenario-based

Your AI system worked for 3 months and now produces bad predictions after a Kubernetes upgrade. How do you investigate?

Check the data pipeline first: compare before-upgrade vs after-upgrade feature distributions. The K8s upgrade may have changed metric names, namespaces, label schemas, or exposure paths. Use feature monitoring dashboards to identify which features drifted. Then check if log formats changed (new container runtime, new log level names). Treat the upgrade as a data contract break and run your validation suite against the new environment.

How would you design a data pipeline that handles log format changes from teams without breaking the AI system?

Use schema versioning with forward compatibility. Define contracts in a schema registry (Avro/Protobuf). On schema change detection: parse both old and new formats, map to a canonical internal schema. Add an alert when unknown schema versions are detected in ingestion. Test all format changes in staging with production data replay before deploying to the AI system.

A new service joins your platform. How do you build a baseline for anomaly detection when you have no historical data?

Use a cold-start strategy: 1) Apply bootstrap baselines from similar services of the same architecture type. 2) Use conservative thresholds (wider bounds) for the first 1-2 weeks. 3) Accumulate real data for 4+ weeks before training a service-specific model. 4) Flag the service as "learning mode" in alerting — route to human review rather than auto-action. 5) After 4 weeks, train and deploy a custom model and tighten the thresholds.

🌐 Real-world Usage

Netflix's Mantis platform processes billions of operational events per second through a real-time feature pipeline before feeding anomaly models. Azure Monitor ingests metrics from millions of resources, computes per-resource dynamic baselines, and feeds them to ML models that power Smart Detection alerts. LinkedIn's AIOps platform uses a centralised feature store that serves 200+ ML models from a consistent set of infrastructure features — preventing train-serve skew across teams.

📝 Summary

A data pipeline is the foundation of every AIOps system — even the best ML model is useless with bad input. Build pipelines with schema validation, freshness checks, and feature distribution monitoring. Use baseline-relative features over raw values. Encode time cyclically. Monitor for data drift as aggressively as you monitor model performance.