FastAPI integration¶
dorm ships a Pydantic v2 adapter that generates schemas from your models — so you can use a single source of truth for both your tables and your FastAPI request / response bodies.
Installation¶
Pick the backend extra that matches your database — sqlite (pulls
aiosqlite for the async path) or postgresql (pulls psycopg
with the connection pool). Add pydantic for the Pydantic v2 schema
adapter that this guide builds on:
# SQLite + Pydantic schemas
uv pip install 'djanorm[sqlite,pydantic]'
# PostgreSQL + Pydantic schemas
uv pip install 'djanorm[postgresql,pydantic]'
There is no separate async extra — the async drivers (aiosqlite,
psycopg) ship under the same sqlite / postgresql extras as
their sync counterparts, so a single install covers both modes.
App lifespan¶
This step is optional but recommended for production. Configure dorm in a FastAPI lifespan and clean up on shutdown:
from contextlib import asynccontextmanager
from fastapi import FastAPI
import dorm
from dorm.db.connection import close_all_async
@asynccontextmanager
async def lifespan(app: FastAPI):
dorm.configure(
DATABASES={
"default": {
"ENGINE": "postgresql",
"NAME": "myapp",
"USER": "myapp",
"PASSWORD": "...",
"HOST": "localhost",
"PORT": 5432,
}
}
)
yield
await close_all_async()
app = FastAPI(lifespan=lifespan)
close_all_async() drains every async pool. Without it FastAPI's
graceful shutdown can hang on lingering connections.
Schemas¶
Two ways to derive Pydantic schemas from a dorm model.
schema_for(...) — quick one-liner¶
from dorm.contrib.pydantic import schema_for
from .models import Author
AuthorOut = schema_for(Author)
AuthorIn = schema_for(Author, exclude=("id",))
AuthorPatch = schema_for(Author, optional=("name", "age", "email"))
| Argument | Effect |
|---|---|
name= |
class name (default f"{Model.__name__}Schema") |
exclude= |
tuple of field names to drop |
only= |
tuple of field names to keep (mutually exclusive with exclude) |
optional= |
mark these fields as Optional[...] = None (PATCH bodies) |
base= |
custom BaseModel to inherit (e.g. for shared config) |
from_attributes=True is set automatically, so you can pass a dorm
instance straight to Schema.model_validate(obj) or use it as a
FastAPI response_model.
create_schema_for(...) / update_schema_for(...) — request bodies¶
For typical CRUD endpoints the input shape diverges from the output. The two helpers cut the boilerplate:
from dorm.contrib.pydantic import (
create_schema_for, update_schema_for, schema_for,
)
AuthorOut = schema_for(Author) # response_model — full row
AuthorCreate = create_schema_for(Author) # POST body — no auto-PK / no GeneratedField
AuthorUpdate = update_schema_for(Author) # PATCH body — every field optional with default None
create_schema_fordrops auto-incrementing PKs andGeneratedFieldcolumns automatically (the server fills them). Required fields stay required. Defaults still propagate — a column withdefault=Falseis optional in Pydantic terms with the real default.update_schema_foradditionally flips every remaining field toT | Nonewith defaultNone, so the caller can omit any subset. In the handler usepayload.model_dump(exclude_unset=True)to iterate only the fields the client actually sent.
@app.patch("/authors/{pk}")
async def patch(pk: int, payload: AuthorUpdate):
author = await Author.objects.aget(pk=pk)
for k, v in payload.model_dump(exclude_unset=True).items():
setattr(author, k, v)
await author.asave()
return AuthorOut.model_validate(author)
DormSchema — declarative class¶
from dorm.contrib.pydantic import DormSchema
from .models import Author, Publisher
class PublisherOut(DormSchema):
class Meta:
model = Publisher
fields = ("id", "name")
class AuthorOut(DormSchema):
bio_url: str | None = None # extra field declared explicitly
class Meta:
model = Author
exclude = ("internal_notes",)
nested = {"publisher": PublisherOut} # FK → nested schema
The metaclass walks Author._meta.fields and adds annotations for
every column, unless you've already declared one. Pass
fields=("a", "b") to whitelist, exclude=("c",) to blacklist, or
optional=("a",) for PATCH-style schemas. nested= swaps a FK or
M2M for a sub-schema (otherwise FKs serialize as their integer PK).
A complete CRUD route¶
from fastapi import APIRouter, HTTPException
from dorm.contrib.pydantic import DormSchema
from .models import Author
class AuthorIn(DormSchema):
class Meta:
model = Author
exclude = ("id",)
class AuthorOut(DormSchema):
class Meta:
model = Author
router = APIRouter(prefix="/authors", tags=["authors"])
@router.post("", response_model=AuthorOut, status_code=201)
async def create_author(payload: AuthorIn) -> Author:
return await Author.objects.acreate(**payload.model_dump())
@router.get("/{author_id}", response_model=AuthorOut)
async def get_author(author_id: int) -> Author:
author = await Author.objects.aget_or_none(pk=author_id)
if author is None:
raise HTTPException(404, "Not found")
return author
@router.get("", response_model=list[AuthorOut])
async def list_authors() -> list[Author]:
return Author.objects.all()
@router.patch("/{author_id}", response_model=AuthorOut)
async def patch_author(author_id: int, payload: AuthorIn) -> Author:
fields = payload.model_dump(exclude_unset=True)
n = await Author.objects.filter(pk=author_id).aupdate(**fields)
if not n:
raise HTTPException(404, "Not found")
return await Author.objects.aget(pk=author_id)
@router.delete("/{author_id}", status_code=204)
async def delete_author(author_id: int) -> None:
n, _ = await Author.objects.filter(pk=author_id).adelete()
if not n:
raise HTTPException(404, "Not found")
File uploads¶
FileField integrates cleanly with FastAPI's UploadFile. The same
endpoint code works against FileSystemStorage (local disk) and
S3Storage (AWS / MinIO / R2) — only settings.STORAGES changes.
Model + schema¶
import dorm
from dorm.contrib.pydantic import DormSchema
class Document(dorm.Model):
name = dorm.CharField(max_length=100)
attachment = dorm.FileField(upload_to="docs/%Y/%m/", null=True, blank=True)
class Meta:
db_table = "documents"
class DocumentOut(DormSchema):
"""The Pydantic interop's BeforeValidator unwraps the FieldFile
descriptor to the storage name (a plain string) automatically —
no custom serialiser needed."""
url: str | None = None # explicit override, populated in the route
class Meta:
model = Document
Upload endpoint¶
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
router = APIRouter(prefix="/documents")
@router.post("", response_model=DocumentOut)
async def upload_document(
name: str = Form(...),
file: UploadFile = File(...),
):
"""Accept a multipart upload, persist the bytes via the configured
storage, and return the saved row + a downloadable URL.
``UploadFile`` exposes a SpooledTemporaryFile under ``.file``;
wrapping it in :class:`dorm.File` lets dorm read the content
chunked instead of loading the whole upload into RAM at once.
"""
if not file.filename:
raise HTTPException(400, "Missing filename")
doc = Document(name=name)
doc.attachment = dorm.File(file.file, name=file.filename)
await doc.asave()
out = DocumentOut.model_validate(doc)
out.url = doc.attachment.url
return out
A request like:
returns:
{
"id": 1,
"name": "Q1 Report",
"attachment": "docs/2026/04/q1.pdf",
"url": "/media/docs/2026/04/q1.pdf"
}
— with FileSystemStorage. Swap STORAGES to S3Storage and url
becomes a presigned https://bucket.s3.amazonaws.com/...?X-Amz-...
link the browser can fetch directly. The endpoint code doesn't change.
Listing + presigned URLs¶
@router.get("", response_model=list[DocumentOut])
async def list_documents():
docs = Document.objects.order_by("-id")
return [
DocumentOut.model_validate(d).model_copy(
update={"url": d.attachment.url if d.attachment else None}
)
for d in docs
]
For S3, each .url is a fresh presigned URL — by default 1 hour TTL.
Adjust the expiry per call by re-instantiating the storage with a
different querystring_expire, or use custom_domain= for permanent
public-CDN links.
Streaming download (when you don't want a public URL)¶
For private storage where you authenticate downloads in your app (rather than handing out S3 presigned URLs), stream through FastAPI:
from fastapi.responses import StreamingResponse
@router.get("/{doc_id}/download")
async def download_document(doc_id: int):
doc = await Document.objects.aget(pk=doc_id)
if not doc.attachment:
raise HTTPException(404, "No file attached")
handle = await doc.attachment.aopen("rb")
return StreamingResponse(
handle.chunks(), # 64 KiB chunks
media_type="application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="{doc.name}"',
"Content-Length": str(doc.attachment.size),
},
)
File.chunks() is implemented for both backends, so the same handler
streams from local disk and from S3's get_object body.
Serving a local MEDIA_ROOT in development¶
FileSystemStorage only writes the bytes — serving them is your
framework's job. For dev, mount the location at the URL prefix you
configured as base_url:
from fastapi.staticfiles import StaticFiles
app.mount("/media", StaticFiles(directory="/var/app/media"), name="media")
In production, hand this off to nginx / CloudFront / the relevant CDN — see Production: file storage.
Deleting a file with the row¶
FieldFile.delete() removes the bytes from storage. Wire it in your
delete handler so a DELETE /documents/{id} doesn't leave orphans:
@router.delete("/{doc_id}", status_code=204)
async def delete_document(doc_id: int):
doc = await Document.objects.aget(pk=doc_id)
if doc.attachment:
await doc.attachment.adelete(save=False) # delete file, don't re-save the row
await doc.adelete()
save=False skips the redundant UPDATE that would otherwise persist
the cleared column right before the row is removed entirely.
Health check endpoint¶
ahealth_check() returns {"status": "ok", "alias": ..., "latency_ms": ..., "pool": {...}},
or status="error" with the exception detail. Wire it to your
orchestrator's liveness/readiness probe.
Async dependency for transactions¶
from fastapi import Depends
from dorm.transaction import aatomic
async def db_tx():
async with aatomic():
yield
@router.post("/transfer")
async def transfer(payload: TransferIn, _: None = Depends(db_tx)):
...
Anything that runs inside transfer is now wrapped in a single
transaction; on exception, FastAPI's exception handler still fires
after the rollback.
Pitfalls¶
- Don't reuse a single dorm
Modelinstance across concurrent requests — instances are mutable andsave()reads__dict__. Each request handler should fetch its own. - Block on sync ORM calls in async routes —
Author.objects.all()in anasync defis fine for tiny dev work but ties up the event loop on every query. Use thea*variants in production. Or enforce the pattern at the model level withdorm.contrib.asyncmodel.AsyncModel— sync calls raiseAsyncOnlyErrordirectly. response_modelvalidation cost — Pydantic re-validates on output. For very high-throughput endpoints, setresponse_model_exclude_unset=Trueor skipresponse_modeland returnJSONResponsedirectly.
Streaming exports — direct StreamingResponse (4.0+)¶
For exporting large querysets without materialising them:
from fastapi.responses import StreamingResponse
from dorm.contrib.streaming import astream_jsonl, astream_csv
@app.get("/orders/export.jsonl")
async def export_jsonl():
qs = Order.objects.afilter(status="completed")
return StreamingResponse(
astream_jsonl(qs, chunk_size=1000),
media_type="application/x-ndjson",
)
@app.get("/orders/export.csv")
async def export_csv():
qs = Order.objects.afilter(status="completed").values(
"id", "amount", "currency", "created_at"
)
return StreamingResponse(
astream_csv(qs),
media_type="text/csv",
headers={"Content-Disposition": 'attachment; filename="orders.csv"'},
)
Memory-bounded — 10M rows export with ~50 MB RSS. Special types (datetime, Decimal, UUID, Enum, bytes) serialise cleanly.
Query budget — protect SLA (4.0+)¶
import dorm
@app.get("/heavy")
async def heavy_handler():
async with dorm.abudget(timeout_ms=200, max_rows=10_000):
rows = await Order.objects.afilter(status="pending")
return {"orders": [r.id for r in rows]}
timeout_ms aborts via statement_timeout; max_rows aborts
client-side. Trade-off: the block becomes an implicit aatomic()
on PG.
N+1 detector as middleware (4.0+)¶
from dorm.contrib.nplusone import detect
@app.middleware("http")
async def nplus_one_middleware(request, call_next):
with detect(raise_on_detect=False) as d:
response = await call_next(request)
if d.findings:
log.warning("N+1 detected on %s: %s", request.url.path, d.report())
return response
raise_on_detect=True for strict test mode.
Idempotency keys (4.0+)¶
Client retries with the same Idempotency-Key → respond with the
cached body:
from fastapi import Header
from dorm.contrib.idempotency import IdempotencyRecord, idempotency_key
class IdpEntry(IdempotencyRecord):
class Meta:
db_table = "idempotency_entries"
@app.post("/payments")
async def create_payment(
body: PaymentIn,
idempotency_key_header: str = Header(alias="Idempotency-Key"),
):
with idempotency_key(idempotency_key_header, model=IdpEntry) as ctx:
if ctx.replay:
return JSONResponse(
ctx.cached_response,
status_code=ctx.cached_status_code or 200,
)
result = process_payment(body)
ctx.store(result, status_code=201)
return JSONResponse(result, status_code=201)
The block runs in atomic() — outbox row + business write commit
together.
Real-time via LISTEN/NOTIFY + WebSocket (4.0+)¶
from fastapi import WebSocket
from dorm.contrib.listen_notify import listen, anotify
@app.websocket("/orders/stream")
async def stream_orders(ws: WebSocket):
await ws.accept()
async with listen("orders") as channel:
async for n in channel:
await ws.send_text(n.payload)
@app.post("/orders")
async def create_order(body: OrderIn):
order = await Order.objects.acreate(**body.dict())
await anotify("orders", order.json())
return order
PostgreSQL-only.
Multi-tenancy middleware (4.0+)¶
from dorm.contrib.tenants_row import current_tenant
@app.middleware("http")
async def tenant_middleware(request, call_next):
tenant = request.headers.get("X-Tenant-ID")
if not tenant:
return JSONResponse({"detail": "missing tenant"}, status_code=400)
with current_tenant(tenant):
return await call_next(request)
Any ORM query inside the handler picks up the tenant automatically.