Back to Taskflow

Taskflow: A General

docs/pipeline_8hpp_source.html

4.1.035.7 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

pipeline.hpp

1#pragma once

2

3#include "../taskflow.hpp"

4

9

10namespace tf {

11

12

13// ----------------------------------------------------------------------------

14// Class Definition: Pipeflow

15// ----------------------------------------------------------------------------

16

43class Pipeflow {

44

45template <typename... Ps>

46friend class Pipeline;

47

48template <typename P>

49friend class ScalablePipeline;

50

51template <typename... Ps>

52friend class DataPipeline;

53

54public:

55

59Pipeflow() = default;

60

64size_t line() const {

65return _line;

66 }

67

71size_t pipe() const {

72return _pipe;

73 }

74

78size_t token() const {

79return _token;

80 }

81

88void stop() {

89if(_pipe != 0) {

90 TF_THROW("only the first pipe can stop the token");

91 }

92 _stop = true;

93 }

94

95private:

96

97// Regular data

98size_t _line;

99size_t _pipe;

100size_t _token;

101bool _stop;

102};

103

104// ----------------------------------------------------------------------------

105// Class Definition: PipeType

106// ----------------------------------------------------------------------------

107

113enum class PipeType : int {

115PARALLEL = 1,

117SERIAL = 2

118};

119

120// ----------------------------------------------------------------------------

121// Class Definition: Pipe

122// ----------------------------------------------------------------------------

123

143template <typename C = std::function<void(tf::Pipeflow&)>>

144class Pipe {

145

146template <typename... Ps>

147friend class Pipeline;

148

149template <typename P>

150friend class ScalablePipeline;

151

152public:

153

157using callable_t = C;

158

162Pipe() = default;

163

181Pipe(PipeType d, C&& callable) :

182 _type{d}, _callable{std::forward<C>(callable)} {

183 }

184

190PipeType type() const {

191return _type;

192 }

193

199void type(PipeType type) {

200 _type = type;

201 }

202

211template <typename U>

212void callable(U&& callable) {

213 _callable = std::forward<U>(callable);

214 }

215

216private:

217

218PipeType _type;

219

220 C _callable;

221};

222

223// ----------------------------------------------------------------------------

224// Class Definition: Pipeline

225// ----------------------------------------------------------------------------

226

306template <typename... Ps>

307class Pipeline {

308

309static_assert(sizeof...(Ps)>0, "must have at least one pipe");

310

314struct Line {

315 std::atomic<size_t> join_counter;

316 };

317

321struct PipeMeta {

322PipeType type;

323 };

324

325public:

326

338Pipeline(size_t num_lines, Ps&&... ps);

339

351Pipeline(size_t num_lines, std::tuple<Ps...>&& ps);

352

361size_t num_lines() const noexcept;

362

369constexpr size_t num_pipes() const;

370

378void reset();

379

386size_t num_tokens() const noexcept;

387

394Graph& graph();

395

396

397private:

398

399Graph _graph;

400

401size_t _num_tokens;

402

403 std::tuple<Ps...> _pipes;

404 std::array<PipeMeta, sizeof...(Ps)> _meta;

405 std::vector<std::array<Line, sizeof...(Ps)>> _lines;

406 std::vector<Task> _tasks;

407 std::vector<Pipeflow> _pipeflows;

408

409template <size_t... I>

410auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);

411

412void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);

413void _build();

414};

415

416// constructor

417template <typename... Ps>

418Pipeline<Ps...>::Pipeline(size_t num_lines, Ps&&... ps) :

419 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},

420 _meta {PipeMeta{ps.type()}...},

421 _lines (num_lines),

422 _tasks (num_lines + 1),

423 _pipeflows (num_lines) {

424

425if(num_lines == 0) {

426 TF_THROW("must have at least one line");

427 }

428

429if(std::get<0>(_pipes).type() != PipeType::SERIAL) {

430 TF_THROW("first pipe must be serial");

431 }

432

433reset();

434 _build();

435}

436

437// constructor

438template <typename... Ps>

439Pipeline<Ps...>::Pipeline(size_t num_lines, std::tuple<Ps...>&& ps) :

440 _pipes {std::forward<std::tuple<Ps...>>(ps)},

441 _meta {_gen_meta(

442 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}

443 )},

444 _lines (num_lines),

445 _tasks (num_lines + 1),

446 _pipeflows (num_lines) {

447

448if(num_lines == 0) {

449 TF_THROW("must have at least one line");

450 }

451

452if(std::get<0>(_pipes).type() != PipeType::SERIAL) {

453 TF_THROW("first pipe must be serial");

454 }

455

456 reset();

457 _build();

458}

