Skip to content

QuerySet & Manager

dorm.queryset.QuerySet

Bases: Generic[_T]

Lazy, chainable query API compatible with Django's QuerySet. Supports both synchronous and asynchronous execution.

cache(timeout: int | None = None, *, using: str = 'default') -> QuerySet[_T]

Cache this queryset's full result set in the configured :mod:dorm.cache backend (typically Redis) for timeout seconds.

Cheap-mode: hot queries (configuration tables, feature flags, listing pages) avoid the round-trip when the cache is warm. The first invocation runs the query, serialises every column of every row, and SETs the bytes blob under a SHA-1 key derived from the model + final SQL + bound parameters. Subsequent calls within timeout seconds hydrate model instances from the cached bytes — no DB round-trip.

Auto-invalidation: every Model.save() / Model.delete() (and the matching async variants) fires post_save / post_delete. The signal handler in :mod:dorm.cache.invalidation deletes every cached queryset key for the affected model class, so writers never observe a stale cached read.

timeout=None falls back to the backend's configured TTL (default 300 s). timeout=0 caches indefinitely. using selects the cache alias in settings.CACHES; defaults to "default".

Returns a clone — chaining .cache() doesn't mutate the source queryset, matching every other QuerySet API.

distinct(*fields: str) -> QuerySet[_T]

Plain distinct() deduplicates whole rows. With *fields on PostgreSQL it emits SELECT DISTINCT ON (col1, col2) … — the canonical "first row per group" pattern.

Common use: pick the most recent order per customer::

(Order.objects
    .order_by("customer_id", "-created_at")
    .distinct("customer_id"))

DISTINCT ON is PostgreSQL-only. Calling with arguments against SQLite raises :class:NotImplementedError so the limitation surfaces at queryset-build time rather than at first execute.

Schedule batched relation loading for fields.

Each entry is either a plain string (the relation field name) or a :class:Prefetch describing a custom queryset and/or an alternate to_attr. The two forms can be mixed in a single call::

Author.objects.prefetch_related(
    "tags",                                    # plain
    Prefetch("books", queryset=published_qs),  # filtered
)

only(*fields: str) -> QuerySet[_T]

Restrict the SELECT projection to a subset of columns.

Bare names (only("name", "age")) restrict the parent model. Dotted paths (only("name", "publisher__country")) restrict a select_related-joined relation as well — the related-side projection drops every column not listed, keeping the JOIN narrow when you only want a label or two from a fat related table. The PK column of every restricted side is always implicitly included so hydrated instances keep their identity.

defer(*fields: str) -> QuerySet[_T]

Inverse of :meth:only: drop the named columns from the SELECT.

Same dotted-path semantics — defer("publisher__bio") keeps every other column of the related publisher row but excludes bio from the JOIN projection.

lookup(column: str | None = None) -> Any

Shortcut for __in=Subquery(qs.values("col")).

Turns a queryset into a scalar subquery that's ready to plug into an IN filter on the outer query::

top10 = Article.objects.order_by("-score")[:10]
comments = Comment.objects.filter(article_id__in=top10.lookup("pk"))

Without column, the wrapped queryset's existing SELECT clause is reused — pre-call .values(...) if it isn't already projecting a single column.

dates(field: str, kind: str, order: str = 'ASC') -> QuerySet

Return distinct truncated date values for field.

kind — one of "day", "week", "month", "year". Truncates each fetched value in Python and de-duplicates so a chronological archive list is one DB round-trip.

Returns a materialized :class:~dorm.QuerySet of date values (already cached; further chainable methods create a fresh query).

datetimes(field: str, kind: str, order: str = 'ASC') -> QuerySet

Distinct truncated datetimes for field.

kind"hour", "minute", "second" in addition to every kind :meth:dates accepts. Returns a materialized :class:~dorm.QuerySet of datetime values.

adates(field: str, kind: str, order: str = 'ASC') -> QuerySet async

Async counterpart of :meth:dates. Returns a materialized :class:~dorm.QuerySet (already cached).

adatetimes(field: str, kind: str, order: str = 'ASC') -> QuerySet async

Async counterpart of :meth:datetimes. Returns a materialized :class:~dorm.QuerySet (already cached).

alias(**kwargs: Any) -> QuerySet[_T]

Add named expressions usable in :meth:filter, :meth:exclude and :meth:order_by without including them in the SELECT list.

Same shape as :meth:annotate — pass name=expression pairs — but the expression is not projected into the result rows. Use this when you only need the value to build a predicate or sort and don't care about reading it back, so you skip the bandwidth and per-row hydration cost::

