docs/ScalableTaskParallelPipeline.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
Scalable Task-parallel Pipeline
Unlike tf::Pipeline (see Task-parallel Pipeline) that instantiates all pipes at the construction time, Taskflow provides a scalable alternative called tf::ScalablePipeline to allow variable assignments of pipes using range iterators. A scalable pipeline is thus more flexible for applications to create a pipeline scheduling framework whose pipeline structure depends on runtime variables.
You need to include the header file, taskflow/algorithm/pipeline.hpp, for creating a scalable pipeline scheduling framework.
#include <taskflow/algorithm/pipeline.hpp>
Similar to tf::Pipeline, tf::ScalablePipeline is a composable graph object to implement a pipeline scheduling framework in a taskflow. The key difference between tf::Pipeline and tf::ScalablePipeline is that a scalable pipeline can accept variable assignments of pipes rather than instantiating all pipes at construction or programming time. Users define a linear range of pipes, each of the same callable type, and apply that range to construct a scalable pipeline. Between successive runs, users can reset the pipeline to a different range of pipes. The following code creates a scalable pipeline that uses four parallel lines to schedule tokens through three serial pipes in the given vector, then resetting that pipeline to a new range of five serial pipes:
tf::Taskflow taskflow("pipeline");
tf::Executor executor;
const size_t num_lines = 4;
// create data storage
std::array<int, num_lines> buffer;
// define the pipe callable
auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable {
switch(pf.pipe()) {
// first stage generates only 5 scheduling tokens and saves the
// token number into the buffer.
case 0: {
if(pf.token() == 5) {
pf.stop();
}
else {
printf("stage 1: input token = %zu\n", pf.token());
buffer[pf.line()] = pf.token();
}
return;
}
break;
// other stages propagate the previous result to this pipe and
// increment it by one
default: {
printf(
"stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]
);
buffer[pf.line()] = buffer[pf.line()] + 1;
}
break;
}
};
// create a vector of three pipes
std::vector< tf::Pipe<std::function<void(tf::Pipeflow&)>> > pipes;
for(size_t i=0; i<3; i++) {
pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}
// create a pipeline of four parallel lines based on the given vector of pipes
tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end());
// build the pipeline graph using composition
tf::Task init = taskflow.emplace({ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace({ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
// reset the pipeline to a new range of five pipes and starts from
// the initial state (i.e., token counts from zero)
for(size_t i=0; i<2; i++) {
pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);
}
pl.reset(pipes.begin(), pipes.end());
executor.run(taskflow).wait();
class to create an executor
Definition executor.hpp:62
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
void dump(std::ostream &ostream) const
dumps the task through an output stream
Definition task.hpp:1482
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:1258
class to create a taskflow object
Definition taskflow.hpp:64
@ SERIAL
serial type
Definition pipeline.hpp:117
The program defines a uniform pipe type of tf::Pipe<std::function<void(tf::Pipeflow&)>> and keep all pipes in a vector that is amenable to change. Then, it constructs a scalable pipeline using two range iterators, [first, last), that point to the beginning and the end of the pipe vector, resulting in a pipeline of three serial stages:
Then, the program appends another two pipes into the vector and resets the pipeline to the new range of two additional pipes, resulting in a pipeline of five serial stages:
When resetting a scalable pipeline to a new range, it will start from the initial state as if it has just been constructed, i.e., the token number counts from zero.
NoteUnlike tf::Pipeline that keeps the given pipes in a std::tuple object, tf::ScalablePipeline does not own the given pipe but maintains a vector of iterators to each pipe in the given range. It is your responsibility to keep those pipe objects alive during the execution of the pipeline task.
It is possible to create a scalable pipeline as a placeholder using the constructor tf::ScalablePipeline(size_t num_lines) and reset it to another range later in the application. The following code creates a task to emplace a range of pipes and reset the pipeline to that range, before running the pipeline task:
tf::Executor executor;
tf::Taskflow taskflow;
size_t num_pipes = 10;
size_t num_lines = 10;
std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;
tf::ScalablePipeline<typename decltype(pipes)::iterator> spl(num_lines);
tf::Task init = taskflow.emplace(&{
for(size_t i=0; i<num_pipes; i++) {
pipes.emplace_back(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.pipe() == 0 && pf.token() == 1024) {
pf.stop();
return;
}
});
}
spl.reset(pipes.begin(), pipes.end());
}).name("init");
tf::Task pipeline = taskflow.composed_of(spl).name("pipeline");
pipeline.succeed(init);
executor.run(taskflow).wait();
Task emplace(C &&callable)
creates a static task
Definition flow_builder.hpp:1571
Task composed_of(T &object)
creates a module task for the target object
Definition flow_builder.hpp:1621
class to create a pipe object for a pipeline stage
Definition pipeline.hpp:144
size_t token() const
queries the token identifier
Definition pipeline.hpp:78
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:71
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:88
class to create a scalable pipeline object
Definition pipeline.hpp:775
class to create a task handle over a taskflow node
Definition task.hpp:569
const std::string & name() const
queries the name of the task
Definition task.hpp:1388
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition task.hpp:1266
The task graph of this program is shown below:
NoteIt is your responsibility to ensure a scalable pipeline has a valid structure before running it. A valid pipeline must have at least one parallel line and one pipe, where the first pipe is a serial type.
Similarly, you can create an empty scalable pipeline using the default constructor tf::ScalablePipeline() and reset it later in your program.
std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;
tf::ScalablePipeline<typename decltype(pipes)::iterator> spl;
// create pipes ...
spl.reset(num_lines, pipes.begin(), pipes.end());
void reset()
resets the pipeline
Definition pipeline.hpp:1084
When assigning a range to a scalable pipeline, the pipeline fetches all pipe iterators in that range to an internal vector. This organization allows invoking a pipe callable to be a random accessible operation, regardless of the pipe container type. Taskflow does not have much restriction on the iterator type, as long as these pipes can be iterated in a sequential order using the postfix increment operator, ++.
// use vector to store pipes
std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> vector;
tf::ScalablePipeline spl1(num_lines, vector.begin(), vector.end());
// use list to store pipes
std::list<tf::Pipe<std::function<void(tf::Pipeflow&)>>> list;
tf::ScalablePipeline spl2(num_lines, list.begin(), list.end());