pkg/cmd/tef/architecture.md
This document explains the technical architecture and code organization of the Task Execution Framework (TEF).
For a high-level overview and getting started guide, see README.md.
TEF follows a layered, interface-based architecture that separates workflow definition from execution:
┌────────────────────────────────────────────────────────────┐
│ Core Abstractions │
│ - Planner interface │
│ - Registry interface │
│ - PlanExecutor & SharedPlanService interfaces │
│ - Task type definitions │
└────────────────────┬───────────────────────────────────────┘
│
┌────────────────────┴───────────────────────────────────────┐
│ BasePlanner Implementation │
│ - Task registration and validation │
│ - Cycle detection │
│ - Convergence validation │
│ - Executor registration │
└────────────────────┬───────────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌─────────┴─────────┐ ┌────────┴──────────────────────┐
│ Plans │ │ Execution Engine │
│ - Plan defs │ │ - PlannerManager impl │
│ - Executors │ │ - Worker runtime │
│ - Registry │ │ - Plan execution │
└───────────────────┘ └───────────────────────────────┘
Status:
pkg/cmd/tef/
├── main.go # Entry point
├── README.md # User documentation
├── architecture.md # This file
├── CLI.md # CLI reference
├── API.md # REST API reference
├── TODO.md # Pending work
│
├── cli/ # CLI command generation
│ ├── commands.go # TODO: Implement initializeWorkerCLI()
│ └── initializer.go # CLI initialization structure
│
├── planners/ # Core abstractions (✅ fully implemented)
│ ├── definitions.go # Executor, PlannerManager, Registry interfaces
│ ├── planner.go # BasePlanner implementation
│ ├── tasks.go # Task type definitions (7 types)
│ ├── status.go # Execution status types
│ ├── utils.go # Helper functions
│ ├── logger.go # Logger interface and implementation
│ ├── plan_registry.go # Plan registry management
│ ├── *_test.go # Comprehensive tests
│ │
│ └── mock/ # Generated mocks for testing
│ └── *.go
│
└── plans/ # Plan definitions (❌ TODO)
└── registry.go # Registry stub
NOT YET CREATED:
├── planners/temporal/ # Temporal-specific implementation
│ ├── manager.go # Implement PlannerManager for Temporal
│ ├── workflow.go # Temporal workflow definitions
│ └── status.go # Temporal status queries
│
├── plans/demo/ # Example plan implementations
│ └── plan.go
│
└── api/ # REST API (optional)
├── server.go
└── handlers/
Defined in pkg/cmd/tef/planners/definitions.go:
Planner Interface:
type Planner interface {
RegisterExecutor(ctx context.Context, executor *Executor)
RegisterPlan(ctx context.Context, first, output Task)
NewExecutionTask(ctx context.Context, name string) *ExecutionTask
NewForkTask(ctx context.Context, name string) *ForkTask
NewConditionTask(ctx context.Context, name string) *ConditionTask
NewCallbackTask(ctx context.Context, name string) *CallbackTask
NewChildWorkflowTask(ctx context.Context, name string) *ChildWorkflowTask
NewEndTask(ctx context.Context, name string) *EndTask
}
Registry Interface:
type Registry interface {
PrepareExecution(ctx context.Context) error
GetPlanName() string
GetPlanDescription() string
GetWorkflowVersion() int
ParsePlanInput(input string) (interface{}, error)
AddStartWorkerCmdFlags(cmd *cobra.Command)
GeneratePlan(ctx context.Context, p Planner)
}
PlannerManager Interface:
type PlannerManager interface {
StartWorker(ctx context.Context, planVariant string) error
ExecutePlan(ctx context.Context, input interface{}, planID string) (string, error)
GetBasePlanner() *BasePlanner
GetExecutionStatus(ctx context.Context, planID, workflowID string) (*ExecutionStatus, error)
ListExecutions(ctx context.Context, planID string) ([]*WorkflowExecutionInfo, error)
ListAllPlans(ctx context.Context) ([]PlanInfo, error)
}
The BasePlanner (pkg/cmd/tef/planners/planner.go) provides:
TasksRegistry map of all tasksExecutorRegistry map of all executorsKey Data Structures:
type BasePlanner struct {
Registry Registry
First Task
Output Task
TasksRegistry map[string]Task
ExecutorRegistry map[string]*Executor
}
Seven task types defined in pkg/cmd/tef/planners/tasks.go:
Type Hierarchy:
baseTask (name, taskName)
├── ExecutionTask (extends stepTask)
├── ForkTask (extends stepTask)
├── ConditionTask
├── CallbackTask (extends stepTask)
├── ChildWorkflowTask (extends stepTask)
└── EndTask
stepTask (extends baseTask, adds Next/Fail)
Validation per Task Type:
Plan Author
│
├─► Implements Registry interface
│ ├─ GetPlanName()
│ ├─ GetPlanDescription()
│ ├─ GetWorkflowVersion()
│ ├─ ParsePlanInput()
│ ├─ GeneratePlan(ctx, Planner)
│ ├─ PrepareExecution(ctx)
│ └─ AddStartWorkerCmdFlags(cmd)
│
└─► Registers in RegisterPlans() (plans/registry.go)
./bin/tef start-worker myplan --plan-variant dev
│
├─► CLI Command (cli/commands.go - initializeWorkerCLI)
│
├─► NewPlannerManager(registry, orchestrationConfig)
│ │
│ └─► NewBasePlanner(ctx, registry)
│ │
│ ├─► registry.GeneratePlan(ctx, planner)
│ │ ├─ Create tasks using Planner interface
│ │ ├─ Register executors
│ │ └─ Define task graph
│ │
│ └─► Validate plan
│ ├─ Check for cycles using DFS
│ ├─ Ensure convergence to common EndTasks
│ ├─ Validate executor registration
│ ├─ Validate executor function signatures
│ └─ Validate task configuration
│
└─► manager.StartWorker(ctx, planID)
├─► Connect to orchestration engine
├─► Create worker for task queue
├─► Register workflows and activities
└─► Start listening for executions
./bin/tef execute myplan '{"param": "value"}' tef.myplan.dev
│
├─► CLI Command (cli/commands.go)
│
├─► registry.ParsePlanInput(inputJSON)
│
├─► NewPlannerManager(registry, orchestrationConfig)
│
└─► manager.ExecutePlan(ctx, input, planID)
│
├─► Connect to orchestration engine
├─► Check for active workers on task queue
├─► Generate unique workflow ID
└─► Start workflow execution
│
└─► Worker picks up workflow
│
└─► Execute task graph using BasePlanner structure
├─ Run executors based on task types
├─ Handle forks (parallel execution)
├─ Handle conditionals (if/then/else)
├─ Handle sleeps (duration-based delays)
├─ Handle callback tasks (wait for signals)
├─ Handle child workflows (synchronous sub-plans)
└─ Follow Next/Fail paths
The BasePlanner performs comprehensive validation when a plan is created.
Uses depth-first search to detect cycles:
func (sr *BasePlanner) validateTaskChain(task Task, visited map[string]bool) error {
if visited[task.Name()] {
return errors.Errorf("cyclic dependency detected: task %s appears in its own execution path", task.Name())
}
visited[task.Name()] = true
// Recursively validate next tasks
// ...
}
Algorithm:
Ensures all branches converge to the same EndTask:
func findEndTask(task Task) *EndTask {
// Traverse Next path until we hit an EndTask
// ...
}
Algorithm:
Validates executor registration and function signatures:
func (sr *BasePlanner) validateExecutorSignature(executor *Executor, taskType TaskType) error {
// Use reflection to inspect function signature
// Validate parameter types and return types
// ...
}
Checks:
TEF's core abstractions have zero dependencies on any orchestration engine:
Planner interface: Framework-agnostic task creationRegistry interface: Framework-agnostic plan definitionTask types: Pure data structures with no framework dependenciesBasePlanner: Validation logic independent of execution engineThe only place framework-specific code lives is in the PlannerManager implementation (e.g., planners/temporal/).
To integrate TEF with an orchestration engine:
Create implementation directory: pkg/cmd/tef/planners/myengine/
Implement PlannerManager:
type manager struct {
basePlanner *planners.BasePlanner
// Your engine-specific client/config
}
func (m *manager) StartWorker(ctx context.Context, planVariant string) error {
// Start your engine's worker
}
func (m *manager) ExecutePlan(ctx context.Context, input interface{}, planID string) (string, error) {
// Execute using your engine
}
// ... implement other methods
Translate task graph to engine's format:
basePlanner.TasksRegistrybasePlanner.FirstImplement CLI initialization:
func initializeWorkerCLI(ctx context.Context, rootCmd *cobra.Command, registries []planners.Registry) {
for _, r := range registries {
manager, err := myengine.NewPlannerManager(ctx, r, myEngineConfig)
// Generate commands for this plan
}
}
Key Insight: Plan definitions, task types, validation logic, and BasePlanner remain unchanged when switching engines. Only the PlannerManager implementation needs to be engine-specific.
Define the task struct in pkg/cmd/tef/planners/tasks.go:
type MyCustomTask struct {
stepTask // or baseTask
// Custom fields
}
func (t *MyCustomTask) Type() TaskType { return TaskTypeMyCustom }
func (t *MyCustomTask) validate() error { /* validation logic */ }
Add factory method to Planner interface:
type Planner interface {
// ... existing methods
NewMyCustomTask(ctx context.Context, name string) *MyCustomTask
}
Implement in BasePlanner:
func (sr *BasePlanner) NewMyCustomTask(ctx context.Context, name string) *MyCustomTask {
task := &MyCustomTask{}
task.taskName = name
sr.registerTask(ctx, task)
return task
}
Update validation logic in BasePlanner.validateTaskChain()
Implement execution logic in your orchestration engine implementation
See README.md and plans/README.md for complete instructions.
ChildWorkflowTaskplanners/definitions.go: All interface definitionsplanners/planner.go: BasePlanner implementation and validationplanners/tasks.go: Task type definitionsplanners/status.go: Execution status typesplans/registry.go: Plan registrationcli/commands.go: CLI command generation (TODO)