Workflow Engine Architecture¶
This document describes the internal architecture of the ValidationRunService, which orchestrates the execution of validation workflows.
Overview¶
The Workflow Engine is responsible for:
- Iterating through
WorkflowSteps defined in aWorkflow - Executing each step against a
Submission - Recording the results (
ValidationFinding, status updates) - Handling workflow-level status transitions
Architecture: Two-Layer Dispatch¶
The workflow engine uses a two-layer dispatch pattern:
- ValidationRunService - Orchestrates the workflow loop and delegates individual steps
- Processors/Handlers - Execute individual steps based on their type
ValidationRunService.execute_workflow_steps()
│
├── For validator steps:
│ │
│ └── ValidationStepProcessor
│ ├── SimpleValidationProcessor (JSON, XML, Basic, AI)
│ └── AdvancedValidationProcessor (EnergyPlus, FMU)
│
└── For action steps:
│
└── StepHandler
├── SlackMessageActionHandler
├── SignedCredentialActionHandler
└── ...
Validator Step Execution: The Processor Pattern¶
Validator steps are executed through the ValidationStepProcessor abstraction. This provides a clean separation between:
- Workflow orchestration (ValidationRunService) - loops, aggregation, status management
- Step lifecycle (Processors) - call validator, persist findings, handle errors
- Validation logic (Validators) - schema checking, AI prompts, assertions
How Validator Steps Execute¶
# Inside StepOrchestrator.execute_workflow_steps()
for step in workflow_steps:
step_run = self._start_step_run(validation_run, step)
if step.validator:
# Use processors for validator steps
result: StepProcessingResult = self._execute_validator_step(
validation_run=validation_run,
step_run=step_run,
)
else:
# Use existing handler flow for action steps
validation_result = self.execute_workflow_step(step=step, ...)
The _execute_validator_step() method delegates to the appropriate processor and returns a typed StepProcessingResult:
def _execute_validator_step(self, validation_run, step_run) -> StepProcessingResult:
from validibot.validations.services.step_processor import get_step_processor
processor = get_step_processor(validation_run, step_run)
return processor.execute()
Processor Types¶
| Processor | Validator Types | Execution Mode |
|---|---|---|
SimpleValidationProcessor |
Basic, JSON Schema, XML Schema, AI | Synchronous, inline |
AdvancedValidationProcessor |
EnergyPlus, FMU, custom | Sync (Docker) or Async (Cloud Run) |
For detailed documentation on processors, see Validation Step Processor Architecture.
Action Step Execution: The Handler Pattern¶
Action steps (non-validation operations) use the StepHandler protocol for extensibility.
Core Components¶
1. Protocol (StepHandler)¶
All execution logic must implement the StepHandler protocol defined in validibot/actions/protocols.py:
- RunContext: Contains the
ValidationRun,WorkflowStep, and shared signals. - StepResult: Standardized output indicating pass/fail, issues, and statistics.
2. Dispatcher (ValidationRunService)¶
For action steps, the service:
- Resolves the appropriate implementation (Action subclass)
- Looks up the registered StepHandler
- Invokes handler.execute(context)
Available Handlers¶
| Handler | Purpose |
|---|---|
SlackMessageActionHandler |
Sends Slack notifications |
SignedCredentialActionHandler |
Generates and attaches credentials |
Async Validator Completion: Callbacks¶
When advanced validators run on async backends (like GCP Cloud Run), execution follows a two-phase pattern:
Phase 1: Launch (ValidationRunService)¶
- Processor calls
engine.validate(), which launches container - Container job starts running on Cloud Run
- Processor returns
StepProcessingResult(passed=None) - Run stays in
RUNNINGstatus, waiting for callback
Phase 2: Complete (ValidationCallbackService)¶
- Container completes and POSTs callback to
/api/internal/callbacks/validation/ ValidationCallbackServicedownloads output envelope from cloud storage- Creates processor and calls
processor.complete_from_callback(output_envelope) - Processor finalizes step and either:
- Resumes workflow with next step, OR
- Finalizes run as SUCCEEDED/FAILED
# Inside ValidationCallbackService._process_callback()
from validibot.validations.services.step_processor import get_step_processor
processor = get_step_processor(run, step_run)
processor.complete_from_callback(output_envelope)
Extending the System¶
Adding a New Validator Type¶
- Create the validator in
validibot/validations/validators/: - Extend
BaseValidator - Implement
validate()method -
For container-based validators, implement
post_execute_validate()too -
Register the validator by adding a
config.pyto your validator sub-package: -
Update processor factory (if needed) in
step_processor/factory.py:
Adding a New Action Type¶
- Define the Model: Create a new
Actionsubclass invalidibot/actions/models.py - Implement Handler: Create a class implementing
StepHandlerinvalidibot/actions/handlers.py - Register: Map the action type to your handler in
validibot/actions/registry.py:
Execution Flow Summary¶
┌────────────────────────────────────────────────────────────────────┐
│ API Request Arrives │
└─────────────────────────────┬──────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────┐
│ ValidationRunService.execute_workflow_steps() │
│ │
│ 1. Mark run as RUNNING │
│ 2. Log VALIDATION_RUN_STARTED event │
│ 3. For each workflow step: │
│ a. Create/get ValidationStepRun │
│ b. Route to processor (validator) or handler (action) │
│ c. Aggregate metrics │
│ 4. Build run summary │
│ 5. Finalize run status │
│ 6. Log VALIDATION_RUN_SUCCEEDED/FAILED event │
└────────────────────────────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌─────────────────────────┐ ┌─────────────────────────┐
│ Validator Step │ │ Action Step │
│ │ │ │
│ get_step_processor() │ │ get_action_handler() │
│ processor.execute() │ │ handler.execute() │
└─────────────────────────┘ └─────────────────────────┘
Signal Flow Through Workflow Execution¶
Signals flow through a workflow execution in a defined sequence. Understanding this sequence is essential for debugging assertion failures and for writing cross-step assertions.
Phase 1: Workflow-level signals resolved before steps run¶
Before any step executes, StepOrchestrator._resolve_workflow_signals() reads
the workflow's WorkflowSignalMapping rows and resolves each source path
against the submission data. The result is a dict of {name: value} pairs
stored in RunContext.workflow_signals.
If any mapping with on_missing="error" fails to resolve and has no default
value, a SignalResolutionError is raised and the run fails before any step
is attempted.
The resolved signals are available in the s (signal) namespace for all
steps in the workflow. For example, a mapping name="target_eui",
source_path="metadata.target_eui_kwh_m2" becomes accessible as
s.target_eui in every step's CEL expressions.
Phase 2: Signals available in the s namespace for all steps¶
When _build_cel_context() runs for each step, the s / signal namespace
is populated from three sources (in priority order):
- Workflow-level signals from
RunContext.workflow_signals(highest priority -- these represent the author's explicit domain vocabulary) - Promoted validator outputs injected by
_inject_promoted_outputs()fromSignalDefinitionrows with non-emptysignal_name - Step-bound input signals resolved from
StepSignalBindingrows (only during input-stage assertion evaluation)
The p / payload namespace contains the raw submission data. The o /
output namespace contains this step's declared output signals (populated
from the validator output during output-stage assertion evaluation).
Phase 3: Promoted outputs reconstructed before each step¶
_inject_promoted_outputs() runs inside _build_cel_context() for each step
(not once per run). It queries SignalDefinition rows across all steps in the
workflow that have a non-empty signal_name and direction=OUTPUT. For each,
it looks up the producing step's output values in the run summary.
This means promoted outputs from step N are available as s.<signal_name> in
step N+1, N+2, and so on -- but not in step N itself (the producing step
accesses its own output via o.<contract_key>).
Phase 4: Cross-step output access via steps¶
After each step completes, store_signals() persists its output dict at
run.summary["steps"][step_key]["output"]. Before the next step runs,
_extract_downstream_signals() reads the summary and builds the steps
namespace:
{
"steps": {
"envelope_check": {
"output": {
"floor_area_m2": 10000.0,
"wall_r_value": 18.0
}
},
"energyplus_sim": {
"output": {
"site_eui_kwh_m2": 75.2,
"site_electricity_kwh": 12345.0
}
}
}
}
Downstream steps can access any prior step's output via the full path:
steps.envelope_check.output.floor_area_m2. This is available alongside
promoted outputs -- the steps namespace provides the raw access path while
s.<signal_name> provides the author-friendly alias.
Signal flow diagram¶
Submission data arrives
|
v
resolve_workflow_signals()
(WorkflowSignalMapping rows)
|
v
RunContext.workflow_signals
= {"target_eui": 95, ...}
|
+---------------+---------------+
| | |
v v v
Step 1 Step 2 Step 3
| | |
_build_cel_context _build_cel_context _build_cel_context
| | |
s: workflow sigs s: workflow sigs s: workflow sigs
+ step inputs + step inputs + step inputs
+ promoted from 1 + promoted from 1,2
o: step 1 output o: step 2 output o: step 3 output
steps: {} steps: {step1} steps: {step1, step2}
p: raw payload p: raw payload p: raw payload
| | |
store_signals() store_signals() store_signals()
summary.steps. summary.steps. summary.steps.
step1.output step2.output step3.output
Key implementation files¶
| File | Responsibility |
|---|---|
validations/services/signal_resolution.py |
resolve_workflow_signals() -- pre-step resolution |
validations/services/step_orchestrator.py |
_resolve_workflow_signals() and _extract_downstream_signals() |
validations/validators/base/base.py |
_build_cel_context() and _inject_promoted_outputs() |
validations/services/step_processor/base.py |
store_signals() -- persist outputs to run summary |
actions/protocols.py |
RunContext dataclass with workflow_signals and downstream_signals |
For the full signal model reference, see Signals.
Related Documentation¶
- Validation Step Processor Architecture - Deep dive into processor pattern
- Validator Architecture - Execution backends and deployment
- How Validibot Works - End-to-end system overview
- Signals - Signal models, CEL namespaces, and resolution