docs/TaskflowProcessingPipeline.html
We study a taskflow processing pipeline that propagates a sequence of tokens through linearly dependent taskflows. The pipeline embeds a taskflow in each pipe to run a parallel algorithm using task graph parallelism.
Many complex and irregular pipeline applications require each pipe to run a parallel algorithm using task graph parallelism. We can formulate such applications as scheduling a sequence of tokens through linearly dependent taskflows. The following example illustrates the pipeline propagation of three scheduling tokens through three linearly dependent taskflows:
Rf1_Ataskflow1 on token 1f1_Btaskflow2 on token 1f1_A->f1_Bf2_Ataskflow1 on token 2f1_A->f2_Af1_Ctaskflow3 on token 1f1_B->f1_Cf2_Btaskflow2 on token 2f1_B->f2_Bf2_Ctaskflow3 on token 2f1_C->f2_Cf2_A->f2_Bf3_Ataskflow1 on token 3f2_A->f3_Af2_B->f2_Cf3_Btaskflow2 on token 3f2_B->f3_Bf3_Ctaskflow3 on token 3f2_C->f3_Cf3_A->f3_Bf3_B->f3_C
Gcluster_1taskflow1cluster_2taskflow2cluster_3taskflow3A1A1B1B1A1->B1C1C1A1->C1D1D1B1->D1C1->D1A2A2B2B2A2->B2C2C2B2->C2D2D2C2->D2A3A3B3B3A3->B3C3C3A3->C3D3D3A3->D3
Each pipe (stage) in the pipeline embeds a taskflow to perform a stage-specific parallel algorithm on an input scheduling token. Parallelism exhibits both inside and outside the three taskflows, combining both task graph parallelism and pipeline parallelism.
Using the example from the previous section, we create a pipeline of three serial pipes each running a taskflow on a sequence of five scheduling tokens. The overall implementation is shown below:
#include \<taskflow/taskflow.hpp\>#include \<taskflow/algorithm/pipeline.hpp\>// taskflow on the first pipevoid make\_taskflow1(tf::Taskflow& tf) {auto [A1, B1, C1, D1] = tf.emplace([](){ printf("A1\n"); },[](){ printf("B1\n"); },[](){ printf("C1\n"); },[](){ printf("D1\n"); });A1.precede(B1, C1);D1.succeed(B1, C1);}// taskflow on the second pipevoid make\_taskflow2(tf::Taskflow& tf) {auto [A2, B2, C2, D2] = tf.emplace([](){ printf("A2\n"); },[](){ printf("B2\n"); },[](){ printf("C2\n"); },[](){ printf("D2\n"); });tf.linearize({A2, B2, C2, D2});}// taskflow on the third pipevoid make\_taskflow3(tf::Taskflow& tf) {auto [A3, B3, C3, D3] = tf.emplace([](){ printf("A3\n"); },[](){ printf("B3\n"); },[](){ printf("C3\n"); },[](){ printf("D3\n"); });A3.precede(B3, C3, D3);}int main() {tf::Taskflow taskflow("taskflow processing pipeline");tf::Executor executor;const size\_t num\_lines = 2;const size\_t num\_pipes = 3;// define the taskflow storage// we use the pipe dimension because we create three 'serial' pipesstd::array\<tf::Taskflow, num\_pipes\> taskflows;// create three different taskflows for the three pipesmake\_taskflow1(taskflows[0]);make\_taskflow2(taskflows[1]);make\_taskflow3(taskflows[2]);// the pipeline consists of three serial pipes// and up to two concurrent scheduling tokenstf::Pipeline pl(num\_lines,// first pipe runs taskflow1tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {if(pf.token() == 5) {pf.stop();return;}printf("begin token %zu\n", pf.token());executor.corun(taskflows[pf.pipe()]);}},// second pipe runs taskflow2tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {executor.corun(taskflows[pf.pipe()]);}},// third pipe calls taskflow3tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {executor.corun(taskflows[pf.pipe()]);}});// build the pipeline graph using compositiontf::Task init = taskflow.emplace([](){ std::cout \<\< "ready\n"; }).name("starting pipeline");tf::Task task = taskflow.composed\_of(pl).name("pipeline");tf::Task stop = taskflow.emplace([](){ std::cout \<\< "stopped\n"; }).name("pipeline stopped");// create task dependencyinit.precede(task);task.precede(stop);// dump the pipeline graph structure (with composition)taskflow.dump(std::cout);// run the pipelineexecutor.run(taskflow).wait();return 0;}
First, we define three taskflows for the three pipes in the pipeline:
// taskflow on the first pipevoid make\_taskflow1(tf::Taskflow& tf) {auto [A1, B1, C1, D1] = tf.emplace([](){ printf("A1\n"); },[](){ printf("B1\n"); },[](){ printf("C1\n"); },[](){ printf("D1\n"); });A1.precede(B1, C1);D1.succeed(B1, C1);}// taskflow on the second pipevoid make\_taskflow2(tf::Taskflow& tf) {auto [A2, B2, C2, D2] = tf.emplace([](){ printf("A2\n"); },[](){ printf("B2\n"); },[](){ printf("C2\n"); },[](){ printf("D2\n"); });tf.linearize({A2, B2, C2, D2});}// taskflow on the third pipevoid make\_taskflow3(tf::Taskflow& tf) {auto [A3, B3, C3, D3] = tf.emplace([](){ printf("A3\n"); },[](){ printf("B3\n"); },[](){ printf("C3\n"); },[](){ printf("D3\n"); });A3.precede(B3, C3, D3);}
As each taskflow corresponds to a pipe in the pipeline, we create a linear array to store the three taskflows:
std::array\<tf::Taskflow, num\_pipes\> taskflows;make\_taskflow1(taskflows[0]);make\_taskflow2(taskflows[1]);make\_taskflow3(taskflows[2]);
Since the three taskflows are linearly dependent, at most one taskflow will run at a pipe. We can store the three taskflows in a linear array of dimension equal to the number of pipes. If there is a parallel pipe, we need to use two-dimensional array, as multiple taskflows at a stage can run simultaneously across parallel lines.
The pipe definition is straightforward. Each pipe runs the corresponding taskflow, which can be indexed at taskflows with the pipe's identifier, tf::Pipeflow::pipe(). The first pipe will cease the pipeline scheduling when it has processed five scheduling tokens:
// first pipe runs taskflow1tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {if(pf.token() == 5) {pf.stop();return;}printf("begin token %zu\n", pf.token());executor.corun(taskflows[pf.pipe()]);}},// second pipe runs taskflow2tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {executor.corun(taskflows[pf.pipe()]);}},// third pipe calls taskflow3tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {executor.corun(taskflows[pf.pipe()]);}}
At each pipe, we use tf::Executor::corun to execute the corresponding taskflow and wait until the execution completes. This is important because we want the caller thread, which is the worker that invokes the pipe callable, to not block (i.e., executor.run(taskflows[pf.pipe()]).wait()) but participate in the work-stealing loop of the scheduler to avoid deadlock.
To build up the taskflow for the pipeline, we create a module task with the defined pipeline structure and connect it with two tasks that output helper messages before and after the pipeline:
tf::Task init = taskflow.emplace([](){ std::cout \<\< "ready\n"; }).name("starting pipeline");tf::Task task = taskflow.composed\_of(pl).name("pipeline");tf::Task stop = taskflow.emplace([](){ std::cout \<\< "stopped\n"; }).name("pipeline stopped");init.precede(task);task.precede(stop);
Taskflowcluster_p0x7ffd7418c200Taskflow Processing Pipelinecluster_p0x7ffd7418c110m1p0x7bc4000142e8starting pipelinep0x7bc4000143d0pipeline [m1]p0x7bc4000142e8->p0x7bc4000143d0p0x7bc4000144b8pipeline stoppedp0x7bc4000143d0->p0x7bc4000144b8p0x7bc400014030condp0x7bc400014118rt-0p0x7bc400014030->p0x7bc4000141180p0x7bc400014200rt-1p0x7bc400014030->p0x7bc4000142001
Finally, we submit the taskflow to the execution and run it once:
executor.run(taskflow).wait();
One possible output is shown below:
ready
begin token0A1
C1
B1
D1
begin token1A2
B2
A1
C1
B1
D1
C2
D2
A3
D3
C3
B3
begin token2A2
B2
C2
D2
A1
C1
B1
D1
A3
D3
C3
B3
A2
B2
C2
D2
begin token3A3
D3
C3
B3
A1
C1
B1
D1
begin token4A2
A1
C1
B1
D1
B2
C2
D2
A3
D3
C3
B3
A2
B2
C2
D2
A3
D3
C3
B3
stopped