Skip to content

Cookbook

Practical recipes for common situations. Each one is a complete, copy-pastable snippet.

Idempotent get-or-create with a unique constraint

from dorm.exceptions import IntegrityError

def get_or_create_email(email: str, name: str) -> Author:
    obj, _ = Author.objects.get_or_create(
        email=email,
        defaults={"name": name},
    )
    return obj

get_or_create runs inside a transaction so two concurrent callers can't both insert. If the unique constraint isn't on the lookup keys the safer pattern is to catch IntegrityError and re-get.

Atomic counter increment

from dorm import F

# Race-free, single SQL UPDATE.
Post.objects.filter(pk=42).update(views=F("views") + 1)

Never post.views += 1; post.save() — that's read-modify-write and loses concurrent increments.

Pagination

def paginate(qs, page: int, page_size: int = 20):
    start = (page - 1) * page_size
    return list(qs.order_by("id")[start:start + page_size])

Always pair a slice with order_by(...) — without explicit ordering the same row can show up on two pages or none.

For large datasets, keyset pagination is far faster than OFFSET:

def page_after(qs, last_id: int, page_size: int = 20):
    return list(
        qs.filter(id__gt=last_id).order_by("id")[:page_size]
    )

Soft delete

There's a built-in mixin in dorm.contrib.softdelete:

from dorm.contrib.softdelete import SoftDeleteModel
import dorm

class Post(SoftDeleteModel):
    title = dorm.CharField(max_length=200)

# Three managers:
Post.objects                          # only live rows
Post.all_objects                      # everything
Post.deleted_objects                  # only soft-deleted

# Soft delete by default; pass hard=True for a real DELETE:
post.delete()                         # UPDATE … SET deleted_at = now()
post.delete(hard=True)                # DELETE FROM …
post.restore()                        # clear deleted_at

# Async parity:
await post.adelete()
await post.arestore()

Caveats: on_delete=CASCADE does not cascade through soft deletes (children remain visible to Post.objects). And database- level UNIQUE constraints don't know about deleted_at — use a partial index (UNIQUE … WHERE deleted_at IS NULL) at the schema level if you need "unique among live rows only".

Audit log via signals

from dorm.signals import post_save, post_delete

def write_audit(sender, instance, created, **kwargs):
    AuditLog.objects.create(
        model=sender.__name__,
        pk=instance.pk,
        action="created" if created else "updated",
        payload=model_to_dict(instance),
    )

post_save.connect(write_audit, sender=Article)
post_delete.connect(write_audit, sender=Article)

For high-volume tables prefer the database-level WAL → Kafka pattern rather than Python signals — signals fire in-process and add latency.

Bulk insert with deduplication

existing = set(
    User.objects.filter(email__in=[u.email for u in batch])
                .values_list("email", flat=True)
)
new_users = [u for u in batch if u.email not in existing]
User.objects.bulk_create(new_users, batch_size=500)

Or, on PG only, push the dedup into the database:

from dorm.db.connection import get_connection
get_connection().execute(
    "INSERT INTO users (email, name) VALUES %s ON CONFLICT (email) DO NOTHING",
    rows,
)

select_for_update lock

with transaction.atomic():
    account = (
        Account.objects.select_for_update()
                       .get(pk=account_id)
    )
    account.balance -= amount
    account.save(update_fields=["balance"])

SELECT ... FOR UPDATE holds row locks until the transaction commits — use it whenever you do read-then-write under contention.

Custom manager / queryset method

class PublishedQuerySet(QuerySet):
    def published(self):
        return self.filter(published=True, published_at__lte=Now())

class Article(dorm.Model):
    title = dorm.CharField(max_length=200)
    published = dorm.BooleanField(default=False)
    published_at = dorm.DateTimeField(null=True)

    objects = PublishedQuerySet.as_manager()

# Article.objects.published().filter(author=...)

The result is still a QuerySet, so you keep chaining.

Streaming a million rows for an export

import csv

with open("authors.csv", "w") as f:
    w = csv.writer(f)
    w.writerow(["id", "name", "email"])
    for a in Author.objects.order_by("id").iterator(chunk_size=5000):
        w.writerow([a.id, a.name, a.email])

iterator(chunk_size=N) opens a server-side cursor on PG and arraysize-streams on SQLite, so memory stays flat.

