Python
Complete integration guide for ImmutableLog in Python. Choose the approach that best fits your project: automatic middleware for web frameworks, direct integration with the `requests` library, or decorators to instrument specific functions and endpoints.
Framework middlewares
The fastest way to integrate ImmutableLog is via middleware. Every HTTP request is automatically captured — errors, successes, latency, and user context — without changing any route or business logic. Click the framework to see the full documentation with ready-to-use middleware code.
Middleware via `MiddlewareMixin`. Configured by `settings.py`. Captures `process_request`, `process_response`, and `process_exception`.
# settings.py
MIDDLEWARE = [
...
"api_core.middleware.ImmutableLogAuditMiddleware",
]
IMTBL_API_KEY = "iml_live_..."
IMTBL_URL = "https://api.immutablelog.com"View full documentation →
Async middleware via Starlette's `BaseHTTPMiddleware`. Configured by parameters in `add_middleware()`.
from middleware import ImmutableLogAuditMiddleware
app.add_middleware(
ImmutableLogAuditMiddleware,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
)View full documentation →
Flask extension with `init_app` pattern. Uses `before_request`, `after_request`, and `teardown_request` hooks. Supports blueprints.
from middleware import ImmutableLogAudit
ImmutableLogAudit(
app,
api_key=os.environ["IMTBL_API_KEY"],
api_url="https://api.immutablelog.com",
)View full documentation →
Integration with requests
Use Python's `requests` library to send events directly to ImmutableLog without depending on a web framework. Ideal for workers, migration scripts, async jobs, CLIs, and any Python code that needs to record events in the ledger.
Helper function send_event
Create a reusable utility function to encapsulate the sending logic. It serializes the payload, generates required headers, and POSTs to the ingestion endpoint.
import json
import logging
import os
import uuid
from datetime import datetime, timezone
import requests
logger = logging.getLogger(__name__)
IMTBL_API_KEY = os.environ.get("IMTBL_API_KEY", "")
IMTBL_URL = os.environ.get("IMTBL_URL", "https://api.immutablelog.com")
def send_event(
event_name: str,
payload: dict,
kind: str = "info",
service: str = "python-service",
env: str = "production",
) -> dict:
"""Envia um evento ao ImmutableLog e retorna a resposta da API."""
request_id = str(uuid.uuid4())
payload_str = json.dumps(
{**payload, "timestamp": datetime.now(timezone.utc).isoformat()},
ensure_ascii=False,
)
event = {
"payload": payload_str,
"meta": {
"type": kind,
"event_name": event_name,
"service": service,
"request_id": request_id,
"env": env,
},
}
headers = {
"Authorization": f"Bearer {IMTBL_API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": f"{event_name}-{request_id}",
"Request-Id": request_id,
}
response = requests.post(
f"{IMTBL_URL}/v1/events",
json=event,
headers=headers,
timeout=5,
)
response.raise_for_status()
return response.json()
# Uso / Usage
send_event(
event_name="payment.approved",
payload={
"payment_id": "pay_abc123",
"amount": 299.90,
"currency": "BRL",
"customer_id": "cust_42",
},
kind="success",
service="payments-service",
)Retry with exponential backoff
For production environments where availability is critical, add automatic retry with exponential backoff. ImmutableLog returns `202` on success and `429` when the monthly limit is reached — do not retry on 429.
import time
def send_event_with_retry(
event_name: str,
payload: dict,
kind: str = "info",
service: str = "python-service",
max_retries: int = 3,
base_delay: float = 0.5,
) -> dict | None:
"""Envia evento com retry e backoff exponencial."""
for attempt in range(max_retries):
try:
response = requests.post(
f"{IMTBL_URL}/v1/events",
json=_build_event(event_name, payload, kind, service),
headers=_build_headers(event_name),
timeout=5,
)
# 429 = limite mensal — nao fazer retry
# 429 = monthly limit — do not retry
if response.status_code == 429:
logger.warning("ImmutableLog monthly limit reached: %s", response.json())
return None
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
if attempt == max_retries - 1:
logger.error("ImmutableLog send failed after %d retries: %s", max_retries, exc)
return None
delay = base_delay * (2 ** attempt) # 0.5s, 1s, 2s
logger.warning("ImmutableLog retry %d/%d in %.1fs: %s", attempt + 1, max_retries, delay, exc)
time.sleep(delay)
return NoneBatch sending
In high-frequency workers or processing jobs, accumulate events in memory and send in batch using threads to avoid blocking the main flow. Each event is still sent individually to the endpoint — use the thread pool to parallelize.
from concurrent.futures import ThreadPoolExecutor, as_completed
def send_events_batch(
events: list[dict],
max_workers: int = 5,
) -> list[dict]:
"""
Envia uma lista de eventos em paralelo usando thread pool.
Cada item de 'events' deve ter: event_name, payload, kind, service.
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
send_event,
ev["event_name"],
ev["payload"],
ev.get("kind", "info"),
ev.get("service", "python-service"),
): ev
for ev in events
}
for future in as_completed(futures):
ev = futures[future]
try:
result = future.result()
results.append({"event": ev["event_name"], "tx_id": result.get("tx_id")})
except Exception as exc:
logger.warning("Batch send failed for %s: %s", ev["event_name"], exc)
return results
# Uso / Usage
events = [
{"event_name": "order.created", "payload": {"order_id": "ord_1"}, "kind": "success"},
{"event_name": "order.created", "payload": {"order_id": "ord_2"}, "kind": "success"},
{"event_name": "payment.failed", "payload": {"order_id": "ord_3"}, "kind": "error"},
]
results = send_events_batch(events, max_workers=3)
print(results)
# [{"event": "order.created", "tx_id": "..."}, ...]Decorators
Decorators allow you to instrument specific functions without modifying their internal logic. Ideal for marking critical business operations (e.g., process payment, create user, send email) where you want to ensure traceability regardless of the framework used.
Synchronous decorator
Captures start, end, and exceptions from the decorated function. On error, includes the exception class and message in the payload. The event is sent in the `finally` block to ensure recording even when the function raises an exception.
import functools
import json
import time
import uuid
from datetime import datetime, timezone
import requests
def audit_log(
event_name: str | None = None,
kind: str = "info",
service: str = "python-service",
):
"""Decorator para registrar chamadas de funcao no ImmutableLog."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
started_at = time.time()
exc_captured = None
try:
return func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
latency_ms = int((time.time() - started_at) * 1000)
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc_captured else kind
payload = {
"id": str(uuid.uuid4()),
"kind": actual_kind,
"message": (
f"{name} failed: {type(exc_captured).__name__}"
if exc_captured
else f"{name} executed"
),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
"function": {"module": func.__module__, "name": func.__qualname__},
}
if exc_captured:
payload["error"] = {
"exception": type(exc_captured).__name__,
"exception_message": str(exc_captured),
"retryable": False,
}
try:
send_event(name, payload, actual_kind, service)
except Exception:
pass # nunca propagar falha de auditoria / never propagate audit failure
return wrapper
return decorator
# Uso / Usage
@audit_log(event_name="payment.process", kind="success", service="payments-service")
def process_payment(payment_id: str, amount: float) -> dict:
# ... logica de negocio / business logic ...
return {"status": "approved", "payment_id": payment_id}
@audit_log(service="user-service")
def delete_user(user_id: int) -> None:
# event_name gerado automaticamente: "fn.mymodule.delete_user"
# event_name auto-generated: "fn.mymodule.delete_user"
...Asynchronous decorator
Asynchronous version of the decorator using `asyncio`. Sending to ImmutableLog is done in a separate thread via `asyncio.get_event_loop().run_in_executor()` to avoid blocking the application's event loop.
import asyncio
import functools
import time
import uuid
from datetime import datetime, timezone
def audit_log_async(
event_name: str | None = None,
kind: str = "info",
service: str = "python-service",
):
"""Decorator para funcoes async — envia evento sem bloquear o event loop."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
started_at = time.time()
exc_captured = None
try:
return await func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
latency_ms = int((time.time() - started_at) * 1000)
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc_captured else kind
payload = {
"id": str(uuid.uuid4()),
"kind": actual_kind,
"message": (
f"{name} failed: {type(exc_captured).__name__}"
if exc_captured
else f"{name} executed"
),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
}
if exc_captured:
payload["error"] = {
"exception": type(exc_captured).__name__,
"exception_message": str(exc_captured),
}
# Executa o envio em thread separada para nao bloquear o event loop
# Runs sending in a separate thread to not block the event loop
loop = asyncio.get_event_loop()
loop.run_in_executor(
None,
lambda: send_event(name, payload, actual_kind, service),
)
return wrapper
return decorator
# Uso / Usage
@audit_log_async(event_name="invoice.generate", kind="success", service="billing-service")
async def generate_invoice(order_id: str) -> dict:
# ... logica async / async logic ...
return {"invoice_id": "inv_abc123"}Class-based decorator (configurable)
For larger projects, use a class-based decorator to centralize configuration (api_key, url, service) and reuse the instance. Register once at application startup and use `@audit.log()` on any function.
import functools
import os
import time
import uuid
from datetime import datetime, timezone
import requests as req_lib
class ImmutableLogAudit:
"""Cliente configuravel para auditoria com ImmutableLog."""
def __init__(
self,
api_key: str = "",
api_url: str = "https://api.immutablelog.com",
service: str = "python-service",
env: str = "production",
):
self.api_key = api_key or os.environ.get("IMTBL_API_KEY", "")
self.api_url = api_url
self.service = service
self.env = env
def log(
self,
event_name: str | None = None,
kind: str = "info",
):
"""Decorator para instrumentar qualquer funcao (sync ou async)."""
def decorator(func):
is_async = asyncio.iscoroutinefunction(func)
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
return await self._run_async(func, args, kwargs, event_name, kind)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
return self._run_sync(func, args, kwargs, event_name, kind)
return async_wrapper if is_async else sync_wrapper
return decorator
def _run_sync(self, func, args, kwargs, event_name, kind):
started_at = time.time()
exc_captured = None
try:
return func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
self._emit(func, started_at, exc_captured, event_name, kind)
async def _run_async(self, func, args, kwargs, event_name, kind):
started_at = time.time()
exc_captured = None
try:
return await func(*args, **kwargs)
except Exception as exc:
exc_captured = exc
raise
finally:
loop = asyncio.get_event_loop()
loop.run_in_executor(
None, lambda: self._emit(func, started_at, exc_captured, event_name, kind)
)
def _emit(self, func, started_at, exc, event_name, kind):
try:
name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
actual_kind = "error" if exc else kind
latency_ms = int((time.time() - started_at) * 1000)
request_id = str(uuid.uuid4())
payload = {
"id": request_id,
"kind": actual_kind,
"message": f"{name} failed: {type(exc).__name__}" if exc else f"{name} executed",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {"latency_ms": latency_ms},
}
if exc:
payload["error"] = {"exception": type(exc).__name__, "exception_message": str(exc)}
event = {
"payload": json.dumps(payload, ensure_ascii=False),
"meta": {"type": actual_kind, "event_name": name, "service": self.service, "env": self.env},
}
req_lib.post(
f"{self.api_url}/v1/events",
json=event,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Idempotency-Key": f"{name}-{request_id}",
"Request-Id": request_id,
},
timeout=5,
)
except Exception:
pass
# Inicializar uma vez na aplicacao / Initialize once in the application
audit = ImmutableLogAudit(
api_key=os.environ["IMTBL_API_KEY"],
service="payments-service",
env="production",
)
# Usar em qualquer funcao sync ou async / Use on any sync or async function
@audit.log(event_name="payment.process", kind="success")
def process_payment(payment_id: str, amount: float) -> dict:
return {"status": "approved"}
@audit.log(event_name="invoice.generate")
async def generate_invoice(order_id: str) -> dict:
return {"invoice_id": "inv_abc123"}This documentation reflects the current API behavior. For questions or advanced integrations, contact the support team.
