Back to Taskflow

Taskflow: A General

docs/data__pipeline_8hpp_source.html

4.1.020.4 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

data_pipeline.hpp

1#pragma once

2

3#include "pipeline.hpp"

4

5

6namespace tf {

7

8// ----------------------------------------------------------------------------

9// Class Definition: DataPipe

10// ----------------------------------------------------------------------------

11

51template <typename Input, typename Output, typename C>

52class DataPipe {

53

54template <typename... Ps>

55friend class DataPipeline;

56

57public:

58

62using callable_t = C;

63

67using input_t = Input;

68

72using output_t = Output;

73

77DataPipe() = default;

78

86DataPipe(PipeType d, callable_t&& callable) :

87 _type{d}, _callable{std::forward<callable_t>(callable)} {

88 }

89

96PipeType type() const {

97return _type;

98 }

99

103void type(PipeType type) {

104 _type = type;

105 }

106

116template <typename U>

117void callable(U&& callable) {

118 _callable = std::forward<U>(callable);

119 }

120

121private:

122

123PipeType _type;

124

125callable_t _callable;

126};

127

170template <typename Input, typename Output, typename C>

171auto make_data_pipe(PipeType d, C&& callable) {

172return DataPipe<Input, Output, C>(d, std::forward<C>(callable));

173}

174

175// ----------------------------------------------------------------------------

176// Class Definition: DataPipeline

177// ----------------------------------------------------------------------------

178

253template <typename... Ps>

254class DataPipeline {

255

256static_assert(sizeof...(Ps)>0, "must have at least one pipe");

257

261struct Line {

262 std::atomic<size_t> join_counter;

263 };

264

268struct PipeMeta {

269PipeType type;

270 };

271

272

273public:

274

278using data_t = unique_variant_t<std::variant<std::conditional_t<

279 std::is_void_v<typename Ps::output_t>,

280 std::monostate,

281 std::decay_t<typename Ps::output_t>>...

282 >>;

283

295DataPipeline(size_t num_lines, Ps&&... ps);

296

308DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps);

309

318size_t num_lines() const noexcept;

319

326 constexpr size_t num_pipes() const noexcept;

327

335void reset();

336

343size_t num_tokens() const noexcept;

344

351Graph& graph();

352

353 private:

354

355Graph _graph;

356

357size_t _num_tokens;

358

359 std::tuple<Ps...> _pipes;

360 std::array<PipeMeta, sizeof...(Ps)> _meta;

361 std::vector<std::array<Line, sizeof...(Ps)>> _lines;

362 std::vector<Task> _tasks;

363 std::vector<Pipeflow> _pipeflows;

364 std::vector<CachelineAligned<data_t>> _buffer;

365

366 template <size_t... I>

367 auto _gen_meta(std::tuple<Ps...>&&, std::index_sequence<I...>);

368

369void _on_pipe(Pipeflow&, NonpreemptiveRuntime&);

370void _build();

371};

372

373// constructor

374template <typename... Ps>

375DataPipeline<Ps...>::DataPipeline(size_t num_lines, Ps&&... ps) :

376 _pipes {std::make_tuple(std::forward<Ps>(ps)...)},

377 _meta {PipeMeta{ps.type()}...},

378 _lines (num_lines),

379 _tasks (num_lines + 1),

380 _pipeflows (num_lines),

381 _buffer (num_lines) {

382

383if(num_lines == 0) {

384 TF_THROW("must have at least one line");

385 }

386

387if(std::get<0>(_pipes).type() != PipeType::SERIAL) {

388 TF_THROW("first pipe must be serial");

389 }

390

391reset();

392 _build();

393}

394

395// constructor

396template <typename... Ps>

397DataPipeline<Ps...>::DataPipeline(size_t num_lines, std::tuple<Ps...>&& ps) :

398 _pipes {std::forward<std::tuple<Ps...>>(ps)},

399 _meta {_gen_meta(

400 std::forward<std::tuple<Ps...>>(ps), std::make_index_sequence<sizeof...(Ps)>{}

401 )},

402 _lines (num_lines),

403 _tasks (num_lines + 1),

404 _pipeflows (num_lines),

405 _buffer (num_lines) {

406

407if(num_lines == 0) {

408 TF_THROW("must have at least one line");

409 }

410

411if(std::get<0>(_pipes).type() != PipeType::SERIAL) {

412 TF_THROW("first pipe must be serial");

413 }

414

415 reset();

416 _build();

417}

418

419// Function: _get_meta

420template <typename... Ps>

421template <size_t... I>

422auto DataPipeline<Ps...>::_gen_meta(std::tuple<Ps...>&& ps, std::index_sequence<I...>) {

423return std::array{PipeMeta{std::get<I>(ps).type()}...};

424}

425

426// Function: num_lines

427template <typename... Ps>

428size_t DataPipeline<Ps...>::num_lines() const noexcept {

429return _pipeflows.size();

430}

