Skip to main content
ImmutableLog logo
BackPython

Python

Complete integration guide for ImmutableLog in Python. Choose the approach that best fits your project: automatic middleware for web frameworks, direct integration with the `requests` library, or decorators to instrument specific functions and endpoints.

Integration with requests

Use Python's `requests` library to send events directly to ImmutableLog without depending on a web framework. Ideal for workers, migration scripts, async jobs, CLIs, and any Python code that needs to record events in the ledger.

Helper function send_event

Create a reusable utility function to encapsulate the sending logic. It serializes the payload, generates required headers, and POSTs to the ingestion endpoint.

python
import json
import logging
import os
import uuid
from datetime import datetime, timezone

import requests

logger = logging.getLogger(__name__)

IMTBL_API_KEY = os.environ.get("IMTBL_API_KEY", "")
IMTBL_URL = os.environ.get("IMTBL_URL", "https://api.immutablelog.com")


def send_event(
    event_name: str,
    payload: dict,
    kind: str = "info",
    service: str = "python-service",
    env: str = "production",
) -> dict:
    """Envia um evento ao ImmutableLog e retorna a resposta da API."""
    request_id = str(uuid.uuid4())
    payload_str = json.dumps(
        {**payload, "timestamp": datetime.now(timezone.utc).isoformat()},
        ensure_ascii=False,
    )

    event = {
        "payload": payload_str,
        "meta": {
            "type": kind,
            "event_name": event_name,
            "service": service,
            "request_id": request_id,
            "env": env,
        },
    }

    headers = {
        "Authorization": f"Bearer {IMTBL_API_KEY}",
        "Content-Type": "application/json",
        "Idempotency-Key": f"{event_name}-{request_id}",
        "Request-Id": request_id,
    }

    response = requests.post(
        f"{IMTBL_URL}/v1/events",
        json=event,
        headers=headers,
        timeout=5,
    )
    response.raise_for_status()
    return response.json()


# Uso / Usage
send_event(
    event_name="payment.approved",
    payload={
        "payment_id": "pay_abc123",
        "amount": 299.90,
        "currency": "BRL",
        "customer_id": "cust_42",
    },
    kind="success",
    service="payments-service",
)

Retry with exponential backoff

For production environments where availability is critical, add automatic retry with exponential backoff. ImmutableLog returns `202` on success and `429` when the monthly limit is reached — do not retry on 429.

python
import time

def send_event_with_retry(
    event_name: str,
    payload: dict,
    kind: str = "info",
    service: str = "python-service",
    max_retries: int = 3,
    base_delay: float = 0.5,
) -> dict | None:
    """Envia evento com retry e backoff exponencial."""
    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{IMTBL_URL}/v1/events",
                json=_build_event(event_name, payload, kind, service),
                headers=_build_headers(event_name),
                timeout=5,
            )

            # 429 = limite mensal — nao fazer retry
            # 429 = monthly limit — do not retry
            if response.status_code == 429:
                logger.warning("ImmutableLog monthly limit reached: %s", response.json())
                return None

            response.raise_for_status()
            return response.json()

        except requests.RequestException as exc:
            if attempt == max_retries - 1:
                logger.error("ImmutableLog send failed after %d retries: %s", max_retries, exc)
                return None

            delay = base_delay * (2 ** attempt)  # 0.5s, 1s, 2s
            logger.warning("ImmutableLog retry %d/%d in %.1fs: %s", attempt + 1, max_retries, delay, exc)
            time.sleep(delay)

    return None

Batch sending

In high-frequency workers or processing jobs, accumulate events in memory and send in batch using threads to avoid blocking the main flow. Each event is still sent individually to the endpoint — use the thread pool to parallelize.

python
from concurrent.futures import ThreadPoolExecutor, as_completed