qs = (
    Author.objects
    .alias(book_count=Count("books"))
    .filter(book_count__gte=5)
)

alias() and :meth:annotate can be mixed freely; aliased names that you later promote to a SELECT can be re-declared via annotate(name=F("name")) (Django pattern).

select_for_update(*, skip_locked: bool = False, no_wait: bool = False, of: tuple[str, ...] | list[str] | None = None) -> QuerySet[_T]

Lock the rows returned by this query until the surrounding transaction ends. Must be called inside an :func:atomic / :func:aatomic block — otherwise PostgreSQL silently treats it as a no-op (the lock would be released immediately at autocommit).

Parameters:

Name Type Description Default
skip_locked bool

Skip rows already locked by other transactions instead of waiting. The canonical "task queue" pattern — each worker pops the next unlocked job.

False
no_wait bool

Raise immediately if any matched row is locked, instead of waiting. Useful for bailing fast on contention.

False
of tuple[str, ...] | list[str] | None

Tuple of relation names (typically table aliases from joins) to lock; defaults to all referenced tables. Use this with select_related to lock only the parent row without also locking joined parent tables.

None

skip_locked and no_wait are mutually exclusive — passing both raises ValueError. Both are PostgreSQL-only; on SQLite they raise NotImplementedError so the caller learns instead of silently getting a different lock model.

with_cte(**named_ctes: 'QuerySet[Any] | CTE') -> QuerySet[_T]

Attach one or more Common Table Expressions to this queryset.

Each name=value pair is emitted as WITH name AS (body) ahead of the main SELECT. value is either a queryset (compiled and bound with the outer query) or a :class:CTE carrying a raw SQL body — used when the body needs features the queryset builder doesn't model (recursive walks, LATERAL joins, GROUPING SETS).

Plain example::

recent = Order.objects.filter(created_at__gte=cutoff)
Customer.objects.with_cte(recent_orders=recent).filter(...)

Recursive example::

from dorm import CTE
qs = Category.objects.with_cte(
    tree=CTE(
        '''
        SELECT id, parent_id FROM categories WHERE parent_id IS NULL
        UNION ALL
        SELECT c.id, c.parent_id FROM categories c
        JOIN tree t ON c.parent_id = t.id
        ''',
        recursive=True,
    ),
).raw("SELECT * FROM tree")

CTE names are validated as SQL identifiers. The bodies see the same parameter dialect as the outer query — the outer placeholder rewrite covers everything in one pass.

cursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]'

Keyset (cursor) pagination — stable across writes and orders of magnitude faster than OFFSET for deep pages.

Parameters:

Name Type Description Default
after dict[str, Any] | None

an optional cursor dict from the previous page's :attr:CursorPage.next_cursor. None returns the first page.

None
order_by str

a single field name. Prefix with - for descending. Defaults to the model's primary key. The ordering must include something unique (typically the PK) — otherwise after can't reliably resume across ties.

'pk'
page_size int

number of rows to return.

50

Example::

page = Article.objects.cursor_paginate(
    order_by="-created_at", page_size=20
)
# Send page.items + page.next_cursor to the client.
# On the next request:
page = Article.objects.cursor_paginate(
    order_by="-created_at", page_size=20, after=cursor
)

Implementation: emits WHERE col > :v (asc) / col < :v (desc) and LIMIT page_size. next_cursor is the last row's order_by field value, or None when the page wasn't full (no more rows).

acursor_paginate(*, after: dict[str, Any] | None = None, order_by: str = 'pk', page_size: int = 50) -> 'CursorPage[_T]' async

Async counterpart of :meth:cursor_paginate.

__await__()

Materialize the queryset asynchronously: rows = await qs. Lets users compose chainable methods (values(), filter(), order_by()...) and consume the result with a single await, instead of having to call a terminal avalues()/alist().

Honours .cache(timeout=…) — cached results are returned without an async iteration; on a miss the rows are stored for the next call.

alist() -> list[_T] async

Materialize the queryset asynchronously and return a list.

Explicit alternative to await qs. Useful when a linter discourages await on a non-async method's return value or when the intent to materialise should be obvious::

rows = await Author.objects.filter(age__gte=18).alist()

explain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str

Return the database's query plan for this queryset.

On PostgreSQL, analyze=True runs the query and includes actual timing / row counts (ANALYZE TRUE, BUFFERS TRUE). format selects the EXPLAIN output dialect — "text" (default), "json", "yaml", "xml". verbose=True adds the VERBOSE flag, surfacing fully-qualified relation names + output column lists. On SQLite, EXPLAIN QUERY PLAN is used; analyze / format / verbose are ignored (SQLite has no equivalents).

