docs/runtime_8hpp_source.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
runtime.hpp
1#pragma once
2
3#include "executor.hpp"
4
5namespace tf {
6
7// ------------------------------------------------------------------------------------------------
8// class: Runtime
9// ------------------------------------------------------------------------------------------------
10
47class Runtime {
48
49friend class Executor;
50friend class FlowBuilder;
51friend class PreemptionGuard;
52friend class Algorithm;
53
54public:
55
71 Executor& executor();
72
77
119
120// ----------------------------------------------------------------------------------------------
121// async methods
122// ----------------------------------------------------------------------------------------------
123
156template <typename F>
157auto async(F&& f);
158
179template <typename P, typename F>
180auto async(P&& params, F&& f);
181
182// ----------------------------------------------------------------------------------------------
183// silent async methods
184// ----------------------------------------------------------------------------------------------
185
206template <typename F>
207void silent_async(F&& f);
208
225template <typename P, typename F>
226void silent_async(P&& params, F&& f);
227
228// ----------------------------------------------------------------------------------------------
229// dependent async methods
230// ----------------------------------------------------------------------------------------------
231
266template <typename F, typename... Tasks>
267requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
268auto dependent_async(F&& func, Tasks&&... tasks);
269
308template <TaskParamsLike P, typename F, typename... Tasks>
309requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
310auto dependent_async(P&& params, F&& func, Tasks&&... tasks);
311
349template <typename F, typename I>
350requires (!std::same_as<std::decay_t<I>, AsyncTask>)
351auto dependent_async(F&& func, I first, I last);
352
394template <TaskParamsLike P, typename F, typename I>
395requires (!std::same_as<std::decay_t<I>, AsyncTask>)
396auto dependent_async(P&& params, F&& func, I first, I last);
397
398// ----------------------------------------------------------------------------------------------
399// silent dependent async methods
400// ----------------------------------------------------------------------------------------------
401
429template <typename F, typename... Tasks>
430requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
431tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);
432
464template <TaskParamsLike P, typename F, typename... Tasks>
465requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
466tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);
467
500template <typename F, typename I>
501requires (!std::same_as<std::decay_t<I>, AsyncTask>)
502tf::AsyncTask silent_dependent_async(F&& func, I first, I last);
503
538template <TaskParamsLike P, typename F, typename I>
539requires (!std::same_as<std::decay_t<I>, AsyncTask>)
540tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);
541
542
543
544// ----------------------------------------------------------------------------------------------
545// cooperative execution methods
546// ----------------------------------------------------------------------------------------------
547
578void corun();
579
583void corun_all();
584
588bool is_cancelled();
589
590private:
591
595explicit Runtime(Executor&, Worker&, Node*);
596
600 Executor& _executor;
601
605Worker& _worker;
606
610 Node* _node;
611};
612
613// constructor
614inline Runtime::Runtime(Executor& executor, Worker& worker, Node* node) :
615 _executor {executor},
616 _worker {worker},
617 _node {node} {
618}
619
620// Function: executor
621inline Executor& Runtime::executor() {
622return _executor;
623}
624
625// Function: worker
626inline Worker& Runtime::worker() {
627return _worker;
628}
629
630// Procedure: schedule
631inline void Runtime::schedule(Task task) {
632
633auto node = task._node;
634// need to keep the invariant: when scheduling a task, the task must have
635// zero dependency (join counter is 0)
636// or we can encounter bug when inserting a nested flow (e.g., module task)
637 node->_join_counter.store(0, std::memory_order_relaxed);
638
639auto& j = node->_parent ? node->_parent->_join_counter :
640 node->_topology->_join_counter;
641 j.fetch_add(1, std::memory_order_relaxed);
642 _executor._schedule(_worker, node);
643}
644
645// Function: corun
646inline void Runtime::corun() {
647 {
648 ExplicitAnchorGuard anchor(_node);
649 _executor._corun_until(_worker, [this] () -> bool {
650return _node->_join_counter.load(std::memory_order_acquire) == 1;
651 });
652 }
653 _node->_rethrow_exception();
654}
655
656// Function: corun_all
657inline void Runtime::corun_all() {
658corun();
659}
660
661inline bool Runtime::is_cancelled() {
662return _node->_is_parent_cancelled();
663}
664
665// ------------------------------------------------------------------------------------------------
666// Runtime::silent_async
667// ------------------------------------------------------------------------------------------------
668
669// Function: silent_async
670template <typename F>
671void Runtime::silent_async(F&& f) {
672silent_async(DefaultTaskParams{}, std::forward<F>(f));
673}
674
675// Function: silent_async
676template <typename P, typename F>
677void Runtime::silent_async(P&& params, F&& f) {
678 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
679 _executor._silent_async(
680 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
681 );
682}
683
684// ------------------------------------------------------------------------------------------------
685// Runtime::async
686// ------------------------------------------------------------------------------------------------
687
688// Function: async
689template <typename F>
690auto Runtime::async(F&& f) {
691return async(DefaultTaskParams{}, std::forward<F>(f));
692}
693
694// Function: async
695template <typename P, typename F>
696auto Runtime::async(P&& params, F&& f) {
697 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
698return _executor._async(
699 std::forward<P>(params), std::forward<F>(f), _node->_topology, _node
700 );
701}
702
703// ------------------------------------------------------------------------------------------------
704// silent dependent async
705// ------------------------------------------------------------------------------------------------
706
707// Function: silent_dependent_async
708template <typename F, typename... Tasks>
709requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
710tf::AsyncTask Runtime::silent_dependent_async(F&& func, Tasks&&... tasks) {
711return silent_dependent_async(
712DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...
713 );
714}
715
716// Function: silent_dependent_async
717template <TaskParamsLike P, typename F, typename... Tasks>
718requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
719tf::AsyncTask Runtime::silent_dependent_async(
720 P&& params, F&& func, Tasks&&... tasks
721){
722 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
723return silent_dependent_async(
724 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
725 );
726}
727
728// Function: silent_dependent_async
729template <typename F, typename I>
730requires (!std::same_as<std::decay_t<I>, AsyncTask>)
731tf::AsyncTask Runtime::silent_dependent_async(F&& func, I first, I last) {
732return silent_dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);
733}
734
735// Function: silent_dependent_async
736template <TaskParamsLike P, typename F, typename I>
737requires (!std::same_as<std::decay_t<I>, AsyncTask>)
738tf::AsyncTask Runtime::silent_dependent_async(
739 P&& params, F&& func, I first, I last
741 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
742return _executor._silent_dependent_async(
743 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
744 );
745}
746
747// ------------------------------------------------------------------------------------------------
748// dependent async
749// ------------------------------------------------------------------------------------------------
750
751// Function: dependent_async
752template <typename F, typename... Tasks>
753requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
754auto Runtime::dependent_async(F&& func, Tasks&&... tasks) {
755return dependent_async(DefaultTaskParams{}, std::forward<F>(func), std::forward<Tasks>(tasks)...);
756}
757
758// Function: dependent_async
759template <TaskParamsLike P, typename F, typename... Tasks>
760requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)
761auto Runtime::dependent_async(P&& params, F&& func, Tasks&&... tasks) {
762 std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
763return dependent_async(
764 std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
765 );
766}
767
768// Function: dependent_async
769template <typename F, typename I>
770requires (!std::same_as<std::decay_t<I>, AsyncTask>)
771auto Runtime::dependent_async(F&& func, I first, I last) {
772return dependent_async(DefaultTaskParams{}, std::forward<F>(func), first, last);
773}
774
775// Function: dependent_async
776template <TaskParamsLike P, typename F, typename I>
777requires (!std::same_as<std::decay_t<I>, AsyncTask>)
778auto Runtime::dependent_async(P&& params, F&& func, I first, I last) {
779 _node->_join_counter.fetch_add(1, std::memory_order_relaxed);
780return _executor._dependent_async(
781 std::forward<P>(params), std::forward<F>(func), first, last, _node->_topology, _node
782 );
783}
784
785// ----------------------------------------------------------------------------
786// Executor Forward Declaration
787// ----------------------------------------------------------------------------
788
789// Procedure: _invoke_runtime_task
790inline bool Executor::_invoke_runtime_task(Worker& worker, Node* node) {
791return _invoke_runtime_task_impl(
792 worker, node, std::get_if<Node::Runtime>(&node->_handle)->work
793 );
794}
795
796// Function: _invoke_runtime_task_impl
797inline bool Executor::_invoke_runtime_task_impl(
798 Worker& worker, Node* node, std::function<void(Runtime&)>& work
800// first time
801if((node->_nstate & NSTATE::PREEMPTED) == 0) {
802
803 Runtime rt(*this, worker, node);
804
805 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
806
807 node->_join_counter.fetch_add(1, std::memory_order_release);
808
809 _observer_prologue(worker, node);
810 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
811 work(rt);
812 });
813 _observer_epilogue(worker, node);
814
815// Last one to leave the runtime; no need to preempt this runtime.
816if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
817 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
818 }
819// There are still child tasks running; need to preempt this runtime.
820// Here, we cannot let caller check the state from node->_nstate due to data race,
821// but return a stateless boolean to indicate preemption.
822// Ex: if preempted, another task may finish real quck and insert this parent task
823// again into the scheduling queue. When running this parent task, it will jump to
824// else branch below and modify tne nstate, thus incuring data race.
825else {
826return true;
827 }
828 }
829// second time - previously preempted
830else {
831 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
832 }
833return false;
834}
835
836// Function: _invoke_runtime_task_impl
837inline bool Executor::_invoke_runtime_task_impl(
838Worker& worker, Node* node, std::function<void(Runtime&, bool)>& work
840
841 Runtime rt(*this, worker, node);
842
843// first time
844if((node->_nstate & NSTATE::PREEMPTED) == 0) {
845
846 node->_nstate |= (NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
847 node->_join_counter.fetch_add(1, std::memory_order_release);
848
849 _observer_prologue(worker, node);
850 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
851 work(rt, false);
852 });
853 _observer_epilogue(worker, node);
854
855// Last one to leave this runtime; no need to preempt this runtime
856if(node->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
857 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
858 }
859// Here, we cannot let caller check the state from node->_nstate due to data race,
860// but return a stateless boolean to indicate preemption.
861// Ex: if preempted, another task may finish real quck and insert this parent task
862// again into the scheduling queue. When running this parent task, it will jump to
863// else branch below and modify tne nstate, thus incuring data race.
864else {
865return true;
866 }
867 }
868// second time - previously preempted
869else {
870 node->_nstate &= ~(NSTATE::PREEMPTED | NSTATE::IMPLICITLY_ANCHORED);
871 }
872
873// clean up outstanding work (e.g., exception)
874 work(rt, true);
875
876return false;
877}
878
879// ------------------------------------------------------------------------------------------------
880// class: NonpreemptiveRuntime (internal use only)
881// ------------------------------------------------------------------------------------------------
882
886class NonpreemptiveRuntime {
887
888friend class Executor;
889
890public:
891
895void schedule(Task task);
896
897private:
898
902explicit NonpreemptiveRuntime(Executor& executor, Worker& worker) :
903 _executor {executor}, _worker {worker}{
904 }
905
909 Executor& _executor;
910
914 Worker& _worker;
915};
916
917// Procedure: schedule
918inline void NonpreemptiveRuntime::schedule(Task task) {
919
920auto node = task._node;
921// need to keep the invariant: when scheduling a task, the task must have
922// zero dependency (join counter is 0)
923// or we can encounter bug when inserting a nested flow (e.g., module task)
924 node->_join_counter.store(0, std::memory_order_relaxed);
925
926auto& j = node->_parent ? node->_parent->_join_counter :
927 node->_topology->_join_counter;
928 j.fetch_add(1, std::memory_order_relaxed);
929 _executor._schedule(_worker, node);
930}
931
932// ----------------------------------------------------------------------------
933// Executor Forward Declaration
934// ----------------------------------------------------------------------------
935
936// Procedure: _invoke_nonpreemptive_runtime_task
937inline void Executor::_invoke_nonpreemptive_runtime_task(Worker& worker, Node* node) {
938 _observer_prologue(worker, node);
939 tf::NonpreemptiveRuntime nprt(*this, worker);
940 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {
941 std::get_if<Node::NonpreemptiveRuntime>(&node->_handle)->work(nprt);
942 });
943 _observer_epilogue(worker, node);
944}
945
946
947// Function: run_until
948template <typename P, typename C>
949tf::Future<void> Executor::run_until(Taskflow&& f, P&& p, C&& c) {
950
951// No need to create a real topology but returns an dummy future for invariant.
952if(f.empty() || p()) {
953 c();
954 std::promise<void> promise;
955 promise.set_value();
956return tf::Future<void>(promise.get_future());
957 }
958
959 _increment_topology();
960
961auto g = std::make_unique<Taskflow>(std::move(f));
962
963// creates a topology for this run
964auto t = std::make_shared<Topology>(*g, std::forward<P>(p), std::forward<C>(c));
965//auto t = std::make_shared<DerivedTopology<P, C>>(*g, std::forward<P>(p), std::forward<C>(c));
966
967// need to create future before the topology got torn down quickly
968tf::Future<void> future(t->_promise.get_future(), t);
969
970// creates a silent-async that holds the taskflow
971silent_async([g=MoC{std::move(g)}, t](tf::Runtime& rt) mutable {
972 t->_parent = rt._node;
973 t->_parent->_join_counter.fetch_add(1, std::memory_order_release);
974if(g.object->_fetch_enqueue(t) == 0) {
975 rt._executor._schedule_graph(
976 rt._worker, g.object->_graph, t.get(), t.get()
977 );
978 }
979 });
980
981return future;
982}
983
984} // end of namespace tf -----------------------------------------------------
class to hold a dependent asynchronous task with shared ownership
Definition async_task.hpp:45
class to create an empty task parameter for compile-time optimization
Definition graph.hpp:191
class to create an executor
Definition executor.hpp:62
void silent_async(P &¶ms, F &&func)
similar to tf::Executor::async but does not return a future object
tf::Future< void > run_until(Taskflow &taskflow, P &&pred)
runs a taskflow multiple times until the predicate becomes true
class to access the result of an execution
Definition taskflow.hpp:630
class to create a runtime task
Definition runtime.hpp:47
tf::Runtime::silent_dependent_async
tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:710
void silent_async(F &&f)
runs the given function asynchronously without returning any future object
Definition runtime.hpp:671
bool is_cancelled()
queries if this runtime task has been cancelled
Definition runtime.hpp:661
Executor & executor()
obtains the running executor
Definition runtime.hpp:621
auto async(F &&f)
runs the given callable asynchronously
Definition runtime.hpp:690
void schedule(Task task)
schedules an active task immediately to the worker's queue
Definition runtime.hpp:631
void corun()
corun all tasks spawned by this runtime with other workers
Definition runtime.hpp:646
auto dependent_async(F &&func, Tasks &&... tasks)
runs the given function asynchronously when the given predecessors finish
Definition runtime.hpp:754
Worker & worker()
acquire a reference to the underlying worker
Definition runtime.hpp:626
void corun_all()
equivalent to tf::Runtime::corun - just an alias for legacy purpose
Definition runtime.hpp:657
class to create a task handle over a taskflow node
Definition task.hpp:569
class to create a taskflow object
Definition taskflow.hpp:64
class to create a worker in an executor
Definition worker.hpp:55
determines if a type is a task parameter type
Definition graph.hpp:202
taskflow namespace
Definition small_vector.hpp:20