Multi-tenant per-schema routing

class TenantRouter:
    def db_for_read(self, model, **hints):
        return _current_tenant_alias()
    def db_for_write(self, model, **hints):
        return _current_tenant_alias()

Set the tenant alias from a middleware (web) or context manager (workers) and route everything through it. Combine with one set of migrations applied per alias.

Per-tenant migration runner (PG, 3.2+)

PostgreSQL exposes the same database to many tenants via schemas. dorm.contrib.tenants ships helpers + CLI flags that bootstrap a schema and apply every migration against it:

from dorm.contrib.tenants import (
    register_tenant,
    migrate_tenant,
    migrate_all_tenants,
)

# At app startup — register every tenant the app knows about.
for name in ("acme", "globex", "initech"):
    register_tenant(name)

# One-off bootstrap of a new tenant:
migrate_tenant("newtenant")

# CI / deploy step — migrate every registered tenant:
results: dict[str, str] = migrate_all_tenants()
for tenant, status in results.items():
    print(tenant, status)

CLI equivalents:

dorm migrate --tenant acme
dorm migrate --all-tenants

The runner pins a single connection inside an atomic() block before swapping search_path, so the DDL lands on the same connection the schema switch ran on (the PG pool would otherwise hand out a fresh connection pointing at public).

migrate_all_tenants() returns {tenant: "ok" | "error: ..."} so a CI gate can fail loud on any partial failure without aborting the whole batch mid-run. PG-only — ensure_schema / migrate_tenant raise NotImplementedError on SQLite / MySQL.

Connection-pool autoscaling

import asyncio
from dorm.contrib.pool_autoscale import autoscale_pool, read_pool_stats


async def autoscale_loop() -> None:
    while True:
        autoscale_pool(
            target_utilization=0.7,
            min_floor=2,
            max_ceiling=20,
            step=2,
        )
        await asyncio.sleep(10)


# Inside FastAPI's startup hook:
@app.on_event("startup")
async def _start_autoscale() -> None:
    asyncio.create_task(autoscale_loop())

read_pool_stats() returns a PoolStats dataclass with utilization, in_use, waiting, etc. for dashboards. PG-only live resize (psycopg-pool); SQLite / MySQL no-op silently.

Pre-open the connection pool at startup (3.3+)

Cold starts pay the connect-and-handshake cost on the first request. warmup_pool opens up to target connections eagerly so the first checkout is hot:

import asyncio
from dorm.contrib.pool_autoscale import warmup_pool


@app.on_event("startup")
async def _warmup():
    n = warmup_pool(target=4)
    print(f"pool warmed up: {n} connection(s) ready")

PostgreSQL only — the helper short-circuits on SQLite / MySQL (returns 0).

Ad-hoc DDL via SchemaEditor (3.3+)

For tooling that needs imperative DDL outside a migration file (REPL exploration, fixture loaders, drift-repair scripts):

import dorm
from dorm.db.connection import get_connection
from dorm.migrations.schema import SchemaEditor


with SchemaEditor(get_connection()) as se:
    se.create_model(Article)
    se.add_field(Article, "summary", dorm.TextField(null=True))
    se.alter_field(Article, "title", dorm.CharField(max_length=400))

The editor delegates to the same migration ops the executor uses, so the imperative path produces identical SQL to a migration file running the same operation. Migrations remain the canonical way to evolve a schema (file-based history, recorder, dry-run, lint); SchemaEditor exists for the imperative cases that don't fit a migration cleanly.

Cascading soft delete (3.3+)

SoftDeleteModel.delete(cascade=True) walks every reverse-FK relation whose source model is also a SoftDeleteModel and soft- deletes each child:

from dorm.contrib.softdelete import SoftDeleteModel


class Article(SoftDeleteModel):
    title: str = dorm.CharField(max_length=200)


class Comment(SoftDeleteModel):
    article = dorm.ForeignKey(Article, on_delete=dorm.CASCADE, related_name="comments")
    body: str = dorm.TextField()


art = Article.objects.first()
art.delete(cascade=True)        # soft-deletes art + its Comments

Children whose source model isn't a SoftDeleteModel are skipped — a hard cascade would contradict the "soft" promise. Wire it yourself when you need a mixed cascade.

Async migration executor