459

460// Function: _get_meta

461template <typename... Ps>

462template <size_t... I>

463auto Pipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {

464return std::array{PipeMeta{std::get<I>(ps).type()}...};

465}

466

467// Function: num_lines

468template <typename... Ps>

469size_t Pipeline<Ps...>::num_lines() const noexcept {

470return _pipeflows.size();

471}

472

473// Function: num_pipes

474template <typename... Ps>

475constexpr size_t Pipeline<Ps...>::num_pipes() const {

476return sizeof...(Ps);

477}

478

479// Function: num_tokens

480template <typename... Ps>

481size_t Pipeline<Ps...>::num_tokens() const noexcept {

482return _num_tokens;

483}

484

485// Function: graph

486template <typename... Ps>

487Graph& Pipeline<Ps...>::graph() {

488return _graph;

489}

490

491// Function: reset

492template <typename... Ps>

493void Pipeline<Ps...>::reset() {

494

495 _num_tokens = 0;

496

497for(size_t l = 0; l<num_lines(); l++) {

498 _pipeflows[l]._pipe = 0;

499 _pipeflows[l]._line = l;

500 }

501

502 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);

503

504for(size_t l=1; l<num_lines(); l++) {

505for(size_t f=1; f<num_pipes(); f++) {

506 _lines[l][f].join_counter.store(

507static_cast<size_t>(_meta[f].type), std::memory_order_relaxed

508 );

509 }

510 }

511

512for(size_t f=1; f<num_pipes(); f++) {

513 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);

514 }

515

516for(size_t l=1; l<num_lines(); l++) {

517 _lines[l][0].join_counter.store(

518static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed

519 );

520 }

521}

522

523// Procedure: _on_pipe

524template <typename... Ps>

525void Pipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {

526 visit_tuple([&](auto&& pipe){

527using callable_t = typename std::decay_t<decltype(pipe)>::callable_t;

528if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {

529 pipe._callable(pf);

530 }

531else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {

532 pipe._callable(pf, rt);

533 }

534else {

535static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");

536 }

537 }, _pipes, pf._pipe);

538}

539

540// Procedure: _build

541template <typename... Ps>

542void Pipeline<Ps...>::_build() {

543

544using namespace std::literals::string_literals;

545

546 FlowBuilder fb(_graph);

547

548// init task

549 _tasks[0] = fb.emplace(this {

550return static_cast<int>(_num_tokens % num_lines());

551 }).name("cond");

552

553// line task

554for(size_t l = 0; l < num_lines(); l++) {

555

556 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {

557

558auto pf = &_pipeflows[l];

559

560 pipeline:

561

562 _lines[pf->_line][pf->_pipe].join_counter.store(

563static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed

564 );

565

566// First pipe does all jobs of initialization and token dependencies

567if (pf->_pipe == 0) {

568 pf->_token = _num_tokens;

569if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {

570// here, the pipeline is not stopped yet because other

571// lines of tasks may still be running their last stages

572return;

573 }

574 ++_num_tokens;

575 }

576else {

577 _on_pipe(*pf, rt);

578 }

579

580size_t c_f = pf->_pipe;

581size_t n_f = (pf->_pipe + 1) % num_pipes();

582size_t n_l = (pf->_line + 1) % num_lines();

583

584 pf->_pipe = n_f;

585

586// ---- scheduling starts here ----

587// Notice that the shared variable f must not be changed after this

588// point because it can result in data race due to the following

589// condition:

590//

591// a -> b

592// | |

593// v v

594// c -> d

595//

596// d will be spawned by either c or b, so if c changes f but b spawns d

597// then data race on f will happen

598

599 std::array<int, 2> retval;

600size_t n = 0;

601

602// downward dependency

603if(_meta[c_f].type == PipeType::SERIAL &&

604 _lines[n_l][c_f].join_counter.fetch_sub(

605 1, std::memory_order_acq_rel) == 1

606 ) {

607 retval[n++] = 1;

608 }

609

610// forward dependency

611if(_lines[pf->_line][n_f].join_counter.fetch_sub(

612 1, std::memory_order_acq_rel) == 1

613 ) {

614 retval[n++] = 0;

615 }

616

617// notice that the task index starts from 1

618switch(n) {

619case 2: {

620 rt.schedule(_tasks[n_l+1]);

621goto pipeline;

622 }

623case 1: {

624// downward dependency

625if (retval[0] == 1) {

626 pf = &_pipeflows[n_l];

627 }

628// forward dependency

629goto pipeline;

630 }

631 }

632 }).name("nprt-"s + std::to_string(l));

633

634 _tasks[0].precede(_tasks[l+1]);

635 }

