Pular para o conteúdo principal
ImmutableLog logo
VoltarPython

Python

Guia completo de integração com o ImmutableLog em Python. Escolha a abordagem que melhor se encaixa no seu projeto: middleware automático para frameworks web, integração direta com a biblioteca `requests`, ou decorators para instrumentar funções e endpoints específicos.

Integração com requests

Use a biblioteca `requests` do Python para enviar eventos diretamente ao ImmutableLog sem depender de um framework web. Ideal para workers, scripts de migração, jobs assíncronos, CLIs e qualquer código Python que precise registrar eventos no ledger.

Função helper send_event

Crie uma função utilitária reutilizável para encapsular a lógica de envio. Ela serializa o payload, gera os headers obrigatórios e faz o POST ao endpoint de ingestão.

python
import json
import logging
import os
import uuid
from datetime import datetime, timezone

import requests

logger = logging.getLogger(__name__)

IMTBL_API_KEY = os.environ.get("IMTBL_API_KEY", "")
IMTBL_URL = os.environ.get("IMTBL_URL", "https://api.immutablelog.com")


def send_event(
    event_name: str,
    payload: dict,
    kind: str = "info",
    service: str = "python-service",
    env: str = "production",
) -> dict:
    """Envia um evento ao ImmutableLog e retorna a resposta da API."""
    request_id = str(uuid.uuid4())
    payload_str = json.dumps(
        {**payload, "timestamp": datetime.now(timezone.utc).isoformat()},
        ensure_ascii=False,
    )

    event = {
        "payload": payload_str,
        "meta": {
            "type": kind,
            "event_name": event_name,
            "service": service,
            "request_id": request_id,
            "env": env,
        },
    }

    headers = {
        "Authorization": f"Bearer {IMTBL_API_KEY}",
        "Content-Type": "application/json",
        "Idempotency-Key": f"{event_name}-{request_id}",
        "Request-Id": request_id,
    }

    response = requests.post(
        f"{IMTBL_URL}/v1/events",
        json=event,
        headers=headers,
        timeout=5,
    )
    response.raise_for_status()
    return response.json()


# Uso / Usage
send_event(
    event_name="payment.approved",
    payload={
        "payment_id": "pay_abc123",
        "amount": 299.90,
        "currency": "BRL",
        "customer_id": "cust_42",
    },
    kind="success",
    service="payments-service",
)

Retry com backoff exponencial

Para ambientes de produção onde a disponibilidade é crítica, adicione retry automático com backoff exponencial. O ImmutableLog retorna `202` em sucesso e `429` quando o limite mensal é atingido — não faça retry em 429.

python
import time

def send_event_with_retry(
    event_name: str,
    payload: dict,
    kind: str = "info",
    service: str = "python-service",
    max_retries: int = 3,
    base_delay: float = 0.5,
) -> dict | None:
    """Envia evento com retry e backoff exponencial."""
    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{IMTBL_URL}/v1/events",
                json=_build_event(event_name, payload, kind, service),
                headers=_build_headers(event_name),
                timeout=5,
            )

            # 429 = limite mensal — nao fazer retry
            # 429 = monthly limit — do not retry
            if response.status_code == 429:
                logger.warning("ImmutableLog monthly limit reached: %s", response.json())
                return None

            response.raise_for_status()
            return response.json()

        except requests.RequestException as exc:
            if attempt == max_retries - 1:
                logger.error("ImmutableLog send failed after %d retries: %s", max_retries, exc)
                return None

            delay = base_delay * (2 ** attempt)  # 0.5s, 1s, 2s
            logger.warning("ImmutableLog retry %d/%d in %.1fs: %s", attempt + 1, max_retries, delay, exc)
            time.sleep(delay)

    return None

Envio em lote (batch)

Em workers de alta frequência ou jobs de processamento, acumule eventos em memória e envie em lote usando threads para não bloquear o fluxo principal. Cada evento ainda é enviado individualmente ao endpoint — use o pool de threads para paralelizar.

python
from concurrent.futures import ThreadPoolExecutor, as_completed

