Skip to content

Workflow Engine Architecture

This document describes the internal architecture of the ValidationRunService, which orchestrates the execution of validation workflows.

Overview

The Workflow Engine is responsible for:

  1. Iterating through WorkflowSteps defined in a Workflow
  2. Executing each step against a Submission
  3. Recording the results (ValidationFinding, status updates)
  4. Handling workflow-level status transitions

Architecture: Two-Layer Dispatch

The workflow engine uses a two-layer dispatch pattern:

  1. ValidationRunService - Orchestrates the workflow loop and delegates individual steps
  2. Processors/Handlers - Execute individual steps based on their type
ValidationRunService.execute_workflow_steps()
        ├── For validator steps:
        │       │
        │       └── ValidationStepProcessor
        │               ├── SimpleValidationProcessor (JSON, XML, Basic, AI)
        │               └── AdvancedValidationProcessor (EnergyPlus, FMU)
        └── For action steps:
                └── StepHandler
                        ├── SlackMessageActionHandler
                        ├── SignedCredentialActionHandler
                        └── ...

Validator Step Execution: The Processor Pattern

Validator steps are executed through the ValidationStepProcessor abstraction. This provides a clean separation between:

  • Workflow orchestration (ValidationRunService) - loops, aggregation, status management
  • Step lifecycle (Processors) - call validator, persist findings, handle errors
  • Validation logic (Validators) - schema checking, AI prompts, assertions

How Validator Steps Execute

# Inside StepOrchestrator.execute_workflow_steps()
for step in workflow_steps:
    step_run = self._start_step_run(validation_run, step)

    if step.validator:
        # Use processors for validator steps
        result: StepProcessingResult = self._execute_validator_step(
            validation_run=validation_run,
            step_run=step_run,
        )
    else:
        # Use existing handler flow for action steps
        validation_result = self.execute_workflow_step(step=step, ...)

The _execute_validator_step() method delegates to the appropriate processor and returns a typed StepProcessingResult:

def _execute_validator_step(self, validation_run, step_run) -> StepProcessingResult:
    from validibot.validations.services.step_processor import get_step_processor

    processor = get_step_processor(validation_run, step_run)
    return processor.execute()

Processor Types

Processor Validator Types Execution Mode
SimpleValidationProcessor Basic, JSON Schema, XML Schema, AI Synchronous, inline
AdvancedValidationProcessor EnergyPlus, FMU, custom Sync (Docker) or Async (Cloud Run)

For detailed documentation on processors, see Validation Step Processor Architecture.

Action Step Execution: The Handler Pattern

Action steps (non-validation operations) use the StepHandler protocol for extensibility.

Core Components

1. Protocol (StepHandler)

All execution logic must implement the StepHandler protocol defined in validibot/actions/protocols.py:

class StepHandler(Protocol):
    def execute(self, run_context: RunContext) -> StepResult:
        ...
  • RunContext: Contains the ValidationRun, WorkflowStep, and shared signals.
  • StepResult: Standardized output indicating pass/fail, issues, and statistics.

2. Dispatcher (ValidationRunService)

For action steps, the service: - Resolves the appropriate implementation (Action subclass) - Looks up the registered StepHandler - Invokes handler.execute(context)

Available Handlers

Handler Purpose
SlackMessageActionHandler Sends Slack notifications
SignedCredentialActionHandler Generates and attaches credentials

Async Validator Completion: Callbacks

When advanced validators run on async backends (like GCP Cloud Run), execution follows a two-phase pattern:

Phase 1: Launch (ValidationRunService)

  1. Processor calls engine.validate(), which launches container
  2. Container job starts running on Cloud Run
  3. Processor returns StepProcessingResult(passed=None)
  4. Run stays in RUNNING status, waiting for callback

Phase 2: Complete (ValidationCallbackService)

  1. Container completes and POSTs callback to /api/internal/callbacks/validation/
  2. ValidationCallbackService downloads output envelope from cloud storage
  3. Creates processor and calls processor.complete_from_callback(output_envelope)
  4. Processor finalizes step and either:
  5. Resumes workflow with next step, OR
  6. Finalizes run as SUCCEEDED/FAILED
# Inside ValidationCallbackService._process_callback()
from validibot.validations.services.step_processor import get_step_processor

processor = get_step_processor(run, step_run)
processor.complete_from_callback(output_envelope)

Extending the System

Adding a New Validator Type

  1. Create the validator in validibot/validations/validators/:
  2. Extend BaseValidator
  3. Implement validate() method
  4. For container-based validators, implement post_execute_validate() too

  5. Register the validator by adding a config.py to your validator sub-package:

    # validations/validators/my_validator/config.py
    config = ValidatorConfig(
        validation_type="MY_VALIDATOR",
        validator_class="validibot.validations.validators.my_validator.validator.MyValidator",
        ...
    )
    

  6. Update processor factory (if needed) in step_processor/factory.py:

    advanced_types = {
        ValidationType.ENERGYPLUS,
        ValidationType.FMU,
        ValidationType.MY_VALIDATOR,  # Add here if container-based or compute-intensive
    }
    

Adding a New Action Type

  1. Define the Model: Create a new Action subclass in validibot/actions/models.py
  2. Implement Handler: Create a class implementing StepHandler in validibot/actions/handlers.py
  3. Register: Map the action type to your handler in validibot/actions/registry.py:
