Referencia de la API del Cliente¶
TaskQueue¶
Cliente síncrono para definir y encolar tareas.
__init__(server_url, queue_name, timeout, api_key, default_ttl_seconds)¶
| Parámetro | Tipo | Por defecto | Descripción |
|---|---|---|---|
server_url |
str |
"http://127.0.0.1:8001" |
URL del servidor lapinq |
queue_name |
str |
"default" |
Nombre de cola por defecto |
timeout |
float |
30.0 |
Timeout de petición HTTP en segundos |
api_key |
str \| None |
None |
API key para cabecera X-API-Key |
default_ttl_seconds |
float \| None |
None |
TTL por defecto para tareas sin ttl_seconds explícito |
task(name=None, queue_name=None, scheduled_at=None, max_retries=None, priority=0, ttl_seconds=None, metadata=None, retry_delay=None, retry_backoff=None, webhook_url=None)¶
Decorador que registra una función como tarea. Se puede usar con o sin paréntesis.
| Parámetro | Tipo | Por defecto | Descripción |
|---|---|---|---|
name |
str \| None |
Nombre de la función | Nombre explícito de la tarea |
queue_name |
str \| None |
De __init__ |
Cola donde encolar |
scheduled_at |
str \| None |
None |
Datetime ISO 8601 para ejecución retardada |
max_retries |
int \| None |
3 |
Máx. reintentos al fallar |
priority |
int |
0 |
Mayor = se ejecuta primero |
ttl_seconds |
int \| None |
None |
Autoeliminar tarea tras N segundos; 0 = no persistir |
metadata |
dict \| None |
None |
Pares clave-valor arbitrarios almacenados como JSONB |
retry_delay |
float \| None |
None |
Espera fija entre reintentos (segundos) |
retry_backoff |
bool \| None |
None |
Backoff exponencial (por defecto true) |
webhook_url |
str \| None |
None |
URL llamada vía POST al completar/fallar |
La función decorada:
- Sigue siendo invocable como la función original
- Obtiene un método .queue() que devuelve un objeto TaskRef
- Obtiene un atributo .task_name
batch_enqueue(tasks)¶
Encola múltiples tareas en una sola petición.
| Parámetro | Tipo | Descripción |
|---|---|---|
tasks |
list[dict] |
Lista de payloads de tareas |
Devuelve httpx.Response.
close()¶
Cierra la conexión HTTP subyacente.
AsyncTaskQueue¶
Cliente asíncrono. Misma interfaz que TaskQueue pero usa httpx.AsyncClient.
__init__(server_url, queue_name, timeout, api_key, default_ttl_seconds)¶
Mismos parámetros que TaskQueue.
task(name=None, queue_name=None, scheduled_at=None, max_retries=None, priority=0, ttl_seconds=None, metadata=None, retry_delay=None, retry_backoff=None, webhook_url=None)¶
Mismos parámetros que TaskQueue. Añade una corrutina .aqueue() que devuelve un objeto TaskRef.
async batch_enqueue(tasks)¶
Versión asíncrona de batch_enqueue. Devuelve httpx.Response.
async close()¶
Cierra la conexión HTTP asíncrona subyacente.
TaskRef¶
Devuelto por .queue() y .aqueue(). Envuelve la respuesta HTTP del servidor.
| Atributo / Método | Descripción |
|---|---|
.task_id |
UUID de la tarea (o None para tareas con TTL cero) |
.json() |
Cuerpo de la respuesta como dict |
.status_code |
Código de estado HTTP |
.wait(timeout=30) |
Sondea hasta que la tarea se complete o expire el timeout (síncrono) |
await .awaitait(timeout=30) |
Sondea hasta que la tarea se complete o expire el timeout (asíncrono) |
Ejemplos¶
from lapinq import TaskQueue, AsyncTaskQueue
# Cliente síncrono
tasks = TaskQueue(server_url="http://localhost:8001")
@tasks.task(name="tarea_sync")
def tarea_sync(x: int):
return x * 2
ref = tarea_sync.queue(x=42)
print(ref.task_id) # "uuid-here"
print(ref.wait(timeout=30)) # {"status": "completed", "result": "84", ...}
# Cliente asíncrono
async_tasks = AsyncTaskQueue(server_url="http://localhost:8001")
@async_tasks.task(name="tarea_async")
async def tarea_async(x: int):
return x * 2
ref = await tarea_async.aqueue(x=42)
print(ref.task_id) # "uuid-here"
result = await ref.awaitait(timeout=30)
print(result["status"]) # "completed"
await async_tasks.close()