def send_events_batch(
    events: list[dict],
    max_workers: int = 5,
) -> list[dict]:
    """
    Envia uma lista de eventos em paralelo usando thread pool.
    Cada item de 'events' deve ter: event_name, payload, kind, service.
    """
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                send_event,
                ev["event_name"],
                ev["payload"],
                ev.get("kind", "info"),
                ev.get("service", "python-service"),
            ): ev
            for ev in events
        }

        for future in as_completed(futures):
            ev = futures[future]
            try:
                result = future.result()
                results.append({"event": ev["event_name"], "tx_id": result.get("tx_id")})
            except Exception as exc:
                logger.warning("Batch send failed for %s: %s", ev["event_name"], exc)

    return results


# Uso / Usage
events = [
    {"event_name": "order.created",  "payload": {"order_id": "ord_1"}, "kind": "success"},
    {"event_name": "order.created",  "payload": {"order_id": "ord_2"}, "kind": "success"},
    {"event_name": "payment.failed", "payload": {"order_id": "ord_3"}, "kind": "error"},
]

results = send_events_batch(events, max_workers=3)
print(results)
# [{"event": "order.created", "tx_id": "..."}, ...]

Decorators

Decorators permitem instrumentar funções específicas sem modificar sua lógica interna. Ideal para marcar operações críticas de negócio (ex: processar pagamento, criar usuário, enviar email) onde você quer garantir rastreabilidade independente do framework usado.

Decorator síncrono

Captura início, fim e exceções da função decorada. Em caso de erro, inclui o nome e mensagem da exceção no payload. O evento é enviado no bloco `finally` para garantir o registro mesmo quando a função lança uma exceção.

python
import functools
import json
import time
import uuid
from datetime import datetime, timezone

import requests


def audit_log(
    event_name: str | None = None,
    kind: str = "info",
    service: str = "python-service",
):
    """Decorator para registrar chamadas de funcao no ImmutableLog."""

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            started_at = time.time()
            exc_captured = None

            try:
                return func(*args, **kwargs)
            except Exception as exc:
                exc_captured = exc
                raise
            finally:
                latency_ms = int((time.time() - started_at) * 1000)
                name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
                actual_kind = "error" if exc_captured else kind

                payload = {
                    "id": str(uuid.uuid4()),
                    "kind": actual_kind,
                    "message": (
                        f"{name} failed: {type(exc_captured).__name__}"
                        if exc_captured
                        else f"{name} executed"
                    ),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "metrics": {"latency_ms": latency_ms},
                    "function": {"module": func.__module__, "name": func.__qualname__},
                }

                if exc_captured:
                    payload["error"] = {
                        "exception": type(exc_captured).__name__,
                        "exception_message": str(exc_captured),
                        "retryable": False,
                    }

                try:
                    send_event(name, payload, actual_kind, service)
                except Exception:
                    pass  # nunca propagar falha de auditoria / never propagate audit failure

        return wrapper
    return decorator


# Uso / Usage
@audit_log(event_name="payment.process", kind="success", service="payments-service")
def process_payment(payment_id: str, amount: float) -> dict:
    # ... logica de negocio / business logic ...
    return {"status": "approved", "payment_id": payment_id}


@audit_log(service="user-service")
def delete_user(user_id: int) -> None:
    # event_name gerado automaticamente: "fn.mymodule.delete_user"
    # event_name auto-generated: "fn.mymodule.delete_user"
    ...

Decorator assíncrono

Versão assíncrona do decorator usando `asyncio`. O envio ao ImmutableLog é feito em uma thread separada via `asyncio.get_event_loop().run_in_executor()` para não bloquear o event loop da aplicação.

python
import asyncio
import functools
import time
import uuid
from datetime import datetime, timezone


def audit_log_async(
    event_name: str | None = None,
    kind: str = "info",
    service: str = "python-service",
):
    """Decorator para funcoes async — envia evento sem bloquear o event loop."""

    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            started_at = time.time()
            exc_captured = None

            try:
                return await func(*args, **kwargs)
            except Exception as exc:
                exc_captured = exc
                raise
            finally:
                latency_ms = int((time.time() - started_at) * 1000)
                name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
                actual_kind = "error" if exc_captured else kind

                payload = {
                    "id": str(uuid.uuid4()),
                    "kind": actual_kind,
                    "message": (
                        f"{name} failed: {type(exc_captured).__name__}"
                        if exc_captured
                        else f"{name} executed"
                    ),
                    "timestamp": datetime.now(timezone.utc).isoformat(),
                    "metrics": {"latency_ms": latency_ms},
                }

                if exc_captured:
                    payload["error"] = {
                        "exception": type(exc_captured).__name__,
                        "exception_message": str(exc_captured),
                    }

                # Executa o envio em thread separada para nao bloquear o event loop
                # Runs sending in a separate thread to not block the event loop
                loop = asyncio.get_event_loop()
                loop.run_in_executor(
                    None,
                    lambda: send_event(name, payload, actual_kind, service),
                )

        return wrapper
    return decorator


