Skip to content

QuerySet & Manager

dorm.queryset.QuerySet

Bases: Generic[_T]

Lazy, chainable query API compatible with Django's QuerySet. Supports both synchronous and asynchronous execution.

cache(timeout: int | None = None, *, using: str = 'default') -> QuerySet[_T]

Cache this queryset's full result set in the configured :mod:dorm.cache backend (typically Redis) for timeout seconds.

Cheap-mode: hot queries (configuration tables, feature flags, listing pages) avoid the round-trip when the cache is warm. The first invocation runs the query, serialises every column of every row, and SETs the bytes blob under a SHA-1 key derived from the model + final SQL + bound parameters. Subsequent calls within timeout seconds hydrate model instances from the cached bytes — no DB round-trip.

Auto-invalidation: every Model.save() / Model.delete() (and the matching async variants) fires post_save / post_delete. The signal handler in :mod:dorm.cache.invalidation deletes every cached queryset key for the affected model class, so writers never observe a stale cached read.

timeout=None falls back to the backend's configured TTL (default 300 s). timeout=0 caches indefinitely. using selects the cache alias in settings.CACHES; defaults to "default".

Returns a clone — chaining .cache() doesn't mutate the source queryset, matching every other QuerySet API.

distinct(*fields: str) -> QuerySet[_T]

Plain distinct() deduplicates whole rows. With *fields on PostgreSQL it emits SELECT DISTINCT ON (col1, col2) … — the canonical "first row per group" pattern.

Common use: pick the most recent order per customer::

(Order.objects
    .order_by("customer_id", "-created_at")
    .distinct("customer_id"))

DISTINCT ON is PostgreSQL-only. Calling with arguments against SQLite raises :class:NotImplementedError so the limitation surfaces at queryset-build time rather than at first execute.

Schedule batched relation loading for fields.

Each entry is either a plain string (the relation field name) or a :class:Prefetch describing a custom queryset and/or an alternate to_attr. The two forms can be mixed in a single call::

Author.objects.prefetch_related(
    "tags",                                    # plain
    Prefetch("books", queryset=published_qs),  # filtered
)

only(*fields: str) -> QuerySet[_T]

Restrict the SELECT projection to a subset of columns.

Bare names (only("name", "age")) restrict the parent model. Dotted paths (only("name", "publisher__country")) restrict a select_related-joined relation as well — the related-side projection drops every column not listed, keeping the JOIN narrow when you only want a label or two from a fat related table. The PK column of every restricted side is always implicitly included so hydrated instances keep their identity.

defer(*fields: str) -> QuerySet[_T]

Inverse of :meth:only: drop the named columns from the SELECT.

Same dotted-path semantics — defer("publisher__bio") keeps every other column of the related publisher row but excludes bio from the JOIN projection.

dates(field: str, kind: str, order: str = 'ASC') -> list

Return distinct truncated date values for field.

kind — one of "day", "week", "month", "year". Truncates each fetched value in Python and de-duplicates so a chronological archive list is one DB round-trip.

Returns list[date] rather than a chainable QuerySet — the Django shape relies on a SQL-level GROUP BY of an annotate-derived column, which the dorm queryset compiler currently doesn't accept cleanly. The Python materialisation is correct on every backend; the chainable variant lands once the compiler grows that path.

datetimes(field: str, kind: str, order: str = 'ASC') -> list

Distinct truncated datetimes for field.

kind"hour", "minute", "second" in addition to every kind :meth:dates accepts. Returns a Python list (see :meth:dates for why).

adates(field: str, kind: str, order: str = 'ASC') -> list async

Async counterpart of :meth:dates. Same Python-side truncation + de-duplication; the only async work is the underlying avalues_list(...) round-trip.

adatetimes(field: str, kind: str, order: str = 'ASC') -> list async

Async counterpart of :meth:datetimes.

alias(**kwargs: Any) -> QuerySet[_T]

Add named expressions usable in :meth:filter, :meth:exclude and :meth:order_by without including them in the SELECT list.

Same shape as :meth:annotate — pass name=expression pairs — but the expression is not projected into the result rows. Use this when you only need the value to build a predicate or sort and don't care about reading it back, so you skip the bandwidth and per-row hydration cost::

qs = (
    Author.objects
    .alias(book_count=Count("books"))
    .filter(book_count__gte=5)
)

alias() and :meth:annotate can be mixed freely; aliased names that you later promote to a SELECT can be re-declared via annotate(name=F("name")) (Django pattern).

select_for_update(*, skip_locked: bool = False, no_wait: bool = False, of: tuple[str, ...] | list[str] | None = None) -> QuerySet[_T]

Lock the rows returned by this query until the surrounding transaction ends. Must be called inside an :func:atomic / :func:aatomic block — otherwise PostgreSQL silently treats it as a no-op (the lock would be released immediately at autocommit).

Parameters:

Name Type Description Default
skip_locked bool

Skip rows already locked by other transactions instead of waiting. The canonical "task queue" pattern — each worker pops the next unlocked job.

False
no_wait bool

