dorm.contrib.outbox¶
Transactional outbox pattern.
See Outbox pattern for recipes.
API¶
dorm.contrib.outbox.OutboxEvent
¶
Bases: Model
Minimal abstract model for the outbox table.
Subclass with a concrete Meta.db_table and run makemigrations
to materialise. Override the field definitions if you need different
types — the OutboxRelay only depends on the column names
(id, event_type, payload, status, created_at).
dorm.contrib.outbox.OutboxRelay
¶
Polling worker that drains an outbox table.
Each iteration:
- Opens a transaction.
SELECT ... FOR UPDATE SKIP LOCKEDof up tobatch_sizepending rows. Other relay workers running against the same outbox naturally pick up disjoint rows — horizontal scaling is free.- Calls handler(row) for each. The handler returns a truthy value on success, falsy on failure. Any raised exception is treated as a failure.
- Successful rows are marked
status='published'withpublished_at = now(). Failures bumpattemptsand storelast_errorfor inspection. - After
max_attemptsfailures, the row is moved tostatus='dead'so it stops blocking the queue. Operators drain the dead-letter pile manually.
The relay is a plain blocking loop. Run it in its own process
(python -m worker) or under supervisor/systemd. Async users
can call :meth:arun instead.
stop() -> None
¶
Request the relay loop to exit after the current batch.
drain_once(handler: Callable[[Any], Any]) -> int
¶
Run a single batch. Returns the number of successfully published events. Useful to drive the relay from a test or from an external scheduler instead of the blocking loop.
Concurrency model: SELECT ... FOR UPDATE SKIP LOCKED on
PostgreSQL — multiple relays naturally pick disjoint rows.
On backends without SKIP LOCKED support (SQLite, MySQL <
8.0) the helper falls back to a plain SELECT inside the same
transaction; running multiple relay processes against those
backends will sometimes process the same event twice — make
the handler idempotent.
run(handler: Callable[[Any], Any]) -> None
¶
Block forever, calling handler for each pending row.
Installs a SIGTERM / SIGINT handler that calls self.stop
so the loop exits cleanly between batches; partial work is
already committed by atomic() and won't be lost. The
handler returns True to mark a row published or False
to retry it later.
adrain_once(handler: Callable[[Any], Any]) -> int
async
¶
Async equivalent of :meth:drain_once. Drains a single
batch via the async ORM path; await the handler when it is
a coroutine function.
arun(handler: Callable[[Any], Any]) -> None
async
¶
Async equivalent of :meth:run. Loops via
:meth:adrain_once, sleeping between empty batches.
Stop with self.stop() from another task — there is no
SIGTERM hook here because async stacks typically register
their own; mirroring the sync helper would surprise FastAPI
/ Litestar / aiohttp users that already wired a graceful-
shutdown lifespan.
dorm.contrib.outbox.record_event(model: type[OutboxEvent], event_type: str, payload: dict[str, Any] | None = None, *, using: str | None = None) -> OutboxEvent
¶
Insert an outbox row inside the active transaction.
Calling without an active atomic() works but defeats the
pattern — the whole point is to share a transaction with the
business write. The helper logs a warning in that case.
dorm.contrib.outbox.arecord_event(model: type[OutboxEvent], event_type: str, payload: dict[str, Any] | None = None, *, using: str | None = None) -> OutboxEvent
async
¶
Async counterpart of :func:record_event.
Mirrors the sync version's tx-detection: logs a warning when
called outside an active aatomic() block so the dual-write
invariant isn't silently broken.
dorm.contrib.outbox.serialize_payload(payload: Any) -> str
¶
Helper: deterministic JSON encoding for outbox payloads.
Sort keys + UTF-8 default + tuple-as-list make the output stable across Python versions, which matters when an idempotent consumer dedupes by hash.