Skip to main content
ImmutableLog logo
Back
FastAPIPythonAsync

FastAPI Integration

Integrate ImmutableLog into any FastAPI application with an async middleware based on Starlette. Automatically captures all HTTP requests — errors, successes, and unhandled exceptions — without modifying your routes.

Installation

The middleware only depends on `requests` (for synchronous sending in a separate thread) and Starlette, already included in FastAPI. Copy the `middleware.py` file into your project.

bash
pip install requests fastapi

Middleware Code

Paste the code below into `middleware.py` in your project. The middleware inherits from Starlette's `BaseHTTPMiddleware` and asynchronously intercepts the complete lifecycle of each request.

python
# middleware.py
import hashlib
import json
import logging
import time
import uuid
from datetime import datetime, timezone

import requests
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp

logger = logging.getLogger(__name__)

MAX_PAYLOAD_BYTES = 12_000


def clamp_payload(payload: dict) -> str:
    s = json.dumps(payload, ensure_ascii=False)
    if len(s.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
        return s
    payload = dict(payload)
    payload.pop("error", None)
    payload.pop("request_body", None)
    s2 = json.dumps(payload, ensure_ascii=False)
    if len(s2.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
        return s2
    return json.dumps({
        "id": payload.get("id"),
        "kind": payload.get("kind"),
        "message": (payload.get("message") or "event")[:500],
        "timestamp": payload.get("timestamp"),
    }, ensure_ascii=False)


def hash_body(body: bytes) -> str:
    return hashlib.sha256(body).hexdigest()


def severity_from(status_code: int | None, exc: Exception | None) -> str:
    if exc is not None:
        return "error"
    if status_code is None:
        return "info"
    if status_code >= 400:
        return "error"
    if status_code >= 300:
        return "info"
    if status_code >= 200:
        return "success"
    return "info"


def get_severity_level(kind: str, status_code: int | None) -> str:
    if kind == "error":
        return "high"
    elif kind == "warning":
        return "medium"
    return "low"


class ImmutableLogAuditMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
        api_key: str,
        api_url: str,
        service_name: str = "fastapi-service",
        env: str = "production",
        skip_paths: list[str] | None = None,
    ):
        super().__init__(app)
        self.api_key = api_key
        self.api_url = api_url
        self.service_name = service_name
        self.env = env
        self.skip_paths = set(skip_paths or ["/health", "/healthz"])

    async def dispatch(self, request: Request, call_next):
        if request.url.path in self.skip_paths:
            return await call_next(request)

        started_at = time.time()
        request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
        body = await request.body()

        exc_captured = None
        response = None
        try:
            response = await call_next(request)
        except Exception as exc:
            exc_captured = exc
            self._emit(request, body, None, exc, started_at, request_id)
            raise

        self._emit(request, body, response, None, started_at, request_id)
        response.headers["x-request-id"] = request_id
        return response

    def _emit(self, request: Request, body: bytes, response, exc, started_at: float, request_id: str):
        try:
            latency_ms = int((time.time() - started_at) * 1000)
            status_code = response.status_code if response else (500 if exc else None)
            kind = severity_from(status_code, exc)

            event_name = getattr(request.state, "imtbl_event_name", None) or f"http.{request.method}.{request.url.path}"

            user_context = {
                "ip": request.client.host if request.client else "unknown",
                "user_agent": request.headers.get("user-agent", "unknown"),
            }

            payload = {
                "id": str(uuid.uuid4()),
                "kind": kind,
                "message": self._generate_message(request, status_code, exc, kind),
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "context": user_context,
                "request": {
                    "request_id": request_id,
                    "method": request.method,
                    "path": str(request.url.path),
                    "query_params": dict(request.query_params) or None,
                },
                "metrics": {"latency_ms": latency_ms, "status_code": status_code},
                "severity": get_severity_level(kind, status_code),
            }

            if kind == "error":
                error_details = {
                    "status_code": status_code,
                    "retryable": status_code in [408, 429, 500, 502, 503, 504] if status_code else False,
                }
                if exc:
                    error_details["exception"] = type(exc).__name__
                    error_details["exception_message"] = str(exc)
                if body:
                    error_details["request_body_hash"] = hash_body(body)
                    if len(body) < 2000:
                        error_details["request_body"] = body.decode("utf-8", errors="replace")
                payload["error"] = error_details
            elif kind == "success":
                payload["success"] = {"status_code": status_code, "result": "ok" if status_code == 200 else "processed"}
            elif kind == "info":
                payload["info"] = {"status_code": status_code, "action": request.method.lower()}

            meta = {
                "type": kind,
                "event_name": event_name,
                "service": self.service_name,
                "request_id": request_id,
                "env": self.env,
            }

            event = {"payload": clamp_payload(payload), "meta": meta}

            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
                "Idempotency-Key": f"{event_name}-{request_id}",
                "Request-Id": request_id,
                "X-Client-TZ": "UTC",
                "X-Client-Offset-Minutes": "0",
            }

            requests.post(f"{self.api_url}/v1/events", json=event, headers=headers, timeout=5)

        except Exception as e:
            logger.warning("ImmutableLogAuditMiddleware _emit failed: %s", e)

    def _generate_message(self, request: Request, status_code, exc, kind: str) -> str:
        method = request.method
        path = str(request.url.path)
        if exc:
            return f"{method} {path} failed with exception: {type(exc).__name__}"
        if kind == "error":
            return f"{method} {path} returned error {status_code}"
        elif kind == "success":
            return f"{method} {path} completed successfully"
        return f"{method} {path} processed"

