Async Dispatch Architecture¶
Validibot runs on three different deployment targets — tests, Docker
Compose, and GCP Cloud Run. Each has a different mechanism for
moving work out of a synchronous request: tests run inline, Docker
Compose uses Celery with a Redis broker, GCP uses Cloud Tasks
calling a worker-only HTTP endpoint. Rather than scattering
DEPLOYMENT_TARGET == "gcp" branches throughout the codebase, we
use a dispatcher abstraction: callers describe what they want
to happen, and a registry picks the right how for the current
deployment.
Two dispatcher hierarchies exist today:
| Domain | Package | Used for |
|---|---|---|
| Validation runs | validibot/core/tasks/dispatch/ |
Launching a validation workflow execution |
| Tracking events | validibot/tracking/dispatch/ |
Recording login/logout + app-event rows out of band |
They share no base class. Their payloads differ, and forcing a
common ancestor would mean polluting both with fields the other
doesn't need. What they do share is the pattern — same ABC shape,
same registry + lru_cache factory, same "dispatch() must not
raise for predictable failures" contract. Reading one teaches you
the other.
Why a dispatcher, not just if DEPLOYMENT_TARGET == ...¶
Before the tracking refactor, validibot.tracking.signals
unconditionally called log_tracking_event_task.delay(). On GCP
there is no Redis broker — Celery tried to connect to
localhost:6379, refused the connection, and the whole 2FA login
request returned 500. Two lessons came out of that:
- Transport choice is a deployment concern. Application code should describe the intent ("record this event") and leave the mechanism (Celery, Cloud Tasks, inline) to a layer that knows the target.
- The auth-path critical section must never die because a queue
is unavailable. The dispatcher contract enforces this: all
transport failures must return a response with
errorset, not raise.
Anatomy of a dispatcher package¶
Both packages have the same four-file shape:
<domain>/dispatch/
├── __init__.py re-exports the public names
├── base.py ABC + request/response dataclasses
├── registry.py factory: picks one dispatcher per DEPLOYMENT_TARGET
├── <target>.py one file per backend (cloud_tasks, celery, inline, …)
base.py — contract¶
*Requestdataclass — primitives only (PKs, not ORM instances). Task-queue serialisers require JSON, and resolving a stale ORM instance from the signal-thread transaction could race with the worker-side read.*DispatchResponsedataclass —task_id(full Cloud Tasks resource name / Celery UUID /Nonefor inline),is_sync, optionalerror.*DispatcherABC —dispatcher_name,is_sync,is_available(),dispatch(request).
The core rule, repeated in every base.py: dispatch() must not
raise for transient or predictable failures. Populate
response.error instead.
registry.py — deployment-target routing¶
@lru_cache(maxsize=1)
def get_tracking_dispatcher() -> TrackingDispatcher:
target = getattr(settings, "DEPLOYMENT_TARGET", "test")
...
lru_cache means the dispatcher is built once per process. Clear
with clear_tracking_dispatcher_cache() in tests that need to swap
the target. The validation-run registry works identically.
One backend file per target¶
- Inline / test — calls the service directly;
is_sync=True. Used by pytest and by a developer runningjust local upwithout Celery. - Celery — wraps
task.delay()with a broadtry/exceptthat catches broker failures and logs.is_sync=False. - Cloud Tasks — builds a
tasks_v2.Taskwith an OIDC token addressed to a worker-only HTTP endpoint.is_sync=False. The worker receives the task, verifies the OIDC token viaCloudTasksOIDCAuthentication(signature + audience + SA allowlist), and calls the same service method inline.
Worker-side endpoints¶
Every Cloud Tasks dispatcher has a matching worker endpoint. They
all sit behind WorkerOnlyAPIView, which:
- Returns 404 from any non-worker instance (
APP_IS_WORKER=False), so a public probe that guesses the URL learns nothing. - Delegates authentication to
get_worker_auth_classes(), which selects Cloud Run IAM + OIDC ongcpand the shared-secretWORKER_API_KEYheader ondocker_compose.
Current Cloud Tasks targets:
| URL | Dispatcher | View |
|---|---|---|
POST /api/v1/execute-validation-run/ |
GoogleCloudTasksDispatcher |
ExecuteValidationRunView |
POST /api/v1/tasks/tracking/log-event/ |
CloudTasksTrackingDispatcher |
LogTrackingEventView |
POST /api/v1/scheduled/* |
Cloud Scheduler (not a dispatcher, but same auth pattern) | SendPeriodicEmailsView, etc. |
Persisted tracking events are available to authorized operators at
/admin/tracking/trackingevent/. The admin is deliberately read-only:
events can be searched and filtered for dispatch or dashboard diagnostics, but
they cannot be added, changed, or deleted outside the tracking service.
URL path as module constant. The tracking dispatcher exports
WORKER_ENDPOINT_PATH = "/api/v1/tasks/tracking/log-event/" at
module level. The router imports the same constant. If the path
drifts, the build fails rather than one side silently posting to a
404.
transaction.on_commit is the caller's job¶
The dispatcher layer doesn't wrap calls in
transaction.on_commit — that's caller context. Signal receivers
and service methods that enqueue work inside a DB transaction call
transaction.on_commit(lambda: dispatcher.dispatch(req)) themselves.
This keeps dispatchers transport-only and leaves the transaction
question with whoever actually has the transaction.
See validibot/tracking/signals.py:_enqueue_tracking_event for the
canonical example, including the last-resort try/except around
the dispatcher call. The dispatcher contract promises no transient
exception escapes, but a genuine programming error (bad import,
attribute error) could still surface. Auth must not 500 because of
a tracking bug — the safety net logs with exc_info=True and
returns.
Adding a new backend¶
Concretely, adding AWS SQS support for the tracking domain would mean:
- New file:
validibot/tracking/dispatch/sqs.pywithSQSTrackingDispatcher(TrackingDispatcher). - New branch in
registry.pywhenDEPLOYMENT_TARGET == "aws". - New worker endpoint if the queue needs one (or a worker process polling the queue, in which case no endpoint).
- Tests in
validibot/tracking/tests/test_dispatch.pycovering registry selection, success, config-missing, client-error.
Signal receivers and service callers change nothing.
Extending to a new domain¶
If you have a new class of work that needs the same target-agnostic routing:
- Create
<domain>/dispatch/with the four-file shape. - Mirror the ABC / dataclasses / registry from one of the existing packages — they're deliberately parallel.
- Add a worker endpoint under
WorkerOnlyAPIViewif Cloud Tasks is in the target list. - Update this document's registry table.
Resist the temptation to share a base class across domains. The cost (coupling two request shapes) outweighs the benefit (one fewer ABC file).
Related docs¶
- Service Architecture — in-process service layer that dispatchers hand off to.
- Execution Backends — how validator containers actually run on each deployment target; sits one layer below the dispatcher.