Back to Taskflow

Taskflow: A General

docs/runtime_8hpp_source.html

4.1.027.1 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

runtime.hpp

1#pragma once

2

3#include "executor.hpp"

4

5namespace tf {

6

7// ------------------------------------------------------------------------------------------------

8// class: Runtime

9// ------------------------------------------------------------------------------------------------

10

47class Runtime {

48

49friend class Executor;

50friend class FlowBuilder;

51friend class PreemptionGuard;

52friend class Algorithm;

53

54public:

55

71 Executor& executor();

72

76inline Worker& worker();

77

118void schedule(Task task);

119

120// ----------------------------------------------------------------------------------------------

121// async methods

122// ----------------------------------------------------------------------------------------------

123

156template <typename F>

157auto async(F&& f);

158

179template <typename P, typename F>

180auto async(P&& params, F&& f);

181

182// ----------------------------------------------------------------------------------------------

183// silent async methods

184// ----------------------------------------------------------------------------------------------

185

206template <typename F>

207void silent_async(F&& f);

208

225template <typename P, typename F>

226void silent_async(P&& params, F&& f);

227

228// ----------------------------------------------------------------------------------------------

229// dependent async methods

230// ----------------------------------------------------------------------------------------------

231

266template <typename F, typename... Tasks>

267requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

268auto dependent_async(F&& func, Tasks&&... tasks);

269

308template <TaskParamsLike P, typename F, typename... Tasks>

309requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

310auto dependent_async(P&& params, F&& func, Tasks&&... tasks);

311

349template <typename F, typename I>

350requires (!std::same_as<std::decay_t<I>, AsyncTask>)

351auto dependent_async(F&& func, I first, I last);

352

394template <TaskParamsLike P, typename F, typename I>

395requires (!std::same_as<std::decay_t<I>, AsyncTask>)

396auto dependent_async(P&& params, F&& func, I first, I last);

397

398// ----------------------------------------------------------------------------------------------

399// silent dependent async methods

400// ----------------------------------------------------------------------------------------------

401

429template <typename F, typename... Tasks>

430requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

431tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);

432

464template <TaskParamsLike P, typename F, typename... Tasks>

465requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

466tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);

467

500template <typename F, typename I>

501requires (!std::same_as<std::decay_t<I>, AsyncTask>)

502tf::AsyncTask silent_dependent_async(F&& func, I first, I last);

503

538template <TaskParamsLike P, typename F, typename I>

539requires (!std::same_as<std::decay_t<I>, AsyncTask>)

540tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);

541

542

543

544// ----------------------------------------------------------------------------------------------

545// cooperative execution methods

546// ----------------------------------------------------------------------------------------------

547

578void corun();

579

583void corun_all();

584

588bool is_cancelled();

589

590private:

591

595explicit Runtime(Executor&, Worker&, Node*);

596

600 Executor& _executor;

601

605Worker& _worker;

606

610 Node* _node;

611};

612

613// constructor

614inline Runtime::Runtime(Executor& executor, Worker& worker, Node* node) :

615 _executor {executor},

616 _worker {worker},

617 _node {node} {

618}

619

620// Function: executor

621inline Executor& Runtime::executor() {

622return _executor;

623}

624

625// Function: worker

626inline Worker& Runtime::worker() {

627return _worker;

628}

629

630// Procedure: schedule

631inline void Runtime::schedule(Task task) {

632

633auto node = task._node;

634// need to keep the invariant: when scheduling a task, the task must have

635// zero dependency (join counter is 0)

636// or we can encounter bug when inserting a nested flow (e.g., module task)

637 node->_join_counter.store(0, std::memory_order_relaxed);

638

639auto& j = node->_parent ? node->_parent->_join_counter :

640 node->_topology->_join_counter;

641 j.fetch_add(1, std::memory_order_relaxed);

642 _executor._schedule(_worker, node);

643}

644

645// Function: corun

