Back to Taskflow

Problem Formulation

docs/TaskflowProcessingPipeline.html

4.1.07.5 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

Taskflow Processing Pipeline

We study a taskflow processing pipeline where each stage runs an entire task graph rather than a single function. This example demonstrates how Taskflow combines task graph parallelism inside each stage with pipeline parallelism across stages, achieving two levels of parallelism simultaneously.

Problem Formulation

Many real-world pipelines require each stage to run a parallel algorithm, not a single function. We model this as a sequence of tokens flowing through three serial pipes, where each pipe embeds a full taskflow graph:

Embedded content

Embedded content

Within each pipe, tasks inside the embedded taskflow run in parallel. Across pipes, the pipeline scheduler overlaps execution of different tokens in different stages. This produces two-level parallelism: intra-stage (task graph) and inter-stage (pipeline).

Implementation

We define three taskflows — one per stage — each with a different internal structure to illustrate the flexibility of this model. We then create a pipeline of three serial pipes, each running its corresponding taskflow via tf::Executor::corun:

#include <taskflow/taskflow.hpp>

#include <taskflow/algorithm/pipeline.hpp>

// stage 1: diamond taskflow (A → {B,C} → D)

void make_taskflow1(tf::Taskflow& tf) {

auto [A1, B1, C1, D1] = tf.emplace(

{ printf("A1\n"); },

{ printf("B1\n"); },

{ printf("C1\n"); },

{ printf("D1\n"); }

);

A1.precede(B1, C1);

D1.succeed(B1, C1);

}

// stage 2: linear chain taskflow (A2 → B2 → C2 → D2)

void make_taskflow2(tf::Taskflow& tf) {

auto [A2, B2, C2, D2] = tf.emplace(

{ printf("A2\n"); },

{ printf("B2\n"); },

{ printf("C2\n"); },

{ printf("D2\n"); }

);

tf.linearize({A2, B2, C2, D2});

}

// stage 3: broadcast taskflow (A3 → {B3, C3, D3})

void make_taskflow3(tf::Taskflow& tf) {

auto [A3, B3, C3, D3] = tf.emplace(

{ printf("A3\n"); },

{ printf("B3\n"); },

{ printf("C3\n"); },

{ printf("D3\n"); }

);

A3.precede(B3, C3, D3);

}

int main() {

tf::Taskflow taskflow("taskflow pipeline");

tf::Executor executor;

const size_t num_lines = 2;

const size_t num_pipes = 3;

// one taskflow per pipe (serial pipes → at most one token per pipe at a time)

std::array<tf::Taskflow, num_pipes> taskflows;

make_taskflow1(taskflows[0]);

make_taskflow2(taskflows[1]);

make_taskflow3(taskflows[2]);

tf::Pipeline pl(num_lines,

// stage 1: run taskflow1 for up to 5 tokens

tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

if(pf.token() == 5) {

pf.stop();

return;

}

printf("token %zu enters stage 1\n", pf.token());

executor.corun(taskflows[pf.pipe()]);

}},

// stage 2: run taskflow2

tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

executor.corun(taskflows[pf.pipe()]);

}},

// stage 3: run taskflow3

tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

executor.corun(taskflows[pf.pipe()]);

}}

);

tf::Task init = taskflow.emplace({ std::cout << "ready\n"; })

.name("start");

tf::Task pipe = taskflow.composed_of(pl)

.name("pipeline");

tf::Task done = taskflow.emplace({ std::cout << "done\n"; })

.name("stop");

init.precede(pipe);

pipe.precede(done);

executor.run(taskflow).wait();

return 0;

}

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow)

runs a taskflow once

tf::Executor::corun

void corun(T &target)

runs a target graph and waits until it completes using an internal worker of this executor

tf::FlowBuilder::emplace

Task emplace(C &&callable)

creates a static task

Definition flow_builder.hpp:1571

tf::FlowBuilder::linearize

void linearize(std::vector< Task > &tasks)

adds adjacent dependency links to a linear list of tasks

Definition flow_builder.hpp:1688

tf::Task::precede

Task & precede(Ts &&... tasks)

adds precedence links from this to other tasks

Definition task.hpp:1258

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

tf

taskflow namespace

Definition small_vector.hpp:20

tf::PipeType::SERIAL

@ SERIAL

serial type

Definition pipeline.hpp:117

Why corun Instead of run

Each pipe callable is itself executed by a worker thread. If we called executor.run(taskflows[...]).wait() inside the pipe, that worker would block waiting for the inner taskflow — preventing it from helping with other available tasks and potentially causing deadlock if all workers are blocked.

tf::Executor::corun avoids this: the calling worker stays active in the work-stealing loop while the inner taskflow executes, ensuring forward progress and preventing deadlock:

// correct: calling worker participates in executing the inner taskflow

executor.corun(taskflows[pf.pipe()]);

// wrong: calling worker blocks, may deadlock if all workers are waiting

executor.run(taskflows[pf.pipe()]).wait();

Taskflow Storage

Since all three pipes are serial, at most one token occupies each stage at any time. A one-dimensional array of taskflows — one per stage — is therefore sufficient:

std::array<tf::Taskflow, num_pipes> taskflows;

If any pipe were declared parallel, multiple tokens could be at the same stage simultaneously across different lines, requiring a two-dimensional storage of size ``(num_lines × num_pipes).

Task Graph

The outer task graph, including pipeline composition, is shown below:

Embedded content