636}

637

638// ----------------------------------------------------------------------------

639// Class Definition: ScalablePipeline

640// ----------------------------------------------------------------------------

641

774template <typename P>

775class ScalablePipeline {

776

780struct Line {

781 std::atomic<size_t> join_counter;

782 };

783

784

785public:

786

790using pipe_t = typename std::iterator_traits<P>::value_type;

791

795ScalablePipeline() = default;

796

806ScalablePipeline(size_t num_lines);

807

824ScalablePipeline(size_t num_lines, P first, P last);

825

829ScalablePipeline(const ScalablePipeline&) = delete;

830

839ScalablePipeline(ScalablePipeline&& rhs);

840

844ScalablePipeline& operator = (const ScalablePipeline&) = delete;

845

854ScalablePipeline& operator = (ScalablePipeline&& rhs);

855

864size_t num_lines() const noexcept;

865

872size_t num_pipes() const noexcept;

873

880void reset();

881

897void reset(P first, P last);

898

916void reset(size_t num_lines, P first, P last);

917

924size_t num_tokens() const noexcept;

925

932Graph& graph();

933

934 private:

935

936Graph _graph;

937

938size_t _num_tokens{0};

939

940 std::vector<P> _pipes;

941 std::vector<Task> _tasks;

942 std::vector<Pipeflow> _pipeflows;

943 std::unique_ptr<Line[]> _lines;

944

945void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);

946void _build();

947

948 Line& _line(size_t, size_t);

949};

950

951// constructor

952template <typename P>

953ScalablePipeline<P>::ScalablePipeline(size_t num_lines) :

954 _tasks (num_lines + 1),

955 _pipeflows (num_lines) {

956

957if(num_lines == 0) {

958 TF_THROW("must have at least one line");

959 }

960

961 _build();

962}

963

964// constructor

965template <typename P>

966ScalablePipeline<P>::ScalablePipeline(size_t num_lines, P first, P last) :

967 _tasks (num_lines + 1),

968 _pipeflows (num_lines) {

969

970if(num_lines == 0) {

971 TF_THROW("must have at least one line");

972 }

973

974reset(first, last);

975 _build();

976}

977

978// move constructor

979template <typename P>

980ScalablePipeline<P>::ScalablePipeline(ScalablePipeline&& rhs):

981 _num_tokens {rhs._num_tokens},

982 _pipes {std::move(rhs._pipes)},

983 _pipeflows {std::move(rhs._pipeflows)},

984 _lines {std::move(rhs._lines)} {

985

986 _graph.clear();

987 _tasks.resize(_pipeflows.size()+1);

988 rhs._num_tokens = 0;

989 rhs._tasks.clear();

990 _build();

991}

992

993// move assignment operator

994template <typename P>

995ScalablePipeline<P>& ScalablePipeline<P>::operator =(ScalablePipeline&& rhs) {

996 _num_tokens = rhs._num_tokens;

997 _pipes = std::move(rhs._pipes);

998 _pipeflows = std::move(rhs._pipeflows);

999 _lines = std::move(rhs._lines);

1000

1001 _graph.clear();

1002 _tasks.resize(_pipeflows.size()+1);

1003

1004 rhs._num_tokens = 0;

1005 rhs._tasks.clear();

1006 _build();

1007return *this;

1008}

1009

1010// Function: num_lines

1011template <typename P>

1012size_t ScalablePipeline<P>::num_lines() const noexcept {

1013return _pipeflows.size();

1014}

1015

1016// Function: num_pipes

1017template <typename P>

1018size_t ScalablePipeline<P>::num_pipes() const noexcept {

1019return _pipes.size();

1020}

1021

1022// Function: num_tokens

1023template <typename P>

1024size_t ScalablePipeline<P>::num_tokens() const noexcept {

1025return _num_tokens;

1026}

1027

1028// Function: graph

1029template <typename P>

1030Graph& ScalablePipeline<P>::graph() {

1031return _graph;

1032}

1033

1034// Function: _line

1035template <typename P>

1036typename ScalablePipeline<P>::Line& ScalablePipeline<P>::_line(size_t l, size_t p) {

1037return _lines[l*num_pipes() + p];

1038}

1039

1040template <typename P>

1041void ScalablePipeline<P>::reset(size_t num_lines, P first, P last) {

1042

1043if(num_lines == 0) {

1044 TF_THROW("must have at least one line");

1045 }

1046

1047 _graph.clear();

1048 _tasks.resize(num_lines + 1);

1049 _pipeflows.resize(num_lines);

1050

1051reset(first, last);

1052

1053 _build();

1054}