646inline void Runtime::corun() {

647 {

648 ExplicitAnchorGuard anchor(_node);

649 _executor._corun_until(_worker, [this] () -> bool {

650return _node->_join_counter.load(std::memory_order_acquire) == 1;

651 });

652 }

653 _node->_rethrow_exception();

654}

655

656// Function: corun_all

657inline void Runtime::corun_all() {

658corun();

659}

660

661inline bool Runtime::is_cancelled() {

662return _node->_is_parent_cancelled();

663}

664

665// ------------------------------------------------------------------------------------------------

666// Runtime::silent_async

667// ------------------------------------------------------------------------------------------------

668

669// Function: silent_async

670template <typename F>

671void Runtime::silent_async(F&& f) {

672silent_async(DefaultTaskParams{}, std::forward<F>(f));

673}

674

675// Function: silent_async

676template <typename P, typename F>

677void Runtime::silent_async(P&& params, F&& f) {

678 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);

679 _executor._silent_async(

680 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node

681 );

682}

683

684// ------------------------------------------------------------------------------------------------

685// Runtime::async

686// ------------------------------------------------------------------------------------------------

687

688// Function: async

689template <typename F>

690auto Runtime::async(F&& f) {

691return async(DefaultTaskParams{}, std::forward<F>(f));

692}

693

694// Function: async

695template <typename P, typename F>

696auto Runtime::async(P&& params, F&& f) {

697 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);

698return _executor._async(

699 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node

700 );

701}

702

703// ------------------------------------------------------------------------------------------------

704// silent dependent async

705// ------------------------------------------------------------------------------------------------

706

707// Function: silent_dependent_async

708template <typename F, typename... Tasks>

709requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

710tf::AsyncTask Runtime::silent_dependent_async(F&& func, Tasks&&... tasks) {

711return silent_dependent_async(

712DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...

713 );

714}

715

716// Function: silent_dependent_async

717template <TaskParamsLike P, typename F, typename... Tasks>

718requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

719tf::AsyncTask Runtime::silent_dependent_async(

720 P&& params, F&& func, Tasks&&... tasks

721){

722 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };

723return silent_dependent_async(

724 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()

725 );

726}

727

728// Function: silent_dependent_async

729template <typename F, typename I>

730requires (!std::same_as<std::decay_t<I>, AsyncTask>)

731tf::AsyncTask Runtime::silent_dependent_async(F&& func, I first, I last) {

732return silent_dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);

733}

734

735// Function: silent_dependent_async

736template <TaskParamsLike P, typename F, typename I>

737requires (!std::same_as<std::decay_t<I>, AsyncTask>)

738tf::AsyncTask Runtime::silent_dependent_async(

739 P&& params, F&& func, I first, I last

  1. {

741 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);

742return _executor._silent_dependent_async(

743 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node

744 );

745}

746

747// ------------------------------------------------------------------------------------------------

748// dependent async

749// ------------------------------------------------------------------------------------------------

750

751// Function: dependent_async

752template <typename F, typename... Tasks>

753requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

754auto Runtime::dependent_async(F&& func, Tasks&&... tasks) {

755return dependent_async(DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...);

756}

757

758// Function: dependent_async

759template <TaskParamsLike P, typename F, typename... Tasks>

760requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

761auto Runtime::dependent_async(P&& params, F&& func, Tasks&&... tasks) {

762 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };

763return dependent_async(

764 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()

765 );

766}

767

768// Function: dependent_async

769template <typename F, typename I>

770requires (!std::same_as<std::decay_t<I>, AsyncTask>)

771auto Runtime::dependent_async(F&& func, I first, I last) {

772return dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);

773}

774

775// Function: dependent_async

776template <TaskParamsLike P, typename F, typename I>

777requires (!std::same_as<std::decay_t<I>, AsyncTask>)

778auto Runtime::dependent_async(P&& params, F&& func, I first, I last) {

779 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);

780return _executor._dependent_async(

781 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node

782 );

783}

784

785// ----------------------------------------------------------------------------

786// Executor Forward Declaration

787// ----------------------------------------------------------------------------

788

789// Procedure: _invoke_runtime_task