def send_events_batch(
    events: list[dict],
    max_workers: int = 5,
) -> list[dict]:
    """
    Envia uma lista de eventos em paralelo usando thread pool.
    Cada item de 'events' deve ter: event_name, payload, kind, service.
    """
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                send_event,
                ev["event_name"],
                ev["payload"],
                ev.get("kind", "info"),
                ev.get("service", "python-service"),
            ): ev
            for ev in events
        }

        for future in as_completed(futures):
            ev = futures[future]
            try:
                result = future.result()
                results.append({"event": ev["event_name"], "tx_id": result.get("tx_id")})
            except Exception as exc:
                logger.warning("Batch send failed for %s: %s", ev["event_name"], exc)

    return results


# Uso / Usage
events = [
    {"event_name": "order.created",  "payload": {"order_id": "ord_1"}, "kind": "success"},
    {"event_name": "order.created",  "payload": {"order_id": "ord_2"}, "kind": "success"},
    {"event_name": "payment.failed", "payload": {"order_id": "ord_3"}, "kind": "error"},
]

results = send_events_batch(events, max_workers=3)
print(results)
# [{"event": "order.created", "tx_id": "..."}, ...]

Decorators

Decorators allow you to instrument specific functions without modifying their internal logic. Ideal for marking critical business operations (e.g., process payment, create user, send email) where you want to ensure traceability regardless of the framework used.

Synchronous decorator

Captures start, end, and exceptions from the decorated function. On error, includes the exception class and message in the payload. The event is sent in the `finally` block to ensure recording even when the function raises an exception.

python
import functools
import json
import time
import uuid
from datetime import datetime, timezone

import requests


def audit_log(
    event_name: str | None = None,
    kind: str = "info",
    service: str = "python-service",
):
    """Decorator para registrar chamadas de funcao no ImmutableLog."""

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            started_at = time.time()
            exc_captured = None

            try:
                return func(*args, **kwargs)
            except Exception as exc:
                exc_captured = exc
                raise
            finally:
                latency_ms = int((time.time() - started_at) * 1000)
                name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
                actual_kind = "error" if exc_captured else kind

                payload = {
                    "id": str(uuid.uuid4()),
                    "kind": actual_kind,
                    "message": (
                        f"{name} failed: {type(exc_captured).__name__}"
                        if exc_captured
                        else f"{name} executed"
                    ),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "metrics": {"latency_ms": latency_ms},
                    "function": {"module": func.__module__, "name": func.__qualname__},
                }

                if exc_captured:
                    payload["error"] = {
                        "exception": type(exc_captured).__name__,
                        "exception_message": str(exc_captured),
                        "retryable": False,
                    }

                try:
                    send_event(name, payload, actual_kind, service)
                except Exception:
                    pass  # nunca propagar falha de auditoria / never propagate audit failure

        return wrapper
    return decorator


# Uso / Usage
@audit_log(event_name="payment.process", kind="success", service="payments-service")
def process_payment(payment_id: str, amount: float) -> dict:
    # ... logica de negocio / business logic ...
    return {"status": "approved", "payment_id": payment_id}


@audit_log(service="user-service")
def delete_user(user_id: int) -> None:
    # event_name gerado automaticamente: "fn.mymodule.delete_user"
    # event_name auto-generated: "fn.mymodule.delete_user"
    ...

Asynchronous decorator

Asynchronous version of the decorator using `asyncio`. Sending to ImmutableLog is done in a separate thread via `asyncio.get_event_loop().run_in_executor()` to avoid blocking the application's event loop.

python
import asyncio
import functools
import time
import uuid
from datetime import datetime, timezone


def audit_log_async(
    event_name: str | None = None,
    kind: str = "info",
    service: str = "python-service",
):
    """Decorator para funcoes async — envia evento sem bloquear o event loop."""

    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            started_at = time.time()
            exc_captured = None

            try:
                return await func(*args, **kwargs)
            except Exception as exc:
                exc_captured = exc
                raise
            finally:
                latency_ms = int((time.time() - started_at) * 1000)
                name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
                actual_kind = "error" if exc_captured else kind

                payload = {
                    "id": str(uuid.uuid4()),
                    "kind": actual_kind,
                    "message": (
                        f"{name} failed: {type(exc_captured).__name__}"
                        if exc_captured
                        else f"{name} executed"
                    ),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "metrics": {"latency_ms": latency_ms},
                }

                if exc_captured:
                    payload["error"] = {
                        "exception": type(exc_captured).__name__,
                        "exception_message": str(exc_captured),
                    }

                # Executa o envio em thread separada para nao bloquear o event loop
                # Runs sending in a separate thread to not block the event loop
                loop = asyncio.get_event_loop()
                loop.run_in_executor(
                    None,
                    lambda: send_event(name, payload, actual_kind, service),
                )

        return wrapper
    return decorator


