Lab: API Integration
Real-world lab: Query cloud APIs (Kubernetes, Azure, or mock services), parse JSON responses, handle errors and retries, orchestrate multi-step workflows—cloud automation in action.
📋 Lab Overview
Your task: build a tool that queries a REST API (use free JSONPlaceholder API or mock), performs multi-step operations, handles errors gracefully, and generates reports. Skills applied: HTTP requests, JSON parsing, error handling, retry logic, data transformation.
🎯 Objectives
- Make GET requests to APIs and parse JSON responses.
- Handle HTTP status codes and errors appropriately.
- Implement retry logic with exponential backoff.
- Chain multiple API calls (get list, then details for each item).
- Transform and aggregate API data.
- Write results to file (JSON or CSV).
💻 Example: Multi-Resource Cloud Orchestration
python
#!/usr/bin/env python3
"""
Cloud API integration: query resources, check status, generate reports.
Using JSONPlaceholder as mock API.
"""
import requests
import json
import time
import argparse
import logging
from typing import List, Dict, Any
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
BASE_URL = "https://jsonplaceholder.typicode.com"
def api_get(url, max_retries=3, backoff=2):
"""Make GET request with retry logic."""
for attempt in range(1, max_retries + 1):
try:
logger.info(f"[Attempt {attempt}/{max_retries}] GET {url}")
response = requests.get(url, timeout=10)
if response.status_code == 200:
logger.info(f"Success: {response.status_code}")
return response.json()
elif response.status_code == 429:
# Rate limited
logger.warning(f"Rate limited, retrying...")
time.sleep(backoff ** attempt)
continue
elif response.status_code >= 500:
# Server error, retry
logger.warning(f"Server error {response.status_code}, retrying...")
time.sleep(backoff ** attempt)
continue
else:
# Client error, don't retry
logger.error(f"Client error {response.status_code}: {response.text}")
return None
except requests.exceptions.Timeout:
logger.warning(f"Timeout, attempt {attempt}/{max_retries}")
if attempt < max_retries:
time.sleep(backoff)
except requests.exceptions.RequestException as e:
logger.warning(f"Request error: {e}, attempt {attempt}/{max_retries}")
if attempt < max_retries:
time.sleep(backoff)
logger.error(f"All {max_retries} retries exhausted")
return None
def list_users() -> List[Dict[str, Any]]:
"""Get list of users."""
data = api_get(f"{BASE_URL}/users")
return data if data else []
def get_user_posts(user_id: int) -> List[Dict[str, Any]]:
"""Get posts for a specific user."""
data = api_get(f"{BASE_URL}/posts?userId={user_id}")
return data if data else []
def get_post_comments(post_id: int) -> List[Dict[str, Any]]:
"""Get comments on a specific post."""
data = api_get(f"{BASE_URL}/comments?postId={post_id}")
return data if data else []
def analyze_users(users: List[Dict]) -> Dict[str, Any]:
"""Analyze users and their content."""
logger.info(f"Analyzing {len(users)} users")
analysis = {
"total_users": len(users),
"users_detail": []
}
for user in users:
user_id = user["id"]
logger.info(f"Processing user {user_id}: {user['name']}")
# Get user's posts
posts = get_user_posts(user_id)
# Analyze posts
total_comments = 0
max_comments = 0
for post in posts:
post_id = post["id"]
comments = get_post_comments(post_id)
num_comments = len(comments)
total_comments += num_comments
max_comments = max(max_comments, num_comments)
user_summary = {
"id": user_id,
"name": user["name"],
"email": user["email"],
"posts": len(posts),
"total_comments": total_comments,
"avg_comments": total_comments / len(posts) if posts else 0,
"max_comments_on_post": max_comments
}
analysis["users_detail"].append(user_summary)
return analysis
def save_report(analysis: Dict, output_file: str):
"""Save analysis report to JSON."""
with open(output_file, "w") as f:
json.dump(analysis, f, indent=2)
logger.info(f"Report saved to {output_file}")
def print_summary(analysis: Dict):
"""Print summary to console."""
print("\n" + "="*60)
print("USER ENGAGEMENT ANALYSIS REPORT")
print("="*60)
print(f"Total users: {analysis['total_users']}")
print()
print("Top users by total comments:")
print("-"*60)
# Sort by total_comments desc, show top 5
sorted_users = sorted(
analysis["users_detail"],
key=lambda u: u["total_comments"],
reverse=True
)[:5]
for user in sorted_users:
print(f" {user['name']:<20} Posts: {user['posts']:>3} "
f"Comments: {user['total_comments']:>4} Avg: {user['avg_comments']:>5.1f}")
def main():
parser = argparse.ArgumentParser(description="Cloud API integration lab")
parser.add_argument("--output", default="report.json", help="Output JSON file")
parser.add_argument("--max-users", type=int, default=5, help="Max users to analyze")
args = parser.parse_args()
logger.info("Starting API integration lab")
# Get users
logger.info("Fetching user list...")
users = list_users()
if not users:
logger.error("Failed to fetch users")
return
# Limit for testing
users = users[:args.max_users]
# Analyze
analysis = analyze_users(users)
# Save and print
save_report(analysis, args.output)
print_summary(analysis)
if __name__ == "__main__":
main()
🧪 Test Cases
- Success path: API calls succeed, data processed correctly.
- Retry logic: simulate timeout/500 error, verify retry mechanism.
- Rate limiting: mock 429 response, verify backoff.
- Error handling: 404 responses, invalid JSON—verify graceful failure.
- Data chaining: verify multi-step calls work (users → posts → comments).
🚀 Enhancements
- Add authentication (Bearer tokens, API keys from environment).
- Implement caching to avoid redundant API calls.
- Export results to CSV instead of JSON.
- Filter results by criteria (only users with >10 posts, etc.).
- Upload results to a server or send via webhook.
- Parallel requests using threading/asyncio for faster execution.