Pular para o conteúdo principal
ImmutableLog logo
Voltar
FastAPIPythonAsync

Integração com FastAPI

Integre o ImmutableLog em qualquer aplicação FastAPI com um middleware assíncrono baseado no Starlette. Captura automaticamente todas as requisições HTTP — erros, sucessos e exceções não tratadas — sem modificar suas rotas.

Instalação

O middleware depende apenas do `requests` (para envio síncrono em thread separada) e do Starlette, já incluso no FastAPI. Copie o arquivo `middleware.py` para dentro do seu projeto.

bash
pip install requests fastapi

Código do Middleware

Cole o código abaixo em `middleware.py` no seu projeto. O middleware herda de `BaseHTTPMiddleware` do Starlette e intercepta o ciclo de vida completo de cada requisição de forma assíncrona.

python
# middleware.py
import hashlib
import json
import logging
import time
import uuid
from datetime import datetime, timezone

import requests
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp

logger = logging.getLogger(__name__)

MAX_PAYLOAD_BYTES = 12_000


def clamp_payload(payload: dict) -> str:
    s = json.dumps(payload, ensure_ascii=False)
    if len(s.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
        return s
    payload = dict(payload)
    payload.pop("error", None)
    payload.pop("request_body", None)
    s2 = json.dumps(payload, ensure_ascii=False)
    if len(s2.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
        return s2
    return json.dumps({
        "id": payload.get("id"),
        "kind": payload.get("kind"),
        "message": (payload.get("message") or "event")[:500],
        "timestamp": payload.get("timestamp"),
    }, ensure_ascii=False)


def hash_body(body: bytes) -> str:
    return hashlib.sha256(body).hexdigest()


def severity_from(status_code: int | None, exc: Exception | None) -> str:
    if exc is not None:
        return "error"
    if status_code is None:
        return "info"
    if status_code >= 400:
        return "error"
    if status_code >= 300:
        return "info"
    if status_code >= 200:
        return "success"
    return "info"


def get_severity_level(kind: str, status_code: int | None) -> str:
    if kind == "error":
        return "high"
    elif kind == "warning":
        return "medium"
    return "low"


class ImmutableLogAuditMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
        api_key: str,
        api_url: str,
        service_name: str = "fastapi-service",
        env: str = "production",
        skip_paths: list[str] | None = None,
    ):
        super().__init__(app)
        self.api_key = api_key
        self.api_url = api_url
        self.service_name = service_name
        self.env = env
        self.skip_paths = set(skip_paths or ["/health", "/healthz"])

    async def dispatch(self, request: Request, call_next):
        if request.url.path in self.skip_paths:
            return await call_next(request)

        started_at = time.time()
        request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
        body = await request.body()

        exc_captured = None
        response = None
        try:
            response = await call_next(request)
        except Exception as exc:
            exc_captured = exc
            self._emit(request, body, None, exc, started_at, request_id)
            raise

        self._emit(request, body, response, None, started_at, request_id)
        response.headers["x-request-id"] = request_id
        return response

    def _emit(self, request: Request, body: bytes, response, exc, started_at: float, request_id: str):
        try:
            latency_ms = int((time.time() - started_at) * 1000)
            status_code = response.status_code if response else (500 if exc else None)
            kind = severity_from(status_code, exc)

            event_name = getattr(request.state, "imtbl_event_name", None) or f"http.{request.method}.{request.url.path}"

            user_context = {
                "ip": request.client.host if request.client else "unknown",
                "user_agent": request.headers.get("user-agent", "unknown"),
            }

            payload = {
                "id": str(uuid.uuid4()),
                "kind": kind,
                "message": self._generate_message(request, status_code, exc, kind),
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "context": user_context,
                "request": {
                    "request_id": request_id,
                    "method": request.method,
                    "path": str(request.url.path),
                    "query_params": dict(request.query_params) or None,
                },
                "metrics": {"latency_ms": latency_ms, "status_code": status_code},
                "severity": get_severity_level(kind, status_code),
            }

            if kind == "error":
                error_details = {
                    "status_code": status_code,
                    "retryable": status_code in [408, 429, 500, 502, 503, 504] if status_code else False,
                }
                if exc:
                    error_details["exception"] = type(exc).__name__
                    error_details["exception_message"] = str(exc)
                if body:
                    error_details["request_body_hash"] = hash_body(body)
                    if len(body) < 2000:
                        error_details["request_body"] = body.decode("utf-8", errors="replace")
                payload["error"] = error_details
            elif kind == "success":
                payload["success"] = {"status_code": status_code, "result": "ok" if status_code == 200 else "processed"}
            elif kind == "info":
                payload["info"] = {"status_code": status_code, "action": request.method.lower()}

            meta = {
                "type": kind,
                "event_name": event_name,
                "service": self.service_name,
                "request_id": request_id,
                "env": self.env,
            }

            event = {"payload": clamp_payload(payload), "meta": meta}

            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
                "Idempotency-Key": f"{event_name}-{request_id}",
                "Request-Id": request_id,
                "X-Client-TZ": "UTC",
                "X-Client-Offset-Minutes": "0",
            }

            requests.post(f"{self.api_url}/v1/events", json=event, headers=headers, timeout=5)

        except Exception as e:
            logger.warning("ImmutableLogAuditMiddleware _emit failed: %s", e)

    def _generate_message(self, request: Request, status_code, exc, kind: str) -> str:
        method = request.method
        path = str(request.url.path)
        if exc:
            return f"{method} {path} failed with exception: {type(exc).__name__}"
        if kind == "error":
            return f"{method} {path} returned error {status_code}"
        elif kind == "success":
            return f"{method} {path} completed successfully"
        return f"{method} {path} processed"

