Back to Taskflow

Problem Formulation

docs/TextProcessingPipeline.html

4.1.07.0 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

Text Processing Pipeline

We study a text processing pipeline that finds the most frequent character in each string from an input source, demonstrating how Taskflow's pipeline model overlaps serial and parallel stages to process a stream of tokens efficiently.

Problem Formulation

Given a vector of strings, we want to find the most frequent character in each string and output the result in the same order as the input. For example:

input

abade ddddf eefge xyzzd ijjjj jiiii kkijk

output (most frequent character : count)

a:2 d:4 e:3 z:2 j:4 i:4 k:3

We decompose the computation into three stages:

  1. Read (serial) — read one string from the input vector in order
  2. Count (parallel) — build a character-frequency map from the string
  3. Reduce (serial) — find the most frequent character from the map and output the result

The first and third stages must run serially to preserve input/output order. The second stage is independent across strings and can run in parallel across multiple pipeline lines.

Creating the Pipeline

We create a pipeline of three pipes with two parallel lines. A larger line count increases throughput at the cost of memory — in practice, std::thread::hardware_concurrency is a good default.

#include <taskflow/taskflow.hpp>

#include <taskflow/algorithm/pipeline.hpp>

std::string format_map(const std::unordered_map<char, size_t>& map) {

std::ostringstream oss;

for(const auto& [c, n] : map) oss << c << ':' << n << ' ';

return oss.str();

}

int main() {

tf::Taskflow taskflow("text-pipeline");

tf::Executor executor;

const size_t num_lines = 2;

std::vector<std::string> input = {

"abade", "ddddf", "eefge", "xyzzd", "ijjjj", "jiiii", "kkijk"

};

// one buffer slot per pipeline line; each slot holds the data for one token

using data_type = std::variant<

std::string,

std::unordered_map<char, size_t>,

std::pair<char, size_t>

>;

std::array<data_type, num_lines> buffer;

tf::Pipeline pl(num_lines,

// stage 1 (serial): read the next input string

tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

if(pf.token() == input.size()) {

pf.stop(); // no more tokens — shut the pipeline down

}

else {

printf("stage 1: %s\n", input[pf.token()].c_str());

buffer[pf.line()] = input[pf.token()];

}

}},

// stage 2 (parallel): build a character-frequency map

tf::Pipe{tf::PipeType::PARALLEL, [&](tf::Pipeflow& pf) {

std::unordered_map<char, size_t> map;

for(char c : std::get<std::string>(buffer[pf.line()])) map[c]++;

printf("stage 2: %s\n", format_map(map).c_str());

buffer[pf.line()] = map;

}},

// stage 3 (serial): find and report the most frequent character

tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {

auto& map = std::get<std::unordered_map<char, size_t>>(buffer[pf.line()]);

auto sol = std::max_element(map.begin(), map.end(),

[](const auto& a, const auto& b) { return a.second < b.second; }

);

printf("stage 3: %c:%zu\n", sol->first, sol->second);

buffer[pf.line()] = *sol;

}}

);

tf::Task init = taskflow.emplace({ std::cout << "ready\n"; })

.name("start");

tf::Task pipe = taskflow.composed_of(pl)

.name("pipeline");

tf::Task done = taskflow.emplace({ std::cout << "done\n"; })

.name("stop");

init.precede(pipe);

pipe.precede(done);

executor.run(taskflow).wait();

return 0;

}

tf::Executor

class to create an executor

Definition executor.hpp:62

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow)

runs a taskflow once

tf::Pipe

class to create a pipe object for a pipeline stage

Definition pipeline.hpp:144

tf::Pipeflow

class to create a pipeflow object used by the pipe callable

Definition pipeline.hpp:43

tf::Pipeline

class to create a pipeline scheduling framework

Definition pipeline.hpp:307

tf::Task::precede

Task & precede(Ts &&... tasks)

adds precedence links from this to other tasks

Definition task.hpp:1258

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

tf::PipeType::SERIAL

@ SERIAL

serial type

Definition pipeline.hpp:117

tf::PipeType::PARALLEL

@ PARALLEL

parallel type

Definition pipeline.hpp:115

Data Buffer

Taskflow gives users full control over data management in a pipeline. We allocate a one-dimensional buffer indexed by pipeline line:

std::array<data_type, num_lines> buffer;

A one-dimensional buffer is sufficient because Taskflow guarantees that at most one scheduling token is active per line at any time, so no two tokens will read or write the same buffer slot simultaneously.

NoteOnly input elements are transformed by stage functions — the pipeline scheduling token index (tf::Pipeflow::token) identifies which input element is being processed, and the line index (tf::Pipeflow::line) identifies which buffer slot to use.

Sample Output

Because stage 2 is a parallel pipe, its output may interleave across lines. One possible execution trace:

ready

stage 1: abade

stage 1: ddddf

stage 2: f:1 d:4

stage 2: e:1 d:1 a:2 b:1

stage 3: a:2

stage 1: eefge

stage 2: g:1 e:3 f:1

stage 3: d:4

stage 1: xyzzd

stage 3: e:3

stage 1: ijjjj

stage 2: z:2 x:1 d:1 y:1

stage 3: z:2

stage 1: jiiii

stage 2: j:4 i:1

stage 3: j:4

stage 2: i:4 j:1

stage 1: kkijk

stage 3: i:4

stage 2: j:1 k:3 i:1

stage 3: k:3

done

The seven stage-3 outputs appear in the same order as the input (a:2, d:4, e:3, z:2, j:4, i:4, k:3), as guaranteed by the serial pipe declaration. The pipeline task graph is shown below:

Embedded content