QuerySet & Manager¶
dorm.queryset.QuerySet
¶
Bases: Generic[_T]
Lazy, chainable query API compatible with Django's QuerySet. Supports both synchronous and asynchronous execution.
cache(timeout: int | None = None, *, using: str = 'default') -> QuerySet[_T]
¶
Cache this queryset's full result set in the configured
:mod:dorm.cache backend (typically Redis) for timeout
seconds.
Cheap-mode: hot queries (configuration tables, feature
flags, listing pages) avoid the round-trip when the cache
is warm. The first invocation runs the query, serialises
every column of every row, and SETs the bytes blob under a
SHA-1 key derived from the model + final SQL + bound
parameters. Subsequent calls within timeout seconds
hydrate model instances from the cached bytes — no DB
round-trip.
Auto-invalidation: every Model.save() /
Model.delete() (and the matching async variants) fires
post_save / post_delete. The signal handler in
:mod:dorm.cache.invalidation deletes every cached
queryset key for the affected model class, so writers
never observe a stale cached read.
timeout=None falls back to the backend's configured
TTL (default 300 s). timeout=0 caches indefinitely.
using selects the cache alias in settings.CACHES;
defaults to "default".
Returns a clone — chaining .cache() doesn't mutate
the source queryset, matching every other QuerySet API.
distinct(*fields: str) -> QuerySet[_T]
¶
Plain distinct() deduplicates whole rows. With *fields
on PostgreSQL it emits SELECT DISTINCT ON (col1, col2) … —
the canonical "first row per group" pattern.
Common use: pick the most recent order per customer::
(Order.objects
.order_by("customer_id", "-created_at")
.distinct("customer_id"))
DISTINCT ON is PostgreSQL-only. Calling with arguments
against SQLite raises :class:NotImplementedError so the
limitation surfaces at queryset-build time rather than at
first execute.
prefetch_related(*fields: 'str | Prefetch') -> QuerySet[_T]
¶
Schedule batched relation loading for fields.
Each entry is either a plain string (the relation field name)
or a :class:Prefetch describing a custom queryset and/or an
alternate to_attr. The two forms can be mixed in a single
call::
Author.objects.prefetch_related(
"tags", # plain
Prefetch("books", queryset=published_qs), # filtered
)
only(*fields: str) -> QuerySet[_T]
¶
Restrict the SELECT projection to a subset of columns.
Bare names (only("name", "age")) restrict the parent
model. Dotted paths (only("name", "publisher__country"))
restrict a select_related-joined relation as well — the
related-side projection drops every column not listed,
keeping the JOIN narrow when you only want a label or two
from a fat related table. The PK column of every restricted
side is always implicitly included so hydrated instances
keep their identity.
defer(*fields: str) -> QuerySet[_T]
¶
Inverse of :meth:only: drop the named columns from the
SELECT.
Same dotted-path semantics — defer("publisher__bio")
keeps every other column of the related publisher row but
excludes bio from the JOIN projection.
dates(field: str, kind: str, order: str = 'ASC') -> list
¶
Return distinct truncated date values for field.
kind — one of "day", "week", "month",
"year". Truncates each fetched value in Python and
de-duplicates so a chronological archive list is one DB
round-trip.
Returns list[date] rather than a chainable QuerySet —
the Django shape relies on a SQL-level GROUP BY of an
annotate-derived column, which the dorm queryset
compiler currently doesn't accept cleanly. The Python
materialisation is correct on every backend; the chainable
variant lands once the compiler grows that path.
datetimes(field: str, kind: str, order: str = 'ASC') -> list
¶
Distinct truncated datetimes for field.
kind — "hour", "minute", "second" in addition
to every kind :meth:dates accepts. Returns a Python list
(see :meth:dates for why).
adates(field: str, kind: str, order: str = 'ASC') -> list
async
¶
Async counterpart of :meth:dates. Same Python-side
truncation + de-duplication; the only async work is the
underlying avalues_list(...) round-trip.
adatetimes(field: str, kind: str, order: str = 'ASC') -> list
async
¶
Async counterpart of :meth:datetimes.
alias(**kwargs: Any) -> QuerySet[_T]
¶
Add named expressions usable in :meth:filter, :meth:exclude
and :meth:order_by without including them in the SELECT list.
Same shape as :meth:annotate — pass name=expression pairs —
but the expression is not projected into the result rows. Use
this when you only need the value to build a predicate or sort
and don't care about reading it back, so you skip the bandwidth
and per-row hydration cost::
qs = (
Author.objects
.alias(book_count=Count("books"))
.filter(book_count__gte=5)
)
alias() and :meth:annotate can be mixed freely; aliased
names that you later promote to a SELECT can be re-declared via
annotate(name=F("name")) (Django pattern).
select_for_update(*, skip_locked: bool = False, no_wait: bool = False, of: tuple[str, ...] | list[str] | None = None) -> QuerySet[_T]
¶
Lock the rows returned by this query until the surrounding
transaction ends. Must be called inside an :func:atomic /
:func:aatomic block — otherwise PostgreSQL silently treats it as
a no-op (the lock would be released immediately at autocommit).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
skip_locked
|
bool
|
Skip rows already locked by other transactions instead of waiting. The canonical "task queue" pattern — each worker pops the next unlocked job. |
False
|
no_wait
|
bool
|
Raise immediately if any matched row is locked, instead of waiting. Useful for bailing fast on contention. |
False
|
of
|
tuple[str, ...] | list[str] | None
|
Tuple of relation names (typically table aliases from
joins) to lock; defaults to all referenced tables. Use
this with |
None
|
skip_locked and no_wait are mutually exclusive — passing
both raises ValueError. Both are PostgreSQL-only; on SQLite
they raise NotImplementedError so the caller learns instead
of silently getting a different lock model.
with_cte(**named_ctes: 'QuerySet[Any] | CTE') -> QuerySet[_T]
¶
Attach one or more Common Table Expressions to this queryset.
Each name=value pair is emitted as WITH name AS (body)
ahead of the main SELECT. value is either a queryset
(compiled and bound with the outer query) or a :class:CTE
carrying a raw SQL body — used when the body needs features
the queryset builder doesn't model (recursive walks, LATERAL
joins, GROUPING SETS).
Plain example::
recent = Order.objects.filter(created_at__gte=cutoff)
Customer.objects.with_cte(recent_orders=recent).filter(...)
Recursive example::
from dorm import CTE
qs = Category.objects.with_cte(
tree=CTE(
'''
SELECT id, parent_id FROM categories WHERE parent_id IS NULL
UNION ALL
SELECT c.id, c.parent_id FROM categories c
JOIN tree t ON c.parent_id = t.id
''',
recursive=True,
),
).raw("SELECT * FROM tree")
CTE names are validated as SQL identifiers. The bodies see the same parameter dialect as the outer query — the outer placeholder rewrite covers everything in one pass.
cursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]'
¶
Keyset (cursor) pagination — stable across writes and orders
of magnitude faster than OFFSET for deep pages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
after
|
dict[str, Any] | None
|
an optional cursor dict from the previous page's
:attr: |
None
|
order_by
|
str
|
a single field name. Prefix with |
'pk'
|
page_size
|
int
|
number of rows to return. |
50
|
Example::
page = Article.objects.cursor_paginate(
order_by="-created_at", page_size=20
)
# Send page.items + page.next_cursor to the client.
# On the next request:
page = Article.objects.cursor_paginate(
order_by="-created_at", page_size=20, after=cursor
)
Implementation: emits WHERE col > :v (asc) /
col < :v (desc) and LIMIT page_size. next_cursor is
the last row's order_by field value, or None when the
page wasn't full (no more rows).
acursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]'
async
¶
Async counterpart of :meth:cursor_paginate.
__await__()
¶
Materialize the queryset asynchronously: rows = await qs.
Lets users compose chainable methods (values(), filter(),
order_by()...) and consume the result with a single await,
instead of having to call a terminal avalues()/alist().
Honours .cache(timeout=…) — cached results are returned
without an async iteration; on a miss the rows are stored
for the next call.
explain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str
¶
Return the database's query plan for this queryset.
On PostgreSQL, analyze=True runs the query and includes
actual timing / row counts (ANALYZE TRUE, BUFFERS TRUE).
format selects the EXPLAIN output dialect — "text"
(default), "json", "yaml", "xml". verbose=True
adds the VERBOSE flag, surfacing fully-qualified relation
names + output column lists. On SQLite, EXPLAIN QUERY PLAN
is used; analyze / format / verbose are ignored
(SQLite has no equivalents).
Useful for diagnosing slow production queries::
slow_qs = Author.objects.filter(age__gte=18).select_related("publisher")
print(slow_qs.explain(analyze=True))
print(slow_qs.explain(format="json")) # for tooling
aexplain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str
async
¶
Async counterpart of :meth:explain.
iterator(chunk_size: int | None = None) -> Iterator[_T]
¶
Stream results one by one without populating the result cache.
When chunk_size is given, uses a server-side cursor on PG
(so the entire result set never lands in client memory) and
cursor.arraysize on SQLite. Without chunk_size, the
previous all-rows-then-iterate path is preserved for back-compat.
aiterator(chunk_size: int | None = None) -> AsyncIterator[_T]
async
¶
Async stream results one by one without populating the result cache.
Same chunk_size semantics as :meth:iterator.
update_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool]
¶
Look up an object; update it if found, otherwise create it.
defaults applies on the update branch when an existing
row matches kwargs. create_defaults applies on the
create branch when no row matches. Without
create_defaults (or with it None), Django parity is
preserved: defaults applies on both branches, mirroring
the pre-3.3 behaviour.
create_defaults (added in Django 5.0) lets callers supply
a different value set for new rows than for updates — common
when a column has a server-managed default that should NOT
be overwritten on update (a created_at timestamp, for
example).
bulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T]
¶
Insert objs in batches of batch_size.
With ignore_conflicts=True, duplicate-key conflicts are
silently skipped (ON CONFLICT DO NOTHING). With
update_conflicts=True, the conflicting row is updated —
the canonical "upsert" pattern. unique_fields is required
when updating so the conflict target is unambiguous; if
update_fields is omitted, every non-PK / non-unique column
is updated, which is almost always what you want for an idempotent
sync from an external source.
returning=[<field>, …] asks the database to send back the
listed columns for each newly-inserted row (PG and SQLite ≥ 3.35
— RETURNING clause). The values are written back onto the
corresponding objects in objs, useful when the column carries
a DB-side default (DEFAULT now()), is a GeneratedField,
or is otherwise computed server-side. MySQL has no RETURNING
on INSERT — the call raises :class:NotImplementedError. Cannot
be combined with ignore_conflicts / update_conflicts
because the row alignment between objs and the rows the DB
actually wrote is no longer 1:1.
Note: when conflicts are skipped or updated, returned PKs may be
None for affected rows (the database doesn't report which
rows actually wrote new data). Re-fetch by unique_fields if
you need the full set of PKs back.
bulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int
¶
Update fields on objs with a single UPDATE ... SET col = CASE pk
WHEN ... statement per batch (one round-trip per batch_size
objects, instead of one per object).
Raises :class:ValueError if fields is empty — without
columns to set, the generated SQL would be malformed (UPDATE
… WHERE … with no SET clause), so we fail fast at the
Python boundary instead of at the database parser.
aupdate_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool]
async
¶
Async counterpart of :meth:update_or_create. Same
defaults / create_defaults semantics.
abulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T]
async
¶
Async counterpart of :meth:bulk_create. See the sync version
for ignore_conflicts / update_conflicts / returning
semantics.
abulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int
async
¶
Async version of :meth:bulk_update. Same single-query batching
strategy: one UPDATE statement per batch of batch_size objects.
dorm.manager.BaseManager
¶
Bases: Generic[_T]
using(alias: str) -> QuerySet[_T]
¶
Manager-level shortcut for Manager.all().using(alias).
Mirrors Django's pattern where Model.objects.using("replica")
returns a queryset bound to alias in one call. Equivalent to
Manager.get_queryset().using(alias).
cache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T
¶
Read a single row by primary key, going through the cache layer first.
Calling cache_get lazy-attaches the queryset-cache
invalidation handlers (post_save / post_delete ➜
bump_model_cache_version) — without this step, a
subsequent Model.save() would NOT bump the per-model
version and the row cache would return the stale snapshot.
Lookup flow:
- Build a cache key namespaced by the model's app/name and
the per-model invalidation version. A racing
Model.save()bumps the version, so the entry written by an in-flight reader points at a key no later read will ask for — same anti-stale-read invariant the queryset cache uses. get→ unpickle → return when present (HMAC verified inside :func:dorm.cache.verify_payload; tampered or unsigned blobs are dropped silently and treated as a miss).- On miss, fall through to
Manager.get(pk=…), write the pickle of the result back into the cache, return.
Cache miss is silent — the database read is the source of
truth. Cache outages also fall through; the queryset layer's
try / except policy applies here too.
cache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T]
¶
Fetch many rows by primary key, going through the cache for
each. Misses are batched into a single IN (...) query
instead of N round-trips, then written back to the cache.
Returns a {pk: instance} dict — pks not found in the DB
are simply absent.
acache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T
async
¶
Async counterpart of :meth:cache_get.
acache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T]
async
¶
Async counterpart of :meth:cache_get_many.
from_queryset(queryset_class: type, class_name: str | None = None) -> type['BaseManager[_T]']
classmethod
¶
Build a Manager subclass that proxies methods of queryset_class.
The canonical Django pattern for adding query-language methods to a manager. Usage::
class PublishedQuerySet(dorm.QuerySet):
def published(self):
return self.filter(is_active=True)
def recent(self, days=30):
cutoff = ...
return self.filter(created_at__gte=cutoff)
class Author(dorm.Model):
...
objects = dorm.Manager.from_queryset(PublishedQuerySet)()
# Now both work end-to-end:
Author.objects.published().recent()
Author.objects.filter(...).published() # via get_queryset
Mechanics: the generated subclass overrides get_queryset to
instantiate queryset_class, and reflects every public method
on queryset_class (anything not starting with _) as a
manager-level passthrough that calls self.get_queryset().method(...).
That mirrors how the default Manager already proxies the built-in
QuerySet API.
class_name customises the generated class's __name__ for
nicer reprs in tracebacks; defaults to f"{cls.__name__}From{queryset_class.__name__}".
dorm.manager.Manager
¶
Bases: BaseManager[_T]
dorm.queryset.CombinedQuerySet
¶
dorm.queryset.RawQuerySet
¶
Bases: Generic[_T]
Executes a raw SQL query and hydrates the results as model instances. Columns returned by the query are mapped to field attnames; unknown columns are stored as plain attributes on the instance.
.. warning::
raw_sql is sent to the database verbatim. Never build it by
string-interpolating user input — use placeholders (%s for
PostgreSQL / SQLite, or $1 / $2 for the dorm builder, which
this class adapts) and pass values via params::
# SAFE
Author.objects.raw("SELECT * FROM authors WHERE id = %s", [user_id])
# UNSAFE — direct string concatenation defeats parameterisation
Author.objects.raw(f"SELECT * FROM authors WHERE id = {user_id}")
For dynamic identifiers (table or column names that aren't fixed at coding time), validate them against an allowlist before splicing.