Limite: Limite de payload: 12KB por evento. Campos grandes são removidos automaticamente se o limite for ultrapassado.

Registro e Configuração

Diferente do Django, o FastAPI não usa um arquivo `settings.py` centralizado. O middleware é configurado diretamente via parâmetros no momento do registro com `app.add_middleware()`. Use variáveis de ambiente para carregar as credenciais com segurança.

python
import os
from fastapi import FastAPI
from middleware import ImmutableLogAuditMiddleware

app = FastAPI()

app.add_middleware(
    ImmutableLogAuditMiddleware,
    api_key=os.environ.get("IMTBL_API_KEY", ""),
    api_url=os.environ.get("IMTBL_URL", "https://api.immutablelog.com"),
    service_name=os.environ.get("IMTBL_SERVICE_NAME", "fastapi-service"),
    env=os.environ.get("IMTBL_ENV", "production"),
    skip_paths=["/health", "/healthz", "/metrics"],
)

Nunca passe o token diretamente no código. Use variáveis de ambiente (.env) e carregue-as via os.environ.get() ou bibliotecas como python-dotenv.

Como funciona

O método `dispatch` intercepta cada requisição. O body é lido antes de repassar ao handler para permitir inclusão em eventos de erro. A resposta passa pelo handler e o evento é emitido ao ImmutableLog. Exceções não tratadas são capturadas, o evento é emitido e a exceção é re-lançada para o tratamento padrão do FastAPI.

await request.body()

Body lido antes do handler para captura em eventos de erro

await call_next(request)

Handler executado; latência calculada; evento emitido na saída

except Exception → re-raise

Exceções capturadas, evento de erro emitido e exceção re-lançada

O header x-request-id é injetado na resposta para rastreabilidade end-to-end entre sistemas.

Nomes de evento customizados

Use `request.state.imtbl_event_name` dentro de qualquer rota para sobrescrever o nome do evento gerado automaticamente. Isso permite rastreabilidade granular por evento de negócio no dashboard.

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.post("/checkout")
async def process_checkout(request: Request):
    # Sobrescreve o nome do evento para esta rota
    # Override the event name for this route
    request.state.imtbl_event_name = "payment.checkout.initiated"

    # ... logica de negocio / business logic ...

    return JSONResponse({"status": "ok"})

Se não definido, o padrão é http.METHOD.path (ex: http.POST./checkout).

Exclusão de rotas

Passe uma lista de paths em `skip_paths` no registro do middleware para ignorar rotas específicas (ex: health checks). Por padrão, `/health` e `/healthz` são ignorados automaticamente.

python
app.add_middleware(
    ImmutableLogAuditMiddleware,
    api_key=os.environ["IMTBL_API_KEY"],
    api_url="https://api.immutablelog.com",
    # Rotas ignoradas / Ignored routes
    skip_paths=["/health", "/healthz", "/metrics", "/readyz"],
)

Nota sobre envio assíncrono

O envio ao ImmutableLog usa `requests` (síncrono) em um contexto assíncrono. Para produção com alta carga, recomendamos substituir por `httpx.AsyncClient` para não bloquear o event loop. O exemplo abaixo mostra ambas as abordagens.

python
# Alternativa async com httpx (recomendada para alta carga)
# Async alternative with httpx (recommended for high load)
import httpx
import asyncio

# Na classe ImmutableLogAuditMiddleware, substitua requests.post por:
# In ImmutableLogAuditMiddleware class, replace requests.post with:

async def _emit_async(self, ...):
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{self.api_url}/v1/events",
            json=event,
            headers=headers,
            timeout=5,
        )

# Ou use asyncio.get_event_loop().run_in_executor() para manter requests:
# Or use asyncio.get_event_loop().run_in_executor() to keep requests:

loop = asyncio.get_event_loop()
await loop.run_in_executor(
    None,
    lambda: requests.post(url, json=event, headers=headers, timeout=5)
)

Para instalar o httpx: pip install httpx

Esta documentação reflete o comportamento atual da integração. Para dúvidas ou integrações avançadas, entre em contato com o time de suporte.