docs/pipeline_8hpp_source.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
pipeline.hpp
1#pragma once
2
3#include "../taskflow.hpp"
4
9
10namespace tf {
11
12
13// ----------------------------------------------------------------------------
14// Class Definition: Pipeflow
15// ----------------------------------------------------------------------------
16
44
45template <typename... Ps>
46friend class Pipeline;
47
48template <typename P>
49friend class ScalablePipeline;
50
51template <typename... Ps>
52friend class DataPipeline;
53
54public:
55
60
65return _line;
66 }
67
72return _pipe;
73 }
74
79return _token;
80 }
81
89if(_pipe != 0) {
90 TF_THROW("only the first pipe can stop the token");
91 }
92 _stop = true;
93 }
94
95private:
96
97// Regular data
98size_t _line;
99size_t _pipe;
100size_t _token;
101bool _stop;
102};
103
104// ----------------------------------------------------------------------------
105// Class Definition: PipeType
106// ----------------------------------------------------------------------------
107
113enum class PipeType : int {
118};
119
120// ----------------------------------------------------------------------------
121// Class Definition: Pipe
122// ----------------------------------------------------------------------------
123
143template <typename C = std::function<void(tf::Pipeflow&)>>
145
146template <typename... Ps>
147friend class Pipeline;
148
149template <typename P>
150friend class ScalablePipeline;
151
152public:
153
157using callable_t = C;
158
163
181Pipe(PipeType d, C&& callable) :
182 _type{d}, _callable{std::forward<C>(callable)} {
183 }
184
191return _type;
192 }
193
200 _type = type;
201 }
202
211template <typename U>
212void callable(U&& callable) {
213 _callable = std::forward<U>(callable);
214 }
215
216private:
217
218PipeType _type;
219
220 C _callable;
221};
222
223// ----------------------------------------------------------------------------
224// Class Definition: Pipeline
225// ----------------------------------------------------------------------------
226
306template <typename... Ps>
308
309static_assert(sizeof...(Ps)>0, "must have at least one pipe");
310
314struct Line {
315 std::atomic<size_t> join_counter;
316 };
317
321struct PipeMeta {
322PipeType type;
323 };
324
325public:
326
338Pipeline(size_t num_lines, Ps&&... ps);
339
351Pipeline(size_t num_lines, std::tuple<Ps...>&& ps);
352
361size_t num_lines() const noexcept;
362
369constexpr size_t num_pipes() const;
370
378void reset();
379
386size_t num_tokens() const noexcept;
387
395
396
397private:
398
399Graph _graph;
400
401size_t _num_tokens;
402
403 std::tuple<Ps...> _pipes;
404 std::array<PipeMeta, sizeof...(Ps)> _meta;
405 std::vector<std::array<Line, sizeof...(Ps)>> _lines;
406 std::vector<Task> _tasks;
407 std::vector<Pipeflow> _pipeflows;
408
409template <size_t... I>
410auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);
411
412void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
413void _build();
414};
415
416// constructor
417template <typename... Ps>
418Pipeline<Ps...>::Pipeline(size_t num_lines, Ps&&... ps) :
419 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},
420 _meta {PipeMeta{ps.type()}...},
421 _lines (num_lines),
422 _tasks (num_lines + 1),
423 _pipeflows (num_lines) {
424
425if(num_lines == 0) {
426 TF_THROW("must have at least one line");
427 }
428
429if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
430 TF_THROW("first pipe must be serial");
431 }
432
433reset();
434 _build();
435}
436
437// constructor
438template <typename... Ps>
439Pipeline<Ps...>::Pipeline(size_t num_lines, std::tuple<Ps...>&& ps) :
440 _pipes {std::forward<std::tuple<Ps...>>(ps)},
441 _meta {_gen_meta(
442 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}
443 )},
444 _lines (num_lines),
445 _tasks (num_lines + 1),
446 _pipeflows (num_lines) {
447
448if(num_lines == 0) {
449 TF_THROW("must have at least one line");
450 }
451
452if(std::get<0>(_pipes).type() != PipeType::SERIAL) {
453 TF_THROW("first pipe must be serial");
454 }
455
456 reset();
457 _build();
458}
459
460// Function: _get_meta
461template <typename... Ps>
462template <size_t... I>
463auto Pipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {
464return std::array{PipeMeta{std::get<I>(ps).type()}...};
465}
466
467// Function: num_lines
468template <typename... Ps>
469size_t Pipeline<Ps...>::num_lines() const noexcept {
470return _pipeflows.size();
471}
472
473// Function: num_pipes
474template <typename... Ps>
475constexpr size_t Pipeline<Ps...>::num_pipes() const {
476return sizeof...(Ps);
477}
478
479// Function: num_tokens
480template <typename... Ps>
481size_t Pipeline<Ps...>::num_tokens() const noexcept {
482return _num_tokens;
483}
484
485// Function: graph
486template <typename... Ps>
487Graph& Pipeline<Ps...>::graph() {
488return _graph;
489}
490
491// Function: reset
492template <typename... Ps>
493void Pipeline<Ps...>::reset() {
494
495 _num_tokens = 0;
496
497for(size_t l = 0; l<num_lines(); l++) {
498 _pipeflows[l]._pipe = 0;
499 _pipeflows[l]._line = l;
500 }
501
502 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);
503
504for(size_t l=1; l<num_lines(); l++) {
505for(size_t f=1; f<num_pipes(); f++) {
506 _lines[l][f].join_counter.store(
507static_cast<size_t>(_meta[f].type), std::memory_order_relaxed
508 );
509 }
510 }
511
512for(size_t f=1; f<num_pipes(); f++) {
513 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);
514 }
515
516for(size_t l=1; l<num_lines(); l++) {
517 _lines[l][0].join_counter.store(
518static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed
519 );
520 }
521}
522
523// Procedure: _on_pipe
524template <typename... Ps>
525void Pipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {
526 visit_tuple([&](auto&& pipe){
527using callable_t = typename std::decay_t<decltype(pipe)>::callable_t;
528if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
529 pipe._callable(pf);
530 }
531else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
532 pipe._callable(pf, rt);
533 }
534else {
535static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
536 }
537 }, _pipes, pf._pipe);
538}
539
540// Procedure: _build
541template <typename... Ps>
542void Pipeline<Ps...>::_build() {
543
544using namespace std::literals::string_literals;
545
546 FlowBuilder fb(_graph);
547
548// init task
549 _tasks[0] = fb.emplace(this {
550return static_cast<int>(_num_tokens % num_lines());
551 }).name("cond");
552
553// line task
554for(size_t l = 0; l < num_lines(); l++) {
555
556 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
557
558auto pf = &_pipeflows[l];
559
560 pipeline:
561
562 _lines[pf->_line][pf->_pipe].join_counter.store(
563static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed
564 );
565
566// First pipe does all jobs of initialization and token dependencies
567if (pf->_pipe == 0) {
568 pf->_token = _num_tokens;
569if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
570// here, the pipeline is not stopped yet because other
571// lines of tasks may still be running their last stages
572return;
573 }
574 ++_num_tokens;
575 }
576else {
577 _on_pipe(*pf, rt);
578 }
579
580size_t c_f = pf->_pipe;
581size_t n_f = (pf->_pipe + 1) % num_pipes();
582size_t n_l = (pf->_line + 1) % num_lines();
583
584 pf->_pipe = n_f;
585
586// ---- scheduling starts here ----
587// Notice that the shared variable f must not be changed after this
588// point because it can result in data race due to the following
589// condition:
590//
591// a -> b
592// | |
593// v v
594// c -> d
595//
596// d will be spawned by either c or b, so if c changes f but b spawns d
597// then data race on f will happen
598
599 std::array<int, 2> retval;
600size_t n = 0;
601
602// downward dependency
603if(_meta[c_f].type == PipeType::SERIAL &&
604 _lines[n_l][c_f].join_counter.fetch_sub(
605 1, std::memory_order_acq_rel) == 1
606 ) {
607 retval[n++] = 1;
608 }
609
610// forward dependency
611if(_lines[pf->_line][n_f].join_counter.fetch_sub(
612 1, std::memory_order_acq_rel) == 1
613 ) {
614 retval[n++] = 0;
615 }
616
617// notice that the task index starts from 1
618switch(n) {
619case 2: {
620 rt.schedule(_tasks[n_l+1]);
621goto pipeline;
622 }
623case 1: {
624// downward dependency
625if (retval[0] == 1) {
626 pf = &_pipeflows[n_l];
627 }
628// forward dependency
629goto pipeline;
630 }
631 }
632 }).name("nprt-"s + std::to_string(l));
633
634 _tasks[0].precede(_tasks[l+1]);
635 }
636}
637
638// ----------------------------------------------------------------------------
639// Class Definition: ScalablePipeline
640// ----------------------------------------------------------------------------
641
774template <typename P>
775class ScalablePipeline {
776
780struct Line {
781 std::atomic<size_t> join_counter;
782 };
783
784
785public:
786
790using pipe_t = typename std::iterator_traits<P>::value_type;
791
795ScalablePipeline() = default;
796
806ScalablePipeline(size_t num_lines);
807
824ScalablePipeline(size_t num_lines, P first, P last);
825
829ScalablePipeline(const ScalablePipeline&) = delete;
830
839ScalablePipeline(ScalablePipeline&& rhs);
840
844ScalablePipeline& operator = (const ScalablePipeline&) = delete;
845
854ScalablePipeline& operator = (ScalablePipeline&& rhs);
855
864size_t num_lines() const noexcept;
865
872size_t num_pipes() const noexcept;
873
880void reset();
881
897void reset(P first, P last);
898
916void reset(size_t num_lines, P first, P last);
917
924size_t num_tokens() const noexcept;
925
933
934 private:
935
936Graph _graph;
937
938size_t _num_tokens{0};
939
940 std::vector<P> _pipes;
941 std::vector<Task> _tasks;
942 std::vector<Pipeflow> _pipeflows;
943 std::unique_ptr<Line[]> _lines;
944
945void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);
946void _build();
947
948 Line& _line(size_t, size_t);
949};
950
951// constructor
952template <typename P>
953ScalablePipeline<P>::ScalablePipeline(size_t num_lines) :
954 _tasks (num_lines + 1),
955 _pipeflows (num_lines) {
956
957if(num_lines == 0) {
958 TF_THROW("must have at least one line");
959 }
960
961 _build();
962}
963
964// constructor
965template <typename P>
966ScalablePipeline<P>::ScalablePipeline(size_t num_lines, P first, P last) :
967 _tasks (num_lines + 1),
968 _pipeflows (num_lines) {
969
970if(num_lines == 0) {
971 TF_THROW("must have at least one line");
972 }
973
974reset(first, last);
975 _build();
976}
977
978// move constructor
979template <typename P>
980ScalablePipeline<P>::ScalablePipeline(ScalablePipeline&& rhs):
981 _num_tokens {rhs._num_tokens},
982 _pipes {std::move(rhs._pipes)},
983 _pipeflows {std::move(rhs._pipeflows)},
984 _lines {std::move(rhs._lines)} {
985
986 _graph.clear();
987 _tasks.resize(_pipeflows.size()+1);
988 rhs._num_tokens = 0;
989 rhs._tasks.clear();
990 _build();
991}
992
993// move assignment operator
994template <typename P>
995ScalablePipeline<P>& ScalablePipeline<P>::operator =(ScalablePipeline&& rhs) {
996 _num_tokens = rhs._num_tokens;
997 _pipes = std::move(rhs._pipes);
998 _pipeflows = std::move(rhs._pipeflows);
999 _lines = std::move(rhs._lines);
1000
1001 _graph.clear();
1002 _tasks.resize(_pipeflows.size()+1);
1003
1004 rhs._num_tokens = 0;
1005 rhs._tasks.clear();
1006 _build();
1007return *this;
1008}
1009
1010// Function: num_lines
1011template <typename P>
1012size_t ScalablePipeline<P>::num_lines() const noexcept {
1013return _pipeflows.size();
1014}
1015
1016// Function: num_pipes
1017template <typename P>
1018size_t ScalablePipeline<P>::num_pipes() const noexcept {
1019return _pipes.size();
1020}
1021
1022// Function: num_tokens
1023template <typename P>
1024size_t ScalablePipeline<P>::num_tokens() const noexcept {
1025return _num_tokens;
1026}
1027
1028// Function: graph
1029template <typename P>
1030Graph& ScalablePipeline<P>::graph() {
1031return _graph;
1032}
1033
1034// Function: _line
1035template <typename P>
1036typename ScalablePipeline<P>::Line& ScalablePipeline<P>::_line(size_t l, size_t p) {
1037return _lines[l*num_pipes() + p];
1038}
1039
1040template <typename P>
1041void ScalablePipeline<P>::reset(size_t num_lines, P first, P last) {
1042
1043if(num_lines == 0) {
1044 TF_THROW("must have at least one line");
1045 }
1046
1047 _graph.clear();
1048 _tasks.resize(num_lines + 1);
1049 _pipeflows.resize(num_lines);
1050
1051reset(first, last);
1052
1053 _build();
1054}
1055
1056// Function: reset
1057template <typename P>
1058void ScalablePipeline<P>::reset(P first, P last) {
1059
1060size_t num_pipes = static_cast<size_t>(std::distance(first, last));
1061
1062if(num_pipes == 0) {
1063 TF_THROW("pipeline cannot be empty");
1064 }
1065
1066if(first->type() != PipeType::SERIAL) {
1067 TF_THROW("first pipe must be serial");
1068 }
1069
1070 _pipes.resize(num_pipes);
1071
1072size_t i=0;
1073for(auto itr = first; itr != last; itr++) {
1074 _pipes[i++] = itr;
1075 }
1076
1077 _lines = std::make_unique<Line[]>(num_lines() * _pipes.size());
1078
1079reset();
1080}
1081
1082// Function: reset
1083template <typename P>
1084void ScalablePipeline<P>::reset() {
1085
1086 _num_tokens = 0;
1087
1088for(size_t l = 0; l<num_lines(); l++) {
1089 _pipeflows[l]._pipe = 0;
1090 _pipeflows[l]._line = l;
1091 }
1092
1093 _line(0, 0).join_counter.store(0, std::memory_order_relaxed);
1094
1095for(size_t l=1; l<num_lines(); l++) {
1096for(size_t f=1; f<num_pipes(); f++) {
1097 _line(l, f).join_counter.store(
1098static_cast<size_t>(_pipes[f]->type()), std::memory_order_relaxed
1099 );
1100 }
1101 }
1102
1103for(size_t f=1; f<num_pipes(); f++) {
1104 _line(0, f).join_counter.store(1, std::memory_order_relaxed);
1105 }
1106
1107for(size_t l=1; l<num_lines(); l++) {
1108 _line(l, 0).join_counter.store(
1109static_cast<size_t>(_pipes[0]->type()) - 1, std::memory_order_relaxed
1110 );
1111 }
1112}
1113
1114// Procedure: _on_pipe
1115template <typename P>
1116void ScalablePipeline<P>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {
1117
1118using callable_t = typename pipe_t::callable_t;
1119
1120if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {
1121 _pipes[pf._pipe]->_callable(pf);
1122 }
1123else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {
1124 _pipes[pf._pipe]->_callable(pf, rt);
1125 }
1126else {
1127static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");
1128 }
1129}
1130
1131// Procedure: _build
1132template <typename P>
1133void ScalablePipeline<P>::_build() {
1134
1135using namespace std::literals::string_literals;
1136
1137FlowBuilder fb(_graph);
1138
1139// init task
1140 _tasks[0] = fb.emplace(this {
1141return static_cast<int>(_num_tokens % num_lines());
1142 }).name("cond");
1143
1144// line task
1145for(size_t l = 0; l < num_lines(); l++) {
1146
1147 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {
1148
1149auto pf = &_pipeflows[l];
1150
1151 pipeline:
1152
1153 _line(pf->_line, pf->_pipe).join_counter.store(
1154static_cast<size_t>(_pipes[pf->_pipe]->type()), std::memory_order_relaxed
1155 );
1156
1157// First pipe does all jobs of initialization and token dependencies
1158if (pf->_pipe == 0) {
1159 pf->_token = _num_tokens;
1160if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {
1161// here, the pipeline is not stopped yet because other
1162// lines of tasks may still be running their last stages
1163return;
1164 }
1165 ++_num_tokens;
1166 }
1167else {
1168 _on_pipe(*pf, rt);
1169 }
1170
1171size_t c_f = pf->_pipe;
1172size_t n_f = (pf->_pipe + 1) % num_pipes();
1173size_t n_l = (pf->_line + 1) % num_lines();
1174
1175 pf->_pipe = n_f;
1176
1177// ---- scheduling starts here ----
1178// Notice that the shared variable f must not be changed after this
1179// point because it can result in data race due to the following
1180// condition:
1181//
1182// a -> b
1183// | |
1184// v v
1185// c -> d
1186//
1187// d will be spawned by either c or b, so if c changes f but b spawns d
1188// then data race on f will happen
1189
1190 std::array<int, 2> retval;
1191size_t n = 0;
1192
1193// downward dependency
1194if(_pipes[c_f]->type() == PipeType::SERIAL &&
1195 _line(n_l, c_f).join_counter.fetch_sub(
1196 1, std::memory_order_acq_rel) == 1
1197 ) {
1198 retval[n++] = 1;
1199 }
1200
1201// forward dependency
1202if(_line(pf->_line, n_f).join_counter.fetch_sub(
1203 1, std::memory_order_acq_rel) == 1
1204 ) {
1205 retval[n++] = 0;
1206 }
1207
1208// notice that the task index starts from 1
1209switch(n) {
1210case 2: {
1211 rt.schedule(_tasks[n_l+1]);
1212goto pipeline;
1213 }
1214case 1: {
1215if (retval[0] == 1) {
1216 pf = &_pipeflows[n_l];
1217 }
1218goto pipeline;
1219 }
1220 }
1221 }).name("nprt-"s + std::to_string(l));
1222
1223 _tasks[0].precede(_tasks[l+1]);
1224 }
1225}
1226
1227} // end of namespace tf -----------------------------------------------------
1228
1229
1230
1231
1232
class to build a task dependency graph
Definition flow_builder.hpp:22
class to create a graph object
Definition graph.hpp:47
void clear()
clears the graph
Definition graph.hpp:929
PipeType type() const
queries the type of the pipe
Definition pipeline.hpp:190
void callable(U &&callable)
assigns a new callable to the pipe
Definition pipeline.hpp:212
C callable_t
alias of the callable type
Definition pipeline.hpp:157
Pipe()=default
default constructor
void type(PipeType type)
assigns a new type to the pipe
Definition pipeline.hpp:199
Pipe(PipeType d, C &&callable)
constructs the pipe object
Definition pipeline.hpp:181
class to create a pipeflow object used by the pipe callable
Definition pipeline.hpp:43
size_t token() const
queries the token identifier
Definition pipeline.hpp:78
size_t pipe() const
queries the pipe identifier of the present token
Definition pipeline.hpp:71
Pipeflow()=default
default constructor
void stop()
stops the pipeline scheduling
Definition pipeline.hpp:88
size_t line() const
queries the line identifier of the present token
Definition pipeline.hpp:64
void reset()
resets the pipeline
Definition pipeline.hpp:493
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:487
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:469
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:481
Pipeline(size_t num_lines, Ps &&... ps)
constructs a pipeline object
Definition pipeline.hpp:418
constexpr size_t num_pipes() const
queries the number of pipes
Definition pipeline.hpp:475
tf::ScalablePipeline::ScalablePipeline
ScalablePipeline(const ScalablePipeline &)=delete
disabled copy constructor
tf::ScalablePipeline::ScalablePipeline
ScalablePipeline()=default
default constructor
Graph & graph()
obtains the graph object associated with the pipeline construct
Definition pipeline.hpp:1030
tf::ScalablePipeline::operator=
ScalablePipeline & operator=(const ScalablePipeline &)=delete
disabled copy assignment operator
tf::ScalablePipeline::num_lines
size_t num_lines() const noexcept
queries the number of parallel lines
Definition pipeline.hpp:1012
tf::ScalablePipeline::num_tokens
size_t num_tokens() const noexcept
queries the number of generated tokens in the pipeline
Definition pipeline.hpp:1024
tf::ScalablePipeline::num_pipes
size_t num_pipes() const noexcept
queries the number of pipes
Definition pipeline.hpp:1018
void reset()
resets the pipeline
Definition pipeline.hpp:1084
typename std::iterator_traits< P >::value_type pipe_t
pipe type
Definition pipeline.hpp:790
taskflow namespace
Definition small_vector.hpp:20
PipeType
enumeration of all pipe types
Definition pipeline.hpp:113
@ SERIAL
serial type
Definition pipeline.hpp:117
@ PARALLEL
parallel type
Definition pipeline.hpp:115