Recetario¶
Recetas prácticas para situaciones comunes. Cada una es un snippet completo y listo para copiar.
get-or-create idempotente con un constraint único¶
from dorm.exceptions import IntegrityError
def get_or_create_email(email: str, name: str) -> Author:
obj, _ = Author.objects.get_or_create(
email=email,
defaults={"name": name},
)
return obj
get_or_create corre dentro de una transacción para que dos
llamadas concurrentes no inserten ambas. Si el constraint único no
está en las claves de lookup, el patrón seguro es capturar
IntegrityError y volver a hacer get.
Incremento atómico de contador¶
from dorm import F
# Sin race, un único UPDATE SQL.
Post.objects.filter(pk=42).update(views=F("views") + 1)
Nunca hagas post.views += 1; post.save() — eso es
read-modify-write y pierde incrementos concurrentes.
Paginación¶
def paginate(qs, page: int, page_size: int = 20):
start = (page - 1) * page_size
return list(qs.order_by("id")[start:start + page_size])
Acompaña siempre un slice con order_by(...) — sin orden explícito
la misma fila puede aparecer en dos páginas o en ninguna.
Para datasets grandes, la paginación por keyset es mucho más
rápida que OFFSET:
def page_after(qs, last_id: int, page_size: int = 20):
return list(
qs.filter(id__gt=last_id).order_by("id")[:page_size]
)
Soft delete¶
Hay un mixin listo en dorm.contrib.softdelete:
from dorm.contrib.softdelete import SoftDeleteModel
import dorm
class Post(SoftDeleteModel):
title = dorm.CharField(max_length=200)
# Tres managers:
Post.objects # solo filas vivas
Post.all_objects # todo
Post.deleted_objects # solo soft-deleted
# Soft delete por defecto; pasa hard=True para un DELETE real:
post.delete() # UPDATE … SET deleted_at = now()
post.delete(hard=True) # DELETE FROM …
post.restore() # limpia deleted_at
# Paridad async:
await post.adelete()
await post.arestore()
Avisos: on_delete=CASCADE no cascadea por soft delete (los
hijos siguen visibles en Post.objects). Y los UNIQUE de la BD no
saben de deleted_at — usa un índice parcial
(UNIQUE … WHERE deleted_at IS NULL) en el schema si necesitas
"único entre filas vivas".
Audit log via señales¶
from dorm.signals import post_save, post_delete
def write_audit(sender, instance, created, **kwargs):
AuditLog.objects.create(
model=sender.__name__,
pk=instance.pk,
action="created" if created else "updated",
payload=model_to_dict(instance),
)
post_save.connect(write_audit, sender=Article)
post_delete.connect(write_audit, sender=Article)
Para tablas con mucho volumen, prefiere el patrón WAL → Kafka a nivel de base de datos en vez de señales Python — las señales disparan in-process y suman latencia.
Bulk insert con deduplicación¶
existing = set(
User.objects.filter(email__in=[u.email for u in batch])
.values_list("email", flat=True)
)
new_users = [u for u in batch if u.email not in existing]
User.objects.bulk_create(new_users, batch_size=500)
O, solo en PG, empuja la dedup a la BD:
from dorm.db.connection import get_connection
get_connection().execute(
"INSERT INTO users (email, name) VALUES %s ON CONFLICT (email) DO NOTHING",
rows,
)
Lock con select_for_update¶
with transaction.atomic():
account = (
Account.objects.select_for_update()
.get(pk=account_id)
)
account.balance -= amount
account.save(update_fields=["balance"])
SELECT ... FOR UPDATE mantiene row locks hasta que la transacción
commitea — úsalo cuando hagas read-then-write bajo contención.
Manager / queryset method propio¶
class PublishedQuerySet(QuerySet):
def published(self):
return self.filter(published=True, published_at__lte=Now())
class Article(dorm.Model):
title = dorm.CharField(max_length=200)
published = dorm.BooleanField(default=False)
published_at = dorm.DateTimeField(null=True)
objects = PublishedQuerySet.as_manager()
# Article.objects.published().filter(author=...)
El resultado sigue siendo un QuerySet, así que sigues encadenando.
Stream de un millón de filas para un export¶
import csv
with open("authors.csv", "w") as f:
w = csv.writer(f)
w.writerow(["id", "name", "email"])
for a in Author.objects.order_by("id").iterator(chunk_size=5000):
w.writerow([a.id, a.name, a.email])
iterator(chunk_size=N) abre un cursor server-side en PG y hace
streaming por arraysize en SQLite, así la memoria se mantiene plana.
Multi-tenant con routing por schema¶
class TenantRouter:
def db_for_read(self, model, **hints):
return _current_tenant_alias()
def db_for_write(self, model, **hints):
return _current_tenant_alias()
Fija el alias del tenant desde un middleware (web) o un context manager (workers) y enruta todo por ahí. Combínalo con un set de migraciones aplicado por alias.
Runner de migraciones por tenant (PG, 3.2+)¶
PostgreSQL expone la misma base a varios tenants via schemas.
dorm.contrib.tenants trae helpers + flags CLI que arrancan el
schema y aplican cada migración contra él:
from dorm.contrib.tenants import (
register_tenant,
migrate_tenant,
migrate_all_tenants,
)
# En el startup — registra cada tenant que conoce la app.
for name in ("acme", "globex", "initech"):
register_tenant(name)
# Bootstrap puntual de un tenant nuevo:
migrate_tenant("newtenant")
# Paso CI / deploy — migra cada tenant registrado:
results: dict[str, str] = migrate_all_tenants()
for tenant, status in results.items():
print(tenant, status)
Equivalentes CLI:
El runner pinna una sola conexión dentro de un atomic() antes de
cambiar search_path, así el DDL aterriza en la misma conexión
sobre la que se hizo el switch (el pool de PG si no daría una
conexión nueva apuntando a public).
migrate_all_tenants() devuelve {tenant: "ok" | "error: ..."}
para que un gate de CI pueda fallar fuerte ante cualquier fallo
parcial sin abortar el batch a mitad. Solo PG —
ensure_schema / migrate_tenant lanzan
NotImplementedError en SQLite / MySQL.
Autoscaling del connection pool¶
import asyncio
from dorm.contrib.pool_autoscale import autoscale_pool, read_pool_stats
async def autoscale_loop() -> None:
while True:
autoscale_pool(
target_utilization=0.7,
min_floor=2,
max_ceiling=20,
step=2,
)
await asyncio.sleep(10)
# En el startup hook de FastAPI:
@app.on_event("startup")
async def _start_autoscale() -> None:
asyncio.create_task(autoscale_loop())
read_pool_stats() devuelve un PoolStats con utilization,
in_use, waiting, etc. para dashboards. Resize en vivo solo
en PG (psycopg-pool); SQLite / MySQL hacen no-op silenciosamente.
Pre-abrir el connection pool al startup (3.3+)¶
Los cold starts pagan el coste de conectar+handshake en la primera
request. warmup_pool abre hasta target conexiones de forma
ansiosa para que el primer checkout esté caliente:
import asyncio
from dorm.contrib.pool_autoscale import warmup_pool
@app.on_event("startup")
async def _warmup():
n = warmup_pool(target=4)
print(f"pool calentado: {n} conexión(es) listas")
Solo PostgreSQL — en SQLite / MySQL el helper retorna 0
(no hay pool compartido).
DDL ad-hoc con SchemaEditor (3.3+)¶
Para tooling que necesita DDL imperativo fuera de un archivo de migración (REPL, fixtures, scripts de reparación de drift):
import dorm
from dorm.db.connection import get_connection
from dorm.migrations.schema import SchemaEditor
with SchemaEditor(get_connection()) as se:
se.create_model(Article)
se.add_field(Article, "summary", dorm.TextField(null=True))
se.alter_field(Article, "title", dorm.CharField(max_length=400))
El editor delega en las mismas ops de migración que usa el
executor, así que la ruta imperativa produce SQL idéntico al de
un archivo de migración ejecutando la misma operación. Las
migraciones siguen siendo la forma canónica de evolucionar el
esquema (historial en archivos, recorder, dry-run, lint);
SchemaEditor existe para los casos imperativos que no encajan
limpiamente en una migración.
Soft delete en cascada (3.3+)¶
SoftDeleteModel.delete(cascade=True) recorre cada relación
reverse-FK cuyo modelo origen sea también un SoftDeleteModel y
hace soft-delete de cada hijo:
from dorm.contrib.softdelete import SoftDeleteModel
class Article(SoftDeleteModel):
title: str = dorm.CharField(max_length=200)
class Comment(SoftDeleteModel):
article = dorm.ForeignKey(Article, on_delete=dorm.CASCADE, related_name="comments")
body: str = dorm.TextField()
art = Article.objects.first()
art.delete(cascade=True) # soft-borra art + sus Comments
Los hijos cuyo modelo origen no es SoftDeleteModel se saltan —
un cascade hard contradiría la promesa "soft". Si necesitas un
cascade mixto, cabléalo a mano.
Executor de migraciones async¶
Para migraciones de boot en stacks async (startup de FastAPI, handler de Lambda, edge runtime) donde el executor sync bloquearía el event loop:
from dorm.db.connection import get_connection
from dorm.migrations.aexecutor import AsyncMigrationExecutor
async def run_startup_migrations() -> None:
executor = AsyncMigrationExecutor(get_connection())
await executor.amigrate("blog", "blog/migrations")
await executor.amigrate("users", "users/migrations")
El wrapper delega cada llamada a asyncio.to_thread para que la
corrección (advisory lock, captura dry-run, semántica de recorder)
coincida 1-a-1 con la ruta sync. Usa arollback,
amigrate_to y ashow_migrations para el mismo trato async
del resto de métodos del executor.
Read-after-write con réplica¶
# El routing manda las lecturas a la réplica — pero justo tras una
# escritura, la réplica puede no tener aún la fila. Lee del primary
# explícitamente:
new = Post.objects.using("default").get(pk=new_pk)
Lee tus propias escrituras desde el primary; confía en la réplica para todo lo más antiguo de unos segundos.
Fixtures de testing¶
import pytest, dorm
from dorm.db.connection import close_all
from dorm.test import transactional_db, atransactional_db # noqa: F401
@pytest.fixture(scope="session", autouse=True)
def configure_dorm():
dorm.configure(DATABASES={"default": {"ENGINE": "sqlite", "NAME": ":memory:"}})
yield
close_all()
@pytest.fixture
def author():
return Author.objects.create(name="Alice", age=30)
def test_something(transactional_db, author):
Author.objects.create(name="Mallory", age=99)
assert Author.objects.count() == 2
# Las dos filas se hacen rollback solas al salir el test.
transactional_db (sync) y atransactional_db (pytest-asyncio)
envuelven cada test en un atomic() que hace rollback al salir,
ahorrándote el DROP TABLE / CREATE TABLE entre tests. Para
suites con unittest usa el mixin dorm.test.DormTestCase:
import unittest
from dorm.test import DormTestCase
class AuthorTests(DormTestCase, unittest.TestCase):
def test_create(self):
Author.objects.create(name="Alice", age=30)
# rollback en tearDown
Para tests de integración con PostgreSQL, usa
testcontainers
para levantar un Postgres efímero por sesión.
Asertar conteo de queries (assertNumQueries, assertMaxQueries) (3.0+)¶
Fija el número exacto de queries de un code path — o el techo —
para defenderte de regresiones N+1. Tanto la forma context manager
como decorator funcionan en tests sync y async def:
from dorm.test import (
assertNumQueries,
assertMaxQueries,
assertNumQueriesFactory,
assertMaxQueriesFactory,
)
def test_list_view(transactional_db):
with assertNumQueries(2) as ctx:
list(Article.objects.select_related("author")[:10])
assert ctx.count == 2 # también expuesto para asserciones extra
@assertMaxQueriesFactory(5)
def test_dashboard(transactional_db):
# Sin N+1: dashboard tiene que emitir ≤ 5 queries.
render_dashboard()
@assertNumQueriesFactory(1)
async def test_async_una_query():
await Article.objects.acount()
Aislamiento per-task via ContextVar — tasks lanzadas con
asyncio.gather ven contadores independientes, así dos tests
concurrentes bajo pytest-xdist no contaminan asserciones.
override_settings + setUpTestData (3.1+)¶
dorm.test.override_settings es a la vez context manager y
decorador (sync + async aware). Revierte cada clave mutada al
salir — incluyendo borrar claves que no existían antes:
from dorm.test import override_settings
with override_settings(SLOW_QUERY_MS=10):
run_some_queries()
@override_settings(USE_TZ=True, TIME_ZONE="Europe/Madrid")
def test_tz_aware():
...
@override_settings(SLOW_QUERY_MS=42)
async def test_async_path():
...
setUpTestData es un decorator factory de clase que corre un
callable cls -> dict una sola vez al construir la clase y pega
cada entrada como atributo de clase — paridad Django, sin
heredar de unittest.TestCase:
from dorm.test import setUpTestData
def _data(cls):
return {
"alice": Author.objects.create(name="Alice", age=30),
"bob": Author.objects.create(name="Bob", age=40),
}
@setUpTestData(_data)
class TestAuthor:
def test_alice_age(self):
assert self.alice.age == 30
Los tests deben revertir cualquier mutación sobre las filas compartidas (o reconstruir vía fixture per-test) — mismo contrato Django.
Cambiar FileField entre disco local y S3 sin tocar código¶
FileField lee el storage backend desde settings.STORAGES en
cada acceso, así que pasar de disco local → MinIO → AWS S3 es un
cambio de configuración, no de código.
class Document(dorm.Model):
name = dorm.CharField(max_length=100)
attachment = dorm.FileField(upload_to="docs/%Y/%m/", null=True, blank=True)
Disco local — el default si STORAGES no se define, útil para
tests / SQLite / dev en una sola máquina:
STORAGES = {
"default": {
"BACKEND": "dorm.storage.FileSystemStorage",
"OPTIONS": {"location": "/var/app/media", "base_url": "/media/"},
}
}
MinIO — para paridad de dev local con producción S3 sin cuenta
AWS. Lanza docker run -d -p 9000:9000 minio/minio server /data una
vez y:
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": "dev-uploads",
"endpoint_url": "http://localhost:9000",
"access_key": "minioadmin",
"secret_key": "minioadmin",
"region_name": "us-east-1",
"signature_version": "s3v4",
"addressing_style": "path",
},
}
}
AWS S3 en producción — boto3 toma las credenciales de la cadena
ambiente (rol IAM en EC2/ECS/Lambda, env vars, ~/.aws/):
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": "myapp-prod-uploads",
"region_name": "eu-west-1",
"default_acl": "private",
"querystring_auth": True,
"querystring_expire": 3600,
},
}
}
El código de la aplicación — doc.attachment = ContentFile(...),
doc.save(), doc.attachment.url, doc.attachment.delete() — es
idéntico en las tres configuraciones. Pilota la elección desde el
entorno para que la misma imagen despliegue en todos lados:
# settings.py
import os
if os.environ.get("DORM_STORAGE") == "s3":
STORAGES = {
"default": {
"BACKEND": "dorm.contrib.storage.s3.S3Storage",
"OPTIONS": {
"bucket_name": os.environ["S3_BUCKET"],
"region_name": os.environ.get("AWS_REGION", "us-east-1"),
"default_acl": "private",
},
}
}
else:
STORAGES = {
"default": {
"BACKEND": "dorm.storage.FileSystemStorage",
"OPTIONS": {"location": os.environ.get("MEDIA_ROOT", "media")},
}
}
Trampa común: al cambiar a S3, no esperes que las filas existentes
se migren solas — los nombres se almacenan tal cual, así que una
fila apuntando a docs/2026/04/q1.pdf en disco local buscará la
misma clave en S3. O bien rellenas el bucket desde el volumen local
antes del cutover, o escribes una migración RunPython puntual que
re-suba cada archivo a través del nuevo storage.
Instrumentación con OpenTelemetry¶
dorm.contrib.otel engancha las señales pre_query / post_query
para que cada query del ORM se convierta en un span de OTel sin
cambios en cada call site.
Setup¶
Instala la API de OTel (y un SDK si quieres mandar spans a algún sitio):
pip install opentelemetry-api opentelemetry-sdk
# Exporters opcionales:
pip install opentelemetry-exporter-otlp # → collector OTLP / Jaeger / Honeycomb
pip install opentelemetry-exporter-jaeger # → Jaeger directo
En el startup de tu app:
from dorm.contrib.otel import instrument
instrument() # tracer por defecto "dorm"
# instrument(tracer_name="miapp.dorm") # tracer custom
Ya está — cada query del ORM produce un span.
Atributos del span¶
Cada span lleva:
| Atributo | Valor |
|---|---|
| nombre del span | db.<vendor> (db.postgresql / db.sqlite) |
db.system |
"postgresql" o "sqlite" |
db.statement |
el SQL, truncado a 1024 chars |
db.dorm.elapsed_ms |
duración wall-clock (set en post_query, antes de end()) |
db.dorm.error |
nombre de clase de la excepción (solo en fallo) |
| status del span | ERROR en fallo, default UNSET en éxito |
El truncado a 1KB del statement mantiene el SQL gigante de
bulk_create fuera de tus traces; si necesitas el SQL completo,
loguéalo aparte vía dorm.queries.
Cablear un exporter (Jaeger via OTLP)¶
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider(resource=Resource({SERVICE_NAME: "miapp"}))
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
)
trace.set_tracer_provider(provider)
# Ahora cablea dorm:
from dorm.contrib.otel import instrument
instrument()
instrument() es idempotente — llamarla dos veces reemplaza el
cableado anterior en lugar de producir spans duplicados, así que es
seguro en hot reload.
Desconectar¶
from dorm.contrib.otel import uninstrument
uninstrument() # desengancha los receivers; futuras queries no se trazan
Útil en teardown de tests o en librerías que quieran opt-out para un código path específico.
Avisos¶
- Dependencia opcional.
instrument()lanza unImportErrorclaro siopentelemetry-apino está instalado. Despliegues sin el paquete siguen funcionando — sin spans. - Los spans se producen sync en el thread llamante. No hay pump background dedicado. El BatchSpanProcessor (config típica del SDK) absorbe la latencia del export, así que el caller ve overhead de nanosegundos por span.
- Los params sensibles aterrizan en
db.statement. El SQL es el texto literal de la query (placeholders%s, params aparte). El atributo OTel no incluye los params bindeados — esos van pordorm.db.utils._mask_paramssolo en logs DEBUG. Si también quieres los params en el span, escribe un receiver custom en lugar de llamar ainstrument().
Pub/sub PostgreSQL con LISTEN / NOTIFY¶
NOTIFY / LISTEN de PostgreSQL te dan pub/sub en la propia BD —
sin Redis para workloads de fan-out a escala modesta. El wrapper
async de dorm expone los dos lados.
Publicar¶
from dorm import transaction
from dorm.db.connection import get_async_connection
conn = get_async_connection()
async def crear_order(payload):
async with transaction.aatomic():
order = await Order.objects.acreate(**payload)
await conn.notify("orders.created", str(order.pk))
# El NOTIFY se entrega TRAS el COMMIT — los suscriptores
# nunca ven trabajo que termine en rollback.
El channel se valida como identificador SQL
([A-Za-z_][A-Za-z0-9_]*, ≤ 63 chars), así un nombre de canal que
venga de input del usuario no puede colar SQL. El payload va como
parámetro bound, así que no hay que escaparlo.
Límite de payload. El payload de NOTIFY está capado a 8000
bytes por defecto (compile-time NOTIFY_PAYLOAD_MAX). No
serialices la fila entera — manda un pk y deja que el listener la
busque.
Suscribirse¶
async def consumer_orders():
conn = get_async_connection()
async for msg in conn.listen("orders.created"):
order = await Order.objects.aget(pk=int(msg.payload))
await dispatch(order)
msg es un psycopg.Notify con atributos channel, payload y
pid. El iterador async no termina por sí solo — rompe el bucle
cuando quieras parar:
async for msg in conn.listen("orders.created"):
if shutdown_event.is_set():
break
await dispatch(msg)
Propiedad de la conexión¶
listen() abre su propia conexión psycopg dedicada (LISTEN
registra un handler de notificación scoped a la sesión — la conexión
debe sobrevivir a la suscripción). NO tira del pool, así que un
listener de larga duración no ocupa un slot del pool.
Cuando rompes el bucle, la conexión se cierra en un bloque
finally. Si el worker muere a saco, la conexión del listener se
recolecta por el timeout de idle-connection de PG.
Reconexión / reintentos¶
El listen() actual no se reconecta solo ante pérdida de conexión.
Envuélvelo en tu propio loop de retry si necesitas eso:
async def listener_robusto(channel: str, handler):
while True:
try:
async for msg in get_async_connection().listen(channel):
await handler(msg)
except (psycopg.OperationalError, psycopg.InterfaceError) as exc:
logger.warning("listener desconectado: %s — reconectando", exc)
await asyncio.sleep(1.0)
Ten en cuenta: las notificaciones disparadas mientras está desconectado se pierden. PG no las encola. Para semántica at-least-once, combina NOTIFY con una tabla outbox que poll-eas al reconectar.
Patrones habituales¶
Invalidación de caché entre réplicas:
async with transaction.aatomic():
await User.objects.filter(pk=user_id).aupdate(**changes)
await conn.notify("user.invalidate", str(user_id))
# Cada nodo escuchando tira su caché local para ese user.
Cola de workers (consumidor uno-de-N):
NOTIFY hace broadcast a todos los listeners. Para patrones de
consumer exclusivo, usa una cola row-based con
select_for_update(skip_locked=True) y NOTIFY solo como señal de
wake-up:
# Productor
async with transaction.aatomic():
job = await Job.objects.acreate(payload=payload, status="pending")
await conn.notify("jobs.wakeup", "")
# Consumer
async for _ in conn.listen("jobs.wakeup"):
while True:
async with transaction.aatomic():
job = await (
Job.objects.filter(status="pending")
.select_for_update(skip_locked=True)
.afirst()
)
if job is None:
break
await job.run()
await job.adelete()