Python
Guia completo de integração com o ImmutableLog em Python. Escolha a abordagem que melhor se encaixa no seu projeto: middleware automático para frameworks web, integração direta com a biblioteca `requests`, ou decorators para instrumentar funções e endpoints específicos.
Middlewares para frameworks
A forma mais rápida de integrar o ImmutableLog é via middleware. Toda requisição HTTP é capturada automaticamente — erros, sucessos, latência e contexto do usuário — sem alterar nenhuma rota ou lógica de negócio. Clique no framework para ver a documentação completa com o código do middleware pronto para usar.
Middleware via `MiddlewareMixin`. Configurado por `settings.py`. Captura `process_request`, `process_response` e `process_exception`.
# settings.py
MIDDLEWARE = [
...
"api_core.middleware.ImmutableLogAuditMiddleware",
]
IMTBL_API_KEY = "iml_live_..."
IMTBL_URL = "https://api.immutablelog.com"Ver documentação completa →
Middleware assíncrono via `BaseHTTPMiddleware` do Starlette. Configurado por parâmetros no `add_middleware()`.
from middleware import ImmutableLogAuditMiddleware
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
)Ver documentação completa →
Extensão Flask com padrão `init_app`. Usa hooks `before_request`, `after_request` e `teardown_request`. Suporta blueprints.
from middleware import ImmutableLogAudit
ImmutableLogAudit(
app,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
)Ver documentação completa →
Integração com requests
Use a biblioteca `requests` do Python para enviar eventos diretamente ao ImmutableLog sem depender de um framework web. Ideal para workers, scripts de migração, jobs assíncronos, CLIs e qualquer código Python que precise registrar eventos no ledger.
Função helper send_event
Crie uma função utilitária reutilizável para encapsular a lógica de envio. Ela serializa o payload, gera os headers obrigatórios e faz o POST ao endpoint de ingestão.
import json
import logging
import os
import uuid
from datetime import datetime, timezone
import requests
logger = logging.getLogger(__name__)
IMTBL_API_KEY = os.environ.get("IMTBL_API_KEY", "")
IMTBL_URL = os.environ.get("IMTBL_URL", "https://api.immutablelog.com")
def send_event(
event_name: str,
payload: dict,
kind: str = "info",
service: str = "python-service",
env: str = "production",
) -> dict:
"""Envia um evento ao ImmutableLog e retorna a resposta da API."""
request_id = str(uuid.uuid4())
payload_str = json.dumps(
{**payload, "timestamp": datetime.now(timezone.utc).isoformat()},
ensure_ascii=False,
)
event = {
"payload": payload_str,
"meta": {
"type": kind,
"event_name": event_name,
"service": service,
"request_id": request_id,
"env": env,
},
}
headers = {
"Authorization": f"Bearer {IMTBL_API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": f"{event_name}-{request_id}",
"Request-Id": request_id,
}
response = requests.post(
f"{IMTBL_URL}/v1/events",
json=event,
headers=headers,
timeout=5,
)
response.raise_for_status()
return response.json()
# Uso / Usage
send_event(
event_name="payment.approved",
payload={
"payment_id": "pay_abc123",
"amount": 299.90,
"currency": "BRL",
"customer_id": "cust_42",
},
kind="success",
service="payments-service",
)Retry com backoff exponencial
Para ambientes de produção onde a disponibilidade é crítica, adicione retry automático com backoff exponencial. O ImmutableLog retorna `202` em sucesso e `429` quando o limite mensal é atingido — não faça retry em 429.
import time
def send_event_with_retry(
event_name: str,
payload: dict,
kind: str = "info",
service: str = "python-service",
max_retries: int = 3,
base_delay: float = 0.5,
) -> dict | None:
"""Envia evento com retry e backoff exponencial."""
for attempt in range(max_retries):
try:
response = requests.post(
f"{IMTBL_URL}/v1/events",
json=_build_event(event_name, payload, kind, service),
headers=_build_headers(event_name),
timeout=5,
)
# 429 = limite mensal — nao fazer retry
# 429 = monthly limit — do not retry
if response.status_code == 429:
logger.warning("ImmutableLog monthly limit reached: %s", response.json())
return None
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
if attempt == max_retries - 1:
logger.error("ImmutableLog send failed after %d retries: %s", max_retries, exc)
return None
delay = base_delay * (2 ** attempt) # 0.5s, 1s, 2s
logger.warning("ImmutableLog retry %d/%d in %.1fs: %s", attempt + 1, max_retries, delay, exc)
time.sleep(delay)
return NoneEnvio em lote (batch)
Em workers de alta frequência ou jobs de processamento, acumule eventos em memória e envie em lote usando threads para não bloquear o fluxo principal. Cada evento ainda é enviado individualmente ao endpoint — use o pool de threads para paralelizar.
from concurrent.futures import ThreadPoolExecutor, as_completed
def send_events_batch(
events: list[dict],
max_workers: int = 5,
) -> list[dict]:
"""
Envia uma lista de eventos em paralelo usando thread pool.
Cada item de 'events' deve ter: event_name, payload, kind, service.
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
send_event,
ev["event_name"],
ev["payload"],
ev.get("kind", "info"),
ev.get("service", "python-service"),
): ev
for ev in events
}
for future in as_completed(futures):
ev = futures[future]
try:
result = future.result()
results.append({"event": ev["event_name"], "tx_id": result.get("tx_id")})
except Exception as exc:
logger.warning("Batch send failed for %s: %s", ev["event_name"], exc)
return results
# Uso / Usage
events = [
{"event_name": "order.created", "payload": {"order_id": "ord_1"}, "kind": "success"},
{"event_name": "order.created", "payload": {"order_id": "ord_2"}, "kind": "success"},
{"event_name": "payment.failed", "payload": {"order_id": "ord_3"}, "kind": "error"},
]
results = send_events_batch(events, max_workers=3)
print(results)
# [{"event": "order.created", "tx_id": "..."}, ...]Decorators
Decorators permitem instrumentar funções específicas sem modificar sua lógica interna. Ideal para marcar operações críticas de negócio (ex: processar pagamento, criar usuário, enviar email) onde você quer garantir rastreabilidade independente do framework usado.
Decorator síncrono
Captura início, fim e exceções da função decorada. Em caso de erro, inclui o nome e mensagem da exceção no payload. O evento é enviado no bloco `finally` para garantir o registro mesmo quando a função lança uma exceção.
import functools
import json
import time
import uuid
from datetime import datetime, timezone
import requests
def audit_log(
event_name: str | None = None,
kind: str = "info",
service: str = "python-service",
):
"""Decorator para registrar chamadas de funcao no ImmutableLog."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
started_at = time.time()
exc_captured = None
try:
return func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
latency_ms = int((time.time() - started_at) * 1000)
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc_captured else kind
payload = {
"id": str(uuid.uuid4()),
"kind": actual_kind,
"message": (
f"{name} failed: {type(exc_captured).__name__}"
if exc_captured
else f"{name} executed"
),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
"function": {"module": func.__module__, "name": func.__qualname__},
}
if exc_captured:
payload["error"] = {
"exception": type(exc_captured).__name__,
"exception_message": str(exc_captured),
"retryable": False,
}
try:
send_event(name, payload, actual_kind, service)
except Exception:
pass # nunca propagar falha de auditoria / never propagate audit failure
return wrapper
return decorator
# Uso / Usage
@audit_log(event_name="payment.process", kind="success", service="payments-service")
def process_payment(payment_id: str, amount: float) -> dict:
# ... logica de negocio / business logic ...
return {"status": "approved", "payment_id": payment_id}
@audit_log(service="user-service")
def delete_user(user_id: int) -> None:
# event_name gerado automaticamente: "fn.mymodule.delete_user"
# event_name auto-generated: "fn.mymodule.delete_user"
...Decorator assíncrono
Versão assíncrona do decorator usando `asyncio`. O envio ao ImmutableLog é feito em uma thread separada via `asyncio.get_event_loop().run_in_executor()` para não bloquear o event loop da aplicação.
import asyncio
import functools
import time
import uuid
from datetime import datetime, timezone
def audit_log_async(
event_name: str | None = None,
kind: str = "info",
service: str = "python-service",
):
"""Decorator para funcoes async — envia evento sem bloquear o event loop."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
started_at = time.time()
exc_captured = None
try:
return await func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
latency_ms = int((time.time() - started_at) * 1000)
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc_captured else kind
payload = {
"id": str(uuid.uuid4()),
"kind": actual_kind,
"message": (
f"{name} failed: {type(exc_captured).__name__}"
if exc_captured
else f"{name} executed"
),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
}
if exc_captured:
payload["error"] = {
"exception": type(exc_captured).__name__,
"exception_message": str(exc_captured),
}
# Executa o envio em thread separada para nao bloquear o event loop
# Runs sending in a separate thread to not block the event loop
loop = asyncio.get_event_loop()
loop.run_in_executor(
None,
lambda: send_event(name, payload, actual_kind, service),
)
return wrapper
return decorator
# Uso / Usage
@audit_log_async(event_name="invoice.generate", kind="success", service="billing-service")
async def generate_invoice(order_id: str) -> dict:
# ... logica async / async logic ...
return {"invoice_id": "inv_abc123"}Decorator com classe (configurável)
Para projetos maiores, use um decorator baseado em classe para centralizar a configuração (api_key, url, service) e reaproveitar a instância. Registre uma vez na inicialização da aplicação e use `@audit.log()` em qualquer função.
import functools
import os
import time
import uuid
from datetime import datetime, timezone
import requests as req_lib
class ImmutableLogAudit:
"""Cliente configuravel para auditoria com ImmutableLog."""
def __init__(
self,
api_key: str = "",
api_url: str = "https://api.immutablelog.com",
service: str = "python-service",
env: str = "production",
):
self.api_key = api_key or os.environ.get("IMTBL_API_KEY", "")
self.api_url = api_url
self.service = service
self.env = env
def log(
self,
event_name: str | None = None,
kind: str = "info",
):
"""Decorator para instrumentar qualquer funcao (sync ou async)."""
def decorator(func):
is_async = asyncio.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
return await self._run_async(func, args, kwargs, event_name, kind)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
return self._run_sync(func, args, kwargs, event_name, kind)
return async_wrapper if is_async else sync_wrapper
return decorator
def _run_sync(self, func, args, kwargs, event_name, kind):
started_at = time.time()
exc_captured = None
try:
return func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
self._emit(func, started_at, exc_captured, event_name, kind)
async def _run_async(self, func, args, kwargs, event_name, kind):
started_at = time.time()
exc_captured = None
try:
return await func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
loop = asyncio.get_event_loop()
loop.run_in_executor(
None, lambda: self._emit(func, started_at, exc_captured, event_name, kind)
)
def _emit(self, func, started_at, exc, event_name, kind):
try:
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc else kind
latency_ms = int((time.time() - started_at) * 1000)
request_id = str(uuid.uuid4())
payload = {
"id": request_id,
"kind": actual_kind,
"message": f"{name} failed: {type(exc).__name__}" if exc else f"{name} executed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
}
if exc:
payload["error"] = {"exception": type(exc).__name__, "exception_message": str(exc)}
event = {
"payload": json.dumps(payload, ensure_ascii=False),
"meta": {"type": actual_kind, "event_name": name, "service": self.service, "env": self.env},
}
req_lib.post(
f"{self.api_url}/v1/events",
json=event,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Idempotency-Key": f"{name}-{request_id}",
"Request-Id": request_id,
},
timeout=5,
)
except Exception:
pass
# Inicializar uma vez na aplicacao / Initialize once in the application
audit = ImmutableLogAudit(
api_key=os.environ["IMTBL_API_KEY"],
service="payments-service",
env="production",
)
# Usar em qualquer funcao sync ou async / Use on any sync or async function
@audit.log(event_name="payment.process", kind="success")
def process_payment(payment_id: str, amount: float) -> dict:
return {"status": "approved"}
@audit.log(event_name="invoice.generate")
async def generate_invoice(order_id: str) -> dict:
return {"invoice_id": "inv_abc123"}Esta documentação reflete o comportamento atual da API. Para dúvidas ou integrações avançadas, entre em contato com o time de suporte.