For boot-time migrations in async stacks (FastAPI startup, Lambda handler, edge runtime) where the sync executor would block the event loop:

from dorm.db.connection import get_connection
from dorm.migrations.aexecutor import AsyncMigrationExecutor


async def run_startup_migrations() -> None:
    executor = AsyncMigrationExecutor(get_connection())
    await executor.amigrate("blog", "blog/migrations")
    await executor.amigrate("users", "users/migrations")

The wrapper offloads each call to asyncio.to_thread so correctness (advisory lock, dry-run capture, recorder semantics) matches the sync path one-to-one. Use arollback, amigrate_to, and ashow_migrations for the same async treatment of the other executor methods.

Read-after-write with a replica

# Routing ensures reads go to the replica — but right after a write,
# the replica may not have the row yet. Read from the primary explicitly:
new = Post.objects.using("default").get(pk=new_pk)

Always read your own writes from the primary; rely on the replica for everything older than a few seconds.

Testing fixtures

import pytest, dorm
from dorm.db.connection import close_all
from dorm.test import transactional_db, atransactional_db  # noqa: F401

@pytest.fixture(scope="session", autouse=True)
def configure_dorm():
    dorm.configure(DATABASES={"default": {"ENGINE": "sqlite", "NAME": ":memory:"}})
    yield
    close_all()

@pytest.fixture
def author():
    return Author.objects.create(name="Alice", age=30)


def test_something(transactional_db, author):
    Author.objects.create(name="Mallory", age=99)
    assert Author.objects.count() == 2
    # Both rows are rolled back automatically when the test exits.

transactional_db (sync) and atransactional_db (pytest-asyncio) wrap each test in an atomic() block that rolls back on exit, so you avoid the DROP TABLE / CREATE TABLE churn between tests. For unittest-style suites use dorm.test.DormTestCase as a mixin:

import unittest
from dorm.test import DormTestCase

class AuthorTests(DormTestCase, unittest.TestCase):
    def test_create(self):
        Author.objects.create(name="Alice", age=30)
        # rolled back at tearDown

For PostgreSQL integration tests, use testcontainers to spin a throwaway Postgres per session.

Asserting query counts (assertNumQueries, assertMaxQueries) (3.0+)

Pin the exact number of queries a code path issues — or the upper bound — to defend against N+1 regressions. Both the context-manager and decorator forms work on sync and async def tests:

from dorm.test import (
    assertNumQueries,
    assertMaxQueries,
    assertNumQueriesFactory,
    assertMaxQueriesFactory,
)

def test_list_view(transactional_db):
    with assertNumQueries(2) as ctx:
        list(Article.objects.select_related("author")[:10])
    assert ctx.count == 2  # also exposed for richer assertions

@assertMaxQueriesFactory(5)
def test_dashboard(transactional_db):
    # No N+1: dashboard must issue ≤ 5 queries.
    render_dashboard()

@assertNumQueriesFactory(1)
async def test_async_one_query():
    await Article.objects.acount()

Per-task isolation via ContextVar — concurrent asyncio.gather tasks see independent counters, so two tests running in parallel under pytest-xdist don't bleed into each other's assertions.

override_settings + setUpTestData (3.1+)

dorm.test.override_settings is both a context manager and a decorator (sync + async aware). Reverts every mutated key on exit — including deletion of keys that didn't exist before:

from dorm.test import override_settings

with override_settings(SLOW_QUERY_MS=10):
    run_some_queries()

@override_settings(USE_TZ=True, TIME_ZONE="Europe/Madrid")
def test_tz_aware():
    ...

@override_settings(SLOW_QUERY_MS=42)
async def test_async_path():
    ...

setUpTestData is a class decorator factory that runs a cls -> dict callable once at class construction and attaches every entry as a class attribute — Django parity, without inheriting from unittest.TestCase:

from dorm.test import setUpTestData

def _data(cls):
    return {
        "alice": Author.objects.create(name="Alice", age=30),
        "bob":   Author.objects.create(name="Bob",   age=40),
    }

@setUpTestData(_data)
class TestAuthor:
    def test_alice_age(self):
        assert self.alice.age == 30

Tests must roll back any mutation they make to the shared rows (or rebuild via a per-test fixture) — same contract Django enforces.

Switching FileField between local disk and S3 with no code changes

