FastAPI Integration
Integrate ImmutableLog into any FastAPI application with an async middleware based on Starlette. Automatically captures all HTTP requests — errors, successes, and unhandled exceptions — without modifying your routes.
Installation
The middleware only depends on `requests` (for synchronous sending in a separate thread) and Starlette, already included in FastAPI. Copy the `middleware.py` file into your project.
pip install requests fastapiMiddleware Code
Paste the code below into `middleware.py` in your project. The middleware inherits from Starlette's `BaseHTTPMiddleware` and asynchronously intercepts the complete lifecycle of each request.
# middleware.py
import hashlib
import json
import logging
import time
import uuid
from datetime import datetime, timezone
import requests
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp
logger = logging.getLogger(__name__)
MAX_PAYLOAD_BYTES = 12_000
def clamp_payload(payload: dict) -> str:
s = json.dumps(payload, ensure_ascii=False)
if len(s.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
return s
payload = dict(payload)
payload.pop("error", None)
payload.pop("request_body", None)
s2 = json.dumps(payload, ensure_ascii=False)
if len(s2.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
return s2
return json.dumps({
"id": payload.get("id"),
"kind": payload.get("kind"),
"message": (payload.get("message") or "event")[:500],
"timestamp": payload.get("timestamp"),
}, ensure_ascii=False)
def hash_body(body: bytes) -> str:
return hashlib.sha256(body).hexdigest()
def severity_from(status_code: int | None, exc: Exception | None) -> str:
if exc is not None:
return "error"
if status_code is None:
return "info"
if status_code >= 400:
return "error"
if status_code >= 300:
return "info"
if status_code >= 200:
return "success"
return "info"
def get_severity_level(kind: str, status_code: int | None) -> str:
if kind == "error":
return "high"
elif kind == "warning":
return "medium"
return "low"
class ImmutableLogAuditMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
api_key: str,
api_url: str,
service_name: str = "fastapi-service",
env: str = "production",
skip_paths: list[str] | None = None,
):
super().__init__(app)
self.api_key = api_key
self.api_url = api_url
self.service_name = service_name
self.env = env
self.skip_paths = set(skip_paths or ["/health", "/healthz"])
async def dispatch(self, request: Request, call_next):
if request.url.path in self.skip_paths:
return await call_next(request)
started_at = time.time()
request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
body = await request.body()
exc_captured = None
response = None
try:
response = await call_next(request)
except Exception as exc:
exc_captured = exc
self._emit(request, body, None, exc, started_at, request_id)
raise
self._emit(request, body, response, None, started_at, request_id)
response.headers["x-request-id"] = request_id
return response
def _emit(self, request: Request, body: bytes, response, exc, started_at: float, request_id: str):
try:
latency_ms = int((time.time() - started_at) * 1000)
status_code = response.status_code if response else (500 if exc else None)
kind = severity_from(status_code, exc)
event_name = getattr(request.state, "imtbl_event_name", None) or f"http.{request.method}.{request.url.path}"
user_context = {
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("user-agent", "unknown"),
}
payload = {
"id": str(uuid.uuid4()),
"kind": kind,
"message": self._generate_message(request, status_code, exc, kind),
"timestamp": datetime.now(timezone.utc).isoformat(),
"context": user_context,
"request": {
"request_id": request_id,
"method": request.method,
"path": str(request.url.path),
"query_params": dict(request.query_params) or None,
},
"metrics": {"latency_ms": latency_ms, "status_code": status_code},
"severity": get_severity_level(kind, status_code),
}
if kind == "error":
error_details = {
"status_code": status_code,
"retryable": status_code in [408, 429, 500, 502, 503, 504] if status_code else False,
}
if exc:
error_details["exception"] = type(exc).__name__
error_details["exception_message"] = str(exc)
if body:
error_details["request_body_hash"] = hash_body(body)
if len(body) < 2000:
error_details["request_body"] = body.decode("utf-8", errors="replace")
payload["error"] = error_details
elif kind == "success":
payload["success"] = {"status_code": status_code, "result": "ok" if status_code == 200 else "processed"}
elif kind == "info":
payload["info"] = {"status_code": status_code, "action": request.method.lower()}
meta = {
"type": kind,
"event_name": event_name,
"service": self.service_name,
"request_id": request_id,
"env": self.env,
}
event = {"payload": clamp_payload(payload), "meta": meta}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Idempotency-Key": f"{event_name}-{request_id}",
"Request-Id": request_id,
"X-Client-TZ": "UTC",
"X-Client-Offset-Minutes": "0",
}
requests.post(f"{self.api_url}/v1/events", json=event, headers=headers, timeout=5)
except Exception as e:
logger.warning("ImmutableLogAuditMiddleware _emit failed: %s", e)
def _generate_message(self, request: Request, status_code, exc, kind: str) -> str:
method = request.method
path = str(request.url.path)
if exc:
return f"{method} {path} failed with exception: {type(exc).__name__}"
if kind == "error":
return f"{method} {path} returned error {status_code}"
elif kind == "success":
return f"{method} {path} completed successfully"
return f"{method} {path} processed"Limit: Payload limit: 12KB per event. Large fields are automatically removed if the limit is exceeded.
Registration and Configuration
Unlike Django, FastAPI does not use a centralized `settings.py` file. The middleware is configured directly via parameters at registration time with `app.add_middleware()`. Use environment variables to load credentials securely.
import os
from fastapi import FastAPI
from middleware import ImmutableLogAuditMiddleware
app = FastAPI()
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ.get("IMTBL_API_KEY", ""),
api_url=os.environ.get("IMTBL_URL", "https://api.immutablelog.com"),
service_name=os.environ.get("IMTBL_SERVICE_NAME", "fastapi-service"),
env=os.environ.get("IMTBL_ENV", "production"),
skip_paths=["/health", "/healthz", "/metrics"],
)Never pass the token directly in code. Use environment variables (.env) and load them via os.environ.get() or libraries like python-dotenv.
How it works
The `dispatch` method intercepts each request. The body is read before passing to the handler to allow inclusion in error events. The response passes through the handler and the event is emitted to ImmutableLog. Unhandled exceptions are captured, the event is emitted, and the exception is re-raised for FastAPI's default handling.
await request.body()
Body read before handler for capture in error events
await call_next(request)
Handler executed; latency calculated; event emitted on exit
except Exception → re-raise
Exceptions captured, error event emitted, and exception re-raised
The x-request-id is injected into the response for end-to-end traceability across systems.
Custom event names
Use `request.state.imtbl_event_name` inside any route to override the automatically generated event name. This enables granular traceability by business event in the dashboard.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post("/checkout")
async def process_checkout(request: Request):
# Sobrescreve o nome do evento para esta rota
# Override the event name for this route
request.state.imtbl_event_name = "payment.checkout.initiated"
# ... logica de negocio / business logic ...
return JSONResponse({"status": "ok"})If not set, the default is http.METHOD.path (e.g., http.POST./checkout).
Path exclusion
Pass a list of paths in `skip_paths` when registering the middleware to ignore specific routes (e.g., health checks). By default, `/health` and `/healthz` are automatically ignored.
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
# Rotas ignoradas / Ignored routes
skip_paths=["/health", "/healthz", "/metrics", "/readyz"],
)Note on async sending
Sending to ImmutableLog uses `requests` (synchronous) in an async context. For high-load production, we recommend replacing it with `httpx.AsyncClient` to avoid blocking the event loop. The example below shows both approaches.
# Alternativa async com httpx (recomendada para alta carga)
# Async alternative with httpx (recommended for high load)
import httpx
import asyncio
# Na classe ImmutableLogAuditMiddleware, substitua requests.post por:
# In ImmutableLogAuditMiddleware class, replace requests.post with:
async def _emit_async(self, ...):
async with httpx.AsyncClient() as client:
await client.post(
f"{self.api_url}/v1/events",
json=event,
headers=headers,
timeout=5,
)
# Ou use asyncio.get_event_loop().run_in_executor() para manter requests:
# Or use asyncio.get_event_loop().run_in_executor() to keep requests:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: requests.post(url, json=event, headers=headers, timeout=5)
)To install httpx: pip install httpx
This documentation reflects the current integration behavior. For questions or advanced integrations, contact the support team.