431

432// Function: num_pipes

433template <typename... Ps>

434constexpr size_t DataPipeline<Ps...>::num_pipes() const noexcept {

435return sizeof...(Ps);

436}

437

438// Function: num_tokens

439template <typename... Ps>

440size_t DataPipeline<Ps...>::num_tokens() const noexcept {

441return _num_tokens;

442}

443

444// Function: graph

445template <typename... Ps>

446Graph& DataPipeline<Ps...>::graph() {

447return _graph;

448}

449

450// Function: reset

451template <typename... Ps>

452void DataPipeline<Ps...>::reset() {

453

454 _num_tokens = 0;

455

456for(size_t l = 0; l<num_lines(); l++) {

457 _pipeflows[l]._pipe = 0;

458 _pipeflows[l]._line = l;

459 }

460

461 _lines[0][0].join_counter.store(0, std::memory_order_relaxed);

462

463for(size_t l=1; l<num_lines(); l++) {

464for(size_t f=1; f<num_pipes(); f++) {

465 _lines[l][f].join_counter.store(

466static_cast<size_t>(_meta[f].type), std::memory_order_relaxed

467 );

468 }

469 }

470

471for(size_t f=1; f<num_pipes(); f++) {

472 _lines[0][f].join_counter.store(1, std::memory_order_relaxed);

473 }

474

475for(size_t l=1; l<num_lines(); l++) {

476 _lines[l][0].join_counter.store(

477static_cast<size_t>(_meta[0].type) - 1, std::memory_order_relaxed

478 );

479 }

480}

481

482// Procedure: _on_pipe

483template <typename... Ps>

484void DataPipeline<Ps...>::_on_pipe(Pipeflow& pf, NonpreemptiveRuntime&) {

485

486 visit_tuple([&](auto&& pipe){

487

488using data_pipe_t = std::decay_t<decltype(pipe)>;

489using callable_t = typename data_pipe_t::callable_t;

490using input_t = std::decay_t<typename data_pipe_t::input_t>;

491using output_t = std::decay_t<typename data_pipe_t::output_t>;

492

493// first pipe

494if constexpr (std::is_invocable_v<callable_t, Pipeflow&>) {

495// -> void {}, i.e., we only have one pipe

496if constexpr (std::is_void_v<output_t>) {

497 pipe._callable(pf);

498// -> output_t {}

499 } else {

500 _buffer[pf._line].data = pipe._callable(pf);

501 }

502 }

503// other pipes without pipeflow in the second argument

504else if constexpr (std::is_invocable_v<callable_t, std::add_lvalue_reference_t<input_t> >) {

505// -> void {}, i.e., the last pipe

506if constexpr (std::is_void_v<output_t>) {

507 pipe._callable(std::get<input_t>(_buffer[pf._line].data));

508// -> output_t {}

509 } else {

510 _buffer[pf._line].data = pipe._callable(

511 std::get<input_t>(_buffer[pf._line].data)

512 );

513 }

514 }

515// other pipes with pipeflow in the second argument

516else if constexpr (std::is_invocable_v<callable_t, input_t&, Pipeflow&>) {

517// [](input_t&, tf::Pipeflow&) -> void {}

518if constexpr (std::is_void_v<output_t>) {

519 pipe._callable(std::get<input_t>(_buffer[pf._line].data), pf);

520// [](input_t&, tf::Pipeflow&) -> output_t {}

521 } else {

522 _buffer[pf._line].data = pipe._callable(

523 std::get<input_t>(_buffer[pf._line].data), pf

524 );

525 }

526 }

527//else if constexpr(std::is_invocable_v<callable_t, Pipeflow&, NonpreemptiveRuntime&>) {

528// pipe._callable(pf, rt);

529//}

530else {

531static_assert(dependent_false_v<callable_t>, "un-supported pipe callable type");

532 }

533 }, _pipes, pf._pipe);

534}

535

536// Procedure: _build

537template <typename... Ps>