1055

1056// Function: reset

1057template <typename P>

1058void ScalablePipeline<P>::reset(P first, P last) {

1059

1060size_t num_pipes = static_cast<size_t>(std::distance(first, last));

1061

1062if(num_pipes == 0) {

1063 TF_THROW("pipeline cannot be empty");

1064 }

1065

1066if(first->type() != PipeType::SERIAL) {

1067 TF_THROW("first pipe must be serial");

1068 }

1069

1070 _pipes.resize(num_pipes);

1071

1072size_t i=0;

1073for(auto itr = first; itr != last; itr++) {

1074 _pipes[i++] = itr;

1075 }

1076

1077 _lines = std::make_unique<Line[]>(num_lines() * _pipes.size());

1078

1079reset();

1080}

1081

1082// Function: reset

1083template <typename P>

1084void ScalablePipeline<P>::reset() {

1085

1086 _num_tokens = 0;

1087

1088for(size_t l = 0; l<num_lines(); l++) {

1089 _pipeflows[l]._pipe = 0;

1090 _pipeflows[l]._line = l;

1091 }

1092

1093 _line(0, 0).join_counter.store(0, std::memory_order_relaxed);

1094

1095for(size_t l=1; l<num_lines(); l++) {

1096for(size_t f=1; f<num_pipes(); f++) {

1097 _line(l, f).join_counter.store(

1098static_cast<size_t>(_pipes[f]->type()), std::memory_order_relaxed

1099 );

1100 }

1101 }

1102

1103for(size_t f=1; f<num_pipes(); f++) {

1104 _line(0, f).join_counter.store(1, std::memory_order_relaxed);

1105 }

1106

1107for(size_t l=1; l<num_lines(); l++) {

1108 _line(l, 0).join_counter.store(

1109static_cast<size_t>(_pipes[0]->type()) - 1, std::memory_order_relaxed

1110 );

1111 }

1112}

1113

1114// Procedure: _on_pipe

1115template <typename P>

1116void ScalablePipeline<P>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime& rt) {

1117

1118using callable_t = typename pipe_t::callable_t;

1119

1120if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {

1121 _pipes[pf._pipe]->_callable(pf);

1122 }

1123else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {

1124 _pipes[pf._pipe]->_callable(pf, rt);

1125 }

1126else {

1127static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");

1128 }

1129}

1130

1131// Procedure: _build

1132template <typename P>

1133void ScalablePipeline<P>::_build() {

1134

1135using namespace std::literals::string_literals;

1136

1137FlowBuilder fb(_graph);

1138

1139// init task

1140 _tasks[0] = fb.emplace(this {

1141return static_cast<int>(_num_tokens % num_lines());

1142 }).name("cond");

1143

1144// line task

1145for(size_t l = 0; l < num_lines(); l++) {

1146

1147 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {

1148

1149auto pf = &_pipeflows[l];

1150

1151 pipeline:

1152

1153 _line(pf->_line, pf->_pipe).join_counter.store(

1154static_cast<size_t>(_pipes[pf->_pipe]->type()), std::memory_order_relaxed

1155 );

1156

1157// First pipe does all jobs of initialization and token dependencies

1158if (pf->_pipe == 0) {

1159 pf->_token = _num_tokens;

1160if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {

1161// here, the pipeline is not stopped yet because other

1162// lines of tasks may still be running their last stages

1163return;

1164 }

1165 ++_num_tokens;

1166 }

1167else {

1168 _on_pipe(*pf, rt);

1169 }

1170

1171size_t c_f = pf->_pipe;

1172size_t n_f = (pf->_pipe + 1) % num_pipes();

1173size_t n_l = (pf->_line + 1) % num_lines();

1174

1175 pf->_pipe = n_f;

1176

1177// ---- scheduling starts here ----

1178// Notice that the shared variable f must not be changed after this

1179// point because it can result in data race due to the following

1180// condition:

1181//

1182// a -> b

1183// | |

1184// v v

1185// c -> d

1186//

1187// d will be spawned by either c or b, so if c changes f but b spawns d

1188// then data race on f will happen

1189

1190 std::array<int, 2> retval;

1191size_t n = 0;

1192

1193// downward dependency

1194if(_pipes[c_f]->type() == PipeType::SERIAL &&

1195 _line(n_l, c_f).join_counter.fetch_sub(

1196 1, std::memory_order_acq_rel) == 1

1197 ) {

1198 retval[n++] = 1;

1199 }

1200

1201// forward dependency

1202if(_line(pf->_line, n_f).join_counter.fetch_sub(

1203 1, std::memory_order_acq_rel) == 1

1204 ) {

1205 retval[n++] = 0;

1206 }

1207

1208// notice that the task index starts from 1

1209switch(n) {

1210case 2: {

1211 rt.schedule(_tasks[n_l+1]);

1212goto pipeline;

1213 }

1214case 1: {

1215if (retval[0] == 1) {

1216 pf = &_pipeflows[n_l];

1217 }

1218goto pipeline;

1219 }

1220 }

1221 }).name("nprt-"s + std::to_string(l));

1222

1223 _tasks[0].precede(_tasks[l+1]);

1224 }

