docs/DataParallelPipeline.html
Taskflow provides another variant, tf::DataPipeline, on top of tf::Pipeline (see Task-parallel Pipeline) to help you implement data-parallel pipeline algorithms while leaving data management to Taskflow. We recommend you finishing reading TaskParallelPipeline first before learning tf::DataPipeline.
You need to include the header file, taskflow/algorithm/data_pipeline.hpp, for implementing data-parallel pipeline algorithms.
#include \<taskflow/algorithm/data\_pipeline.hpp\>
Similar to creating a task-parallel pipeline (tf::Pipeline), there are three steps to create a data-parallel pipeline application:
The following example creates a data-parallel pipeline that generates a total of five dataflow tokens from void to int at the first stage, from int to std::string at the second stage, and std::string to void at the final stage. Data storage between stages is automatically managed by tf::DataPipeline.
#include \<taskflow/taskflow.hpp\>#include \<taskflow/algorithm/data\_pipeline.hpp\>int main() {// data flow =\> void -\> int -\> std::string -\> void tf::Taskflow taskflow("pipeline");tf::Executor executor;const size\_t num\_lines = 4;// create a pipeline graphtf::DataPipeline pl(num\_lines,tf::make\_data\_pipe\<void, int\>(tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) -\> int{if(pf.token() == 5) {pf.stop();return 0;}else {printf("first pipe returns %lu\n", pf.token());return pf.token();}}),tf::make\_data\_pipe\<int, std::string\>(tf::PipeType::SERIAL, [](int& input) {printf("second pipe returns a string of %d\n", input + 100);return std::to\_string(input + 100);}),tf::make\_data\_pipe\<std::string, void\>(tf::PipeType::SERIAL, [](std::string& input) {printf("third pipe receives the input string %s\n", input.c\_str());}));// build the pipeline graph using compositiontaskflow.composed\_of(pl).name("pipeline");// dump the pipeline graph structure (with composition)taskflow.dump(std::cout);// run the pipelineexecutor.run(taskflow).wait();return 0;}
The interface of tf::DataPipeline is very similar to tf::Pipeline, except that the library transparently manages the dataflow between pipes. To create a stage in a data-parallel pipeline, you should always use the helper function tf::make_data_pipe:
tf::make\_data\_pipe\<int, std::string\>(tf::PipeType::SERIAL, [](int& input) { return std::to\_string(input + 100);});
The helper function starts with a pair of an input and an output types in its template arguments. Both types will always be decayed to their original form using std::decay (e.g., const int& becomes int) for storage purpose. In terms of function arguments, the first argument specifies the direction of this data pipe, which can be either tf::PipeType::SERIAL or tf::PipeType::PARALLEL, and the second argument is a callable to invoke by the pipeline scheduler. The callable must take the input data type in its first argument and returns a value of the output data type. Additionally, the callable can take a tf::Pipeflow reference in its second argument which allows you to query the runtime information of a stage task, such as its line number and token number.
tf::make\_data\_pipe\<int, std::string\>(tf::PipeType::SERIAL, [](int& input, tf::Pipeflow& pf) {printf("token=%lu, line=%lu\n", pf.token(), pf.line());return std::to\_string(input + 100);})
For the first pipe, the input type should always be void and the callable must take a tf::Pipeflow reference in its argument. In this example, we will stop the pipeline when processing five tokens.
tf::make\_data\_pipe\<void, int\>(tf::PipeType::SERIAL, [](tf::Pipeflow& pf) -\> int{if(pf.token() == 5) {pf.stop();return 0;// returns a dummy value}else {return pf.token();}}),
Similarly, the output type of the last pipe should be void as no more data will go out of the final pipe.
tf::make\_data\_pipe\<std::string, void\>(tf::PipeType::SERIAL, [](std::string& input) {std::cout \<\< input \<\< std::endl;})
Finally, you need to compose the pipeline graph by creating a module task (i.e., tf::Taskflow::compoased_of).
// build the pipeline graph using compositiontaskflow.composed\_of(pl).name("pipeline");// dump the pipeline graph structure (with composition)taskflow.dump(std::cout);// run the pipelineexecutor.run(taskflow).wait();
Taskflowcluster_p0x7ffc47e53358Taskflowcluster_p0x7ffc47e53220m1p0x1878a88pipeline [m1]p0x1878600condp0x18786e8rt-0p0x1878600->p0x18786e80p0x18787d0rt-1p0x1878600->p0x18787d01p0x18788b8rt-2p0x1878600->p0x18788b82p0x18789a0rt-3p0x1878600->p0x18789a03
By default, tf::DataPipeline uses std::variant to store a type-safe union of all input and output data types extracted from the given data pipes. To avoid false sharing, each line keeps a variant that is aligned with the cacheline size. When invoking a pipe callable, the input data is acquired in reference from the variant using std::get. When returning from a pipe callable, the output data is stored back to the variant using assignment operator.