FileField reads the storage backend from settings.STORAGES at every access, so swapping local disk → MinIO → AWS S3 is a config change, not a code change.

class Document(dorm.Model):
    name = dorm.CharField(max_length=100)
    attachment = dorm.FileField(upload_to="docs/%Y/%m/", null=True, blank=True)

Local disk — the default if STORAGES is unset, useful for tests / SQLite / single-machine dev:

STORAGES = {
    "default": {
        "BACKEND": "dorm.storage.FileSystemStorage",
        "OPTIONS": {"location": "/var/app/media", "base_url": "/media/"},
    }
}

MinIO — for local-dev parity with production S3 without an AWS account. Run docker run -d -p 9000:9000 minio/minio server /data once and:

STORAGES = {
    "default": {
        "BACKEND": "dorm.contrib.storage.s3.S3Storage",
        "OPTIONS": {
            "bucket_name": "dev-uploads",
            "endpoint_url": "http://localhost:9000",
            "access_key": "minioadmin",
            "secret_key": "minioadmin",
            "region_name": "us-east-1",
            "signature_version": "s3v4",
            "addressing_style": "path",
        },
    }
}

AWS S3 in production — boto3 picks up credentials from the ambient chain (IAM role on EC2/ECS/Lambda, env vars, ~/.aws/):

STORAGES = {
    "default": {
        "BACKEND": "dorm.contrib.storage.s3.S3Storage",
        "OPTIONS": {
            "bucket_name": "myapp-prod-uploads",
            "region_name": "eu-west-1",
            "default_acl": "private",
            "querystring_auth": True,
            "querystring_expire": 3600,
        },
    }
}

The application code — doc.attachment = ContentFile(...), doc.save(), doc.attachment.url, doc.attachment.delete() — is identical in all three configurations. Drive the choice from the environment so the same image deploys everywhere:

# settings.py
import os

if os.environ.get("DORM_STORAGE") == "s3":
    STORAGES = {
        "default": {
            "BACKEND": "dorm.contrib.storage.s3.S3Storage",
            "OPTIONS": {
                "bucket_name": os.environ["S3_BUCKET"],
                "region_name": os.environ.get("AWS_REGION", "us-east-1"),
                "default_acl": "private",
            },
        }
    }
else:
    STORAGES = {
        "default": {
            "BACKEND": "dorm.storage.FileSystemStorage",
            "OPTIONS": {"location": os.environ.get("MEDIA_ROOT", "media")},
        }
    }

Common gotcha: when switching to S3, don't expect existing rows to migrate silently — the names are stored verbatim, so a row pointing at docs/2026/04/q1.pdf on local disk will look up the same key on S3. Either back-fill the bucket from the local volume before the cutover, or write a one-off RunPython migration that re-uploads each file through the new storage.

OpenTelemetry instrumentation

dorm.contrib.otel hooks the pre_query / post_query signals so every ORM query becomes an OTel span without per-call-site changes.

Setup

Install the OTel API (and an SDK if you want to ship spans anywhere):

pip install opentelemetry-api opentelemetry-sdk
# Optional exporters:
pip install opentelemetry-exporter-otlp     # → OTLP collector / Jaeger / Honeycomb
pip install opentelemetry-exporter-jaeger   # → Jaeger directly

In your app startup:

from dorm.contrib.otel import instrument

instrument()                                 # default tracer name "dorm"
# instrument(tracer_name="myapp.dorm")       # custom tracer

That's it — every query the ORM runs now produces a span.

Span attributes

Each span carries:

Attribute Value
span name db.<vendor> (db.postgresql / db.sqlite)
db.system "postgresql" or "sqlite"
db.statement the SQL text, truncated to 1024 chars
db.dorm.elapsed_ms wall-clock duration (set on post_query, before end())
db.dorm.error exception class name (only on failure)
span status ERROR on failure, default UNSET on success

The 1KB statement truncation keeps massive bulk_create SQL out of your traces; if you need the full SQL, log it separately via dorm.queries.

Wiring an exporter (Jaeger via OTLP)

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

provider = TracerProvider(resource=Resource({SERVICE_NAME: "myapp"}))
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
)
trace.set_tracer_provider(provider)

# Now wire dorm into it:
from dorm.contrib.otel import instrument
instrument()