# Uso / Usage
@audit_log_async(event_name="invoice.generate", kind="success", service="billing-service")
async def generate_invoice(order_id: str) -> dict:
    # ... logica async / async logic ...
    return {"invoice_id": "inv_abc123"}

Decorator com classe (configurável)

Para projetos maiores, use um decorator baseado em classe para centralizar a configuração (api_key, url, service) e reaproveitar a instância. Registre uma vez na inicialização da aplicação e use `@audit.log()` em qualquer função.

python
import functools
import os
import time
import uuid
from datetime import datetime, timezone

import requests as req_lib


class ImmutableLogAudit:
    """Cliente configuravel para auditoria com ImmutableLog."""

    def __init__(
        self,
        api_key: str = "",
        api_url: str = "https://api.immutablelog.com",
        service: str = "python-service",
        env: str = "production",
    ):
        self.api_key = api_key or os.environ.get("IMTBL_API_KEY", "")
        self.api_url = api_url
        self.service = service
        self.env = env

    def log(
        self,
        event_name: str | None = None,
        kind: str = "info",
    ):
        """Decorator para instrumentar qualquer funcao (sync ou async)."""
        def decorator(func):
            is_async = asyncio.iscoroutinefunction(func)

            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                return await self._run_async(func, args, kwargs, event_name, kind)

            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                return self._run_sync(func, args, kwargs, event_name, kind)

            return async_wrapper if is_async else sync_wrapper
        return decorator

    def _run_sync(self, func, args, kwargs, event_name, kind):
        started_at = time.time()
        exc_captured = None
        try:
            return func(*args, **kwargs)
        except Exception as exc:
            exc_captured = exc
            raise
        finally:
            self._emit(func, started_at, exc_captured, event_name, kind)

    async def _run_async(self, func, args, kwargs, event_name, kind):
        started_at = time.time()
        exc_captured = None
        try:
            return await func(*args, **kwargs)
        except Exception as exc:
            exc_captured = exc
            raise
        finally:
            loop = asyncio.get_event_loop()
            loop.run_in_executor(
                None, lambda: self._emit(func, started_at, exc_captured, event_name, kind)
            )

    def _emit(self, func, started_at, exc, event_name, kind):
        try:
            name = event_name or f"fn.{func.__module__}.{func.__qualname__}"
            actual_kind = "error" if exc else kind
            latency_ms = int((time.time() - started_at) * 1000)
            request_id = str(uuid.uuid4())

            payload = {
                "id": request_id,
                "kind": actual_kind,
                "message": f"{name} failed: {type(exc).__name__}" if exc else f"{name} executed",
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "metrics": {"latency_ms": latency_ms},
            }
            if exc:
                payload["error"] = {"exception": type(exc).__name__, "exception_message": str(exc)}

            event = {
                "payload": json.dumps(payload, ensure_ascii=False),
                "meta": {"type": actual_kind, "event_name": name, "service": self.service, "env": self.env},
            }
            req_lib.post(
                f"{self.api_url}/v1/events",
                json=event,
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json",
                    "Idempotency-Key": f"{name}-{request_id}",
                    "Request-Id": request_id,
                },
                timeout=5,
            )
        except Exception:
            pass


# Inicializar uma vez na aplicacao / Initialize once in the application
audit = ImmutableLogAudit(
    api_key=os.environ["IMTBL_API_KEY"],
    service="payments-service",
    env="production",
)


# Usar em qualquer funcao sync ou async / Use on any sync or async function
@audit.log(event_name="payment.process", kind="success")
def process_payment(payment_id: str, amount: float) -> dict:
    return {"status": "approved"}


@audit.log(event_name="invoice.generate")
async def generate_invoice(order_id: str) -> dict:
    return {"invoice_id": "inv_abc123"}

Esta documentação reflete o comportamento atual da API. Para dúvidas ou integrações avançadas, entre em contato com o time de suporte.