Async Dispatch Architecture¶
Validibot runs on three different deployment targets — tests, Docker
Compose, and GCP Cloud Run. Each has a different mechanism for
moving work out of a synchronous request: tests run inline, Docker
Compose uses Celery with a Redis broker, GCP uses Cloud Tasks
calling a worker-only HTTP endpoint. Rather than scattering
DEPLOYMENT_TARGET == "gcp" branches throughout the codebase, we
use a dispatcher abstraction: callers describe what they want
to happen, and a registry picks the right how for the current
deployment.
Two dispatcher hierarchies exist today:
| Domain | Package | Used for |
|---|---|---|
| Validation runs | validibot/core/tasks/dispatch/ |
Launching a validation workflow execution |
| Tracking events | validibot/tracking/dispatch/ |
Recording login/logout + app-event rows out of band |
They share no base class. Their payloads differ, and forcing a
common ancestor would mean polluting both with fields the other
doesn't need. What they do share is the pattern — same ABC shape,
same registry + lru_cache factory, same "dispatch() must not
raise for predictable failures" contract. Reading one teaches you
the other.
Why a dispatcher, not just if DEPLOYMENT_TARGET == ...¶
Before the tracking refactor, validibot.tracking.signals
unconditionally called log_tracking_event_task.delay(). On GCP
there is no Redis broker — Celery tried to connect to
localhost:6379, refused the connection, and the whole 2FA login
request returned 500. Two lessons came out of that:
- Transport choice is a deployment concern. Application code should describe the intent ("record this event") and leave the mechanism (Celery, Cloud Tasks, inline) to a layer that knows the target.
- The auth-path critical section must never die because a queue
is unavailable. The dispatcher contract enforces this: all
transport failures must return a response with
errorset, not raise.
Anatomy of a dispatcher package¶
Both packages have the same four-file shape:
<domain>/dispatch/
├── __init__.py re-exports the public names
├── base.py ABC + request/response dataclasses
├── registry.py factory: picks one dispatcher per DEPLOYMENT_TARGET
├── <target>.py one file per backend (cloud_tasks, celery, inline, …)
base.py — contract¶
*Requestdataclass — primitives only (PKs, not ORM instances). Task-queue serialisers require JSON, and resolving a stale ORM instance from the signal-thread transaction could race with the worker-side read.*DispatchResponsedataclass —task_id(full Cloud Tasks resource name / Celery UUID /Nonefor inline),is_sync, optionalerror.*DispatcherABC —dispatcher_name,is_sync,is_available(),dispatch(request).
The core rule, repeated in every base.py: dispatch() must not
raise for transient or predictable failures. Populate
response.error instead.
registry.py — deployment-target routing¶
@lru_cache(maxsize=1)
def get_tracking_dispatcher() -> TrackingDispatcher:
target = getattr(settings, "DEPLOYMENT_TARGET", "test")
...
lru_cache means the dispatcher is built once per process. Clear
with clear_tracking_dispatcher_cache() in tests that need to swap
the target. The validation-run registry works identically.
One backend file per target¶
- Inline / test — calls the service directly;
is_sync=True. Used by pytest and by a developer runningjust local upwithout Celery. - Celery — wraps
task.delay()with a broadtry/exceptthat catches broker failures and logs.is_sync=False. - Cloud Tasks — builds a
tasks_v2.Taskwith an OIDC token addressed to a worker-only HTTP endpoint.is_sync=False. The worker receives the task, verifies the OIDC token viaCloudTasksOIDCAuthentication(signature + audience + SA allowlist), and calls the same service method inline.
Worker-side endpoints¶
Every Cloud Tasks dispatcher has a matching worker endpoint. They
all sit behind WorkerOnlyAPIView, which:
- Returns 404 from any non-worker instance (
APP_IS_WORKER=False), so a public probe that guesses the URL learns nothing. - Delegates authentication to
get_worker_auth_classes(), which selects Cloud Run IAM + OIDC ongcpand the shared-secretWORKER_API_KEYheader ondocker_compose.
Current Cloud Tasks targets:
| URL | Dispatcher | View |
|---|---|---|
POST /api/v1/execute-validation-run/ |
GoogleCloudTasksDispatcher |
ExecuteValidationRunView |
POST /api/v1/tasks/tracking/log-event/ |
CloudTasksTrackingDispatcher |
LogTrackingEventView |
POST /api/v1/scheduled/* |
Cloud Scheduler (not a dispatcher, but same auth pattern) | SendPeriodicEmailsView, etc. |
URL path as module constant. The tracking dispatcher exports
WORKER_ENDPOINT_PATH = "/api/v1/tasks/tracking/log-event/" at
module level. The router imports the same constant. If the path
drifts, the build fails rather than one side silently posting to a
404.
transaction.on_commit is the caller's job¶
The dispatcher layer doesn't wrap calls in
transaction.on_commit — that's caller context. Signal receivers
and service methods that enqueue work inside a DB transaction call
transaction.on_commit(lambda: dispatcher.dispatch(req)) themselves.
This keeps dispatchers transport-only and leaves the transaction
question with whoever actually has the transaction.
See validibot/tracking/signals.py:_enqueue_tracking_event for the
canonical example, including the last-resort try/except around
the dispatcher call. The dispatcher contract promises no transient
exception escapes, but a genuine programming error (bad import,
attribute error) could still surface. Auth must not 500 because of
a tracking bug — the safety net logs with exc_info=True and
returns.
Adding a new backend¶
Concretely, adding AWS SQS support for the tracking domain would mean:
- New file:
validibot/tracking/dispatch/sqs.pywithSQSTrackingDispatcher(TrackingDispatcher). - New branch in
registry.pywhenDEPLOYMENT_TARGET == "aws". - New worker endpoint if the queue needs one (or a worker process polling the queue, in which case no endpoint).
- Tests in
validibot/tracking/tests/test_dispatch.pycovering registry selection, success, config-missing, client-error.
Signal receivers and service callers change nothing.
Extending to a new domain¶
If you have a new class of work that needs the same target-agnostic routing:
- Create
<domain>/dispatch/with the four-file shape. - Mirror the ABC / dataclasses / registry from one of the existing packages — they're deliberately parallel.
- Add a worker endpoint under
WorkerOnlyAPIViewif Cloud Tasks is in the target list. - Update this document's registry table.
Resist the temptation to share a base class across domains. The cost (coupling two request shapes) outweighs the benefit (one fewer ABC file).
Related docs¶
- Service Architecture — in-process service layer that dispatchers hand off to.
- Execution Backends — how validator containers actually run on each deployment target; sits one layer below the dispatcher.