Useful for diagnosing slow production queries::

slow_qs = Author.objects.filter(age__gte=18).select_related("publisher")
print(slow_qs.explain(analyze=True))
print(slow_qs.explain(format="json"))     # for tooling

aexplain(*, analyze: bool = False, format: str | None = None, verbose: bool = False) -> str async

Async counterpart of :meth:explain.

iterator(chunk_size: int | None = None) -> Iterator[_T]

Stream results one by one without populating the result cache.

When chunk_size is given, uses a server-side cursor on PG (so the entire result set never lands in client memory) and cursor.arraysize on SQLite. Without chunk_size, the previous all-rows-then-iterate path is preserved for back-compat.

aiterator(chunk_size: int | None = None) -> AsyncIterator[_T] async

Async stream results one by one without populating the result cache.

Same chunk_size semantics as :meth:iterator.

update_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool]

Look up an object; update it if found, otherwise create it.

defaults applies on the update branch when an existing row matches kwargs. create_defaults applies on the create branch when no row matches. Without create_defaults (or with it None), Django parity is preserved: defaults applies on both branches, mirroring the pre-3.3 behaviour.

create_defaults (added in Django 5.0) lets callers supply a different value set for new rows than for updates — common when a column has a server-managed default that should NOT be overwritten on update (a created_at timestamp, for example).

delete_batched(batch_size: int = 1000) -> int

Delete every row matching the queryset in chunks of batch_size, walking the table via keyset pagination on the primary key. Each chunk runs in its own :func:dorm.transaction.atomic block so a long-running purge doesn't hold a single transaction (and the associated row locks / WAL) for the duration of the operation.

Each batch dispatches through :meth:delete, so reverse-FK descriptors (on_delete=CASCADE / SET_NULL / SET_DEFAULT / PROTECT) fire as usual per batch. On tables with deep cascade fan-out the batched call can amplify the work; trim the cascade tree first (delete_batched on the child tables) when that matters.

Returns the total number of rows deleted in the parent table. The per-model breakdown that :meth:delete exposes is collapsed to a single integer — call :meth:delete directly when you need the detailed accounting.

update_batched(batch_size: int = 1000, **kwargs: Any) -> int

Apply UPDATE … SET ... in chunks of batch_size, walking the queryset by primary key. Same trade-off as :meth:delete_batched: shorter locks / smaller WAL chunks in exchange for forfeiting the single-statement atomicity guarantee. Each batch is its own :func:dorm.transaction.atomic.

bulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T]

Insert objs in batches of batch_size.

With ignore_conflicts=True, duplicate-key conflicts are silently skipped (ON CONFLICT DO NOTHING). With update_conflicts=True, the conflicting row is updated — the canonical "upsert" pattern. unique_fields is required when updating so the conflict target is unambiguous; if update_fields is omitted, every non-PK / non-unique column is updated, which is almost always what you want for an idempotent sync from an external source.

returning=[<field>, …] asks the database to send back the listed columns for each newly-inserted row (PG and SQLite ≥ 3.35 — RETURNING clause). The values are written back onto the corresponding objects in objs, useful when the column carries a DB-side default (DEFAULT now()), is a GeneratedField, or is otherwise computed server-side. MySQL has no RETURNING on INSERT — the call raises :class:NotImplementedError. Cannot be combined with ignore_conflicts / update_conflicts because the row alignment between objs and the rows the DB actually wrote is no longer 1:1.