538void DataPipeline<Ps...>::_build() {

539

540using namespace std::literals::string_literals;

541

542 FlowBuilder fb(_graph);

543

544// init task

545 _tasks[0] = fb.emplace(this {

546return static_cast<int>(_num_tokens % num_lines());

547 }).name("cond");

548

549// line task

550for(size_t l = 0; l < num_lines(); l++) {

551

552 _tasks[l + 1] = fb.emplace([this, l] (tf::NonpreemptiveRuntime& rt) mutable {

553

554auto pf = &_pipeflows[l];

555

556 pipeline:

557

558 _lines[pf->_line][pf->_pipe].join_counter.store(

559static_cast<size_t>(_meta[pf->_pipe].type), std::memory_order_relaxed

560 );

561

562if (pf->_pipe == 0) {

563 pf->_token = _num_tokens;

564if (pf->_stop = false, _on_pipe(*pf, rt); pf->_stop == true) {

565// here, the pipeline is not stopped yet because other

566// lines of tasks may still be running their last stages

567return;

568 }

569 ++_num_tokens;

570 }

571else {

572 _on_pipe(*pf, rt);

573 }

574

575size_t c_f = pf->_pipe;

576size_t n_f = (pf->_pipe + 1) % num_pipes();

577size_t n_l = (pf->_line + 1) % num_lines();

578

579 pf->_pipe = n_f;

580

581// ---- scheduling starts here ----

582// Notice that the shared variable f must not be changed after this

583// point because it can result in data race due to the following

584// condition:

585//

586// a -> b

587// | |

588// v v

589// c -> d

590//

591// d will be spawned by either c or b, so if c changes f but b spawns d

592// then data race on f will happen

593

594 std::array<int, 2> retval;

595size_t n = 0;

596

597// downward dependency

598if(_meta[c_f].type == PipeType::SERIAL &&

599 _lines[n_l][c_f].join_counter.fetch_sub(

600 1, std::memory_order_acq_rel) == 1

601 ) {

602 retval[n++] = 1;

603 }

604

605// forward dependency

606if(_lines[pf->_line][n_f].join_counter.fetch_sub(

607 1, std::memory_order_acq_rel) == 1

608 ) {

609 retval[n++] = 0;

610 }

611

612// notice that the task index starts from 1

613switch(n) {

614case 2: {

615 rt.schedule(_tasks[n_l+1]);

616goto pipeline;

617 }

618case 1: {

619// downward dependency

620if (retval[0] == 1) {

621 pf = &_pipeflows[n_l];

622 }

623// forward dependency

624goto pipeline;

625 }

626 }

627 }).name("rt-"s + std::to_string(l));

628

629 _tasks[0].precede(_tasks[l+1]);

630 }

631}

632

633

634} // end of namespace tf -----------------------------------------------------

635

636

637

638

639

tf::CachelineAligned

class to ensure cacheline-aligned storage for an object.

Definition os.hpp:219

tf::DataPipe

class to create a stage in a data-parallel pipeline

Definition data_pipeline.hpp:52

tf::DataPipe::DataPipe

DataPipe(PipeType d, callable_t &&callable)

constructs a data pipe

Definition data_pipeline.hpp:86

tf::DataPipe::callable

void callable(U &&callable)

assigns a new callable to the data pipe

Definition data_pipeline.hpp:117

tf::DataPipe::callable_t

C callable_t

callable type of the data pipe

Definition data_pipeline.hpp:62

tf::DataPipe::type

PipeType type() const

queries the type of the data pipe

Definition data_pipeline.hpp:96

tf::DataPipe::output_t

Output output_t

output type of the data pipe

Definition data_pipeline.hpp:72

tf::DataPipe::DataPipe

DataPipe()=default

default constructor

tf::DataPipe::input_t

Input input_t

input type of the data pipe

Definition data_pipeline.hpp:67

tf::DataPipe::type

void type(PipeType type)

assigns a new type to the data pipe

Definition data_pipeline.hpp:103

tf::DataPipeline::DataPipeline

DataPipeline(size_t num_lines, Ps &&... ps)

constructs a data-parallel pipeline object

Definition data_pipeline.hpp:375

tf::DataPipeline::num_tokens

size_t num_tokens() const noexcept

queries the number of generated tokens in the pipeline

Definition data_pipeline.hpp:440

tf::DataPipeline::num_pipes

constexpr size_t num_pipes() const noexcept

queries the number of pipes

Definition data_pipeline.hpp:434

tf::DataPipeline::num_lines

size_t num_lines() const noexcept

queries the number of parallel lines

Definition data_pipeline.hpp:428

tf::DataPipeline::reset

void reset()

resets the pipeline

Definition data_pipeline.hpp:452

tf::DataPipeline::graph

Graph & graph()

obtains the graph object associated with the pipeline construct

Definition data_pipeline.hpp:446

tf::DataPipeline::data_t

unique_variant_t< std::variant< std::conditional_t< std::is_void_v< typename Ps::output_t >, std::monostate, std::decay_t< typename Ps::output_t > >... > > data_t

internal storage type for each data token (default std::variant)

Definition data_pipeline.hpp:278

tf::Graph

class to create a graph object

Definition graph.hpp:47

tf::Pipeflow

class to create a pipeflow object used by the pipe callable

Definition pipeline.hpp:43

tf::Task

class to create a task handle over a taskflow node

Definition task.hpp:569

tf

taskflow namespace

Definition small_vector.hpp:20

tf::make_data_pipe

auto make_data_pipe(PipeType d, C &&callable)

function to construct a data pipe (tf::DataPipe)

Definition data_pipeline.hpp:171

tf::PipeType

PipeType

enumeration of all pipe types

Definition pipeline.hpp:113

tf::PipeType::SERIAL

@ SERIAL

serial type

Definition pipeline.hpp:117