# Uso / Usage
@audit_log_async(event_name="invoice.generate", kind="success", service="billing-service")
async def generate_invoice(order_id: str) -> dict:
    # ... logica async / async logic ...
    return {"invoice_id": "inv_abc123"}

Class-based decorator (configurable)

For larger projects, use a class-based decorator to centralize configuration (api_key, url, service) and reuse the instance. Register once at application startup and use `@audit.log()` on any function.

python
import functools
import os
import time
import uuid
from datetime import datetime, timezone

import requests as req_lib


class ImmutableLogAudit:
    """Cliente configuravel para auditoria com ImmutableLog."""

    def __init__(
        self,
        api_key: str = "",
        api_url: str = "https://api.immutablelog.com",
        service: str = "python-service",
        env: str = "production",
    ):
        self.api_key = api_key or os.environ.get("IMTBL_API_KEY", "")
        self.api_url = api_url
        self.service = service
        self.env = env

    def log(
        self,
        event_name: str | None = None,
        kind: str = "info",
    ):
        """Decorator para instrumentar qualquer funcao (sync ou async)."""
        def decorator(func):
            is_async = asyncio.iscoroutinefunction(func)

            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                return await self._run_async(func, args, kwargs, event_name, kind)

            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                return self._run_sync(func, args, kwargs, event_name, kind)

            return async_wrapper if is_async else sync_wrapper
        return decorator

    def _run_sync(self, func, args, kwargs, event_name, kind):
        started_at = time.time()
        exc_captured = None
        try:
            return func(*args, **kwargs)
        except Exception as exc:
            exc_captured = exc
            raise
        finally:
            self._emit(func, started_at, exc_captured, event_name, kind)

    async def _run_async(self, func, args, kwargs, event_name, kind):
        started_at = time.time()
        exc_captured = None
        try:
            return await func(*args, **kwargs)
        except Exception as exc:
            exc_captured = exc
            raise
        finally:
            loop = asyncio.get_event_loop()
            loop.run_in_executor(
                None, lambda: self._emit(func, started_at, exc_captured, event_name, kind)
            )

    def _emit(self, func, started_at, exc, event_name, kind):
        try:
            name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
            actual_kind = "error" if exc else kind
            latency_ms = int((time.time() - started_at) * 1000)
            request_id = str(uuid.uuid4())

            payload = {
                "id": request_id,
                "kind": actual_kind,
                "message": f"{name} failed: {type(exc).__name__}" if exc else f"{name} executed",
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "metrics": {"latency_ms": latency_ms},
            }
            if exc:
                payload["error"] = {"exception": type(exc).__name__, "exception_message": str(exc)}

            event = {
                "payload": json.dumps(payload, ensure_ascii=False),
                "meta": {"type": actual_kind, "event_name": name, "service": self.service, "env": self.env},
            }
            req_lib.post(
                f"{self.api_url}/v1/events",
                json=event,
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                    "Idempotency-Key": f"{name}-{request_id}",
                    "Request-Id": request_id,
                },
                timeout=5,
            )
        except Exception:
            pass


# Inicializar uma vez na aplicacao / Initialize once in the application
audit = ImmutableLogAudit(
    api_key=os.environ["IMTBL_API_KEY"],
    service="payments-service",
    env="production",
)


# Usar em qualquer funcao sync ou async / Use on any sync or async function
@audit.log(event_name="payment.process", kind="success")
def process_payment(payment_id: str, amount: float) -> dict:
    return {"status": "approved"}


@audit.log(event_name="invoice.generate")
async def generate_invoice(order_id: str) -> dict:
    return {"invoice_id": "inv_abc123"}

This documentation reflects the current API behavior. For questions or advanced integrations, contact the support team.