Skip to content

dorm.contrib.outbox

Transactional outbox pattern.

See Outbox pattern for recipes.

API

dorm.contrib.outbox.OutboxEvent

Bases: Model

Minimal abstract model for the outbox table.

Subclass with a concrete Meta.db_table and run makemigrations to materialise. Override the field definitions if you need different types — the OutboxRelay only depends on the column names (id, event_type, payload, status, created_at).

dorm.contrib.outbox.OutboxRelay

Polling worker that drains an outbox table.

Each iteration:

  1. Opens a transaction.
  2. SELECT ... FOR UPDATE SKIP LOCKED of up to batch_size pending rows. Other relay workers running against the same outbox naturally pick up disjoint rows — horizontal scaling is free.
  3. Calls handler(row) for each. The handler returns a truthy value on success, falsy on failure. Any raised exception is treated as a failure.
  4. Successful rows are marked status='published' with published_at = now(). Failures bump attempts and store last_error for inspection.
  5. After max_attempts failures, the row is moved to status='dead' so it stops blocking the queue. Operators drain the dead-letter pile manually.

The relay is a plain blocking loop. Run it in its own process (python -m worker) or under supervisor/systemd. Async users can call :meth:arun instead.

stop() -> None

Request the relay loop to exit after the current batch.

drain_once(handler: Callable[[Any], Any]) -> int

Run a single batch. Returns the number of successfully published events. Useful to drive the relay from a test or from an external scheduler instead of the blocking loop.

Concurrency model: SELECT ... FOR UPDATE SKIP LOCKED on PostgreSQL — multiple relays naturally pick disjoint rows. On backends without SKIP LOCKED support (SQLite, MySQL < 8.0) the helper falls back to a plain SELECT inside the same transaction; running multiple relay processes against those backends will sometimes process the same event twice — make the handler idempotent.

run(handler: Callable[[Any], Any]) -> None

Block forever, calling handler for each pending row.

Installs a SIGTERM / SIGINT handler that calls self.stop so the loop exits cleanly between batches; partial work is already committed by atomic() and won't be lost. The handler returns True to mark a row published or False to retry it later.

adrain_once(handler: Callable[[Any], Any]) -> int async

Async equivalent of :meth:drain_once. Drains a single batch via the async ORM path; await the handler when it is a coroutine function.

arun(handler: Callable[[Any], Any]) -> None async

Async equivalent of :meth:run. Loops via :meth:adrain_once, sleeping between empty batches.

Stop with self.stop() from another task — there is no SIGTERM hook here because async stacks typically register their own; mirroring the sync helper would surprise FastAPI / Litestar / aiohttp users that already wired a graceful- shutdown lifespan.

dorm.contrib.outbox.record_event(model: type[OutboxEvent], event_type: str, payload: dict[str, Any] | None = None, *, using: str | None = None) -> OutboxEvent

Insert an outbox row inside the active transaction.

Calling without an active atomic() works but defeats the pattern — the whole point is to share a transaction with the business write. The helper logs a warning in that case.

dorm.contrib.outbox.arecord_event(model: type[OutboxEvent], event_type: str, payload: dict[str, Any] | None = None, *, using: str | None = None) -> OutboxEvent async

Async counterpart of :func:record_event.

Mirrors the sync version's tx-detection: logs a warning when called outside an active aatomic() block so the dual-write invariant isn't silently broken.

dorm.contrib.outbox.serialize_payload(payload: Any) -> str

Helper: deterministic JSON encoding for outbox payloads.

Sort keys + UTF-8 default + tuple-as-list make the output stable across Python versions, which matters when an idempotent consumer dedupes by hash.