790inline bool Executor::_invoke_runtime_task(Worker& worker, Node* node) {

791return _invoke_runtime_task_impl(

792 worker, node, std::get_if<Node::Runtime>(&node->_handle)->work

793 );

794}

795

796// Function: _invoke_runtime_task_impl

797inline bool Executor::_invoke_runtime_task_impl(

798 Worker& worker, Node* node, std::function<void(Runtime&)>& work

  1. {

800// first time

801if((node->_nstate & NSTATE::PREEMPTED) == 0) {

802

803 Runtime rt(*this, worker, node);

804

805 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

806

807 node->_join_counter.fetch_add(1, std::memory_order_release);

808

809 _observer_prologue(worker, node);

810 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

811 work(rt);

812 });

813 _observer_epilogue(worker, node);

814

815// Last one to leave the runtime; no need to preempt this runtime.

816if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

817 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

818 }

819// There are still child tasks running; need to preempt this runtime.

820// Here, we cannot let caller check the state from node->_nstate due to data race,

821// but return a stateless boolean to indicate preemption.

822// Ex: if preempted, another task may finish real quck and insert this parent task

823// again into the scheduling queue. When running this parent task, it will jump to

824// else branch below and modify tne nstate, thus incuring data race.

825else {

826return true;

827 }

828 }

829// second time - previously preempted

830else {

831 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

832 }

833return false;

834}

835

836// Function: _invoke_runtime_task_impl

837inline bool Executor::_invoke_runtime_task_impl(

838Worker& worker, Node* node, std::function<void(Runtime&, bool)>& work

  1. {

840

841 Runtime rt(*this, worker, node);

842

843// first time

844if((node->_nstate & NSTATE::PREEMPTED) == 0) {

845

846 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

847 node->_join_counter.fetch_add(1, std::memory_order_release);

848

849 _observer_prologue(worker, node);

850 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

851 work(rt, false);

852 });

853 _observer_epilogue(worker, node);

854

855// Last one to leave this runtime; no need to preempt this runtime

856if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

857 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

858 }

859// Here, we cannot let caller check the state from node->_nstate due to data race,

860// but return a stateless boolean to indicate preemption.

861// Ex: if preempted, another task may finish real quck and insert this parent task

862// again into the scheduling queue. When running this parent task, it will jump to

863// else branch below and modify tne nstate, thus incuring data race.

864else {

865return true;

866 }

867 }

868// second time - previously preempted

869else {

870 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);

871 }

872

873// clean up outstanding work (e.g., exception)

874 work(rt, true);

875

876return false;

877}

878

879// ------------------------------------------------------------------------------------------------

880// class: NonpreemptiveRuntime (internal use only)

881// ------------------------------------------------------------------------------------------------

882

886class NonpreemptiveRuntime {

887

888friend class Executor;

889

890public:

891

895void schedule(Task task);

896

897private:

898

902explicit NonpreemptiveRuntime(Executor& executor, Worker& worker) :

903 _executor {executor}, _worker {worker}{

904 }

905

909 Executor& _executor;

910

914 Worker& _worker;

915};

916

917// Procedure: schedule

918inline void NonpreemptiveRuntime::schedule(Task task) {

919

920auto node = task._node;

921// need to keep the invariant: when scheduling a task, the task must have

922// zero dependency (join counter is 0)

923// or we can encounter bug when inserting a nested flow (e.g., module task)

924 node->_join_counter.store(0, std::memory_order_relaxed);

925

926auto& j = node->_parent ? node->_parent->_join_counter :

927 node->_topology->_join_counter;

928 j.fetch_add(1, std::memory_order_relaxed);

929 _executor._schedule(_worker, node);

930}

931

932// ----------------------------------------------------------------------------

933// Executor Forward Declaration

934// ----------------------------------------------------------------------------

935

936// Procedure: _invoke_nonpreemptive_runtime_task