# validibot/actions/registry.py
register_action_handler(MyActionType.CUSTOM, MyCustomHandler)

Execution Flow Summary

┌────────────────────────────────────────────────────────────────────┐
│                     API Request Arrives                             │
└─────────────────────────────┬──────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────┐
│          ValidationRunService.execute_workflow_steps()              │
│                                                                     │
│   1. Mark run as RUNNING                                            │
│   2. Log VALIDATION_RUN_STARTED event                               │
│   3. For each workflow step:                                        │
│      a. Create/get ValidationStepRun                                │
│      b. Route to processor (validator) or handler (action)          │
│      c. Aggregate metrics                                           │
│   4. Build run summary                                              │
│   5. Finalize run status                                            │
│   6. Log VALIDATION_RUN_SUCCEEDED/FAILED event                      │
└────────────────────────────────────────────────────────────────────┘
              ┌───────────────┴───────────────┐
              │                               │
              ▼                               ▼
┌─────────────────────────┐     ┌─────────────────────────┐
│   Validator Step        │     │   Action Step           │
│                         │     │                         │
│   get_step_processor()  │     │   get_action_handler()  │
│   processor.execute()   │     │   handler.execute()     │
└─────────────────────────┘     └─────────────────────────┘

Signal Flow Through Workflow Execution

Signals flow through a workflow execution in a defined sequence. Understanding this sequence is essential for debugging assertion failures and for writing cross-step assertions.

Phase 1: Workflow-level signals resolved before steps run

Before any step executes, StepOrchestrator._resolve_workflow_signals() reads the workflow's WorkflowSignalMapping rows and resolves each source path against the submission data. The result is a dict of {name: value} pairs stored in RunContext.workflow_signals.

If any mapping with on_missing="error" fails to resolve and has no default value, a SignalResolutionError is raised and the run fails before any step is attempted.

The resolved signals are available in the s (signal) namespace for all steps in the workflow. For example, a mapping name="target_eui", source_path="metadata.target_eui_kwh_m2" becomes accessible as s.target_eui in every step's CEL expressions.

Phase 2: Signals available in the s namespace for all steps

When _build_cel_context() runs for each step, the s / signal namespace is populated from three sources (in priority order):

  1. Workflow-level signals from RunContext.workflow_signals (highest priority -- these represent the author's explicit domain vocabulary)
  2. Promoted validator outputs injected by _inject_promoted_outputs() from SignalDefinition rows with non-empty signal_name
  3. Step-bound input signals resolved from StepSignalBinding rows (only during input-stage assertion evaluation)

The p / payload namespace contains the raw submission data. The o / output namespace contains this step's declared output signals (populated from the validator output during output-stage assertion evaluation).

Phase 3: Promoted outputs reconstructed before each step

_inject_promoted_outputs() runs inside _build_cel_context() for each step (not once per run). It queries SignalDefinition rows across all steps in the workflow that have a non-empty signal_name and direction=OUTPUT. For each, it looks up the producing step's output values in the run summary.

This means promoted outputs from step N are available as s.<signal_name> in step N+1, N+2, and so on -- but not in step N itself (the producing step accesses its own output via o.<contract_key>).

Phase 4: Cross-step output access via steps

After each step completes, store_signals() persists its output dict at run.summary["steps"][step_key]["output"]. Before the next step runs, _extract_downstream_signals() reads the summary and builds the steps namespace:

{
  "steps": {
    "envelope_check": {
      "output": {
        "floor_area_m2": 10000.0,
        "wall_r_value": 18.0
      }
    },
    "energyplus_sim": {
      "output": {
        "site_eui_kwh_m2": 75.2,
        "site_electricity_kwh": 12345.0
      }
    }
  }
}

Downstream steps can access any prior step's output via the full path: steps.envelope_check.output.floor_area_m2. This is available alongside promoted outputs -- the steps namespace provides the raw access path while s.<signal_name> provides the author-friendly alias.

Signal flow diagram

                    Submission data arrives
                            |
                            v
              resolve_workflow_signals()
              (WorkflowSignalMapping rows)
                            |
                            v
                  RunContext.workflow_signals
                  = {"target_eui": 95, ...}
                            |
            +---------------+---------------+
            |               |               |
            v               v               v
         Step 1          Step 2          Step 3
            |               |               |
     _build_cel_context  _build_cel_context  _build_cel_context
            |               |               |
     s: workflow sigs   s: workflow sigs   s: workflow sigs
        + step inputs      + step inputs      + step inputs
                            + promoted from 1  + promoted from 1,2
     o: step 1 output   o: step 2 output   o: step 3 output
     steps: {}           steps: {step1}     steps: {step1, step2}
     p: raw payload      p: raw payload     p: raw payload
            |               |               |
     store_signals()     store_signals()    store_signals()
     summary.steps.      summary.steps.     summary.steps.
       step1.output        step2.output       step3.output

Key implementation files

File Responsibility
validations/services/signal_resolution.py resolve_workflow_signals() -- pre-step resolution
validations/services/step_orchestrator.py _resolve_workflow_signals() and _extract_downstream_signals()
validations/validators/base/base.py _build_cel_context() and _inject_promoted_outputs()
validations/services/step_processor/base.py store_signals() -- persist outputs to run summary
actions/protocols.py RunContext dataclass with workflow_signals and downstream_signals

For the full signal model reference, see Signals.