Cookbook¶
Practical recipes for common situations. Each one is a complete, copy-pastable snippet.
Idempotent get-or-create with a unique constraint¶
from dorm.exceptions import IntegrityError
def get_or_create_email(email: str, name: str) -> Author:
obj, _ = Author.objects.get_or_create(
email=email,
defaults={"name": name},
)
return obj
get_or_create runs inside a transaction so two concurrent callers
can't both insert. If the unique constraint isn't on the lookup keys
the safer pattern is to catch IntegrityError and re-get.
Atomic counter increment¶
from dorm import F
# Race-free, single SQL UPDATE.
Post.objects.filter(pk=42).update(views=F("views") + 1)
Never post.views += 1; post.save() — that's read-modify-write and
loses concurrent increments.
Pagination¶
def paginate(qs, page: int, page_size: int = 20):
start = (page - 1) * page_size
return list(qs.order_by("id")[start:start + page_size])
Always pair a slice with order_by(...) — without explicit ordering
the same row can show up on two pages or none.
For large datasets, keyset pagination is far faster than OFFSET:
def page_after(qs, last_id: int, page_size: int = 20):
return list(
qs.filter(id__gt=last_id).order_by("id")[:page_size]
)
Soft delete¶
There's a built-in mixin in dorm.contrib.softdelete:
from dorm.contrib.softdelete import SoftDeleteModel
import dorm
class Post(SoftDeleteModel):
title = dorm.CharField(max_length=200)
# Three managers:
Post.objects # only live rows
Post.all_objects # everything
Post.deleted_objects # only soft-deleted
# Soft delete by default; pass hard=True for a real DELETE:
post.delete() # UPDATE … SET deleted_at = now()
post.delete(hard=True) # DELETE FROM …
post.restore() # clear deleted_at
# Async parity:
await post.adelete()
await post.arestore()
Caveats: on_delete=CASCADE does not cascade through soft
deletes (children remain visible to Post.objects). And database-
level UNIQUE constraints don't know about deleted_at — use a
partial index (UNIQUE … WHERE deleted_at IS NULL) at the schema
level if you need "unique among live rows only".
Audit log via signals¶
from dorm.signals import post_save, post_delete
def write_audit(sender, instance, created, **kwargs):
AuditLog.objects.create(
model=sender.__name__,
pk=instance.pk,
action="created" if created else "updated",
payload=model_to_dict(instance),
)
post_save.connect(write_audit, sender=Article)
post_delete.connect(write_audit, sender=Article)
For high-volume tables prefer the database-level WAL → Kafka pattern rather than Python signals — signals fire in-process and add latency.
Bulk insert with deduplication¶
existing = set(
User.objects.filter(email__in=[u.email for u in batch])
.values_list("email", flat=True)
)
new_users = [u for u in batch if u.email not in existing]
User.objects.bulk_create(new_users, batch_size=500)
Or, on PG only, push the dedup into the database:
from dorm.db.connection import get_connection
get_connection().execute(
"INSERT INTO users (email, name) VALUES %s ON CONFLICT (email) DO NOTHING",
rows,
)
select_for_update lock¶
with transaction.atomic():
account = (
Account.objects.select_for_update()
.get(pk=account_id)
)
account.balance -= amount
account.save(update_fields=["balance"])
SELECT ... FOR UPDATE holds row locks until the transaction
commits — use it whenever you do read-then-write under contention.
Custom manager / queryset method¶
class PublishedQuerySet(QuerySet):
def published(self):
return self.filter(published=True, published_at__lte=Now())
class Article(dorm.Model):
title = dorm.CharField(max_length=200)
published = dorm.BooleanField(default=False)
published_at = dorm.DateTimeField(null=True)
objects = PublishedQuerySet.as_manager()
# Article.objects.published().filter(author=...)
The result is still a QuerySet, so you keep chaining.
Streaming a million rows for an export¶
import csv
with open("authors.csv", "w") as f:
w = csv.writer(f)
w.writerow(["id", "name", "email"])
for a in Author.objects.order_by("id").iterator(chunk_size=5000):
w.writerow([a.id, a.name, a.email])
iterator(chunk_size=N) opens a server-side cursor on PG and
arraysize-streams on SQLite, so memory stays flat.
Multi-tenant per-schema routing¶
class TenantRouter:
def db_for_read(self, model, **hints):
return _current_tenant_alias()
def db_for_write(self, model, **hints):
return _current_tenant_alias()
Set the tenant alias from a middleware (web) or context manager (workers) and route everything through it. Combine with one set of migrations applied per alias.
Per-tenant migration runner (PG, 3.2+)¶
PostgreSQL exposes the same database to many tenants via schemas.
dorm.contrib.tenants ships helpers + CLI flags that bootstrap a
schema and apply every migration against it:
from dorm.contrib.tenants import (
register_tenant,
migrate_tenant,
migrate_all_tenants,
)
# At app startup — register every tenant the app knows about.
for name in ("acme", "globex", "initech"):
register_tenant(name)
# One-off bootstrap of a new tenant:
migrate_tenant("newtenant")
# CI / deploy step — migrate every registered tenant:
results: dict[str, str] = migrate_all_tenants()
for tenant, status in results.items():
print(tenant, status)
CLI equivalents:
The runner pins a single connection inside an atomic() block
before swapping search_path, so the DDL lands on the same
connection the schema switch ran on (the PG pool would otherwise
hand out a fresh connection pointing at public).
migrate_all_tenants() returns {tenant: "ok" | "error: ..."} so
a CI gate can fail loud on any partial failure without aborting the
whole batch mid-run. PG-only — ensure_schema /
migrate_tenant raise NotImplementedError on SQLite / MySQL.
Connection-pool autoscaling¶
import asyncio
from dorm.contrib.pool_autoscale import autoscale_pool, read_pool_stats
async def autoscale_loop() -> None:
while True:
autoscale_pool(
target_utilization=0.7,
min_floor=2,
max_ceiling=20,
step=2,
)
await asyncio.sleep(10)
# Inside FastAPI's startup hook:
@app.on_event("startup")
async def _start_autoscale() -> None:
asyncio.create_task(autoscale_loop())
read_pool_stats() returns a PoolStats dataclass with
utilization, in_use, waiting, etc. for dashboards. PG-only
live resize (psycopg-pool); SQLite / MySQL no-op silently.
Pre-open the connection pool at startup (3.3+)¶
Cold starts pay the connect-and-handshake cost on the first
request. warmup_pool opens up to target connections eagerly so
the first checkout is hot:
import asyncio
from dorm.contrib.pool_autoscale import warmup_pool
@app.on_event("startup")
async def _warmup():
n = warmup_pool(target=4)
print(f"pool warmed up: {n} connection(s) ready")
PostgreSQL only — the helper short-circuits on SQLite / MySQL
(returns 0).
Ad-hoc DDL via SchemaEditor (3.3+)¶
For tooling that needs imperative DDL outside a migration file (REPL exploration, fixture loaders, drift-repair scripts):
import dorm
from dorm.db.connection import get_connection
from dorm.migrations.schema import SchemaEditor
with SchemaEditor(get_connection()) as se:
se.create_model(Article)
se.add_field(Article, "summary", dorm.TextField(null=True))
se.alter_field(Article, "title", dorm.CharField(max_length=400))
The editor delegates to the same migration ops the executor uses,
so the imperative path produces identical SQL to a migration file
running the same operation. Migrations remain the canonical way
to evolve a schema (file-based history, recorder, dry-run, lint);
SchemaEditor exists for the imperative cases that don't fit a
migration cleanly.
Cascading soft delete (3.3+)¶
SoftDeleteModel.delete(cascade=True) walks every reverse-FK
relation whose source model is also a SoftDeleteModel and soft-
deletes each child:
from dorm.contrib.softdelete import SoftDeleteModel
class Article(SoftDeleteModel):
title: str = dorm.CharField(max_length=200)
class Comment(SoftDeleteModel):
article = dorm.ForeignKey(Article, on_delete=dorm.CASCADE, related_name="comments")
body: str = dorm.TextField()
art = Article.objects.first()
art.delete(cascade=True) # soft-deletes art + its Comments
Children whose source model isn't a SoftDeleteModel are skipped
— a hard cascade would contradict the "soft" promise. Wire it
yourself when you need a mixed cascade.
Async migration executor¶
For boot-time migrations in async stacks (FastAPI startup, Lambda handler, edge runtime) where the sync executor would block the event loop:
from dorm.db.connection import get_connection
from dorm.migrations.aexecutor import AsyncMigrationExecutor
async def run_startup_migrations() -> None:
executor = AsyncMigrationExecutor(get_connection())
await executor.amigrate("blog", "blog/migrations")
await executor.amigrate("users", "users/migrations")
The wrapper offloads each call to asyncio.to_thread so
correctness (advisory lock, dry-run capture, recorder semantics)
matches the sync path one-to-one. Use arollback,
amigrate_to, and ashow_migrations for the same async
treatment of the other executor methods.
Read-after-write with a replica¶
# Routing ensures reads go to the replica — but right after a write,
# the replica may not have the row yet. Read from the primary explicitly:
new = Post.objects.using("default").get(pk=new_pk)
Always read your own writes from the primary; rely on the replica for everything older than a few seconds.
Testing fixtures¶
import pytest, dorm
from dorm.db.connection import close_all
from dorm.test import transactional_db, atransactional_db # noqa: F401
@pytest.fixture(scope="session", autouse=True)
def configure_dorm():
dorm.configure(DATABASES={"default": {"ENGINE": "sqlite", "NAME": ":memory:"}})
yield
close_all()
@pytest.fixture
def author():
return Author.objects.create(name="Alice", age=30)
def test_something(transactional_db, author):
Author.objects.create(name="Mallory", age=99)
assert Author.objects.count() == 2
# Both rows are rolled back automatically when the test exits.
transactional_db (sync) and atransactional_db (pytest-asyncio)
wrap each test in an atomic() block that rolls back on exit, so
you avoid the DROP TABLE / CREATE TABLE churn between tests. For
unittest-style suites use dorm.test.DormTestCase as a mixin:
import unittest
from dorm.test import DormTestCase
class AuthorTests(DormTestCase, unittest.TestCase):
def test_create(self):
Author.objects.create(name="Alice", age=30)
# rolled back at tearDown
For PostgreSQL integration tests, use
testcontainers to
spin a throwaway Postgres per session.
Asserting query counts (assertNumQueries, assertMaxQueries) (3.0+)¶
Pin the exact number of queries a code path issues — or the upper
bound — to defend against N+1 regressions. Both the context-manager
and decorator forms work on sync and async def tests:
from dorm.test import (
assertNumQueries,
assertMaxQueries,
assertNumQueriesFactory,
assertMaxQueriesFactory,
)
def test_list_view(transactional_db):
with assertNumQueries(2) as ctx:
list(Article.objects.select_related("author")[:10])
assert ctx.count == 2 # also exposed for richer assertions
@assertMaxQueriesFactory(5)
def test_dashboard(transactional_db):
# No N+1: dashboard must issue ≤ 5 queries.
render_dashboard()
@assertNumQueriesFactory(1)
async def test_async_one_query():
await Article.objects.acount()
Per-task isolation via ContextVar — concurrent
asyncio.gather tasks see independent counters, so two tests
running in parallel under pytest-xdist don't bleed into each
other's assertions.
override_settings + setUpTestData (3.1+)¶
dorm.test.override_settings is both a context manager and a
decorator (sync + async aware). Reverts every mutated key on exit
— including deletion of keys that didn't exist before:
from dorm.test import override_settings
with override_settings(SLOW_QUERY_MS=10):
run_some_queries()
@override_settings(USE_TZ=True, TIME_ZONE="Europe/Madrid")
def test_tz_aware():
...
@override_settings(SLOW_QUERY_MS=42)
async def test_async_path():
...
setUpTestData is a class decorator factory that runs a
cls -> dict callable once at class construction and attaches
every entry as a class attribute — Django parity, without
inheriting from unittest.TestCase:
from dorm.test import setUpTestData
def _data(cls):
return {
"alice": Author.objects.create(name="Alice", age=30),
"bob": Author.objects.create(name="Bob", age=40),
}
@setUpTestData(_data)
class TestAuthor:
def test_alice_age(self):
assert self.alice.age == 30
Tests must roll back any mutation they make to the shared rows (or rebuild via a per-test fixture) — same contract Django enforces.
Switching FileField between local disk and S3 with no code changes¶
FileField reads the storage backend from settings.STORAGES at
every access, so swapping local disk → MinIO → AWS S3 is a config
change, not a code change.
class Document(dorm.Model):
name = dorm.CharField(max_length=100)
attachment = dorm.FileField(upload_to="docs/%Y/%m/", null=True, blank=True)
Local disk — the default if STORAGES is unset, useful for
tests / SQLite / single-machine dev:
STORAGES = {
"default": {
"BACKEND": "dorm.storage.FileSystemStorage",
"OPTIONS": {"location": "/var/app/media", "base_url": "/media/"},
}
}
MinIO — for local-dev parity with production S3 without an AWS
account. Run docker run -d -p 9000:9000 minio/minio server /data
once and:
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": "dev-uploads",
"endpoint_url": "http://localhost:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin",
"region_name": "us-east-1",
"signature_version": "s3v4",
"addressing_style": "path",
},
}
}
AWS S3 in production — boto3 picks up credentials from the
ambient chain (IAM role on EC2/ECS/Lambda, env vars, ~/.aws/):
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": "myapp-prod-uploads",
"region_name": "eu-west-1",
"default_acl": "private",
"querystring_auth": True,
"querystring_expire": 3600,
},
}
}
The application code — doc.attachment = ContentFile(...),
doc.save(), doc.attachment.url, doc.attachment.delete() — is
identical in all three configurations. Drive the choice from the
environment so the same image deploys everywhere:
# settings.py
import os
if os.environ.get("DORM_STORAGE") == "s3":
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": os.environ["S3_BUCKET"],
"region_name": os.environ.get("AWS_REGION", "us-east-1"),
"default_acl": "private",
},
}
}
else:
STORAGES = {
"default": {
"BACKEND": "dorm.storage.FileSystemStorage",
"OPTIONS": {"location": os.environ.get("MEDIA_ROOT", "media")},
}
}
Common gotcha: when switching to S3, don't expect existing rows to
migrate silently — the names are stored verbatim, so a row pointing
at docs/2026/04/q1.pdf on local disk will look up the same key on
S3. Either back-fill the bucket from the local volume before the
cutover, or write a one-off RunPython migration that re-uploads
each file through the new storage.
OpenTelemetry instrumentation¶
dorm.contrib.otel hooks the pre_query / post_query signals so
every ORM query becomes an OTel span without per-call-site changes.
Setup¶
Install the OTel API (and an SDK if you want to ship spans anywhere):
pip install opentelemetry-api opentelemetry-sdk
# Optional exporters:
pip install opentelemetry-exporter-otlp # → OTLP collector / Jaeger / Honeycomb
pip install opentelemetry-exporter-jaeger # → Jaeger directly
In your app startup:
from dorm.contrib.otel import instrument
instrument() # default tracer name "dorm"
# instrument(tracer_name="myapp.dorm") # custom tracer
That's it — every query the ORM runs now produces a span.
Span attributes¶
Each span carries:
| Attribute | Value |
|---|---|
| span name | db.<vendor> (db.postgresql / db.sqlite) |
db.system |
"postgresql" or "sqlite" |
db.statement |
the SQL text, truncated to 1024 chars |
db.dorm.elapsed_ms |
wall-clock duration (set on post_query, before end()) |
db.dorm.error |
exception class name (only on failure) |
| span status | ERROR on failure, default UNSET on success |
The 1KB statement truncation keeps massive bulk_create SQL out of
your traces; if you need the full SQL, log it separately via
dorm.queries.
Wiring an exporter (Jaeger via OTLP)¶
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider(resource=Resource({SERVICE_NAME: "myapp"}))
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
)
trace.set_tracer_provider(provider)
# Now wire dorm into it:
from dorm.contrib.otel import instrument
instrument()
instrument() is idempotent — calling it twice replaces the
previous wiring instead of producing double spans, so it's safe to
call again on hot reload.
Disconnecting¶
from dorm.contrib.otel import uninstrument
uninstrument() # detaches the receivers; future queries are not traced
Useful in test teardown or in libraries that want to opt out for a specific code path.
Caveats¶
- Optional dependency.
instrument()raises a clearImportErrorifopentelemetry-apiisn't installed. Production deploys without the package keep working — just no spans. - Spans are produced synchronously on the calling thread. No dedicated background pump. The BatchSpanProcessor (typical SDK config) absorbs the export latency, so the caller sees nanosecond overhead per span.
- Sensitive params land in
db.statement. The SQL is the literal query text (%splaceholders, params separate). The OTel attribute does NOT include the bound parameters — those go throughdorm.db.utils._mask_paramsin DEBUG logs only. If you also want the params on the span, write a custom receiver instead of callinginstrument().
PostgreSQL pub/sub via LISTEN / NOTIFY¶
PostgreSQL's NOTIFY / LISTEN give you pub-sub on the database
itself — no Redis required for fan-out workloads at modest scale.
The dorm async wrapper exposes both ends.
Publishing¶
from dorm import transaction
from dorm.db.connection import get_async_connection
conn = get_async_connection()
async def create_order(payload):
async with transaction.aatomic():
order = await Order.objects.acreate(**payload)
await conn.notify("orders.created", str(order.pk))
# NOTIFY is delivered AFTER COMMIT — subscribers never see
# work that ends up rolled back.
channel is validated as a SQL identifier ([A-Za-z_][A-Za-z0-9_]*,
≤ 63 chars), so a channel name from user input can't smuggle SQL.
The payload is bound as a parameter, so no escaping needed.
Payload limits. PostgreSQL's NOTIFY payload is capped at
8000 bytes by default (compile-time NOTIFY_PAYLOAD_MAX). Don't
serialise the whole row — pass a pk and let the listener fetch.
Subscribing¶
async def consume_orders():
conn = get_async_connection()
async for msg in conn.listen("orders.created"):
order = await Order.objects.aget(pk=int(msg.payload))
await dispatch(order)
msg is a psycopg.Notify with channel, payload, and pid
attributes. The async iterator never returns by itself — break out
when you want to stop:
async for msg in conn.listen("orders.created"):
if shutdown_event.is_set():
break
await dispatch(msg)
Connection ownership¶
listen() opens its own dedicated psycopg connection (LISTEN
registers a session-scoped notification handler — the connection
must outlive the subscription). It does NOT pull from the pool, so
a long-lived listener doesn't tie up a pool slot.
When you break out of the iterator, the connection is closed in a
finally block. If your worker dies hard, the listener connection
gets reaped by PG's idle-connection timeout.
Reconnection / retries¶
The current listen() does not auto-reconnect on connection loss.
Wrap it in your own retry loop if you need that:
async def reliable_listener(channel: str, handler):
while True:
try:
async for msg in get_async_connection().listen(channel):
await handler(msg)
except (psycopg.OperationalError, psycopg.InterfaceError) as exc:
logger.warning("listener disconnected: %s — reconnecting", exc)
await asyncio.sleep(1.0)
Keep in mind: notifications fired while disconnected are lost. PG doesn't queue them. For at-least-once semantics, pair NOTIFY with an outbox table you poll on reconnect.
Common patterns¶
Cache invalidation across replicas:
async with transaction.aatomic():
await User.objects.filter(pk=user_id).aupdate(**changes)
await conn.notify("user.invalidate", str(user_id))
# Every node listening drops its local cache for that user.
Worker queue (one-of-N consumers):
NOTIFY broadcasts to all listeners. For exclusive-consumer
patterns, use a row-based queue with select_for_update(skip_locked=True)
and use NOTIFY just as a wakeup signal:
# Producer
async with transaction.aatomic():
job = await Job.objects.acreate(payload=payload, status="pending")
await conn.notify("jobs.wakeup", "")
# Consumer
async for _ in conn.listen("jobs.wakeup"):
while True:
async with transaction.aatomic():
job = await (
Job.objects.filter(status="pending")
.select_for_update(skip_locked=True)
.afirst()
)
if job is None:
break
await job.run()
await job.adelete()