1225}

1226

1227} // end of namespace tf -----------------------------------------------------

1228

1229

1230

1231

1232

tf::FlowBuilder

class to build a task dependency graph

Definition flow_builder.hpp:22

tf::Graph

class to create a graph object

Definition graph.hpp:47

tf::Graph::clear

void clear()

clears the graph

Definition graph.hpp:929

tf::Pipe::type

PipeType type() const

queries the type of the pipe

Definition pipeline.hpp:190

tf::Pipe::callable

void callable(U &&callable)

assigns a new callable to the pipe

Definition pipeline.hpp:212

tf::Pipe::callable_t

C callable_t

alias of the callable type

Definition pipeline.hpp:157

tf::Pipe::Pipe

Pipe()=default

default constructor

tf::Pipe::type

void type(PipeType type)

assigns a new type to the pipe

Definition pipeline.hpp:199

tf::Pipe::Pipe

Pipe(PipeType d, C &&callable)

constructs the pipe object

Definition pipeline.hpp:181

tf::Pipeflow

class to create a pipeflow object used by the pipe callable

Definition pipeline.hpp:43

tf::Pipeflow::token

size_t token() const

queries the token identifier

Definition pipeline.hpp:78

tf::Pipeflow::pipe

size_t pipe() const

queries the pipe identifier of the present token

Definition pipeline.hpp:71

tf::Pipeflow::Pipeflow

Pipeflow()=default

default constructor

tf::Pipeflow::stop

void stop()

stops the pipeline scheduling

Definition pipeline.hpp:88

tf::Pipeflow::line

size_t line() const

queries the line identifier of the present token

Definition pipeline.hpp:64

tf::Pipeline::reset

void reset()

resets the pipeline

Definition pipeline.hpp:493

tf::Pipeline::graph

Graph & graph()

obtains the graph object associated with the pipeline construct

Definition pipeline.hpp:487

tf::Pipeline::num_lines

size_t num_lines() const noexcept

queries the number of parallel lines

Definition pipeline.hpp:469

tf::Pipeline::num_tokens

size_t num_tokens() const noexcept

queries the number of generated tokens in the pipeline

Definition pipeline.hpp:481

tf::Pipeline::Pipeline

Pipeline(size_t num_lines, Ps &&... ps)

constructs a pipeline object

Definition pipeline.hpp:418

tf::Pipeline::num_pipes

constexpr size_t num_pipes() const

queries the number of pipes

Definition pipeline.hpp:475

tf::ScalablePipeline::ScalablePipeline

ScalablePipeline(const ScalablePipeline &)=delete

disabled copy constructor

tf::ScalablePipeline::ScalablePipeline

ScalablePipeline()=default

default constructor

tf::ScalablePipeline::graph

Graph & graph()

obtains the graph object associated with the pipeline construct

Definition pipeline.hpp:1030

tf::ScalablePipeline::operator=

ScalablePipeline & operator=(const ScalablePipeline &)=delete

disabled copy assignment operator

tf::ScalablePipeline::num_lines

size_t num_lines() const noexcept

queries the number of parallel lines

Definition pipeline.hpp:1012

tf::ScalablePipeline::num_tokens

size_t num_tokens() const noexcept

queries the number of generated tokens in the pipeline

Definition pipeline.hpp:1024

tf::ScalablePipeline::num_pipes

size_t num_pipes() const noexcept

queries the number of pipes

Definition pipeline.hpp:1018

tf::ScalablePipeline::reset

void reset()

resets the pipeline

Definition pipeline.hpp:1084

tf::ScalablePipeline::pipe_t

typename std::iterator_traits< P >::value_type pipe_t

pipe type

Definition pipeline.hpp:790

tf

taskflow namespace

Definition small_vector.hpp:20

tf::PipeType

PipeType

enumeration of all pipe types

Definition pipeline.hpp:113

tf::PipeType::SERIAL

@ SERIAL

serial type

Definition pipeline.hpp:117

tf::PipeType::PARALLEL

@ PARALLEL

parallel type

Definition pipeline.hpp:115