Raise immediately if any matched row is locked, instead of waiting. Useful for bailing fast on contention.

False
of tuple[str, ...] | list[str] | None

Tuple of relation names (typically table aliases from joins) to lock; defaults to all referenced tables. Use this with select_related to lock only the parent row without also locking joined parent tables.

None

skip_locked and no_wait are mutually exclusive — passing both raises ValueError. Both are PostgreSQL-only; on SQLite they raise NotImplementedError so the caller learns instead of silently getting a different lock model.

with_cte(**named_ctes: 'QuerySet[Any] | CTE') -> QuerySet[_T]

Attach one or more Common Table Expressions to this queryset.

Each name=value pair is emitted as WITH name AS (body) ahead of the main SELECT. value is either a queryset (compiled and bound with the outer query) or a :class:CTE carrying a raw SQL body — used when the body needs features the queryset builder doesn't model (recursive walks, LATERAL joins, GROUPING SETS).

Plain example::

recent = Order.objects.filter(created_at__gte=cutoff)
Customer.objects.with_cte(recent_orders=recent).filter(...)

Recursive example::

from dorm import CTE
qs = Category.objects.with_cte(
    tree=CTE(
        '''
        SELECT id, parent_id FROM categories WHERE parent_id IS NULL
        UNION ALL
        SELECT c.id, c.parent_id FROM categories c
        JOIN tree t ON c.parent_id = t.id
        ''',
        recursive=True,
    ),
).raw("SELECT * FROM tree")

CTE names are validated as SQL identifiers. The bodies see the same parameter dialect as the outer query — the outer placeholder rewrite covers everything in one pass.

cursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]'

Keyset (cursor) pagination — stable across writes and orders of magnitude faster than OFFSET for deep pages.

Parameters:

Name Type Description Default
after dict[str, Any] | None

an optional cursor dict from the previous page's :attr:CursorPage.next_cursor. None returns the first page.

None
order_by str

a single field name. Prefix with - for descending. Defaults to the model's primary key. The ordering must include something unique (typically the PK) — otherwise after can't reliably resume across ties.

'pk'
page_size int

number of rows to return.

50

Example::

page = Article.objects.cursor_paginate(
    order_by="-created_at", page_size=20
)
# Send page.items + page.next_cursor to the client.
# On the next request:
page = Article.objects.cursor_paginate(
    order_by="-created_at", page_size=20, after=cursor
)

Implementation: emits WHERE col > :v (asc) / col < :v (desc) and LIMIT page_size. next_cursor is the last row's order_by field value, or None when the page wasn't full (no more rows).

acursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]' async

Async counterpart of :meth:cursor_paginate.

__await__()

Materialize the queryset asynchronously: rows = await qs. Lets users compose chainable methods (values(), filter(), order_by()...) and consume the result with a single await, instead of having to call a terminal avalues()/alist().

Honours .cache(timeout=…) — cached results are returned without an async iteration; on a miss the rows are stored for the next call.

explain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str

Return the database's query plan for this queryset.

On PostgreSQL, analyze=True runs the query and includes actual timing / row counts (ANALYZE TRUE, BUFFERS TRUE). format selects the EXPLAIN output dialect — "text" (default), "json", "yaml", "xml". verbose=True adds the VERBOSE flag, surfacing fully-qualified relation names + output column lists. On SQLite, EXPLAIN QUERY PLAN is used; analyze / format / verbose are ignored (SQLite has no equivalents).

Useful for diagnosing slow production queries::

slow_qs = Author.objects.filter(age__gte=18).select_related("publisher")
print(slow_qs.explain(analyze=True))
print(slow_qs.explain(format="json"))     # for tooling

aexplain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str async

Async counterpart of :meth:explain.

iterator(chunk_size: int | None = None) -> Iterator[_T]

Stream results one by one without populating the result cache.

When chunk_size is given, uses a server-side cursor on PG (so the entire result set never lands in client memory) and cursor.arraysize on SQLite. Without chunk_size, the previous all-rows-then-iterate path is preserved for back-compat.

aiterator(chunk_size: int | None = None) -> AsyncIterator[_T] async

Async stream results one by one without populating the result cache.

Same chunk_size semantics as :meth:iterator.

update_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool]

Look up an object; update it if found, otherwise create it.

defaults applies on the update branch when an existing row matches kwargs. create_defaults applies on the create branch when no row matches. Without create_defaults (or with it None), Django parity is preserved: defaults applies on both branches, mirroring the pre-3.3 behaviour.

create_defaults (added in Django 5.0) lets callers supply a different value set for new rows than for updates — common when a column has a server-managed default that should NOT be overwritten on update (a created_at timestamp, for example).

bulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T]

Insert objs in batches of batch_size.

With ignore_conflicts=True, duplicate-key conflicts are silently skipped (ON CONFLICT DO NOTHING). With update_conflicts=True, the conflicting row is updated — the canonical "upsert" pattern. unique_fields is required when updating so the conflict target is unambiguous; if update_fields is omitted, every non-PK / non-unique column is updated, which is almost always what you want for an idempotent sync from an external source.

