docs/TextProcessingPipeline.html
We study a text processing pipeline that finds the most frequent character of each string from an input source. Parallelism exhibits in the form of a three-stage pipeline that transforms the input string to a final pair type.
Given an input vector of strings, we want to compute the most frequent character for each string using a series of transform operations. For example:
# input stringsabade
ddddf
eefge
xyzzd
ijjjj
jiiii
kkijk# outputa:2
d:4
e:3
z:2
j:4
i:4
k:3
We decompose the algorithm into three stages:
std::string from the input vectorstd::unorder_map<char, size_t> frequency map from the stringstd::pair<char, size_t> from the mapThe first and the third stages process inputs and generate results in serial, and the second stage can run in parallel. The algorithm is a perfect fit to pipeline parallelism, as different stages can overlap with each other in time across parallel lines.
We create a pipeline of three pipes (stages) and two parallel lines to solve the problem. The number of parallel lines is a tunable parameter. In most cases, we can just use std::thread::hardware_concurrency as the line count. The first pipe reads an input string from the vector in order, the second pipe transforms the input string from the first pipe to a frequency map in parallel, and the third pipe reduces the frequency map to find the most frequent character. The overall implementation is shown below:
#include \<taskflow/taskflow.hpp\>#include \<taskflow/algorithm/pipeline.hpp\>// Function: format the mapstd::string format\_map(const std::unordered\_map\<char, size\_t\>& map) {std::ostringstream oss;for(const auto& [i, j] : map) {oss \<\< i \<\< ':' \<\< j \<\< ' ';}return oss.str();}int main() {tf::Taskflow taskflow("text-filter pipeline");tf::Executor executor;const size\_t num\_lines = 2;// input data std::vector\<std::string\> input = {"abade", "ddddf","eefge","xyzzd","ijjjj","jiiii","kkijk"};// custom data storageusing data\_type = std::variant\<std::string, std::unordered\_map\<char, size\_t\>, std::pair\<char, size\_t\>\>;std::array\<data\_type, num\_lines\> mybuffer;// the pipeline consists of three pipes (serial-parallel-serial)// and up to two concurrent scheduling tokenstf::Pipeline pl(num\_lines,// first pipe processes the input datatf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {if(pf.token() == input.size()) {pf.stop();}else {printf("stage 1: input token = %s\n", input[pf.token()].c\_str());mybuffer[pf.line()] = input[pf.token()];}}},// second pipe counts the frequency of each charactertf::Pipe{tf::PipeType::PARALLEL, [&](tf::Pipeflow& pf) {std::unordered\_map\<char, size\_t\> map;for(auto c : std::get\<std::string\>(mybuffer[pf.line()])) {map[c]++;}printf("stage 2: map = %s\n", format\_map(map).c\_str());mybuffer[pf.line()] = map;}},// third pipe reduces the most frequent charactertf::Pipe{tf::PipeType::SERIAL, [&mybuffer](tf::Pipeflow& pf) {auto& map = std::get\<std::unordered\_map\<char, size\_t\>\>(mybuffer[pf.line()]);auto sol = std::max\_element(map.begin(), map.end(), [](auto& a, auto& b){return a.second \< b.second;});printf("stage 3: %c:%zu\n", sol-\>first, sol-\>second);// not necessary to store the last-stage data, just for demo purposemybuffer[pf.line()] = \*sol;}});// build the pipeline graph using compositiontf::Task init = taskflow.emplace([](){ std::cout \<\< "ready\n"; }).name("starting pipeline");tf::Task task = taskflow.composed\_of(pl).name("pipeline");tf::Task stop = taskflow.emplace([](){ std::cout \<\< "stopped\n"; }).name("pipeline stopped");// create task dependencyinit.precede(task);task.precede(stop);// dump the pipeline graph structure (with composition)taskflow.dump(std::cout);// run the pipelineexecutor.run(taskflow).wait();return 0;}
Taskflow does not provide any data abstraction to perform pipeline scheduling, but give users full control over data management in their applications. In this example, we create an one-dimensional buffer of a std::variant data type to store the output of each pipe in a uniform storage:
using data\_type = std::variant\<std::string, std::unordered\_map\<char, size\_t\>, std::pair\<char, size\_t\>\>;std::array\<std::array\<data\_type, num\_pipes\>, num\_lines\> mybuffer;
The first pipe reads one string and puts it in the corresponding entry at the buffer, mybuffer[pf.line()]. Since we read in each string in order, we declare the pipe as a serial type:
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {if(pf.token() == input.size()) {pf.stop();}else {mybuffer[pf.line()] = input[pf.token()];printf("stage 1: input token = %s\n", input[pf.token()].c\_str());}}},
The second pipe needs to get the input string from the previous pipe and then transforms that input string into a frequency map that records the occurrence of each character in the string. As multiple transforms can operate simultaneously, we declare the pipe as a parallel type:
tf::Pipe{tf::PipeType::PARALLEL, [&](tf::Pipeflow& pf) {std::unordered\_map\<char, size\_t\> map;for(auto c : std::get\<std::string\>(mybuffer[pf.line()])) {map[c]++;}mybuffer[pf.line()] = map;printf("stage 2: map = %s\n", format\_map(map).c\_str());}}
Similarly, the third pipe needs to get the input frequency map from the previous pipe and then reduces the result to find the most frequent character. We may not need to store the result in the buffer but other places defined by the application (e.g., an output file). As we want to output the result in the same order as the input, we declare the pipe as a serial type:
tf::Pipe{tf::PipeType::SERIAL, [&mybuffer](tf::Pipeflow& pf) {auto& map = std::get\<std::unordered\_map\<char, size\_t\>\>(mybuffer[pf.line()]);auto sol = std::max\_element(map.begin(), map.end(), [](auto& a, auto& b){return a.second \< b.second;});printf("stage 3: %c:%zu\n", sol-\>first, sol-\>second);}}
To build up the taskflow graph for the pipeline, we create a module task out of the pipeline structure and connect it with two tasks that outputs messages before and after the pipeline:
tf::Task init = taskflow.emplace([](){ std::cout \<\< "ready\n"; }).name("starting pipeline");tf::Task task = taskflow.composed\_of(pl).name("pipeline");tf::Task stop = taskflow.emplace([](){ std::cout \<\< "stopped\n"; }).name("pipeline stopped");init.precede(task);task.precede(stop);
Finally, we submit the taskflow to the execution and run it once:
executor.run(taskflow).wait();
As the second stage is a parallel pipe, the output may interleave. One possible result is shown below:
ready
stage1: inputtoken=abade
stage1: inputtoken=ddddf
stage2:map=f:1 d:4
stage2:map=e:1 d:1 a:2 b:1
stage3: a:2
stage1: inputtoken=eefge
stage2:map=g:1 e:3 f:1
stage3: d:4
stage1: inputtoken=xyzzd
stage3: e:3
stage1: inputtoken=ijjjj
stage2:map=z:2 x:1 d:1 y:1
stage3: z:2
stage1: inputtoken=jiiii
stage2:map=j:4 i:1
stage3: j:4
stage2:map=i:4 j:1
stage1: inputtoken=kkijk
stage3: i:4
stage2:map=j:1 k:3 i:1
stage3: k:3
stopped
We can see seven outputs at the third stage that show the most frequent character for each of the seven strings in order (a:2, d:4, e:3, z:2, j:4, i:4, k:3). The taskflow graph of this pipeline workload is shown below:
Taskflowcluster_p0x7ffd7418c200Text Processing Pipelinecluster_p0x7ffd7418c110m1p0x7bc4000142e8starting pipelinep0x7bc4000143d0pipeline [m1]p0x7bc4000142e8->p0x7bc4000143d0p0x7bc4000144b8pipeline stoppedp0x7bc4000143d0->p0x7bc4000144b8p0x7bc400014030condp0x7bc400014118rt-0p0x7bc400014030->p0x7bc4000141180p0x7bc400014200rt-1p0x7bc400014030->p0x7bc4000142001