Skip to content

Async Dispatch Architecture

Validibot runs on three different deployment targets — tests, Docker Compose, and GCP Cloud Run. Each has a different mechanism for moving work out of a synchronous request: tests run inline, Docker Compose uses Celery with a Redis broker, GCP uses Cloud Tasks calling a worker-only HTTP endpoint. Rather than scattering DEPLOYMENT_TARGET == "gcp" branches throughout the codebase, we use a dispatcher abstraction: callers describe what they want to happen, and a registry picks the right how for the current deployment.

Two dispatcher hierarchies exist today:

Domain Package Used for
Validation runs validibot/core/tasks/dispatch/ Launching a validation workflow execution
Tracking events validibot/tracking/dispatch/ Recording login/logout + app-event rows out of band

They share no base class. Their payloads differ, and forcing a common ancestor would mean polluting both with fields the other doesn't need. What they do share is the pattern — same ABC shape, same registry + lru_cache factory, same "dispatch() must not raise for predictable failures" contract. Reading one teaches you the other.

Why a dispatcher, not just if DEPLOYMENT_TARGET == ...

Before the tracking refactor, validibot.tracking.signals unconditionally called log_tracking_event_task.delay(). On GCP there is no Redis broker — Celery tried to connect to localhost:6379, refused the connection, and the whole 2FA login request returned 500. Two lessons came out of that:

  1. Transport choice is a deployment concern. Application code should describe the intent ("record this event") and leave the mechanism (Celery, Cloud Tasks, inline) to a layer that knows the target.
  2. The auth-path critical section must never die because a queue is unavailable. The dispatcher contract enforces this: all transport failures must return a response with error set, not raise.

Anatomy of a dispatcher package

Both packages have the same four-file shape:

<domain>/dispatch/
├── __init__.py           re-exports the public names
├── base.py               ABC + request/response dataclasses
├── registry.py           factory: picks one dispatcher per DEPLOYMENT_TARGET
├── <target>.py           one file per backend (cloud_tasks, celery, inline, …)

base.py — contract

  • *Request dataclass — primitives only (PKs, not ORM instances). Task-queue serialisers require JSON, and resolving a stale ORM instance from the signal-thread transaction could race with the worker-side read.
  • *DispatchResponse dataclass — task_id (full Cloud Tasks resource name / Celery UUID / None for inline), is_sync, optional error.
  • *Dispatcher ABC — dispatcher_name, is_sync, is_available(), dispatch(request).

The core rule, repeated in every base.py: dispatch() must not raise for transient or predictable failures. Populate response.error instead.

registry.py — deployment-target routing

@lru_cache(maxsize=1)
def get_tracking_dispatcher() -> TrackingDispatcher:
    target = getattr(settings, "DEPLOYMENT_TARGET", "test")
    ...

lru_cache means the dispatcher is built once per process. Clear with clear_tracking_dispatcher_cache() in tests that need to swap the target. The validation-run registry works identically.

One backend file per target

  • Inline / test — calls the service directly; is_sync=True. Used by pytest and by a developer running just local up without Celery.
  • Celery — wraps task.delay() with a broad try/except that catches broker failures and logs. is_sync=False.
  • Cloud Tasks — builds a tasks_v2.Task with an OIDC token addressed to a worker-only HTTP endpoint. is_sync=False. The worker receives the task, verifies the OIDC token via CloudTasksOIDCAuthentication (signature + audience + SA allowlist), and calls the same service method inline.

Worker-side endpoints

Every Cloud Tasks dispatcher has a matching worker endpoint. They all sit behind WorkerOnlyAPIView, which:

  1. Returns 404 from any non-worker instance (APP_IS_WORKER=False), so a public probe that guesses the URL learns nothing.
  2. Delegates authentication to get_worker_auth_classes(), which selects Cloud Run IAM + OIDC on gcp and the shared-secret WORKER_API_KEY header on docker_compose.

Current Cloud Tasks targets:

URL Dispatcher View
POST /api/v1/execute-validation-run/ GoogleCloudTasksDispatcher ExecuteValidationRunView
POST /api/v1/tasks/tracking/log-event/ CloudTasksTrackingDispatcher LogTrackingEventView
POST /api/v1/scheduled/* Cloud Scheduler (not a dispatcher, but same auth pattern) SendPeriodicEmailsView, etc.

Persisted tracking events are available to authorized operators at /admin/tracking/trackingevent/. The admin is deliberately read-only: events can be searched and filtered for dispatch or dashboard diagnostics, but they cannot be added, changed, or deleted outside the tracking service.

URL path as module constant. The tracking dispatcher exports WORKER_ENDPOINT_PATH = "/api/v1/tasks/tracking/log-event/" at module level. The router imports the same constant. If the path drifts, the build fails rather than one side silently posting to a 404.

transaction.on_commit is the caller's job

The dispatcher layer doesn't wrap calls in transaction.on_commit — that's caller context. Signal receivers and service methods that enqueue work inside a DB transaction call transaction.on_commit(lambda: dispatcher.dispatch(req)) themselves. This keeps dispatchers transport-only and leaves the transaction question with whoever actually has the transaction.

See validibot/tracking/signals.py:_enqueue_tracking_event for the canonical example, including the last-resort try/except around the dispatcher call. The dispatcher contract promises no transient exception escapes, but a genuine programming error (bad import, attribute error) could still surface. Auth must not 500 because of a tracking bug — the safety net logs with exc_info=True and returns.

Adding a new backend

Concretely, adding AWS SQS support for the tracking domain would mean:

  1. New file: validibot/tracking/dispatch/sqs.py with SQSTrackingDispatcher(TrackingDispatcher).
  2. New branch in registry.py when DEPLOYMENT_TARGET == "aws".
  3. New worker endpoint if the queue needs one (or a worker process polling the queue, in which case no endpoint).
  4. Tests in validibot/tracking/tests/test_dispatch.py covering registry selection, success, config-missing, client-error.

Signal receivers and service callers change nothing.

Extending to a new domain

If you have a new class of work that needs the same target-agnostic routing:

  1. Create <domain>/dispatch/ with the four-file shape.
  2. Mirror the ABC / dataclasses / registry from one of the existing packages — they're deliberately parallel.
  3. Add a worker endpoint under WorkerOnlyAPIView if Cloud Tasks is in the target list.
  4. Update this document's registry table.

Resist the temptation to share a base class across domains. The cost (coupling two request shapes) outweighs the benefit (one fewer ABC file).

  • Service Architecture — in-process service layer that dispatchers hand off to.
  • Execution Backends — how validator containers actually run on each deployment target; sits one layer below the dispatcher.