pkg/cmd/tef/README.md
TEF is a workflow orchestration framework for building multi-step task execution systems. It provides core abstractions for defining workflows with sequential execution, parallel execution, conditional branching, failure handling, and automatic validation.
Every TEF workflow follows this structure:
TEF is framework-agnostic: the core Plan definitions have no dependency on any specific orchestration engine. Swapping execution engines only requires implementing the PlannerManager interface.
Current Status: Core framework is complete (interfaces, validation, task types). Orchestration engine integration, CLI commands, and plan implementations are pending.
Before diving into examples, here are the key terms:
Registry interface. Plans describe what tasks to run and how they connect.Here's a simple plan that demonstrates TEF's core features:
package demo
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/cmd/tef/planners"
"github.com/spf13/cobra"
)
type DemoPlan struct{}
// Input data for the plan
type DemoInput struct {
Message string `json:"message"`
Count int `json:"count"`
}
// PrepareExecution sets up necessary resources for plan execution
func (d *DemoPlan) PrepareExecution(ctx context.Context) error {
return nil
}
// GetPlanName returns the plan identifier
func (d *DemoPlan) GetPlanName() string {
return "demo"
}
// GetPlanDescription returns a human-readable description
func (d *DemoPlan) GetPlanDescription() string {
return "Demonstrates TEF's core features: sequential execution, conditional branching, parallel execution, and sleep tasks"
}
// GetPlanVersion returns the workflow version
func (d *DemoPlan) GetPlanVersion() int {
return 1
}
// ParsePlanInput parses JSON input
func (d *DemoPlan) ParsePlanInput(input string) (interface{}, error) {
var data DemoInput
if err := json.Unmarshal([]byte(input), &data); err != nil {
return nil, err
}
return &data, nil
}
// AddStartWorkerCmdFlags adds plan-specific flags to worker commands
func (d *DemoPlan) AddStartWorkerCmdFlags(cmd *cobra.Command) {
// No custom flags for this demo plan
}
// GeneratePlan builds the workflow structure
func (d *DemoPlan) GeneratePlan(ctx context.Context, p planners.Planner) {
// Register executor functions
p.RegisterExecutor(ctx, &planners.Executor{
Name: "print message",
Description: "Prints a message",
Func: printMessage,
})
p.RegisterExecutor(ctx, &planners.Executor{
Name: "check count",
Description: "Checks if count is positive",
Func: checkCount,
})
p.RegisterExecutor(ctx, &planners.Executor{
Name: "process parallel",
Description: "Processes data in parallel branch",
Func: processParallel,
})
p.RegisterExecutor(ctx, &planners.Executor{
Name: "wait time",
Description: "Returns wait duration",
Func: waitTime,
})
// 1. SEQUENTIAL EXECUTION: Print message
task1 := p.NewExecutionTask(ctx, "print message")
task1.ExecutorFn = printMessage
// 2. CONDITIONAL BRANCHING: Check if count is positive
conditionTask := p.NewConditionTask(ctx, "check count")
conditionTask.ExecutorFn = checkCount // Returns (bool, error)
// 3. PARALLEL EXECUTION: Process two branches concurrently
forkTask := p.NewForkTask(ctx, "parallel processing")
forkJoin := p.NewForkJoinTask(ctx, "fork join")
branch1 := p.NewExecutionTask(ctx, "process branch 1")
branch1.ExecutorFn = processParallel
branch1.Next = forkJoin
branch2 := p.NewExecutionTask(ctx, "process branch 2")
branch2.ExecutorFn = processParallel
branch2.Next = forkJoin
forkTask.Tasks = []planners.Task{branch1, branch2}
forkTask.Join = forkJoin // All branches must converge to this join point
// 4. END: Workflow termination
endTask := p.NewEndTask(ctx, "end")
// Wire tasks together with FAILURE HANDLING
task1.Next = conditionTask
task1.Fail = endTask // On failure, end workflow
// Conditional paths
conditionTask.Then = forkTask // If count > 0, run parallel tasks
conditionTask.Else = endTask // If count <= 0, end
// After parallel work
forkTask.Next = endTask
forkTask.Fail = endTask
// Register the plan (first task, output task)
p.RegisterPlan(ctx, task1, task1)
}
// Executor functions
func printMessage(ctx context.Context, input *DemoInput) (string, error) {
fmt.Printf("Message: %s\n", input.Message)
return "printed", nil
}
func checkCount(ctx context.Context, input *DemoInput) (bool, error) {
return input.Count > 0, nil
}
func processParallel(ctx context.Context, input *DemoInput) (string, error) {
// Parallel processing work
return "processed", nil
}
func waitTime(ctx context.Context, input *DemoInput) (time.Duration, error) {
return 5 * time.Second, nil
}
TEF can generate visual representations of your workflow as DOT graphs and PNG images:
./dev build tef
./bin/tef gen-view demo
This creates:
demo.dot - DOT format graph definitiondemo.png - Visual diagram of the workflowRequirements: Graphviz must be installed (brew install graphviz on macOS, apt install graphviz on Linux)
The visualization shows:
--with-failure-paths flag)Use this during development to verify your workflow structure and ensure all branches converge correctly.
Let's walk through each part of the example:
type DemoPlan struct{}
func (d *DemoPlan) GetPlanName() string {
return "demo"
}
Every plan implements the Registry interface. At minimum, it needs a name and a way to parse input.
func (d *DemoPlan) GetPlanDescription() string {
return "Demonstrates TEF's core features: sequential execution, conditional branching, parallel execution, and sleep tasks"
}
func (d *DemoPlan) GetPlanVersion() int {
return 1
}
GetPlanDescription() returns a human-readable description that helps users and operators understand the plan's purpose. This description is exposed through the TEF API and CLI for documentation and discovery.
GetPlanVersion() returns the workflow version number, which is critical for managing backward compatibility:
func (d *DemoPlan) ParsePlanInput(input string) (interface{}, error) {
var data DemoInput
if err := json.Unmarshal([]byte(input), &data); err != nil {
return nil, err
}
return &data, nil
}
Plans receive input as JSON strings. The framework validates and parses this input before execution.
p.RegisterExecutor(ctx, &planners.Executor{
Name: "print message",
Description: "Prints a message",
Func: printMessage,
})
All executor functions must be registered before they can be referenced in tasks. The framework validates that executors exist and have correct signatures.
task1 := p.NewExecutionTask(ctx, "print message")
task1.ExecutorFn = printMessage
task1.Next = conditionTask // Next task on success
task1.Fail = endTask // Failure handler
ExecutionTask runs an executor and proceeds to the Next task on success or Fail task on error.
conditionTask := p.NewConditionTask(ctx, "check count")
conditionTask.ExecutorFn = checkCount // Must return (bool, error)
conditionTask.Then = forkTask // If true
conditionTask.Else = sleepTask // If false
ConditionTask evaluates a boolean executor and follows the Then or Else path.
forkTask := p.NewForkTask(ctx, "parallel processing")
forkJoin := p.NewForkJoinTask(ctx, "fork join")
branch1 := p.NewExecutionTask(ctx, "process branch 1")
branch1.Next = forkJoin
branch2 := p.NewExecutionTask(ctx, "process branch 2")
branch2.Next = forkJoin
forkTask.Tasks = []planners.Task{branch1, branch2}
forkTask.Join = forkJoin // Synchronization point for all branches
ForkTask executes multiple branches concurrently. All branches must converge to the specified Join point (a ForkJoinTask) before execution continues to the fork's Next task. The Join ForkJoinTask acts as a synchronization barrier, not as termination of the entire execution path.
endTask := p.NewEndTask(ctx, "end")
All execution paths must end with an EndTask. EndTask serves two purposes:
The framework validates that all branches converge to appropriate EndTasks.
The example above shows the most common task types. TEF also supports:
See the Plan Development Guide for complete documentation of all task types.
TEF requires an orchestration engine (e.g., Temporal) to execute workflows. The core framework provides the abstractions; you need to implement the PlannerManager interface for your chosen engine.
What's Implemented:
What's TODO:
See TODO.md for the complete list of pending work.
# From the cockroach repository root
./dev build tef
# The binary will be available at: ./bin/tef
Note: The current binary has minimal functionality until the orchestration engine and CLI commands are implemented.
pkg/cmd/tef/plans/myplan/planners.Registry interface (see example above)pkg/cmd/tef/plans/registry.goSee the Plan Development Guide for detailed instructions.
Once implemented, the workflow would be:
# Terminal 1: Start a worker
./bin/tef start-worker demo --plan-variant dev
# Terminal 2: Execute the plan
./bin/tef execute demo '{"message": "Hello", "count": 5}' dev
# Returns: Workflow ID for tracking
# Check status
./bin/tef status demo <workflow-id>
TEF follows key design principles:
Framework Independence: Core abstractions have zero dependency on any orchestration engine. The entire execution layer can be swapped by implementing the PlannerManager interface.
Comprehensive Validation: Plans are validated at creation time:
Type Safety: Executor function signatures are validated using reflection. ConditionTask executors must return (bool, error), etc.
Workflow Versioning: Plans support versioning via GetWorkflowVersion(). Increment the version for backward-incompatible changes.
For detailed architecture documentation, see architecture.md.
pkg/cmd/tef/planners/definitions.go - Core interface definitionspkg/cmd/tef/planners/planner.go - BasePlanner implementation and validationpkg/cmd/tef/planners/tasks.go - All task type definitionspkg/cmd/tef/plans/registry.go - Plan registration| Task Type | Purpose | Key Signature |
|---|---|---|
| ExecutionTask | Run executor function | func(ctx, input, ...params) (output, error) |
| ForkTask | Parallel execution | Multiple branches, all converge to Join (ForkJoinTask) |
| ConditionTask | Conditional branching | func(ctx, input, ...params) (bool, error) |
| CallbackTask | External async work | ExecutionFn + ResultProcessorFn |
| ChildWorkflowTask | Execute child plan | func(ctx, planInfo, input, ...params) (ChildTaskInfo, error) |
| ForkJoinTask | Fork synchronization | Synchronization point where fork branches converge |
| EndTask | Termination | Marks end of execution path |
For complete validation rules and error messages, see the Plan Development Guide.