Limit: Payload limit: 12KB per event. Large fields are automatically removed if the limit is exceeded.

Registration and Configuration

Unlike Django, FastAPI does not use a centralized `settings.py` file. The middleware is configured directly via parameters at registration time with `app.add_middleware()`. Use environment variables to load credentials securely.

python
import os
from fastapi import FastAPI
from middleware import ImmutableLogAuditMiddleware

app = FastAPI()

app.add_middleware(
    ImmutableLogAuditMiddleware,
    api_key=os.environ.get("IMTBL_API_KEY", ""),
    api_url=os.environ.get("IMTBL_URL", "https://api.immutablelog.com"),
    service_name=os.environ.get("IMTBL_SERVICE_NAME", "fastapi-service"),
    env=os.environ.get("IMTBL_ENV", "production"),
    skip_paths=["/health", "/healthz", "/metrics"],
)

Never pass the token directly in code. Use environment variables (.env) and load them via os.environ.get() or libraries like python-dotenv.

How it works

The `dispatch` method intercepts each request. The body is read before passing to the handler to allow inclusion in error events. The response passes through the handler and the event is emitted to ImmutableLog. Unhandled exceptions are captured, the event is emitted, and the exception is re-raised for FastAPI's default handling.

await request.body()

Body read before handler for capture in error events

await call_next(request)

Handler executed; latency calculated; event emitted on exit

except Exception → re-raise

Exceptions captured, error event emitted, and exception re-raised

The x-request-id is injected into the response for end-to-end traceability across systems.

Custom event names

Use `request.state.imtbl_event_name` inside any route to override the automatically generated event name. This enables granular traceability by business event in the dashboard.

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.post("/checkout")
async def process_checkout(request: Request):
    # Sobrescreve o nome do evento para esta rota
    # Override the event name for this route
    request.state.imtbl_event_name = "payment.checkout.initiated"

    # ... logica de negocio / business logic ...

    return JSONResponse({"status": "ok"})

If not set, the default is http.METHOD.path (e.g., http.POST./checkout).

Path exclusion

Pass a list of paths in `skip_paths` when registering the middleware to ignore specific routes (e.g., health checks). By default, `/health` and `/healthz` are automatically ignored.

python
app.add_middleware(
    ImmutableLogAuditMiddleware,
    api_key=os.environ["IMTBL_API_KEY"],
    api_url="https://api.immutablelog.com",
    # Rotas ignoradas / Ignored routes
    skip_paths=["/health", "/healthz", "/metrics", "/readyz"],
)

Note on async sending

Sending to ImmutableLog uses `requests` (synchronous) in an async context. For high-load production, we recommend replacing it with `httpx.AsyncClient` to avoid blocking the event loop. The example below shows both approaches.

python
# Alternativa async com httpx (recomendada para alta carga)
# Async alternative with httpx (recommended for high load)
import httpx
import asyncio

# Na classe ImmutableLogAuditMiddleware, substitua requests.post por:
# In ImmutableLogAuditMiddleware class, replace requests.post with:

async def _emit_async(self, ...):
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{self.api_url}/v1/events",
            json=event,
            headers=headers,
            timeout=5,
        )

# Ou use asyncio.get_event_loop().run_in_executor() para manter requests:
# Or use asyncio.get_event_loop().run_in_executor() to keep requests:

loop = asyncio.get_event_loop()
await loop.run_in_executor(
    None,
    lambda: requests.post(url, json=event, headers=headers, timeout=5)
)

To install httpx: pip install httpx

This documentation reflects the current integration behavior. For questions or advanced integrations, contact the support team.