Integração com FastAPI
Integre o ImmutableLog em qualquer aplicação FastAPI com um middleware assíncrono baseado no Starlette. Captura automaticamente todas as requisições HTTP — erros, sucessos e exceções não tratadas — sem modificar suas rotas.
Instalação
O middleware depende apenas do `requests` (para envio síncrono em thread separada) e do Starlette, já incluso no FastAPI. Copie o arquivo `middleware.py` para dentro do seu projeto.
pip install requests fastapiCódigo do Middleware
Cole o código abaixo em `middleware.py` no seu projeto. O middleware herda de `BaseHTTPMiddleware` do Starlette e intercepta o ciclo de vida completo de cada requisição de forma assíncrona.
# middleware.py
import hashlib
import json
import logging
import time
import uuid
from datetime import datetime, timezone
import requests
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.types import ASGIApp
logger = logging.getLogger(__name__)
MAX_PAYLOAD_BYTES = 12_000
def clamp_payload(payload: dict) -> str:
s = json.dumps(payload, ensure_ascii=False)
if len(s.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
return s
payload = dict(payload)
payload.pop("error", None)
payload.pop("request_body", None)
s2 = json.dumps(payload, ensure_ascii=False)
if len(s2.encode("utf-8")) <= MAX_PAYLOAD_BYTES:
return s2
return json.dumps({
"id": payload.get("id"),
"kind": payload.get("kind"),
"message": (payload.get("message") or "event")[:500],
"timestamp": payload.get("timestamp"),
}, ensure_ascii=False)
def hash_body(body: bytes) -> str:
return hashlib.sha256(body).hexdigest()
def severity_from(status_code: int | None, exc: Exception | None) -> str:
if exc is not None:
return "error"
if status_code is None:
return "info"
if status_code >= 400:
return "error"
if status_code >= 300:
return "info"
if status_code >= 200:
return "success"
return "info"
def get_severity_level(kind: str, status_code: int | None) -> str:
if kind == "error":
return "high"
elif kind == "warning":
return "medium"
return "low"
class ImmutableLogAuditMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
api_key: str,
api_url: str,
service_name: str = "fastapi-service",
env: str = "production",
skip_paths: list[str] | None = None,
):
super().__init__(app)
self.api_key = api_key
self.api_url = api_url
self.service_name = service_name
self.env = env
self.skip_paths = set(skip_paths or ["/health", "/healthz"])
async def dispatch(self, request: Request, call_next):
if request.url.path in self.skip_paths:
return await call_next(request)
started_at = time.time()
request_id = request.headers.get("x-request-id") or str(uuid.uuid4())
body = await request.body()
exc_captured = None
response = None
try:
response = await call_next(request)
except Exception as exc:
exc_captured = exc
self._emit(request, body, None, exc, started_at, request_id)
raise
self._emit(request, body, response, None, started_at, request_id)
response.headers["x-request-id"] = request_id
return response
def _emit(self, request: Request, body: bytes, response, exc, started_at: float, request_id: str):
try:
latency_ms = int((time.time() - started_at) * 1000)
status_code = response.status_code if response else (500 if exc else None)
kind = severity_from(status_code, exc)
event_name = getattr(request.state, "imtbl_event_name", None) or f"http.{request.method}.{request.url.path}"
user_context = {
"ip": request.client.host if request.client else "unknown",
"user_agent": request.headers.get("user-agent", "unknown"),
}
payload = {
"id": str(uuid.uuid4()),
"kind": kind,
"message": self._generate_message(request, status_code, exc, kind),
"timestamp": datetime.now(timezone.utc).isoformat(),
"context": user_context,
"request": {
"request_id": request_id,
"method": request.method,
"path": str(request.url.path),
"query_params": dict(request.query_params) or None,
},
"metrics": {"latency_ms": latency_ms, "status_code": status_code},
"severity": get_severity_level(kind, status_code),
}
if kind == "error":
error_details = {
"status_code": status_code,
"retryable": status_code in [408, 429, 500, 502, 503, 504] if status_code else False,
}
if exc:
error_details["exception"] = type(exc).__name__
error_details["exception_message"] = str(exc)
if body:
error_details["request_body_hash"] = hash_body(body)
if len(body) < 2000:
error_details["request_body"] = body.decode("utf-8", errors="replace")
payload["error"] = error_details
elif kind == "success":
payload["success"] = {"status_code": status_code, "result": "ok" if status_code == 200 else "processed"}
elif kind == "info":
payload["info"] = {"status_code": status_code, "action": request.method.lower()}
meta = {
"type": kind,
"event_name": event_name,
"service": self.service_name,
"request_id": request_id,
"env": self.env,
}
event = {"payload": clamp_payload(payload), "meta": meta}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Idempotency-Key": f"{event_name}-{request_id}",
"Request-Id": request_id,
"X-Client-TZ": "UTC",
"X-Client-Offset-Minutes": "0",
}
requests.post(f"{self.api_url}/v1/events", json=event, headers=headers, timeout=5)
except Exception as e:
logger.warning("ImmutableLogAuditMiddleware _emit failed: %s", e)
def _generate_message(self, request: Request, status_code, exc, kind: str) -> str:
method = request.method
path = str(request.url.path)
if exc:
return f"{method} {path} failed with exception: {type(exc).__name__}"
if kind == "error":
return f"{method} {path} returned error {status_code}"
elif kind == "success":
return f"{method} {path} completed successfully"
return f"{method} {path} processed"Limite: Limite de payload: 12KB por evento. Campos grandes são removidos automaticamente se o limite for ultrapassado.
Registro e Configuração
Diferente do Django, o FastAPI não usa um arquivo `settings.py` centralizado. O middleware é configurado diretamente via parâmetros no momento do registro com `app.add_middleware()`. Use variáveis de ambiente para carregar as credenciais com segurança.
import os
from fastapi import FastAPI
from middleware import ImmutableLogAuditMiddleware
app = FastAPI()
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ.get("IMTBL_API_KEY", ""),
api_url=os.environ.get("IMTBL_URL", "https://api.immutablelog.com"),
service_name=os.environ.get("IMTBL_SERVICE_NAME", "fastapi-service"),
env=os.environ.get("IMTBL_ENV", "production"),
skip_paths=["/health", "/healthz", "/metrics"],
)Nunca passe o token diretamente no código. Use variáveis de ambiente (.env) e carregue-as via os.environ.get() ou bibliotecas como python-dotenv.
Como funciona
O método `dispatch` intercepta cada requisição. O body é lido antes de repassar ao handler para permitir inclusão em eventos de erro. A resposta passa pelo handler e o evento é emitido ao ImmutableLog. Exceções não tratadas são capturadas, o evento é emitido e a exceção é re-lançada para o tratamento padrão do FastAPI.
await request.body()
Body lido antes do handler para captura em eventos de erro
await call_next(request)
Handler executado; latência calculada; evento emitido na saída
except Exception → re-raise
Exceções capturadas, evento de erro emitido e exceção re-lançada
O header x-request-id é injetado na resposta para rastreabilidade end-to-end entre sistemas.
Nomes de evento customizados
Use `request.state.imtbl_event_name` dentro de qualquer rota para sobrescrever o nome do evento gerado automaticamente. Isso permite rastreabilidade granular por evento de negócio no dashboard.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post("/checkout")
async def process_checkout(request: Request):
# Sobrescreve o nome do evento para esta rota
# Override the event name for this route
request.state.imtbl_event_name = "payment.checkout.initiated"
# ... logica de negocio / business logic ...
return JSONResponse({"status": "ok"})Se não definido, o padrão é http.METHOD.path (ex: http.POST./checkout).
Exclusão de rotas
Passe uma lista de paths em `skip_paths` no registro do middleware para ignorar rotas específicas (ex: health checks). Por padrão, `/health` e `/healthz` são ignorados automaticamente.
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
# Rotas ignoradas / Ignored routes
skip_paths=["/health", "/healthz", "/metrics", "/readyz"],
)Nota sobre envio assíncrono
O envio ao ImmutableLog usa `requests` (síncrono) em um contexto assíncrono. Para produção com alta carga, recomendamos substituir por `httpx.AsyncClient` para não bloquear o event loop. O exemplo abaixo mostra ambas as abordagens.
# Alternativa async com httpx (recomendada para alta carga)
# Async alternative with httpx (recommended for high load)
import httpx
import asyncio
# Na classe ImmutableLogAuditMiddleware, substitua requests.post por:
# In ImmutableLogAuditMiddleware class, replace requests.post with:
async def _emit_async(self, ...):
async with httpx.AsyncClient() as client:
await client.post(
f"{self.api_url}/v1/events",
json=event,
headers=headers,
timeout=5,
)
# Ou use asyncio.get_event_loop().run_in_executor() para manter requests:
# Or use asyncio.get_event_loop().run_in_executor() to keep requests:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: requests.post(url, json=event, headers=headers, timeout=5)
)Para instalar o httpx: pip install httpx
Esta documentação reflete o comportamento atual da integração. Para dúvidas ou integrações avançadas, entre em contato com o time de suporte.
