Back to Taskflow

Include the Header

docs/ScalableTaskParallelPipeline.html

4.1.012.3 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

Scalable Task-parallel Pipeline

Unlike tf::Pipeline (see Task-parallel Pipeline) that instantiates all pipes at the construction time, Taskflow provides a scalable alternative called tf::ScalablePipeline to allow variable assignments of pipes using range iterators. A scalable pipeline is thus more flexible for applications to create a pipeline scheduling framework whose pipeline structure depends on runtime variables.

Include the Header

You need to include the header file, taskflow/algorithm/pipeline.hpp, for creating a scalable pipeline scheduling framework.

#include <taskflow/algorithm/pipeline.hpp>

Create a Scalable Pipeline Module Task

Similar to tf::Pipeline, tf::ScalablePipeline is a composable graph object to implement a pipeline scheduling framework in a taskflow. The key difference between tf::Pipeline and tf::ScalablePipeline is that a scalable pipeline can accept variable assignments of pipes rather than instantiating all pipes at construction or programming time. Users define a linear range of pipes, each of the same callable type, and apply that range to construct a scalable pipeline. Between successive runs, users can reset the pipeline to a different range of pipes. The following code creates a scalable pipeline that uses four parallel lines to schedule tokens through three serial pipes in the given vector, then resetting that pipeline to a new range of five serial pipes:

tf::Taskflow taskflow("pipeline");

tf::Executor executor;

const size_t num_lines = 4;

// create data storage

std::array<int, num_lines> buffer;

// define the pipe callable

auto pipe_callable = [&buffer] (tf::Pipeflow& pf) mutable {

switch(pf.pipe()) {

// first stage generates only 5 scheduling tokens and saves the

// token number into the buffer.

case 0: {

if(pf.token() == 5) {

pf.stop();

}

else {

printf("stage 1: input token = %zu\n", pf.token());

buffer[pf.line()] = pf.token();

}

return;

}

break;

// other stages propagate the previous result to this pipe and

// increment it by one

default: {

printf(

"stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]

);

buffer[pf.line()] = buffer[pf.line()] + 1;

}

break;

}

};

// create a vector of three pipes

std::vector< tf::Pipe<std::function<void(tf::Pipeflow&)>> > pipes;

for(size_t i=0; i<3; i++) {

pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);

}

// create a pipeline of four parallel lines based on the given vector of pipes

tf::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end());

// build the pipeline graph using composition

tf::Task init = taskflow.emplace({ std::cout << "ready\n"; })

.name("starting pipeline");

tf::Task task = taskflow.composed_of(pl)

.name("pipeline");

tf::Task stop = taskflow.emplace({ std::cout << "stopped\n"; })

.name("pipeline stopped");

// create task dependency

init.precede(task);

task.precede(stop);

// dump the pipeline graph structure (with composition)

taskflow.dump(std::cout);

// run the pipeline

executor.run(taskflow).wait();

// reset the pipeline to a new range of five pipes and starts from

// the initial state (i.e., token counts from zero)

for(size_t i=0; i<2; i++) {

pipes.emplace_back(tf::PipeType::SERIAL, pipe_callable);

}

pl.reset(pipes.begin(), pipes.end());

executor.run(taskflow).wait();

tf::Executor

class to create an executor

Definition executor.hpp:62

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow)

runs a taskflow once

tf::Pipeflow

class to create a pipeflow object used by the pipe callable

Definition pipeline.hpp:43

tf::Task::dump

void dump(std::ostream &ostream) const

dumps the task through an output stream

Definition task.hpp:1482

tf::Task::precede

Task & precede(Ts &&... tasks)

adds precedence links from this to other tasks

Definition task.hpp:1258

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

tf::PipeType::SERIAL

@ SERIAL

serial type

Definition pipeline.hpp:117

The program defines a uniform pipe type of tf::Pipe<std::function<void(tf::Pipeflow&)>> and keep all pipes in a vector that is amenable to change. Then, it constructs a scalable pipeline using two range iterators, [first, last), that point to the beginning and the end of the pipe vector, resulting in a pipeline of three serial stages:

Embedded content

Then, the program appends another two pipes into the vector and resets the pipeline to the new range of two additional pipes, resulting in a pipeline of five serial stages:

Embedded content

When resetting a scalable pipeline to a new range, it will start from the initial state as if it has just been constructed, i.e., the token number counts from zero.

NoteUnlike tf::Pipeline that keeps the given pipes in a std::tuple object, tf::ScalablePipeline does not own the given pipe but maintains a vector of iterators to each pipe in the given range. It is your responsibility to keep those pipe objects alive during the execution of the pipeline task.

Reset a Placeholder Scalable Pipeline

It is possible to create a scalable pipeline as a placeholder using the constructor tf::ScalablePipeline(size_t num_lines) and reset it to another range later in the application. The following code creates a task to emplace a range of pipes and reset the pipeline to that range, before running the pipeline task:

tf::Executor executor;

tf::Taskflow taskflow;

size_t num_pipes = 10;

size_t num_lines = 10;

std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;

tf::ScalablePipeline<typename decltype(pipes)::iterator> spl(num_lines);

tf::Task init = taskflow.emplace(&{

for(size_t i=0; i<num_pipes; i++) {

pipes.emplace_back(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

if(pf.pipe() == 0 && pf.token() == 1024) {

pf.stop();

return;

}

});

}

spl.reset(pipes.begin(), pipes.end());

}).name("init");

tf::Task pipeline = taskflow.composed_of(spl).name("pipeline");

pipeline.succeed(init);

executor.run(taskflow).wait();

tf::FlowBuilder::emplace

Task emplace(C &&callable)

creates a static task

Definition flow_builder.hpp:1571

tf::FlowBuilder::composed_of

Task composed_of(T &object)

creates a module task for the target object

Definition flow_builder.hpp:1621

tf::Pipe

class to create a pipe object for a pipeline stage

Definition pipeline.hpp:144

tf::Pipeflow::token

size_t token() const

queries the token identifier

Definition pipeline.hpp:78

tf::Pipeflow::pipe

size_t pipe() const

queries the pipe identifier of the present token

Definition pipeline.hpp:71

tf::Pipeflow::stop

void stop()

stops the pipeline scheduling

Definition pipeline.hpp:88

tf::ScalablePipeline

class to create a scalable pipeline object

Definition pipeline.hpp:775

tf::Task

class to create a task handle over a taskflow node

Definition task.hpp:569

tf::Task::name

const std::string & name() const

queries the name of the task

Definition task.hpp:1388

tf::Task::succeed

Task & succeed(Ts &&... tasks)

adds precedence links from other tasks to this

Definition task.hpp:1266

The task graph of this program is shown below:

Embedded content

NoteIt is your responsibility to ensure a scalable pipeline has a valid structure before running it. A valid pipeline must have at least one parallel line and one pipe, where the first pipe is a serial type.

Similarly, you can create an empty scalable pipeline using the default constructor tf::ScalablePipeline() and reset it later in your program.

std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> pipes;

tf::ScalablePipeline<typename decltype(pipes)::iterator> spl;

// create pipes ...

spl.reset(num_lines, pipes.begin(), pipes.end());

tf::ScalablePipeline::reset

void reset()

resets the pipeline

Definition pipeline.hpp:1084

Use Other Iterator Types

When assigning a range to a scalable pipeline, the pipeline fetches all pipe iterators in that range to an internal vector. This organization allows invoking a pipe callable to be a random accessible operation, regardless of the pipe container type. Taskflow does not have much restriction on the iterator type, as long as these pipes can be iterated in a sequential order using the postfix increment operator, ++.

// use vector to store pipes

std::vector<tf::Pipe<std::function<void(tf::Pipeflow&)>>> vector;

tf::ScalablePipeline spl1(num_lines, vector.begin(), vector.end());

// use list to store pipes

std::list<tf::Pipe<std::function<void(tf::Pipeflow&)>>> list;

tf::ScalablePipeline spl2(num_lines, list.begin(), list.end());