937inline void Executor::_invoke_nonpreemptive_runtime_task(Worker& worker, Node* node) {

938 _observer_prologue(worker, node);

939 tf::NonpreemptiveRuntime nprt(*this, worker);

940 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

941 std::get_if<Node::NonpreemptiveRuntime>(&node->_handle)->work(nprt);

942 });

943 _observer_epilogue(worker, node);

944}

945

946

947// Function: run_until

948template <typename P, typename C>

949tf::Future<void> Executor::run_until(Taskflow&& f, P&& p, C&& c) {

950

951// No need to create a real topology but returns an dummy future for invariant.

952if(f.empty() || p()) {

953 c();

954 std::promise<void> promise;

955 promise.set_value();

956return tf::Future<void>(promise.get_future());

957 }

958

959 _increment_topology();

960

961auto g = std::make_unique<Taskflow>(std::move(f));

962

963// creates a topology for this run

964auto t = std::make_shared<Topology>(*g, std::forward<P>(p), std::forward<C>(c));

965//auto t = std::make_shared<DerivedTopology<P, C>>(*g, std::forward<P>(p), std::forward<C>(c));

966

967// need to create future before the topology got torn down quickly

968tf::Future<void> future(t->_promise.get_future(), t);

969

970// creates a silent-async that holds the taskflow

971silent_async([g=MoC{std::move(g)}, t](tf::Runtime& rt) mutable {

972 t->_parent = rt._node;

973 t->_parent->_join_counter.fetch_add(1, std::memory_order_release);

974if(g.object->_fetch_enqueue(t) == 0) {

975 rt._executor._schedule_graph(

976 rt._worker, g.object->_graph, t.get(), t.get()

977 );

978 }

979 });

980

981return future;

982}

983

984} // end of namespace tf -----------------------------------------------------

tf::AsyncTask

class to hold a dependent asynchronous task with shared ownership

Definition async_task.hpp:45

tf::DefaultTaskParams

class to create an empty task parameter for compile-time optimization

Definition graph.hpp:191

tf::Executor

class to create an executor

Definition executor.hpp:62

tf::Executor::silent_async

void silent_async(P &&params, F &&func)

similar to tf::Executor::async but does not return a future object

tf::Executor::run_until

tf::Future< void > run_until(Taskflow &taskflow, P &&pred)

runs a taskflow multiple times until the predicate becomes true

tf::Future

class to access the result of an execution

Definition taskflow.hpp:630

tf::Runtime

class to create a runtime task

Definition runtime.hpp:47

tf::Runtime::silent_dependent_async

tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)

runs the given function asynchronously when the given predecessors finish

Definition runtime.hpp:710

tf::Runtime::silent_async

void silent_async(F &&f)

runs the given function asynchronously without returning any future object

Definition runtime.hpp:671

tf::Runtime::is_cancelled

bool is_cancelled()

queries if this runtime task has been cancelled

Definition runtime.hpp:661

tf::Runtime::executor

Executor & executor()

obtains the running executor

Definition runtime.hpp:621

tf::Runtime::async

auto async(F &&f)

runs the given callable asynchronously

Definition runtime.hpp:690

tf::Runtime::schedule

void schedule(Task task)

schedules an active task immediately to the worker's queue

Definition runtime.hpp:631

tf::Runtime::corun

void corun()

corun all tasks spawned by this runtime with other workers

Definition runtime.hpp:646

tf::Runtime::dependent_async

auto dependent_async(F &&func, Tasks &&... tasks)

runs the given function asynchronously when the given predecessors finish

Definition runtime.hpp:754

tf::Runtime::worker

Worker & worker()

acquire a reference to the underlying worker

Definition runtime.hpp:626

tf::Runtime::corun_all

void corun_all()

equivalent to tf::Runtime::corun - just an alias for legacy purpose

Definition runtime.hpp:657

tf::Task

class to create a task handle over a taskflow node

Definition task.hpp:569

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

tf::Worker

class to create a worker in an executor

Definition worker.hpp:55

tf::TaskParamsLike

determines if a type is a task parameter type

Definition graph.hpp:202

tf

taskflow namespace

Definition small_vector.hpp:20