docs/ScalableTaskParallelPipeline.html
Unlike tf::Pipeline (see Task-parallel Pipeline) that instantiates all pipes at the construction time, Taskflow provides a scalable alternative called tf::ScalablePipeline to allow variable assignments of pipes using range iterators. A scalable pipeline is thus more flexible for applications to create a pipeline scheduling framework whose pipeline structure depends on runtime variables.
You need to include the header file, taskflow/algorithm/pipeline.hpp, for creating a scalable pipeline scheduling framework.
#include \<taskflow/algorithm/pipeline.hpp\>
Similar to tf::Pipeline, tf::ScalablePipeline is a composable graph object to implement a pipeline scheduling framework in a taskflow. The key difference between tf::Pipeline and tf::ScalablePipeline is that a scalable pipeline can accept variable assignments of pipes rather than instantiating all pipes at construction or programming time. Users define a linear range of pipes, each of the same callable type, and apply that range to construct a scalable pipeline. Between successive runs, users can reset the pipeline to a different range of pipes. The following code creates a scalable pipeline that uses four parallel lines to schedule tokens through three serial pipes in the given vector, then resetting that pipeline to a new range of five serial pipes:
tf::Taskflow taskflow("pipeline");tf::Executor executor;const size\_t num\_lines = 4;// create data storagestd::array\<int, num\_lines\> buffer;// define the pipe callableauto pipe\_callable = [&buffer] (tf::Pipeflow& pf) mutable {switch(pf.pipe()) {// first stage generates only 5 scheduling tokens and saves the // token number into the buffer.case 0: {if(pf.token() == 5) {pf.stop();}else {printf("stage 1: input token = %zu\n", pf.token());buffer[pf.line()] = pf.token();}return;}break;// other stages propagate the previous result to this pipe and// increment it by onedefault: {printf("stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]);buffer[pf.line()] = buffer[pf.line()] + 1;} break;}};// create a vector of three pipesstd::vector\< tf::Pipe\<std::function\<void(tf::Pipeflow&)\>\> \> pipes;for(size\_t i=0; i\<3; i++) {pipes.emplace\_back(tf::PipeType::SERIAL, pipe\_callable);}// create a pipeline of four parallel lines based on the given vector of pipestf::ScalablePipeline pl(num\_lines, pipes.begin(), pipes.end());// build the pipeline graph using compositiontf::Task init = taskflow.emplace([](){ std::cout \<\< "ready\n"; }).name("starting pipeline");tf::Task task = taskflow.composed\_of(pl).name("pipeline");tf::Task stop = taskflow.emplace([](){ std::cout \<\< "stopped\n"; }).name("pipeline stopped");// create task dependencyinit.precede(task);task.precede(stop);// dump the pipeline graph structure (with composition)taskflow.dump(std::cout);// run the pipelineexecutor.run(taskflow).wait();// reset the pipeline to a new range of five pipes and starts from// the initial state (i.e., token counts from zero)for(size\_t i=0; i\<2; i++) {pipes.emplace\_back(tf::PipeType::SERIAL, pipe\_callable);}pl.reset(pipes.begin(), pipes.end());executor.run(taskflow).wait();
The program defines a uniform pipe type of tf::Pipe<std::function<void(tf::Pipeflow&)>> and keep all pipes in a vector that is amenable to change. Then, it constructs a scalable pipeline using two range iterators, [first, last), that point to the beginning and the end of the pipe vector, resulting in a pipeline of three serial stages:
Taskflowcluster0cluster1cluster2cluster3p00pipe-0p10pipe-0p00->p10p01pipe-1p00->p01p20pipe-0p10->p20p11pipe-1p10->p11p30pipe-0p20->p30p21pipe-1p20->p21p31pipe-1p30->p31p01->p11p02pipe-2p01->p02p11->p21p12pipe-2p11->p12p21->p31p22pipe-2p21->p22p32pipe-2p31->p32p02->p12p12->p22p22->p32
Then, the program appends another two pipes into the vector and resets the pipeline to the new range of two additional pipes, resulting in a pipeline of five serial stages:
Taskflowcluster0cluster1cluster2cluster3p00pipe-0p10pipe-0p00->p10p01pipe-1p00->p01p20pipe-0p10->p20p11pipe-1p10->p11p30pipe-0p20->p30p21pipe-1p20->p21p31pipe-1p30->p31p01->p11p02pipe-2p01->p02p11->p21p12pipe-2p11->p12p21->p31p22pipe-2p21->p22p32pipe-2p31->p32p02->p12p03pipe-3p02->p03p12->p22p13pipe-3p12->p13p22->p32p23pipe-3p22->p23p33pipe-3p32->p33p03->p13p04pipe-4p03->p04p13->p23p14pipe-4p13->p14p23->p33p24pipe-4p23->p24p34pipe-4p33->p34p04->p14p14->p24p24->p34
When resetting a scalable pipeline to a new range, it will start from the initial state as if it has just been constructed, i.e., the token number counts from zero.
It is possible to create a scalable pipeline as a placeholder using the constructor tf::ScalablePipeline(size_t num_lines) and reset it to another range later in the application. The following code creates a task to emplace a range of pipes and reset the pipeline to that range, before running the pipeline task:
tf::Executor executor;tf::Taskflow taskflow;size\_t num\_pipes = 10;size\_t num\_lines = 10;std::vector\<tf::Pipe\<std::function\<void(tf::Pipeflow&)\>\>\> pipes;tf::ScalablePipeline\<typename decltype(pipes)::iterator\> spl(num\_lines); tf::Task init = taskflow.emplace(&{for(size\_t i=0; i\<num\_pipes; i++) {pipes.emplace\_back(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) { if(pf.pipe() == 0 && pf.token() == 1024) {pf.stop();return;}});}spl.reset(pipes.begin(), pipes.end());}).name("init");tf::Task pipeline = taskflow.composed\_of(spl).name("pipeline");pipeline.succeed(init);executor.run(taskflow).wait();
The task graph of this program is shown below:
Taskflowcluster_p0x7ffcc76abe80Taskflowcluster_p0x7ffcc76abd30m1p0x8e9a68init pipesp0x8e9980pipeline [m1]p0x8e9a68->p0x8e9980p0x8ea460condp0x8ea378rt-0p0x8ea460->p0x8ea3780p0x8ea290rt-1p0x8ea460->p0x8ea2901p0x8ea1a8rt-2p0x8ea460->p0x8ea1a82p0x8ea0c0rt-3p0x8ea460->p0x8ea0c03p0x8e9fd8rt-4p0x8ea460->p0x8e9fd84p0x8e9ef0rt-5p0x8ea460->p0x8e9ef05p0x8e9e08rt-6p0x8ea460->p0x8e9e086p0x8e9d20rt-7p0x8ea460->p0x8e9d207p0x8e9c38rt-8p0x8ea460->p0x8e9c388p0x8e9b50rt-9p0x8ea460->p0x8e9b509
Similarly, you can create an empty scalable pipeline using the default constructor tf::ScalablePipeline() and reset it later in your program.
std::vector\<tf::Pipe\<std::function\<void(tf::Pipeflow&)\>\>\> pipes;tf::ScalablePipeline\<typename decltype(pipes)::iterator\> spl; // create pipes ...spl.reset(num\_lines, pipes.begin(), pipes.end());
When assigning a range to a scalable pipeline, the pipeline fetches all pipe iterators in that range to an internal vector. This organization allows invoking a pipe callable to be a random accessible operation, regardless of the pipe container type. Taskflow does not have much restriction on the iterator type, as long as these pipes can be iterated in a sequential order using the postfix increment operator, ++.
// use vector to store pipesstd::vector\<tf::Pipe\<std::function\<void(tf::Pipeflow&)\>\>\> vector;tf::ScalablePipeline spl1(num\_lines, vector.begin(), vector.end());// use list to store pipesstd::list\<tf::Pipe\<std::function\<void(tf::Pipeflow&)\>\>\> list;tf::ScalablePipeline spl2(num\_lines, list.begin(), list.end());