docs/data__pipeline_8hpp_source.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
data_pipeline.hpp
1#pragma once
2
3#include "pipeline.hpp"
4
5
6namespace tf {
7
8// ----------------------------------------------------------------------------
9// Class Definition: DataPipe
10// ----------------------------------------------------------------------------
11
51template <typename Input, typename Output, typename C>
53
54template <typename... Ps>
55friend class DataPipeline;
56
57public:
58
62using callable_t = C;
63
68
73
78
86DataPipe(PipeType d, callable_t&& callable) :
87 _type{d}, _callable{std::forward<callable_t>(callable)} {
88 }
89
97return _type;
98 }
99
104 _type = type;
105 }
106
116template <typename U>
117void callable(U&& callable) {
118 _callable = std::forward<U>(callable);
119 }
120
121private:
122
123PipeType _type;
124
125callable_t _callable;
126};
127
170template <typename Input, typename Output, typename C>
171auto make_data_pipe(PipeType d, C&& callable) {
172return DataPipe<Input, Output, C>(d, std::forward<C>(callable));
173}
174
175// ----------------------------------------------------------------------------
176// Class Definition: DataPipeline
177// ----------------------------------------------------------------------------
178
253template <typename... Ps>
254class DataPipeline {
255
256static_assert(sizeof...(Ps)>0, "must have at least one pipe");
257
261struct Line {
262 std::atomic<size_t> join_counter;
263 };
264
268struct PipeMeta {
269PipeType type;
270 };
271
272
273public:
274
278using data_t = unique_variant_t<std::variant<std::conditional_t<
279 std::is_void_v<typename Ps::output_t>,
280 std::monostate,
281 std::decay_t<typename Ps::output_t>>...
282 >>;
283
295DataPipeline(size_t num_lines, Ps&&... ps);
296
308DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps);
309
318size_t num_lines() const noexcept;
319
326 constexpr size_t num_pipes() const noexcept;
327
335void reset();
336
343size_t num_tokens() const noexcept;
344
352
353 private:
354
355Graph _graph;
356
357size_t _num_tokens;
358
359 std::tuple<Ps...> _pipes;
360 std::array<PipeMeta, sizeof...(Ps)> _meta;
361 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
362 std::vector<Task> _tasks;
363 std::vector<Pipeflow> _pipeflows;
364 std::vector<CachelineAligned<data_t>> _buffer;
365
366 template <size_t... I>
367 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
368
369void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
370void _build();
371};
372
373// constructor
374template <typename... Ps>
375DataPipeline<Ps...>::DataPipeline(size_t num_lines, Ps&&... ps) :
376 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
377 _meta {PipeMeta{ps.type()}...},
378 _lines (num_lines),
379 _tasks (num_lines + 1),
380 _pipeflows (num_lines),
381 _buffer (num_lines) {
382
383if(num_lines == 0) {
384 TF_THROW("must have at least one line");
385 }
386
387if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
388 TF_THROW("first pipe must be serial");
389 }
390
391reset();
392 _build();
393}
394
395// constructor
396template <typename... Ps>
397DataPipeline<Ps...>::DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps) :
398 _pipes {std::forward<std::tuple<Ps...>>(ps)},
399 _meta {_gen_meta(
400 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
401 )},
402 _lines (num_lines),
403 _tasks (num_lines + 1),
404 _pipeflows (num_lines),
405 _buffer (num_lines) {
406
407if(num_lines == 0) {
408 TF_THROW("must have at least one line");
409 }
410
411if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
412 TF_THROW("first pipe must be serial");
413 }
414
415 reset();
416 _build();
417}
418
419// Function: _get_meta
420template <typename... Ps>
421template <size_t... I>
422auto DataPipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
423return std::array{PipeMeta{std::get<I>(ps).type()}...};
424}
425
426// Function: num_lines
427template <typename... Ps>
428size_t DataPipeline<Ps...>::num_lines() const noexcept {
429return _pipeflows.size();
430}
431
432// Function: num_pipes
433template <typename... Ps>
434constexpr size_t DataPipeline<Ps...>::num_pipes() const noexcept {
435return sizeof...(Ps);
436}
437
438// Function: num_tokens
439template <typename... Ps>
440size_t DataPipeline<Ps...>::num_tokens() const noexcept {
441return _num_tokens;
442}
443
444// Function: graph
445template <typename... Ps>
446Graph& DataPipeline<Ps...>::graph() {
447return _graph;
448}
449
450// Function: reset
451template <typename... Ps>
452void DataPipeline<Ps...>::reset() {
453
454 _num_tokens = 0;
455
456for(size_t l = 0; l<num_lines(); l++) {
457 _pipeflows[l]._pipe = 0;
458 _pipeflows[l]._line = l;
459 }
460
461 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
462
463for(size_t l=1; l<num_lines(); l++) {
464for(size_t f=1; f<num_pipes(); f++) {
465 _lines[l][f].join_counter.store(
466static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
467 );
468 }
469 }
470
471for(size_t f=1; f<num_pipes(); f++) {
472 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
473 }
474
475for(size_t l=1; l<num_lines(); l++) {
476 _lines[l][0].join_counter.store(
477static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
478 );
479 }
480}
481
482// Procedure: _on_pipe
483template <typename... Ps>
484void DataPipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime&) {
485
486 visit_tuple([&](auto&& pipe){
487
488using data_pipe_t = std::decay_t<decltype(pipe)>;
489using callable_t = typename data_pipe_t::callable_t;
490using input_t = std::decay_t<typename data_pipe_t::input_t>;
491using output_t = std::decay_t<typename data_pipe_t::output_t>;
492
493// first pipe
494if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
495// -> void {}, i.e., we only have one pipe
496if constexpr (std::is_void_v<output_t>) {
497 pipe._callable(pf);
499 } else {
500 _buffer[pf._line].data = pipe._callable(pf);
501 }
502 }
503// other pipes without pipeflow in the second argument
504else if constexpr (std::is_invocable_v<callable_t, std::add_lvalue_reference_t<input_t> >) {
505// -> void {}, i.e., the last pipe
506if constexpr (std::is_void_v<output_t>) {
507 pipe._callable(std::get<input_t>(_buffer[pf._line].data));
509 } else {
510 _buffer[pf._line].data = pipe._callable(
511 std::get<input_t>(_buffer[pf._line].data)
512 );
513 }
514 }
515// other pipes with pipeflow in the second argument
516else if constexpr (std::is_invocable_v<callable_t, input_t&, Pipeflow&>) {
517// [](input_t&, tf::Pipeflow&) -> void {}
518if constexpr (std::is_void_v<output_t>) {
519 pipe._callable(std::get<input_t>(_buffer[pf._line].data), pf);
520// [](input_t&, tf::Pipeflow&) -> output_t {}
521 } else {
522 _buffer[pf._line].data = pipe._callable(
523 std::get<input_t>(_buffer[pf._line].data), pf
524 );
525 }
526 }
527//else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
528// pipe._callable(pf, rt);
529//}
530else {
531static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
532 }
533 }, _pipes, pf._pipe);
534}
535
536// Procedure: _build
537template <typename... Ps>
538void DataPipeline<Ps...>::_build() {
539
540using namespace std::literals::string_literals;
541
542 FlowBuilder fb(_graph);
543
544// init task
545 _tasks[0] = fb.emplace(this {
546return static_cast<int>(_num_tokens % num_lines());
547 }).name("cond");
548
549// line task
550for(size_t l = 0; l < num_lines(); l++) {
551
552 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
553
554auto pf = &_pipeflows[l];
555
556 pipeline:
557
558 _lines[pf->_line][pf->_pipe].join_counter.store(
559static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
560 );
561
562if (pf->_pipe == 0) {
563 pf->_token = _num_tokens;
564if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
565// here, the pipeline is not stopped yet because other
566// lines of tasks may still be running their last stages
567return;
568 }
569 ++_num_tokens;
570 }
571else {
572 _on_pipe(*pf, rt);
573 }
574
575size_t c_f = pf->_pipe;
576size_t n_f = (pf->_pipe + 1) % num_pipes();
577size_t n_l = (pf->_line + 1) % num_lines();
578
579 pf->_pipe = n_f;
580
581// ---- scheduling starts here ----
582// Notice that the shared variable f must not be changed after this
583// point because it can result in data race due to the following
584// condition:
585//
586// a -> b
587// | |
588// v v
589// c -> d
590//
591// d will be spawned by either c or b, so if c changes f but b spawns d
592// then data race on f will happen
593
594 std::array<int, 2> retval;
595size_t n = 0;
596
597// downward dependency
598if(_meta[c_f].type == PipeType::SERIAL &&
599 _lines[n_l][c_f].join_counter.fetch_sub(
600 1, std::memory_order_acq_rel) == 1
601 ) {
602 retval[n++] = 1;
603 }
604
605// forward dependency
606if(_lines[pf->_line][n_f].join_counter.fetch_sub(
607 1, std::memory_order_acq_rel) == 1
608 ) {
609 retval[n++] = 0;
610 }
611
612// notice that the task index starts from 1
613switch(n) {
614case 2: {
615 rt.schedule(_tasks[n_l+1]);
616goto pipeline;
617 }
618case 1: {
619// downward dependency
620if (retval[0] == 1) {
621 pf = &_pipeflows[n_l];
622 }
623// forward dependency
624goto pipeline;
625 }
626 }
627 }).name("rt-"s + std::to_string(l));
628
629 _tasks[0].precede(_tasks[l+1]);
630 }
631}
632
633
634} // end of namespace tf -----------------------------------------------------
635
636
637
638
639
class to ensure cacheline-aligned storage for an object.
Definition os.hpp:219
class to create a stage in a data-parallel pipeline
Definition data_pipeline.hpp:52
DataPipe(PipeType d, callable_t &&callable)
constructs a data pipe
Definition data_pipeline.hpp:86
void callable(U &&callable)
assigns a new callable to the data pipe
Definition data_pipeline.hpp:117
C callable_t
callable type of the data pipe
Definition data_pipeline.hpp:62
PipeType type() const
queries the type of the data pipe
Definition data_pipeline.hpp:96
Output output_t
output type of the data pipe
Definition data_pipeline.hpp:72
DataPipe()=default
default constructor
Input input_t
input type of the data pipe
Definition data_pipeline.hpp:67
void type(PipeType type)
assigns a new type to the data pipe
Definition data_pipeline.hpp:103
tf::DataPipeline::DataPipeline
DataPipeline(size_t num_lines, Ps &&... ps)
constructs a data-parallel pipeline object
Definition data_pipeline.hpp:375
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition data_pipeline.hpp:440
constexpr size_t num_pipes() const noexcept
queries the number of pipes
Definition data_pipeline.hpp:434
size_t num_lines() const noexcept
queries the number of parallel lines
Definition data_pipeline.hpp:428
void reset()
resets the pipeline
Definition data_pipeline.hpp:452
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition data_pipeline.hpp:446
unique_variant_t< std::variant< std::conditional_t< std::is_void_v< typename Ps::output_t >, std::monostate, std::decay_t< typename Ps::output_t > >... > > data_t
internal storage type for each data token (default std::variant)
Definition data_pipeline.hpp:278
class to create a graph object
Definition graph.hpp:47
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
class to create a task handle over a taskflow node
Definition task.hpp:569
taskflow namespace
Definition small_vector.hpp:20
auto make_data_pipe(PipeType d, C &&callable)
function to construct a data pipe (tf::DataPipe)
Definition data_pipeline.hpp:171
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117