Saltar a contenido

dorm.contrib.outbox_cdc

Publicadores acoplados a broker para OutboxRelay. Handlers drop-in que reenvían eventos outbox a Kafka, NATS, Redis Streams o un logger.

API

dorm.contrib.outbox_cdc.LoggingPublisher

Bases: _BasePublisher

Drop-in publisher that just writes the encoded row to a logger. Useful for end-to-end tests + local dev workflows that don't want to spin up a real broker.

The handler always returns True so the relay marks the row as published — the assumption is "if logging fails, you have bigger problems than a stuck outbox row".

dorm.contrib.outbox_cdc.KafkaPublisher

Bases: _BasePublisher

Sync Kafka publisher backed by :mod:kafka-python.

Parameters:

Name Type Description Default
bootstrap_servers Any

comma-separated host:port list or list of tuples — passed straight to KafkaProducer.

required
topic_resolver Callable[[Any], str] | None

optional row-to-topic callable. Defaults to using row.event_type as the topic.

None
producer_kwargs dict[str, Any] | None

extra kwargs forwarded to KafkaProducer (e.g. acks="all" for stronger durability).

None

close() -> None

Close the underlying producer's sockets. Call before process exit to flush any buffered records.

dorm.contrib.outbox_cdc.NatsPublisher

Bases: _BasePublisher

Async NATS publisher backed by :mod:nats-py.

Connection is lazy + reused across calls — the first __call__ establishes the connection. Use :meth:aclose from the saga's teardown to release the socket.

Designed to plug into OutboxRelay.arun().

dorm.contrib.outbox_cdc.RedisStreamPublisher

Bases: _BasePublisher

Sync Redis Streams publisher backed by :mod:redis.

Each event becomes one entry on a stream whose key is the resolved topic. Use a single-shard stream for strict ordering or stream-per-tenant for parallelism.

__init__(client: Any, *, topic_resolver: Callable[[Any], str] | None = None, max_len: int | None = None) -> None

client is a connected redis.Redis instance.

max_len (optional) caps the stream length using MAXLEN ~ — the approximate trim that Redis can do without scanning the whole stream. Pass it to enforce retention without an external cron.