Data Pipelines and Feature Engineering for AIOps
Build the data foundations that feed AI: log ingestion, metric normalization, feature extraction, and quality validation — because garbage in, garbage out.
🧒 Simple Explanation (ELI5)
Imagine you're a chef making a gourmet meal. Raw ingredients arrive dirty and unprocessed — vegetables with soil, meat in packaging, spices unmeasured. Before cooking, you wash, chop, weigh, and organise everything. AI is the chef; data pipelines do the prep work. Without clean, structured ingredients, the chef produces terrible food no matter how skilled they are.
🔧 Why Data Pipelines Are the Foundation of AIOps
- AI quality equals data quality: 80% of ML project failures trace back to bad input data, not bad algorithms.
- Scale: A single Kubernetes cluster generates 50M+ log lines/day. You can't process raw JSONL at this scale without structured ingestion.
- Feature freshness: A stale feature (15-minute-old CPU reading) causes wrong real-time anomaly decisions. Pipeline latency directly impacts alert accuracy.
- Schema enforcement: If a log format changes (a team adds a new field), the AI input breaks silently unless you validate schemas.
- Audit and compliance: You must be able to replay historical data and reproduce AI decisions — this requires immutable pipeline storage.
🌍 Real-world Analogy
A hospital's electronic records system collects data from blood pressure monitors, ECG machines, temperature sensors, and nurse notes. Before a doctor reviews a patient, all these signals are normalised (blood pressure always in mmHg, not kPa), timestamped, and organised in a timeline. That's feature engineering: raw heterogeneous signals transformed into a consistent, structured form that a physician (or ML model) can reason about correctly.
⚙️ How AIOps Data Pipelines Work
Stage 1: Data Collection
- Logs: Fluentd/Fluent Bit → Kafka → Elasticsearch / Azure Log Analytics
- Metrics: Prometheus scrape → Thanos/Cortex for long-term storage → PromQL queries
- Traces: OpenTelemetry SDK → Jaeger / Azure Application Insights
- Events: Kubernetes events, deployment webhooks, config change notifications
Stage 2: Parsing and Normalization
- Parse unstructured log lines (regex, Grok patterns) into structured JSON
- Normalise timestamps to UTC ISO 8601
- Standardise severity levels: WARN, WARNING, CAUTION → all become WARN
- Extract service name, pod, namespace from log metadata
Stage 3: Feature Engineering
Features are the AI inputs — they must capture signal, not noise:
| Raw Signal | Engineered Feature | Why It's Better |
|---|---|---|
| CPU % at timestamp | CPU % relative to 24h p95 baseline | Accounts for normal load variation |
| Error log count | Error rate per minute (rate of change) | Spike detection vs slow accumulation |
| Absolute latency | Latency as % deviation from hourly average | Context-aware, adapts to service type |
| Raw timestamp | Hour of day, day of week, is_business_hours | Cyclical pattern awareness |
| Deployment happened | Minutes since last deployment | Correlate incidents with change events |
Stage 4: Quality Validation
- Null checks: Missing values must be handled (impute, skip, or alert)
- Cardinality limits: A field with 100,000 unique pod names will break most models
- Distribution monitoring: Alert if input feature distribution drifts from training baseline
- Freshness checks: Reject stale features older than your SLO window
📊 Visual: AIOps Data Pipeline Architecture
JSONL, syslog, nginx
Parse + tag
Buffer + fan-out
Extract, normalize
Anomaly / classify
Baseline-relative
Redis / Table Storage
Schema, freshness
⌨️ Feature Engineering Pipeline in Python
"""
AIOps feature engineering pipeline.
Ingests raw Prometheus metric samples and produces ML-ready features.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timezone
# ── Simulate raw Prometheus metrics (as if queried via prom API) ──────────────
def load_raw_metrics() -> pd.DataFrame:
"""Load raw metric time series from Prometheus or CSV export."""
np.random.seed(42)
timestamps = pd.date_range('2026-01-01', periods=1440, freq='1min', tz='UTC')
return pd.DataFrame({
'timestamp': timestamps,
'cpu_pct': np.random.normal(45, 8, 1440).clip(5, 100),
'memory_pct': np.random.normal(60, 5, 1440).clip(10, 100),
'error_count': np.random.poisson(2, 1440), # avg 2 errors/min
'request_rate': np.random.normal(500, 50, 1440).clip(100, 2000),
'service': 'payment-api'
})
# ── Feature Engineering ─────────────────────────────────────────────────────
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Transform raw metrics into ML-ready features."""
df = df.sort_values('timestamp').copy()
# Rolling baselines (24h window = 1440 minutes)
window = min(1440, len(df))
df['cpu_baseline_p95'] = df['cpu_pct'].rolling(window, min_periods=30).quantile(0.95)
df['cpu_vs_baseline'] = df['cpu_pct'] / df['cpu_baseline_p95'].clip(lower=1)
# Rate-of-change features (detect spikes)
df['error_rate_1m'] = df['error_count'] # per minute
df['error_rate_5m'] = df['error_count'].rolling(5).mean() # 5-min rolling
df['error_spike'] = df['error_rate_1m'] / df['error_rate_5m'].clip(lower=0.1)
# Cyclical time features (encode as sin/cos to preserve circularity)
df['hour'] = df['timestamp'].dt.hour
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['is_business_hours'] = df['hour'].between(8, 18).astype(int)
df['day_of_week'] = df['timestamp'].dt.dayofweek # 0=Mon, 6=Sun
df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
# Latency deviation (normalised)
latency_baseline = df['request_rate'].rolling(60).mean()
df['latency_deviation_pct'] = (
(df['request_rate'] - latency_baseline) / latency_baseline.clip(lower=1)
) * 100
return df.dropna() # drop rows where rolling windows don't yet have min_periods
# ── Data Quality Validation ──────────────────────────────────────────────────
def validate_features(df: pd.DataFrame, max_age_seconds: int = 300) -> dict:
"""Run quality checks before feeding features to ML model."""
issues = []
latest_ts = df['timestamp'].max()
age = (datetime.now(timezone.utc) - latest_ts).total_seconds()
if age > max_age_seconds:
issues.append(f"STALE: latest feature is {age:.0f}s old (max {max_age_seconds}s)")
null_counts = df.isnull().sum()
for col, count in null_counts[null_counts > 0].items():
issues.append(f"NULLS: {col} has {count} null values ({count/len(df)*100:.1f}%)")
if (df['cpu_pct'] > 100).any():
issues.append("RANGE: cpu_pct contains values > 100%")
return {'valid': len(issues) == 0, 'issues': issues, 'rows': len(df)}
# ── Run Pipeline ─────────────────────────────────────────────────────────────
raw = load_raw_metrics()
features = engineer_features(raw)
validation = validate_features(features)
print(f"Pipeline output: {validation['rows']} rows")
print(f"Valid: {validation['valid']}")
print(f"Feature columns: {[c for c in features.columns if c not in ['timestamp','service']]}")
🧪 Hands-on
- Run the feature engineering script above. Observe the output features — verify that
cpu_vs_baselineis approximately 1.0 for normal data. - Inject an artificial anomaly: set
df.loc[1000:1005, 'cpu_pct'] = 95. Verifycpu_vs_baselinespikes to ~1.6 for those rows. - Export 1 day of real Prometheus metrics using
promtool query rangeand run the pipeline on real data. - Add a new feature:
minutes_since_last_deployment. Use a mock deployment timestamp list to compute this. Observe how it creates a strong signal around incident times. - Test the validation function by introducing a null in the CPU column. Verify the validator catches it and reports it correctly.
Always encode time features as sin/cos pairs (e.g., hour_sin = sin(2π * hour/24)). If you use raw hour values (0–23), the model treats midnight (0) and 11pm (23) as maximally far apart, when they're actually adjacent time points. Sin/cos encoding preserves the circular nature of time.
🎮 Try It Yourself
- Implement sin/cos time encoding: Write a Python function that takes a
datetimeobject and returnshour_sin,hour_cos,day_sin,day_cos. Verify that midnight and 23:00 produce similar values (close in circular distance), unlike raw 0 vs 23. - Design a feature set for a Kubernetes CPU anomaly detector. List at least 6 features you would engineer from raw Prometheus metrics. Think about: time, rolling windows, baseline deltas, inter-service relationships, recent deployments.
- Schema validation test: Take the feature pipeline from the code section above. Introduce a deliberate bug — pass a CPU value of
1.2(fractional instead of percentage). Verify the validation function catches "value out of range 0-100" before it enters the model. - End-to-end drill: Export 1 hour of Prometheus data as CSV (or generate synthetic data:
np.random.normal(45, 8, 360)for 360 one-minute samples). Run the full pipeline: ingest → validate → engineer features → output a DataFrame ready for Isolation Forest training. Confirm zero nulls in the output.
K8s reality check: In AKS, your feature pipeline runs as a Kubernetes CronJob every 5 minutes, writing feature vectors to Azure Blob Storage. The anomaly model reads from this storage on each inference. If the CronJob fails, stale features silently degrade model accuracy — add a feature_age_hours check that alerts if features are more than 15 minutes old.
🧠 Debugging Scenario
Problem: AI anomaly detector runs perfectly for 2 weeks, then starts firing false alerts constantly. You haven't changed the model.
- Investigation: Compare feature distributions in your monitoring dashboard from last week vs this week.
cpu_vs_baselineis now near 0.0 for all rows instead of ~1.0. - Root cause: A team renamed the Prometheus metric from
cpu_usage_percenttonode_cpu_utilization_ratio(switching from 0-100 to 0-1 scale). The feature pipeline ingests the new metric name but the value is now 0.45 instead of 45. The baseline is also 0-1, but when the first anomaly spike hits 90 (old metric), the pipeline now sees 0.90 vs baseline 0.45 — suddenly everything looks like a 2x spike. - Fix: Add schema versioning to your pipeline. Validate metric value ranges at ingestion:
assert 0 <= cpu_value <= 100. Use a feature contract that catches when expected ranges change.
🎯 Interview Questions
Beginner
Feature engineering transforms raw signals (CPU%, log counts) into ML-ready inputs that capture meaningful patterns. In AIOps, a raw CPU reading of 80% tells the model nothing about whether that's normal — 80% baseline-relative CPU does. Good features encode domain knowledge (business hours, deployment recency) that the model can't extract from raw values alone.
A data pipeline ingests raw data from sources (logs, metrics, traces), transforms it through parsing and normalisation, extracts ML features, validates quality, and delivers structured inputs to the ML model. Main stages: collection (Fluentd/Prometheus), parsing/normalisation, feature extraction, validation, and storage/serving.
Stale features make the AI act on outdated information. A 10-minute-old CPU reading may not reflect a memory leak that started 5 minutes ago. The model may label a currently critical situation as "normal" based on old data. Define a maximum feature age (e.g., 2 minutes for real-time anomaly detection) and reject or flag inputs that exceed it.
Normalisation ensures models aren't biased by scale differences. CPU at 65 and memory at 65 have the same numerical value, but one might represent 65% of total CPU and 65GB of 256GB memory. Normalising both to 0-1 baseline-relative values lets the model treat both features consistently.
The principle that an AI system's output quality is bounded by its input quality. A perfectly trained model will produce wrong predictions if fed null values, wrong-scale metrics, or mislabeled log events. Data quality issues account for the majority of production AI failures — this is why data pipeline investment is as important as model investment.
Intermediate
Monitor the statistical distribution of each feature over time: mean, standard deviation, min/max, and percentiles (p50, p95, p99). Use statistical tests like Population Stability Index (PSI) or Kolmogorov-Smirnov test to detect when production distributions diverge significantly from training distributions. Alert when PSI > 0.2 (moderate drift) or > 0.25 (significant drift requiring retraining).
A feature store has online (low-latency) and offline (historical) layers. Online store (Redis): serves real-time features in <10ms for inference. Offline store (Azure Data Lake/S3): stores historical features for training. Key design elements: feature versioning (timestamped), point-in-time correct joins (never use future data for past training), and consistent transformations between train and serve pipelines.
Train-serve skew occurs when features at serving time differ from features at training time — same name, different calculation. Common causes: different code paths for train vs serve pipelines, timezone differences (training in UTC, serving in local time), or different null handling. Prevention: use the same feature transformation code in both pipelines (a single library), and monitor feature value distributions at both stages.
Scenario-based
Check the data pipeline first: compare before-upgrade vs after-upgrade feature distributions. The K8s upgrade may have changed metric names, namespaces, label schemas, or exposure paths. Use feature monitoring dashboards to identify which features drifted. Then check if log formats changed (new container runtime, new log level names). Treat the upgrade as a data contract break and run your validation suite against the new environment.
Use schema versioning with forward compatibility. Define contracts in a schema registry (Avro/Protobuf). On schema change detection: parse both old and new formats, map to a canonical internal schema. Add an alert when unknown schema versions are detected in ingestion. Test all format changes in staging with production data replay before deploying to the AI system.
Use a cold-start strategy: 1) Apply bootstrap baselines from similar services of the same architecture type. 2) Use conservative thresholds (wider bounds) for the first 1-2 weeks. 3) Accumulate real data for 4+ weeks before training a service-specific model. 4) Flag the service as "learning mode" in alerting — route to human review rather than auto-action. 5) After 4 weeks, train and deploy a custom model and tighten the thresholds.
🌐 Real-world Usage
Netflix's Mantis platform processes billions of operational events per second through a real-time feature pipeline before feeding anomaly models. Azure Monitor ingests metrics from millions of resources, computes per-resource dynamic baselines, and feeds them to ML models that power Smart Detection alerts. LinkedIn's AIOps platform uses a centralised feature store that serves 200+ ML models from a consistent set of infrastructure features — preventing train-serve skew across teams.
📝 Summary
A data pipeline is the foundation of every AIOps system — even the best ML model is useless with bad input. Build pipelines with schema validation, freshness checks, and feature distribution monitoring. Use baseline-relative features over raw values. Encode time cyclically. Monitor for data drift as aggressively as you monitor model performance.