Back to Taskflow

Taskflow: A General

docs/core_2taskflow_8hpp_source.html

4.1.019.5 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

taskflow.hpp

1#pragma once

2

3#include "flow_builder.hpp"

4

9

10namespace tf {

11

12// ----------------------------------------------------------------------------

13

64class Taskflow : public FlowBuilder {

65

66friend class Topology;

67friend class Executor;

68friend class FlowBuilder;

69friend class Subflow;

70

71struct Dumper {

72size_t id;

73 std::stack<std::tuple<const Node*, const Graph*, size_t>> stack; // added depth

74 std::unordered_map<const Graph*, size_t> visited;

75 };

76

77public:

78

87Taskflow(const std::string& name);

88

92Taskflow();

93

109Taskflow(Taskflow&& rhs);

110

126Taskflow& operator =(Taskflow&& rhs);

127

156~Taskflow() = default;

157

184void dump(std::ostream& ostream) const;

185

192 std::string dump() const;

193

219size_t num_tasks() const;

220

233bool empty() const;

234

243void name(const std::string&);

244

253const std::string& name() const;

254

262void clear();

263

277template <typename V>

278void for_each_task(V&& visitor) const;

279

313void remove_dependency(Task from, Task to);

314

322Graph& graph();

323

324private:

325

326mutable std::mutex _mutex;

327

328 std::string _name;

329

330Graph _graph;

331

332 std::queue<std::shared_ptr<Topology>> _topologies;

333

334void _dump(std::ostream&, const Graph*) const;

335void _dump(std::ostream&, const Node*, Dumper&, size_t level) const;

336void _dump(std::ostream&, const Graph*, Dumper&, size_t level) const;

337

338size_t _fetch_enqueue(std::shared_ptr<Topology>);

339};

340

341// Constructor

342inline Taskflow::Taskflow(const std::string& name) :

343 FlowBuilder {_graph},

344 _name {name} {

345}

346

347// Constructor

348inline Taskflow::Taskflow() : FlowBuilder{_graph} {

349}

350

351// Move constructor

352inline Taskflow::Taskflow(Taskflow&& rhs) : FlowBuilder{_graph} {

353 std::scoped_lock<std::mutex> lock(rhs._mutex);

354 _name = std::move(rhs._name);

355 _graph = std::move(rhs._graph);

356 _topologies = std::move(rhs._topologies);

357}

358

359// Move assignment

360inline Taskflow& Taskflow::operator =(Taskflow&& rhs) {

361if(this != &rhs) {

362 std::scoped_lock<std::mutex, std::mutex> lock(_mutex, rhs._mutex);

363 _name = std::move(rhs._name);

364 _graph = std::move(rhs._graph);

365 _topologies = std::move(rhs._topologies);

366 }

367return *this;

368}

369

370// Function:

371inline void Taskflow::clear() {

372 _graph.clear();

373}

374

375// Function: num_tasks

376inline size_t Taskflow::num_tasks() const {

377return _graph.size();

378}

379

380// Function: empty

381inline bool Taskflow::empty() const {

382return _graph.empty();

383}

384

385// Function: name

386inline void Taskflow::name(const std::string &name) {

387 _name = name;

388}

389

390// Function: name

391inline const std::string& Taskflow::name() const {

392return _name;

393}

394

395// Function: graph

396inline Graph& Taskflow::graph() {

397return _graph;

398}

399

400// Function: for_each_task

401template <typename V>

402void Taskflow::for_each_task(V&& visitor) const {

403for(auto itr = _graph.begin(); itr != _graph.end(); ++itr) {

404 visitor(Task(*itr));

405 }

406}

407

408// Function: remove_dependency

409inline void Taskflow::remove_dependency(Task from, Task to) {

410// remove "to" from the succcessor list of "from"

411 from._node->_remove_successors(to._node);

412

413// remove "from" from the predecessor list of "to"

414 to._node->_remove_predecessors(from._node);

415}

416

417// Function: _fetch_enqueue

418inline size_t Taskflow::_fetch_enqueue(std::shared_ptr<Topology> tpg) {

419 std::lock_guard<std::mutex> lock(_mutex);

420auto pre_size = _topologies.size();

421 _topologies.emplace(std::move(tpg));

422return pre_size;

423}

424

425// Function: dump

426inline std::string Taskflow::dump() const {

427 std::ostringstream oss;

428dump(oss);

429return oss.str();

430}

431

432// Function: dump

433inline void Taskflow::dump(std::ostream& os) const {

434 os << "digraph Taskflow {\n";

435 os << " compound=true;\n"; // required for lhead/ltail cluster edges

436 _dump(os, &_graph);

437 os << "}\n";

438}

439

440// Function: _dump (top-level — iterates module stack)

441inline void Taskflow::_dump(std::ostream& os, const Graph* top) const {

442

443 Dumper dumper;

444

445 dumper.id = 0;

446 dumper.stack.push({nullptr, top, 1});

447 dumper.visited[top] = dumper.id++;

448

449while(!dumper.stack.empty()) {

450

451auto [p, f, depth] = dumper.stack.top();

452 dumper.stack.pop();

453

454 std::string ind(depth * 2, ' ');

455 std::string ind2((depth + 1) * 2, ' ');

456

457 os << ind << "subgraph cluster_p" << f << " {\n";

458 os << ind2 << "label="";

459

460// n-level module

461if(p) {

462if (p->_name.empty()) os << 'm' << dumper.visited[f];

463else os << p->name();

464 }

465// top-level taskflow graph

466else {

467 os << "Taskflow: ";

468if(_name.empty()) os << 'p' << this;

469else os << _name;

470 }

471

472 os << "";\n";

473

474 _dump(os, f, dumper, depth + 1);

475 os << ind << "}\n";

476 }

477}

478

479// Function: _dump (single node)

480inline void Taskflow::_dump(

481 std::ostream& os, const Node* node, Dumper& dumper, size_t level

  1. const {

483

484 std::string ind(level * 2, ' ');

485

486// label of the node

487 os << ind << 'p' << node << "[label="";

488if(node->_name.empty()) os << 'p' << node;

489else os << node->_name;

490 os << "" ";

491

492// shape of the node

493switch(node->_handle.index()) {

494

495case Node::CONDITION:

496case Node::MULTI_CONDITION:

497 os << "shape=diamond color=black fillcolor=aquamarine style=filled";

498break;

499

500default:

501break;

502 }

503

504 os << "];\n";

505

506for(size_t s=0; s<node->_num_successors; ++s) {

507if(node->_is_conditioner()) {

508 os << ind << 'p' << node << " -> p" << node->_edges[s]

509 << " [style=dashed label="" << s << ""];\n";

510 } else {

511 os << ind << 'p' << node << " -> p" << node->_edges[s] << ";\n";

512 }

513 }

514

515// node info

516switch(auto hid = node->_handle.index(); hid) {

517

518case Node::SUBFLOW:

519case Node::ADOPTED_MODULE: {

520

521auto& g = (hid == Node::SUBFLOW) ?

522 std::get_if<Node::Subflow>(&node->_handle)->subgraph :

523 std::get_if<Node::AdoptedModule>(&node->_handle)->graph;

524

525if(!g.empty()) {

526 std::string ind2((level + 1) * 2, ' ');

527

528 os << ind << "subgraph cluster_p" << node << " {\n";

529 os << ind2 << ((hid == Node::SUBFLOW) ? "label="Subflow: "

530 : "label="AdoptedModule: ");

531if(node->_name.empty()) os << 'p' << node;

532else os << node->_name;

533 os << "";\n";

534 os << ind2 << "color=blue;\n";

535

536 _dump(os, &g, dumper, level + 1);

537 os << ind << "}\n";

538

539// Single cluster-level join edge: subflow cluster → parent node.

540// ltail clips the arrow tail at the cluster boundary.

541auto first = *g.begin();

542 os << ind << 'p' << first << " -> p" << node

543 << " [ltail=cluster_p" << node

544 << " style=dashed color=blue];\n";

545 }

546 }

547break;

548

549default:

550break;

551 }

552}

553

554// Function: _dump (graph — iterates nodes)

555inline void Taskflow::_dump(

556 std::ostream& os, const Graph* graph, Dumper& dumper, size_t level

  1. const {

558

559 std::string ind(level * 2, ' ');

560

561for(auto itr = graph->begin(); itr != graph->end(); ++itr) {

562

563 Node* n = *itr;

564

565// regular task

566if(n->_handle.index() != Node::MODULE) {

567 _dump(os, n, dumper, level);

568 }

569// module task

570else {

571auto mgraph = &(std::get_if<Node::Module>(&n->_handle)->graph);

572

573 os << ind << 'p' << n << "[shape=box3d, color=blue, label="";

574if(n->_name.empty()) os << 'p' << n;

575else os << n->_name;

576

577if(dumper.visited.find(mgraph) == dumper.visited.end()) {

578 dumper.visited[mgraph] = dumper.id++;

579 dumper.stack.push({n, mgraph, level});

580 }

581

582if(n->_name.empty()) os << " [m" << dumper.visited[mgraph] << "]";

583 os << ""];\n";

584

585for(size_t i=0; i<n->_num_successors; ++i) {

586 os << ind << 'p' << n << " -> " << 'p' << n->_edges[i] << ";\n";

587 }

588 }

589 }

590}

591

592// ----------------------------------------------------------------------------

593// class definition: Future

594// ----------------------------------------------------------------------------

595

629template <typename T>

630class Future : public std::future<T> {

631

632friend class Executor;

633friend class Subflow;

634friend class Runtime;

635

636public:

637

641Future() = default;

642

646Future(const Future&) = delete;

647

651Future(Future&&) = default;

652

656Future(std::future<T>&&);

657

661Future& operator =(const Future&) = delete;

662

666Future& operator =(Future&&) = default;

667

700bool cancel();

701

702private:

703

704 std::weak_ptr<Topology> _topology;

705

706Future(std::future<T>&&, std::weak_ptr<Topology>);

707};

708

709template <typename T>

710Future<T>::Future(std::future<T>&& f, std::weak_ptr<Topology> p) :

711 std::future<T> {std::move(f)},

712 _topology {std::move(p)} {

713}

714

715template <typename T>

716Future<T>::Future(std::future<T>&& f) : std::future<T> {std::move(f)} {

717}

718

719// Function: cancel

720template <typename T>

721bool Future<T>::cancel() {

722if(auto ptr = _topology.lock(); ptr) {

723 ptr->_estate.fetch_or(ESTATE::CANCELLED, std::memory_order_relaxed);

724return true;

725 }

726return false;

727}

728

729

730} // end of namespace tf. ---------------------------------------------------

tf::Future::cancel

bool cancel()

cancels the execution of the running taskflow associated with this future object

Definition taskflow.hpp:721

tf::Future::operator=

Future & operator=(const Future &)=delete

disabled copy assignment

tf::Future::Future

Future()=default

default constructor

tf::Future::Future

Future(const Future &)=delete

disabled copy constructor

tf::Future::Future

Future(Future &&)=default

default move constructor

tf::Graph

class to create a graph object

Definition graph.hpp:47

tf::Task

class to create a task handle over a taskflow node

Definition task.hpp:569

tf::Taskflow::clear

void clear()

clears the associated task dependency graph

Definition taskflow.hpp:371

tf::Taskflow::empty

bool empty() const

queries if this taskflow is empty (has no tasks)

Definition taskflow.hpp:381

tf::Taskflow::remove_dependency

void remove_dependency(Task from, Task to)

removes dependencies that go from task from to task to

Definition taskflow.hpp:409

tf::Taskflow::for_each_task

void for_each_task(V &&visitor) const

applies a visitor to each task in this taskflow

Definition taskflow.hpp:402

tf::Taskflow::Taskflow

Taskflow(const std::string &name)

constructs a taskflow with the given name

Definition taskflow.hpp:342

tf::Taskflow::graph

Graph & graph()

returns a reference to the underlying graph object

Definition taskflow.hpp:396

tf::Taskflow::name

const std::string & name() const

queries the name of this taskflow

Definition taskflow.hpp:391

tf::Taskflow::dump

std::string dump() const

dumps the taskflow to a std::string of DOT format

Definition taskflow.hpp:426

tf::Taskflow::operator=

Taskflow & operator=(Taskflow &&rhs)

move assignment operator

Definition taskflow.hpp:360

tf::Taskflow::~Taskflow

~Taskflow()=default

default destructor

tf::Taskflow::name

void name(const std::string &)

assigns a new name to this taskflow

Definition taskflow.hpp:386

tf::Taskflow::Taskflow

Taskflow()

constructs a taskflow

Definition taskflow.hpp:348

tf::Taskflow::num_tasks

size_t num_tasks() const

queries the number of tasks in this taskflow

Definition taskflow.hpp:376

tf

taskflow namespace

Definition small_vector.hpp:20