returning=[<field>, …] asks the database to send back the listed columns for each newly-inserted row (PG and SQLite ≥ 3.35 — RETURNING clause). The values are written back onto the corresponding objects in objs, useful when the column carries a DB-side default (DEFAULT now()), is a GeneratedField, or is otherwise computed server-side. MySQL has no RETURNING on INSERT — the call raises :class:NotImplementedError. Cannot be combined with ignore_conflicts / update_conflicts because the row alignment between objs and the rows the DB actually wrote is no longer 1:1.

Note: when conflicts are skipped or updated, returned PKs may be None for affected rows (the database doesn't report which rows actually wrote new data). Re-fetch by unique_fields if you need the full set of PKs back.

bulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int

Update fields on objs with a single UPDATE ... SET col = CASE pk WHEN ... statement per batch (one round-trip per batch_size objects, instead of one per object).

Raises :class:ValueError if fields is empty — without columns to set, the generated SQL would be malformed (UPDATE … WHERE … with no SET clause), so we fail fast at the Python boundary instead of at the database parser.

aupdate_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool] async

Async counterpart of :meth:update_or_create. Same defaults / create_defaults semantics.

abulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T] async

Async counterpart of :meth:bulk_create. See the sync version for ignore_conflicts / update_conflicts / returning semantics.

abulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int async

Async version of :meth:bulk_update. Same single-query batching strategy: one UPDATE statement per batch of batch_size objects.

dorm.manager.BaseManager

Bases: Generic[_T]

using(alias: str) -> QuerySet[_T]

Manager-level shortcut for Manager.all().using(alias).

Mirrors Django's pattern where Model.objects.using("replica") returns a queryset bound to alias in one call. Equivalent to Manager.get_queryset().using(alias).

cache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T

Read a single row by primary key, going through the cache layer first.

Calling cache_get lazy-attaches the queryset-cache invalidation handlers (post_save / post_deletebump_model_cache_version) — without this step, a subsequent Model.save() would NOT bump the per-model version and the row cache would return the stale snapshot.

Lookup flow:

  1. Build a cache key namespaced by the model's app/name and the per-model invalidation version. A racing Model.save() bumps the version, so the entry written by an in-flight reader points at a key no later read will ask for — same anti-stale-read invariant the queryset cache uses.
  2. get → unpickle → return when present (HMAC verified inside :func:dorm.cache.verify_payload; tampered or unsigned blobs are dropped silently and treated as a miss).
  3. On miss, fall through to Manager.get(pk=…), write the pickle of the result back into the cache, return.

Cache miss is silent — the database read is the source of truth. Cache outages also fall through; the queryset layer's try / except policy applies here too.

cache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T]

Fetch many rows by primary key, going through the cache for each. Misses are batched into a single IN (...) query instead of N round-trips, then written back to the cache.

Returns a {pk: instance} dict — pks not found in the DB are simply absent.

acache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T async

Async counterpart of :meth:cache_get.

acache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T] async

Async counterpart of :meth:cache_get_many.

from_queryset(queryset_class: type, class_name: str | None = None) -> type['BaseManager[_T]'] classmethod

Build a Manager subclass that proxies methods of queryset_class.

The canonical Django pattern for adding query-language methods to a manager. Usage::

class PublishedQuerySet(dorm.QuerySet):
    def published(self):
        return self.filter(is_active=True)

    def recent(self, days=30):
        cutoff = ...
        return self.filter(created_at__gte=cutoff)

class Author(dorm.Model):
    ...
    objects = dorm.Manager.from_queryset(PublishedQuerySet)()

# Now both work end-to-end:
Author.objects.published().recent()
Author.objects.filter(...).published()  # via get_queryset

Mechanics: the generated subclass overrides get_queryset to instantiate queryset_class, and reflects every public method on queryset_class (anything not starting with _) as a manager-level passthrough that calls self.get_queryset().method(...). That mirrors how the default Manager already proxies the built-in QuerySet API.

class_name customises the generated class's __name__ for nicer reprs in tracebacks; defaults to f"{cls.__name__}From{queryset_class.__name__}".

dorm.manager.Manager

Bases: BaseManager[_T]

dorm.queryset.CombinedQuerySet

Bases: QuerySet[_T]

Produced by .union() / .intersection() / .difference().

dorm.queryset.RawQuerySet

Bases: Generic[_T]

Executes a raw SQL query and hydrates the results as model instances. Columns returned by the query are mapped to field attnames; unknown columns are stored as plain attributes on the instance.

.. warning:: raw_sql is sent to the database verbatim. Never build it by string-interpolating user input — use placeholders (%s for PostgreSQL / SQLite, or $1 / $2 for the dorm builder, which this class adapts) and pass values via params::

   # SAFE
   Author.objects.raw("SELECT * FROM authors WHERE id = %s", [user_id])

   # UNSAFE — direct string concatenation defeats parameterisation
   Author.objects.raw(f"SELECT * FROM authors WHERE id = {user_id}")

For dynamic identifiers (table or column names that aren't fixed at coding time), validate them against an allowlist before splicing.