Note: when conflicts are skipped or updated, returned PKs may be None for affected rows (the database doesn't report which rows actually wrote new data). Re-fetch by unique_fields if you need the full set of PKs back.

bulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int

Update fields on objs with a single UPDATE ... SET col = CASE pk WHEN ... statement per batch (one round-trip per batch_size objects, instead of one per object).

Raises :class:ValueError if fields is empty — without columns to set, the generated SQL would be malformed (UPDATE … WHERE … with no SET clause), so we fail fast at the Python boundary instead of at the database parser.

upsert(objs: list[_T], unique_fields: list[str], *, update_fields: list[str] | None = None, batch_size: int = 1000, returning: bool = False) -> list[_T]

Cross-vendor UPSERT (INSERT-or-UPDATE).

Sugar over :meth:bulk_create(update_conflicts=True) that smooths the per-backend differences:

  • PostgreSQL / SQLite emit INSERT ... ON CONFLICT (uniques) DO UPDATE SET ....
  • MySQL / MariaDB emit INSERT ... ON DUPLICATE KEY UPDATE (unique_fields is implied by the table's unique indexes).
  • libsql / DuckDB inherit the SQLite path.

Parameters:

Name Type Description Default
objs list[_T]

instances to upsert.

required
unique_fields list[str]

list of field names that determine row identity. Required — picking the conflict target wrong silently overwrites unrelated rows.

required
update_fields list[str] | None

optional column list to refresh on conflict. When None, every non-unique non-pk field is updated.

None
batch_size int

rows per INSERT statement; keeps per-statement parameter count bounded under PG / SQLite limits.

1000
returning bool

when True, returns the inserted-or-updated rows refreshed from the database (PG / SQLite only).

False

Returns the list of upserted instances (possibly empty when backends without RETURNING are used and returning=False).

aupsert(objs: list[_T], unique_fields: list[str], *, update_fields: list[str] | None = None, batch_size: int = 1000, returning: bool = False) -> list[_T] async

Async counterpart of :meth:upsert.

bulk_update_when(cases: list[tuple[Any, dict[str, Any]]], *, default: dict[str, Any] | None = None) -> int

Apply per-condition UPDATE ... SET col = CASE WHEN <cond> THEN <val> ... END against the queryset's current filter.

Parameters:

Name Type Description Default
cases list[tuple[Any, dict[str, Any]]]

list of (condition, values) tuples. The condition is a :class:~dorm.Q instance (or a dict interpreted as a Q kwargs payload); values is a {field_name: value} mapping with the assignments that fire when the condition matches.

required
default dict[str, Any] | None

optional {field_name: value} mapping used as the ELSE branch of every generated CASE. A field present in cases but absent from default keeps its current value (ELSE <col>) — matching the Django Case(default=F(col)) idiom.

None

Returns the number of rows touched (one row may match more than one condition; only the first wins, per SQL CASE semantics).

Example::

Article.objects.filter(published=True).bulk_update_when(
    [
        (Q(score__gte=90), {"label": "A", "featured": True}),
        (Q(score__gte=70), {"label": "B"}),
    ],
    default={"label": "C", "featured": False},
)

Compiles into a single UPDATE statement carrying one CASE per touched column — one round-trip total, regardless of how many fields appear in the cases.

aupdate_or_create(defaults: dict[str, Any] | None = None, create_defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool] async

Async counterpart of :meth:update_or_create. Same defaults / create_defaults semantics.

avalues(*fields: str) -> QuerySet[Any]

Async variant of :meth:values. Returns a lazily-evaluated :class:~dorm.QuerySet that can be iterated with async for or materialized with await.

avalues_list(*fields: str, flat: bool = False, named: bool = False) -> ValuesListQuerySet

Async variant of :meth:values_list. Returns a lazily-evaluated :class:~dorm.ValuesListQuerySet that can be iterated with async for or materialized with await.

abulk_create(objs: list[_T], batch_size: int = 1000, *, ignore_conflicts: bool = False, update_conflicts: bool = False, update_fields: list[str] | None = None, unique_fields: list[str] | None = None, returning: list[str] | None = None) -> list[_T] async

Async counterpart of :meth:bulk_create. See the sync version for ignore_conflicts / update_conflicts / returning semantics.

abulk_update_when(cases: list[tuple[Any, dict[str, Any]]], *, default: dict[str, Any] | None = None) -> int async

Async counterpart of :meth:bulk_update_when.

abulk_update(objs: list[_T], fields: list[str], batch_size: int = 1000) -> int async

Async version of :meth:bulk_update. Same single-query batching strategy: one UPDATE statement per batch of batch_size objects.

dorm.manager.BaseManager

Bases: Generic[_T]

using(alias: str) -> QuerySet[_T]

Manager-level shortcut for Manager.all().using(alias).

Mirrors Django's pattern where Model.objects.using("replica") returns a queryset bound to alias in one call. Equivalent to Manager.get_queryset().using(alias).

cache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T

Read a single row by primary key, going through the cache layer first.

Calling cache_get lazy-attaches the queryset-cache invalidation handlers (post_save / post_deletebump_model_cache_version) — without this step, a subsequent Model.save() would NOT bump the per-model version and the row cache would return the stale snapshot.

Lookup flow:

  1. Build a cache key namespaced by the model's app/name and the per-model invalidation version. A racing Model.save() bumps the version, so the entry written by an in-flight reader points at a key no later read will ask for — same anti-stale-read invariant the queryset cache uses.
  2. get → unpickle → return when present (HMAC verified inside :func:dorm.cache.verify_payload; tampered or unsigned blobs are dropped silently and treated as a miss).
  3. On miss, fall through to Manager.get(pk=…), write the pickle of the result back into the cache, return.

Cache miss is silent — the database read is the source of truth. Cache outages also fall through; the queryset layer's try / except policy applies here too.

cache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T]

Fetch many rows by primary key, going through the cache for each. Misses are batched into a single IN (...) query instead of N round-trips, then written back to the cache.

Returns a {pk: instance} dict — pks not found in the DB are simply absent.

acache_get(*, pk: Any, timeout: int | None = None, using: str = 'default') -> _T async

Async counterpart of :meth:cache_get.

acache_get_many(*, pks: list[Any], timeout: int | None = None, using: str = 'default') -> dict[Any, _T] async

Async counterpart of :meth:cache_get_many.

delete_batched(batch_size: int = 1000) -> int

Manager-level shortcut to :meth:QuerySet.delete_batched. Delete every row of the underlying table in keyset-paginated chunks.

update_batched(batch_size: int = 1000, **kwargs: Any) -> int

Manager-level shortcut to :meth:QuerySet.update_batched. Apply the update across the whole table in keyset-paginated chunks.

exists_or_create(defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[bool, _T]

Return (exists, instance).

Skip the SELECT round-trip from :meth:get_or_create when callers only care whether a row with the given lookup exists — uses :meth:exists + a single :meth:create follow-up on miss.

create_or_update(defaults: dict[str, Any] | None = None, **kwargs: Any) -> tuple[_T, bool]

Alias of :meth:update_or_create with the (instance, created) return swapped to (created, instance) ordering favoured by some style guides. Identical semantics.

union_with(*others: Any, all: bool = False, order_by: list[str] | None = None) -> Any

Build a polymorphic UNION across this manager and others.

Each entry in others may be:

  • another :class:~dorm.Manager — its default queryset is used.
  • a :class:~dorm.QuerySet — used as-is.
  • a (model_or_queryset, {"column": "expr_or_alias", ...}) tuple — the mapping renames / projects columns so the per-branch SELECT lists agree (UNION's only requirement).

all=True emits UNION ALL (keep duplicates, no dedupe cost). order_by is appended on the outer query.

Returns a :class:~dorm.queryset.CombinedQuerySet whose iteration yields plain dict rows — the original model class identity is lost during UNION, so callers typically apply a type_marker annotation per branch to tell the rows apart.

cached(*, timeout: int | None = None, using: str = 'default')

Sugar for self.get_queryset().cache(timeout=...).

Returns a fresh queryset whose first materialisation populates the cache and whose subsequent materialisations re-use it. Drop-in for Model.objects.cached(timeout=60) chains that would otherwise read Model.objects.all().cache(timeout=60).

from_queryset(queryset_class: type, class_name: str | None = None) -> type['BaseManager[_T]'] classmethod

Build a Manager subclass that proxies methods of queryset_class.

The canonical Django pattern for adding query-language methods to a manager. Usage::

class PublishedQuerySet(dorm.QuerySet):
    def published(self):
        return self.filter(is_active=True)

    def recent(self, days=30):
        cutoff = ...
        return self.filter(created_at__gte=cutoff)

class Author(dorm.Model):
    ...
    objects = dorm.Manager.from_queryset(PublishedQuerySet)()

# Now both work end-to-end:
Author.objects.published().recent()
Author.objects.filter(...).published()  # via get_queryset

Mechanics: the generated subclass overrides get_queryset to instantiate queryset_class, and reflects every public method on queryset_class (anything not starting with _) as a manager-level passthrough that calls self.get_queryset().method(...). That mirrors how the default Manager already proxies the built-in QuerySet API.

class_name customises the generated class's __name__ for nicer reprs in tracebacks; defaults to f"{cls.__name__}From{queryset_class.__name__}".

dorm.manager.Manager

Bases: BaseManager[_T]

dorm.queryset.CombinedQuerySet

Bases: QuerySet[_T]

Produced by .union() / .intersection() / .difference().

dorm.queryset.RawQuerySet

Bases: Generic[_T]

Executes a raw SQL query and hydrates the results as model instances. Columns returned by the query are mapped to field attnames; unknown columns are stored as plain attributes on the instance.

.. warning:: raw_sql is sent to the database verbatim. Never build it by string-interpolating user input — use placeholders (%s for PostgreSQL / SQLite, or $1 / $2 for the dorm builder, which this class adapts) and pass values via params::

   # SAFE
   Author.objects.raw("SELECT * FROM authors WHERE id = %s", [user_id])

   # UNSAFE — direct string concatenation defeats parameterisation
   Author.objects.raw(f"SELECT * FROM authors WHERE id = {user_id}")

For dynamic identifiers (table or column names that aren't fixed at coding time), validate them against an allowlist before splicing.