instrument() is idempotent — calling it twice replaces the previous wiring instead of producing double spans, so it's safe to call again on hot reload.

Disconnecting

from dorm.contrib.otel import uninstrument
uninstrument()  # detaches the receivers; future queries are not traced

Useful in test teardown or in libraries that want to opt out for a specific code path.

Caveats

  • Optional dependency. instrument() raises a clear ImportError if opentelemetry-api isn't installed. Production deploys without the package keep working — just no spans.
  • Spans are produced synchronously on the calling thread. No dedicated background pump. The BatchSpanProcessor (typical SDK config) absorbs the export latency, so the caller sees nanosecond overhead per span.
  • Sensitive params land in db.statement. The SQL is the literal query text (%s placeholders, params separate). The OTel attribute does NOT include the bound parameters — those go through dorm.db.utils._mask_params in DEBUG logs only. If you also want the params on the span, write a custom receiver instead of calling instrument().

PostgreSQL pub/sub via LISTEN / NOTIFY

PostgreSQL's NOTIFY / LISTEN give you pub-sub on the database itself — no Redis required for fan-out workloads at modest scale. The dorm async wrapper exposes both ends.

Publishing

from dorm import transaction
from dorm.db.connection import get_async_connection

conn = get_async_connection()

async def create_order(payload):
    async with transaction.aatomic():
        order = await Order.objects.acreate(**payload)
        await conn.notify("orders.created", str(order.pk))
        # NOTIFY is delivered AFTER COMMIT — subscribers never see
        # work that ends up rolled back.

channel is validated as a SQL identifier ([A-Za-z_][A-Za-z0-9_]*, ≤ 63 chars), so a channel name from user input can't smuggle SQL. The payload is bound as a parameter, so no escaping needed.

Payload limits. PostgreSQL's NOTIFY payload is capped at 8000 bytes by default (compile-time NOTIFY_PAYLOAD_MAX). Don't serialise the whole row — pass a pk and let the listener fetch.

Subscribing

async def consume_orders():
    conn = get_async_connection()
    async for msg in conn.listen("orders.created"):
        order = await Order.objects.aget(pk=int(msg.payload))
        await dispatch(order)

msg is a psycopg.Notify with channel, payload, and pid attributes. The async iterator never returns by itself — break out when you want to stop:

async for msg in conn.listen("orders.created"):
    if shutdown_event.is_set():
        break
    await dispatch(msg)

Connection ownership

listen() opens its own dedicated psycopg connection (LISTEN registers a session-scoped notification handler — the connection must outlive the subscription). It does NOT pull from the pool, so a long-lived listener doesn't tie up a pool slot.

When you break out of the iterator, the connection is closed in a finally block. If your worker dies hard, the listener connection gets reaped by PG's idle-connection timeout.

Reconnection / retries

The current listen() does not auto-reconnect on connection loss. Wrap it in your own retry loop if you need that:

async def reliable_listener(channel: str, handler):
    while True:
        try:
            async for msg in get_async_connection().listen(channel):
                await handler(msg)
        except (psycopg.OperationalError, psycopg.InterfaceError) as exc:
            logger.warning("listener disconnected: %s — reconnecting", exc)
            await asyncio.sleep(1.0)

Keep in mind: notifications fired while disconnected are lost. PG doesn't queue them. For at-least-once semantics, pair NOTIFY with an outbox table you poll on reconnect.

Common patterns

Cache invalidation across replicas:

async with transaction.aatomic():
    await User.objects.filter(pk=user_id).aupdate(**changes)
    await conn.notify("user.invalidate", str(user_id))
# Every node listening drops its local cache for that user.

Worker queue (one-of-N consumers):

NOTIFY broadcasts to all listeners. For exclusive-consumer patterns, use a row-based queue with select_for_update(skip_locked=True) and use NOTIFY just as a wakeup signal:

# Producer
async with transaction.aatomic():
    job = await Job.objects.acreate(payload=payload, status="pending")
    await conn.notify("jobs.wakeup", "")

# Consumer
async for _ in conn.listen("jobs.wakeup"):
    while True:
        async with transaction.aatomic():
            job = await (
                Job.objects.filter(status="pending")
                .select_for_update(skip_locked=True)
                .afirst()
            )
            if job is None:
                break
            await job.run()
            await job.adelete()