Back to Taskflow

Taskflow: A General

docs/executor_8hpp_source.html

4.1.072.9 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

executor.hpp

1#pragma once

2

3#include "../observer/tfprof.hpp"

4#include "taskflow.hpp"

5#include "async_task.hpp"

6

11

12namespace tf {

13

14// ----------------------------------------------------------------------------

15// Executor Definition

16// ----------------------------------------------------------------------------

17

62class Executor {

63

64friend class FlowBuilder;

65friend class Subflow;

66friend class Runtime;

67friend class NonpreemptiveRuntime;

68friend class Algorithm;

69friend class TaskGroup;

70

71public:

72

91explicit Executor(

92size_t N = std::thread::hardware_concurrency(),

93 std::shared_ptr<WorkerInterface> wif = nullptr

94 );

95

103~Executor();

104

123tf::Future<void> run(Taskflow& taskflow);

124

144tf::Future<void> run(Taskflow&& taskflow);

145

168template<typename C>

169tf::Future<void> run(Taskflow& taskflow, C&& callable);

170

195template<typename C>

196tf::Future<void> run(Taskflow&& taskflow, C&& callable);

197

217tf::Future<void> run_n(Taskflow& taskflow, size_t N);

218

241tf::Future<void> run_n(Taskflow&& taskflow, size_t N);

242

268template<typename C>

269tf::Future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);

270

296template<typename C>

297tf::Future<void> run_n(Taskflow&& taskflow, size_t N, C&& callable);

298

322template<typename P>

323tf::Future<void> run_until(Taskflow& taskflow, P&& pred);

324

350template<typename P>

351tf::Future<void> run_until(Taskflow&& taskflow, P&& pred);

352

379template<typename P, typename C>

380tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);

381

410template<typename P, typename C>

411tf::Future<void> run_until(Taskflow&& taskflow, P&& pred, C&& callable);

412

453template <typename T>

454void corun(T& target);

455

484template <typename P>

485void corun_until(P&& predicate);

486

500void wait_for_all();

501

512size_t num_workers() const noexcept;

513

520size_t num_waiters() const noexcept;

521

525size_t num_queues() const noexcept;

526

540size_t num_topologies() const;

541

558Worker* this_worker();

559

577int this_worker_id() const;

578

579// --------------------------------------------------------------------------

580// Observer methods

581// --------------------------------------------------------------------------

582

600 template <typename Observer, typename... ArgsT>

601 std::shared_ptr<Observer> make_observer(ArgsT&&... args);

602

608 template <typename Observer>

609void remove_observer(std::shared_ptr<Observer> observer);

610

614size_t num_observers() const noexcept;

615

616// --------------------------------------------------------------------------

617// Async Task Methods

618// --------------------------------------------------------------------------

619

645 template <typename P, typename F>

646 auto async(P&& params, F&& func);

647

671 template <typename F>

672 auto async(F&& func);

673

697 template <typename P, typename F>

698void silent_async(P&& params, F&& func);

699

722 template <typename F>

723void silent_async(F&& func);

724

725// --------------------------------------------------------------------------

726// Silent Dependent Async Methods

727// --------------------------------------------------------------------------

728

756 template <typename F, typename... Tasks>

757requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

758tf::AsyncTask silent_dependent_async(F&& func, Tasks&&... tasks);

759

791 template <TaskParamsLike P, typename F, typename... Tasks>

792 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

793tf::AsyncTask silent_dependent_async(P&& params, F&& func, Tasks&&... tasks);

794

827 template <typename F, typename I>

828requires (!std::same_as<std::decay_t<I>, AsyncTask>)

829tf::AsyncTask silent_dependent_async(F&& func, I first, I last);

830

865 template <TaskParamsLike P, typename F, typename I>

866 requires (!std::same_as<std::decay_t<I>, AsyncTask>)

867tf::AsyncTask silent_dependent_async(P&& params, F&& func, I first, I last);

868

869// --------------------------------------------------------------------------

870// Dependent Async Methods

871// --------------------------------------------------------------------------

872

910 template <typename F, typename... Tasks>

911requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

912 auto dependent_async(F&& func, Tasks&&... tasks);

913

955 template <TaskParamsLike P, typename F, typename... Tasks>

956 requires (std::same_as<std::decay_t<Tasks>, AsyncTask> && ...)

957 auto dependent_async(P&& params, F&& func, Tasks&&... tasks);

958

999 template <typename F, typename I>

1000requires (!std::same_as<std::decay_t<I>, AsyncTask>)

1001 auto dependent_async(F&& func, I first, I last);

1002

1047 template <TaskParamsLike P, typename F, typename I>

1048 requires (!std::same_as<std::decay_t<I>, AsyncTask>)

1049 auto dependent_async(P&& params, F&& func, I first, I last);

1050

1051// ----------------------------------------------------------------------------------------------

1052// Task Group

1053// ----------------------------------------------------------------------------------------------

1054

1100 TaskGroup task_group();

1101

1102 private:

1103

1104 struct Buffer {

1105 std::mutex mutex;

1106UnboundedWSQ<Node*> queue;

1107 };

1108

1109 std::vector<Worker> _workers;

1110 std::vector<Buffer> _buffers;

1111

1112// notifier's state variable and num_topologies should sit on different cachelines

1113// or the false sharing can cause serious performance drop

1114alignas(TF_CACHELINE_SIZE) DefaultNotifier _notifier;

1115alignas(TF_CACHELINE_SIZE) std::atomic<size_t> _num_topologies {0};

1116

1117 std::unordered_map<std::thread::id, Worker*> _t2w;

1118 std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;

1119

1120void _shutdown();

1121void _observer_prologue(Worker&, Node*);

1122void _observer_epilogue(Worker&, Node*);

1123void _spawn(size_t, std::shared_ptr<WorkerInterface>);

1124void _exploit_task(Worker&, Node*&);

1125bool _explore_task(Worker&, Node*&);

1126void _schedule(Worker&, Node*);

1127void _schedule(Node*);

1128void _schedule_graph(Worker&, Graph&, Topology*, NodeBase*);

1129void _spill(Node*);

1130void _set_up_topology(Worker*, Topology*);

1131void _tear_down_topology(Worker&, Topology*, Node*&);

1132void _tear_down_async(Worker&, Node*, Node*&);

1133void _tear_down_dependent_async(Worker&, Node*, Node*&);

1134void _tear_down_nonasync(Worker&, Node*, Node*&);

1135void _tear_down_invoke(Worker&, Node*, Node*&);

1136void _increment_topology();

1137void _decrement_topology();

1138void _invoke(Worker&, Node*);

1139void _invoke_static_task(Worker&, Node*);

1140void _invoke_nonpreemptive_runtime_task(Worker&, Node*);

1141void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);

1142void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);

1143void _process_dependent_async(Node*, tf::AsyncTask&, size_t&);

1144void _process_exception(Worker&, Node*);

1145void _update_cache(Worker&, Node*&, Node*);

1146void _corun_graph(Worker&, Graph&, Topology*, NodeBase*);

1147

1148bool _wait_for_task(Worker&, Node*&);

1149bool _invoke_subflow_task(Worker&, Node*);

1150bool _invoke_module_task(Worker&, Node*);

1151bool _invoke_adopted_module_task(Worker&, Node*);

1152bool _invoke_module_task_impl(Worker&, Node*, Graph&);

1153bool _invoke_async_task(Worker&, Node*);

1154bool _invoke_dependent_async_task(Worker&, Node*);

1155bool _invoke_runtime_task(Worker&, Node*);

1156bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&)>&);

1157bool _invoke_runtime_task_impl(Worker&, Node*, std::function<void(Runtime&, bool)>&);

1158

1159size_t _set_up_graph(Graph&, Topology*, NodeBase*);

1160

1161template <typename P>

1162void _corun_until(Worker&, P&&);

1163

1164template <typename I>

1165void _bulk_schedule(Worker&, I, size_t);

1166

1167template <typename I>

1168void _bulk_schedule(I, size_t);

1169

1170template <typename I>

1171void _bulk_spill(I, size_t);

1172

1173template <typename I>

1174void _bulk_spill_round_robin(I, size_t);

1175

1176template <size_t N>

1177void _bulk_update_cache(Worker&, Node*&, Node*, std::array<Node*, N>&, size_t&);

1178

1179template <typename P, typename F>

1180auto _async(P&&, F&&, Topology*, NodeBase*);

1181

1182template <typename P, typename F>

1183void _silent_async(P&&, F&&, Topology*, NodeBase*);

1184

1185template <TaskParamsLike P, typename F, typename I>

1186requires (!std::same_as<std::decay_t<I>, AsyncTask>)

1187auto _dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);

1188

1189template <TaskParamsLike P, typename F, typename I>

1190requires (!std::same_as<std::decay_t<I>, AsyncTask>)

1191auto _silent_dependent_async(P&&, F&&, I, I, Topology*, NodeBase*);

1192

1193template <typename... ArgsT>

1194void _schedule_async_task(ArgsT&&...);

1195

1196template <typename I, typename... ArgsT>

1197 AsyncTask _schedule_dependent_async_task(I, I, size_t, ArgsT&&...);

1198};

1199

1200#ifndef DOXYGEN_GENERATING_OUTPUT

1201

1202// Constructor

1203inline Executor::Executor(size_t N, std::shared_ptr<WorkerInterface> wif) :

1204 _workers (N),

1205 _buffers (std::bit_width(N)), // Empirically, we find that log2(N) performs best.

1206 _notifier (N) {

1207

1208if(N == 0) {

1209 TF_THROW("executor must define at least one worker");

1210 }

1211

1212// If spawning N threads fails, shut down any created threads before

1213// rethrowing the exception.

1214#ifndef TF_DISABLE_EXCEPTION_HANDLING

1215try {

1216#endif

1217 _spawn(N, std::move(wif));

1218#ifndef TF_DISABLE_EXCEPTION_HANDLING

1219 }

1220catch(...) {

1221 _shutdown();

1222 std::rethrow_exception(std::current_exception());

1223 }

1224#endif

1225

1226// initialize the default observer if requested

1227if(has_env(TF_ENABLE_PROFILER)) {

1228 TFProfManager::get()._manage(make_observer<TFProfObserver>());

1229 }

1230}

1231

1232// Destructor

1233inline Executor::~Executor() {

1234 _shutdown();

1235}

1236

1237// Function: _shutdown

1238inline void Executor::_shutdown() {

1239

1240// wait for all topologies to complete

1241wait_for_all();

1242

1243// shut down the scheduler

1244for(size_t i=0; i<_workers.size(); ++i) {

1245 _workers[i]._done.test_and_set(std::memory_order_relaxed);

1246 }

1247

1248 _notifier.notify_all();

1249

1250// Only join the thread if it is joinable, as std::thread construction

1251// may fail and throw an exception.

1252for(auto& w : _workers) {

1253if(w._thread.joinable()) {

1254 w._thread.join();

1255 }

1256 }

1257}

1258

1259// Function: num_workers

1260inline size_t Executor::num_workers() const noexcept {

1261return _workers.size();

1262}

1263

1264// Function: num_waiters

1265inline size_t Executor::num_waiters() const noexcept {

1266return _notifier.num_waiters();

1267}

1268

1269// Function: num_queues

1270inline size_t Executor::num_queues() const noexcept {

1271return _workers.size() + _buffers.size();

1272}

1273

1274// Function: num_topologies

1275inline size_t Executor::num_topologies() const {

1276return _num_topologies.load(std::memory_order_relaxed);

1277}

1278

1279// Function: this_worker

1280inline Worker* Executor::this_worker() {

1281auto itr = _t2w.find(std::this_thread::get_id());

1282return itr == _t2w.end() ? nullptr : itr->second;

1283}

1284

1285// Function: this_worker_id

1286inline int Executor::this_worker_id() const {

1287auto i = _t2w.find(std::this_thread::get_id());

1288return i == _t2w.end() ? -1 : static_cast<int>(i->second->_id);

1289}

1290

1291// Procedure: _spawn

1292inline void Executor::_spawn(size_t N, std::shared_ptr<WorkerInterface> wif) {

1293

1294for(size_t id=0; id<N; ++id) {

1295 _workers[id]._thread = std::thread([&, id, wif] () {

1296

1297auto& worker = _workers[id];

1298

1299 worker._id = id;

1300 worker._sticky_victim = id;

1301 worker._rdgen.seed(static_cast<uint32_t>(std::hash<std::thread::id>()(std::this_thread::get_id())));

1302

1303// before entering the work-stealing loop, call the scheduler prologue

1304if(wif) {

1305 wif->scheduler_prologue(worker);

1306 }

1307

1308 Node* t = nullptr;

1309 std::exception_ptr ptr = nullptr;

1310

1311// must use 1 as condition instead of !done because

1312// the previous worker may stop while the following workers

1313// are still preparing for entering the scheduling loop

1314#ifndef TF_DISABLE_EXCEPTION_HANDLING

1315try {

1316#endif

1317// work-stealing loop

1318while(1) {

1319

1320// drains out the local queue first

1321 _exploit_task(worker, t);

1322

1323// steals and waits for tasks

1324if(_wait_for_task(worker, t) == false) {

1325break;

1326 }

1327 }

1328

1329#ifndef TF_DISABLE_EXCEPTION_HANDLING

1330 }

1331catch(...) {

1332 ptr = std::current_exception();

1333 }

1334#endif

1335

1336// call the user-specified epilogue function

1337if(wif) {

1338 wif->scheduler_epilogue(worker, ptr);

1339 }

1340

1341 });

1342

1343// We avoid using thread-local storage to track the mapping between a thread

1344// and its corresponding worker in an executor. On Windows, thread-local

1345// storage can be unreliable in certain situations (see issue #727).

1346//

1347// Instead, we maintain a per-executor mapping from threads to workers.

1348// This approach has an additional advantage: according to the C++ Standard,

1349// std::thread::id uniquely identifies a thread object. Therefore, once the map

1350// returns a valid worker, we can be certain that the worker belongs to this

1351// executor. This eliminates the need for additional executor validation

1352// required by using thread-local storage.

1353//

1354// Example:

1355//

1356// Worker* w = this_worker();

1357// // Using thread-local storage, we would need additional executor validation:

1358// if (w == nullptr || w->_executor != this) { /* caller is not a worker of this executor */ }

1359//

1360// // Using per-executor mapping, it suffices to check:

1361// if (w == nullptr) { /* caller is not a worker of this executor */ }

1362//

1363 _t2w.emplace(_workers[id]._thread.get_id(), &_workers[id]);

1364 }

1365}

1366

1367// Function: _explore_task

1368inline bool Executor::_explore_task(Worker& w, Node*& t) {

1369

1370// Fast path: if no topologies are live, all queues are guaranteed empty

1371// by the executor's invariant (num_topologies reaches zero only after all

1372// nodes have been scheduled and their queues flushed). Skip the entire

1373// steal loop and return immediately so the caller enters _wait_for_task

1374// to sleep. relaxed ordering is sufficient — this is a hint, and any

1375// missed update is caught safely by the 2PC guard in _wait_for_task.

1376if(_num_topologies.load(std::memory_order_relaxed) == 0) {

1377return true;

1378 }

1379

1380const size_t MAX_VICTIM = num_queues(); // guaranteed >= 2 by constructor

1381const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);

1382

1383// local aliases for steal protocol sentinels — these are properties of the

1384// steal protocol, not of any specific queue type

1385size_t num_steals = 0;

1386size_t vtm = w._sticky_victim;

1387

1388while(true) {

1389

1390 t = (vtm < _workers.size())

1391 ? _workers[vtm]._wsq.steal()

1392 : _buffers[vtm - _workers.size()].queue.steal();

1393

1394if(t) {

1395 w._sticky_victim = vtm;

1396break;

1397 }

1398

1399// EMPTY: pick a new victim excluding self since our own queue is likely empty.

1400// map [0, MAX_VICTIM-1) over [0, MAX_VICTIM) \ {w._id} — always safe since MAX_VICTIM >= 2.

1401 vtm = w._rdgen() % (MAX_VICTIM - 1);

1402if(vtm >= w._id) vtm++;

1403

1404if(++num_steals > MAX_STEALS) {

1405 std::this_thread::yield();

1406if(num_steals > 150 + MAX_STEALS) {

1407break;

1408 }

1409 }

1410

1411if(w._done.test(std::memory_order_relaxed)) {

1412return false;

1413 }

1414 }

1415

1416return true;

1417}

1418

1419/*

1420// Function: _explore_task

1421inline bool Executor::_explore_task(Worker& w, Node*& t) {

1422

1423 // Fast path: if no topologies are live, all queues are guaranteed empty

1424 // by the executor's invariant (num_topologies reaches zero only after all

1425 // nodes have been scheduled and their queues flushed). Skip the entire

1426 // steal loop and return immediately so the caller enters _wait_for_task

1427 // to sleep. relaxed ordering is sufficient — this is a hint, and any

1428 // missed update is caught safely by the 2PC guard in _wait_for_task.

1429 if(_num_topologies.load(std::memory_order_relaxed) == 0) {

1430 return true;

1431 }

1432

1433 //assert(!t);

1434 const size_t MAX_VICTIM = num_queues();

1435 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);

1436

1437 size_t num_steals = 0;

1438 size_t vtm = w._sticky_victim;

1439

1440 // Make the worker steal immediately from the assigned victim.

1441 while(true) {

1442

1443 // If the worker's victim thread is within the worker pool, steal from the worker's queue.

1444 // Otherwise, steal from the buffer, adjusting the victim index based on the worker pool size.

1445 t = (vtm < _workers.size())

1446 ? _workers[vtm]._wsq.steal()

1447 : _buffers[vtm - _workers.size()].queue.steal();

1448

1449 if(t) {

1450 w._sticky_victim = vtm;

1451 break;

1452 }

1453

1454 // Increment the steal count, and if it exceeds MAX_STEALS, yield the thread.

1455 // If the number of empty steals reaches MAX_STEALS, exit the loop.

1456 if (++num_steals > MAX_STEALS) {

1457 std::this_thread::yield();

1458 if(num_steals > 150 + MAX_STEALS) {

1459 break;

1460 }

1461 }

1462

1463 if(w._done.test(std::memory_order_relaxed)) {

1464 return false;

1465 }

1466

1467 // Randomely generate a next victim.

1468 vtm = w._rdgen() % MAX_VICTIM;

1469 }

1470 return true;

1471}

1472*/

1473

1474// Procedure: _exploit_task

1475inline void Executor::_exploit_task(Worker& w, Node*& t) {

1476while(t) {

1477 _invoke(w, t);

1478 t = w._wsq.pop();

1479 }

1480}

1481

1482// Function: _wait_for_task

1483inline bool Executor::_wait_for_task(Worker& w, Node*& t) {

1484

1485 explore_task:

1486

1487if(_explore_task(w, t) == false) {

1488return false;

1489 }

1490

1491// Go exploit the task if we successfully steal one.

1492if(t) {

1493return true;

1494 }

1495

1496// Entering the 2PC guard as all queues are likely empty after many stealing attempts.

1497 _notifier.prepare_wait(w._id);

1498

1499// Fast path: if no topologies are live, all queues are guaranteed empty.

1500// Skip the O(N) buffer and worker queue scans and go directly to sleep.

1501// This is safe because prepare_wait has already been called — any notify

1502// that arrives after this check but before commit_wait will be caught by

1503// the 2PC guarantee of the notifier.

1504if(_num_topologies.load(std::memory_order_relaxed) == 0) {

1505// still check done flag before committing to sleep

1506if(w._done.test(std::memory_order_relaxed)) {

1507 _notifier.cancel_wait(w._id);

1508return false;

1509 }

1510 _notifier.commit_wait(w._id);

1511goto explore_task;

1512 }

1513

1514// Condition #1: buffers should be empty

1515for(size_t b=0; b<_buffers.size(); ++b) {

1516if(!_buffers[b].queue.empty()) {

1517 _notifier.cancel_wait(w._id);

1518 w._sticky_victim = b + _workers.size();

1519goto explore_task;

1520 }

1521 }

1522

1523// Condition #2: worker queues should be empty

1524// Note: We need to use index-based looping to avoid data race with _spawn

1525// which initializes other worker data structure at the same time.

1526// Also, due to the property of a work-stealing queue, we don't need to check

1527// this worker's work-stealing queue.

1528for(size_t k=0; k<_workers.size()-1; ++k) {

1529if(size_t vtm = k + (k >= w._id); !_workers[vtm]._wsq.empty()) {

1530 _notifier.cancel_wait(w._id);

1531 w._sticky_victim = vtm;

1532goto explore_task;

1533 }

1534 }

1535

1536// Condition #3: worker should be alive

1537if(w._done.test(std::memory_order_relaxed)) {

1538 _notifier.cancel_wait(w._id);

1539return false;

1540 }

1541

1542// Now I really need to relinquish myself to others.

1543 _notifier.commit_wait(w._id);

1544goto explore_task;

1545}

1546

1547// Function: make_observer

1548template<typename Observer, typename... ArgsT>

1549std::shared_ptr<Observer> Executor::make_observer(ArgsT&&... args) {

1550

1551static_assert(

1552 std::is_base_of_v<ObserverInterface, Observer>,

1553"Observer must be derived from ObserverInterface"

1554 );

1555

1556// use a local variable to mimic the constructor

1557auto ptr = std::make_shared<Observer>(std::forward<ArgsT>(args)...);

1558

1559 ptr->set_up(_workers.size());

1560

1561 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));

1562

1563return ptr;

1564}

1565

1566// Procedure: remove_observer

1567template <typename Observer>

1568void Executor::remove_observer(std::shared_ptr<Observer> ptr) {

1569

1570static_assert(

1571 std::is_base_of_v<ObserverInterface, Observer>,

1572"Observer must be derived from ObserverInterface"

1573 );

1574

1575 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));

1576}

1577

1578// Function: num_observers

1579inline size_t Executor::num_observers() const noexcept {

1580return _observers.size();

1581}

1582

1583// Procedure: _spill

1584inline void Executor::_spill(Node* item) {

1585// Since pointers are aligned to 8 bytes, we perform a simple hash to avoid

1586// contention caused by hashing to the same slot.

1587auto b = (reinterpret_cast<uintptr_t>(item) >> 16) % _buffers.size();

1588 std::scoped_lock lock(_buffers[b].mutex);

1589 _buffers[b].queue.push(item);

1590}

1591

1592// Procedure: _bulk_spill (single batch to one buffer)

1593// Uses Knuth multiplicative hash on the first pointer to select a buffer,

1594// providing better bit diffusion than the shift-based approach, especially

1595// when the allocator returns pointers with regular low-bit patterns.

1596template <typename I>

1597void Executor::_bulk_spill(I first, size_t N) {

1598//assert(N != 0);

1599auto b = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % _buffers.size();

1600 std::scoped_lock lock(_buffers[b].mutex);

1601 _buffers[b].queue.bulk_push(first, N);

1602}

1603

1604// Procedure: _bulk_spill

1605// Distributes a batch of N spilled nodes across all buffers in round-robin

1606// order starting from a hash of the first node's pointer. Each buffer's lock

1607// is held only for its chunk, reducing contention compared to sending the

1608// entire batch to a single buffer.

1609template <typename I>

1610void Executor::_bulk_spill_round_robin(I first, size_t N) {

1611

1612// assert(N != 0);

1613const size_t B = _buffers.size();

1614const size_t start = ((reinterpret_cast<uintptr_t>(*first) * 2654435761ULL) >> 32) % B;

1615const size_t per_buf = (N + B - 1) / B;

1616size_t remaining = N;

1617for(size_t i = 0; i < B && remaining > 0; ++i) {

1618size_t b = (start + i) % B;

1619size_t chunk = std::min(per_buf, remaining);

1620 {

1621 std::scoped_lock lock(_buffers[b].mutex);

1622 _buffers[b].queue.bulk_push(first, chunk);

1623 }

1624// terminates early via remaining > 0, so we don't acquire unnecessary locks on empty chunks.

1625 remaining -= chunk;

1626 }

1627}

1628

1629// Procedure: _schedule

1630inline void Executor::_schedule(Worker& worker, Node* node) {

1631// starting at v3.5 we do not use any complicated notification mechanism

1632// as the experimental result has shown no significant advantage.

1633if(worker._wsq.try_push(node) == false) {

1634 _spill(node);

1635 }

1636 _notifier.notify_one();

1637}

1638

1639// Procedure: _schedule

1640inline void Executor::_schedule(Node* node) {

1641 _spill(node);

1642 _notifier.notify_one();

1643}

1644

1645// Procedure: _schedule

1646template <typename I>

1647void Executor::_bulk_schedule(Worker& worker, I first, size_t num_nodes) {

1648

1649if(num_nodes == 0) {

1650return;

1651 }

1652

1653// NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).

1654// This is because when a node v is inserted into the queue, v can run and finish

1655// immediately. If v is the last node in the graph, it will tear down the parent task vector

1656// which cause the last ++first to fail. This problem is specific to MSVC which has a stricter

1657// iterator implementation in std::vector than GCC/Clang.

1658if(auto n = worker._wsq.try_bulk_push(first, num_nodes); n != num_nodes) {

1659 _bulk_spill(first, num_nodes - n);

1660 }

1661 _notifier.notify_n(num_nodes);

1662

1663// notify first before spilling to hopefully wake up workers earlier

1664// however, the experiment does not show any benefit for doing this.

1665//auto n = worker._wsq.try_bulk_push(first, num_nodes);

1666//_notifier.notify_n(n);

1667//_bulk_schedule(first + n, num_nodes - n);

1668}

1669

1670// Procedure: _schedule

1671template <typename I>

1672inline void Executor::_bulk_schedule(I first, size_t num_nodes) {

1673

1674if(num_nodes == 0) {

1675return;

1676 }

1677

1678// NOTE: We cannot use first/last in the for-loop (e.g., for(; first != last; ++first)).

1679// This is because when a node v is inserted into the queue, v can run and finish

1680// immediately. If v is the last node in the graph, it will tear down the parent task vector

1681// which cause the last ++first to fail. This problem is specific to MSVC which has a stricter

1682// iterator implementation in std::vector than GCC/Clang.

1683 _bulk_spill(first, num_nodes);

1684 _notifier.notify_n(num_nodes);

1685}

1686

1687// Function: _update_cache

1688TF_FORCE_INLINE void Executor::_update_cache(Worker& worker, Node*& cache, Node* node) {

1689if(cache) {

1690 _schedule(worker, cache);

1691 }

1692 cache = node;

1693}

1694

1695// Function: _bulk_update_cache

1696template <size_t N>

1697TF_FORCE_INLINE void Executor::_bulk_update_cache(

1698Worker& worker, Node*& cache, Node* node, std::array<Node*, N>& array, size_t& n

  1. {

1700// experimental results show no benefit of using bulk_update_cache

1701if(cache) {

1702 array[n++] = cache;

1703if(n == N) {

1704 _bulk_schedule(worker, array, n);

1705 n = 0;

1706 }

1707 }

1708 cache = node;

1709}

1710

1711// Procedure: _invoke

1712inline void Executor::_invoke(Worker& worker, Node* node) {

1713

1714 #define TF_INVOKE_CONTINUATION() \

1715 if (cache) { \

1716 node = cache; \

1717 goto begin_invoke; \

1718 }

1719

1720 begin_invoke:

1721

1722 Node* cache {nullptr};

1723

1724// if this is the second invoke due to preemption, directly jump to invoke task

1725if(node->_nstate & NSTATE::PREEMPTED) {

1726goto invoke_task;

1727 }

1728

1729// If the work has been cancelled, there is no need to continue.

1730// Here, we do tear_down_invoke since async tasks may also get cancelled where

1731// we need to recycle the node.

1732if(node->_is_parent_cancelled()) {

1733 _tear_down_invoke(worker, node, cache);

1734 TF_INVOKE_CONTINUATION();

1735return;

1736 }

1737

1738// if acquiring semaphore(s) exists, acquire them first

1739if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {

1740 SmallVector<Node*> waiters;

1741if(!node->_acquire_all(waiters)) {

1742 _bulk_schedule(worker, waiters.begin(), waiters.size());

1743return;

1744 }

1745 }

1746

1747 invoke_task:

1748

1749 SmallVector<int> conds;

1750

1751// switch is faster than nested if-else due to jump table

1752switch(node->_handle.index()) {

1753// static task

1754case Node::STATIC:{

1755 _invoke_static_task(worker, node);

1756 }

1757break;

1758

1759// runtime task

1760case Node::RUNTIME:{

1761if(_invoke_runtime_task(worker, node)) {

1762return;

1763 }

1764 }

1765break;

1766

1767// non-preemptive runtime task

1768case Node::NONPREEMPTIVE_RUNTIME:{

1769 _invoke_nonpreemptive_runtime_task(worker, node);

1770 }

1771break;

1772

1773// subflow task

1774case Node::SUBFLOW: {

1775if(_invoke_subflow_task(worker, node)) {

1776return;

1777 }

1778 }

1779break;

1780

1781// condition task

1782case Node::CONDITION: {

1783 _invoke_condition_task(worker, node, conds);

1784 }

1785break;

1786

1787// multi-condition task

1788case Node::MULTI_CONDITION: {

1789 _invoke_multi_condition_task(worker, node, conds);

1790 }

1791break;

1792

1793// module task

1794case Node::MODULE: {

1795if(_invoke_module_task(worker, node)) {

1796return;

1797 }

1798 }

1799break;

1800

1801// adopted module task

1802case Node::ADOPTED_MODULE: {

1803if(_invoke_adopted_module_task(worker, node)) {

1804return;

1805 }

1806 }

1807break;

1808

1809// async task

1810case Node::ASYNC: {

1811if(_invoke_async_task(worker, node)) {

1812return;

1813 }

1814 _tear_down_async(worker, node, cache);

1815 TF_INVOKE_CONTINUATION();

1816return;

1817 }

1818break;

1819

1820// dependent async task

1821case Node::DEPENDENT_ASYNC: {

1822if(_invoke_dependent_async_task(worker, node)) {

1823return;

1824 }

1825 _tear_down_dependent_async(worker, node, cache);

1826 TF_INVOKE_CONTINUATION();

1827return;

1828 }

1829break;

1830

1831// monostate (placeholder)

1832default:

1833break;

1834 }

1835

1836// if releasing semaphores exist, release them

1837if(node->_semaphores && !node->_semaphores->to_release.empty()) {

1838 SmallVector<Node*> waiters;

1839 node->_release_all(waiters);

1840 _bulk_schedule(worker, waiters.begin(), waiters.size());

1841 }

1842

1843// Reset the join counter with strong dependencies to support cycles.

1844// + We must do this before scheduling the successors to avoid race

1845// condition on _predecessors.

1846// + We must use fetch_add instead of direct assigning

1847// because the user-level call on "invoke" may explicitly schedule

1848// this task again (e.g., pipeline) which can access the join_counter.

1849 node->_join_counter.fetch_add(

1850 node->_nstate & NSTATE::STRONG_DEPENDENCIES_MASK, std::memory_order_relaxed

1851 );

1852

1853// Invoke the task based on the corresponding type

1854switch(node->_handle.index()) {

1855

1856// condition and multi-condition tasks

1857case Node::CONDITION:

1858case Node::MULTI_CONDITION: {

1859for(auto cond : conds) {

1860if(cond >= 0 && static_cast<size_t>(cond) < node->_num_successors) {

1861auto s = node->_edges[cond];

1862// zeroing the join counter for invariant

1863 s->_join_counter.store(0, std::memory_order_relaxed);

1864 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);

1865 _update_cache(worker, cache, s);

1866 }

1867 }

1868 }

1869break;

1870

1871// non-condition task

1872default: {

1873for(size_t i=0; i<node->_num_successors; ++i) {

1874if(auto s = node->_edges[i]; s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

1875 node->_parent->_join_counter.fetch_add(1, std::memory_order_relaxed);

1876 _update_cache(worker, cache, s);

1877 }

1878 }

1879 }

1880break;

1881 }

1882

1883// clean up the node after execution

1884 _tear_down_nonasync(worker, node, cache);

1885 TF_INVOKE_CONTINUATION();

1886}

1887

1888// Procedure: _tear_down_nonasync

1889inline void Executor::_tear_down_nonasync(Worker& worker, Node* node, Node*& cache) {

1890

1891// we must check parent first before subtracting the join counter,

1892// or it can introduce data race

1893if(auto parent = node->_parent; parent == node->_topology) {

1894if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

1895 _tear_down_topology(worker, node->_topology, cache);

1896 }

1897 }

1898else {

1899// needs to fetch every data before join counter becomes zero at which

1900// the node may be deleted

1901auto state = parent->_nstate;

1902if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

1903// this task is spawned from a preempted parent, so we need to resume it

1904if(state & NSTATE::PREEMPTED) {

1905 _update_cache(worker, cache, static_cast<Node*>(parent));

1906 }

1907 }

1908 }

1909}

1910

1911// Procedure: _tear_down_invoke

1912inline void Executor::_tear_down_invoke(Worker& worker, Node* node, Node*& cache) {

1913switch(node->_handle.index()) {

1914case Node::ASYNC:

1915 _tear_down_async(worker, node, cache);

1916break;

1917

1918case Node::DEPENDENT_ASYNC:

1919 _tear_down_dependent_async(worker, node, cache);

1920break;

1921

1922default:

1923 _tear_down_nonasync(worker, node, cache);

1924break;

1925 }

1926}

1927

1928// Procedure: _observer_prologue

1929inline void Executor::_observer_prologue(Worker& worker, Node* node) {

1930for(auto& observer : _observers) {

1931 observer->on_entry(WorkerView(worker), TaskView(*node));

1932 }

1933}

1934

1935// Procedure: _observer_epilogue

1936inline void Executor::_observer_epilogue(Worker& worker, Node* node) {

1937for(auto& observer : _observers) {

1938 observer->on_exit(WorkerView(worker), TaskView(*node));

1939 }

1940}

1941

1942// Procedure: _process_exception

1943inline void Executor::_process_exception(Worker&, Node* node) {

1944

1945// Finds the anchor and mark the entire path with exception,

1946// so recursive tasks can be cancelled properly.

1947// Since exception can come from asynchronous task (with runtime), the node itself can be anchored.

1948 NodeBase* ea = node; // explicit anchor

1949 NodeBase* ia = nullptr; // implicit anchor

1950

1951while(ea && (ea->_estate.load(std::memory_order_relaxed) & ESTATE::EXPLICITLY_ANCHORED) == 0) {

1952 ea->_estate.fetch_or(ESTATE::EXCEPTION, std::memory_order_relaxed);

1953// we only want the inner-most implicit anchor

1954if(ia == nullptr && (ea->_nstate & NSTATE::IMPLICITLY_ANCHORED)) {

1955 ia = ea;

1956 }

1957 ea = ea->_parent;

1958 }

1959

1960// flag used to ensure execution is caught in a thread-safe manner

1961constexpr static auto flag = ESTATE::EXCEPTION | ESTATE::CAUGHT;

1962

1963// The exception occurs under a blocking call (e.g., corun, join).

1964if(ea) {

1965// multiple tasks may throw, and we only take the first thrown exception

1966if((ea->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {

1967 ea->_exception_ptr = std::current_exception();

1968return;

1969 }

1970 }

1971// Implicit anchor has the lowest priority

1972else if(ia){

1973if((ia->_estate.fetch_or(flag, std::memory_order_relaxed) & ESTATE::CAUGHT) == 0) {

1974 ia->_exception_ptr = std::current_exception();

1975return;

1976 }

1977 }

1978

1979// For now, we simply store the exception in this node; this can happen in an

1980// execution that does not have any external control to capture the exception,

1981// such as silent async task without any parent.

1982 node->_exception_ptr = std::current_exception();

1983}

1984

1985// Procedure: _invoke_static_task

1986inline void Executor::_invoke_static_task(Worker& worker, Node* node) {

1987 _observer_prologue(worker, node);

1988 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

1989 std::get_if<Node::Static>(&node->_handle)->work();

1990 });

1991 _observer_epilogue(worker, node);

1992}

1993

1994// Procedure: _invoke_subflow_task

1995inline bool Executor::_invoke_subflow_task(Worker& worker, Node* node) {

1996

1997auto& h = *std::get_if<Node::Subflow>(&node->_handle);

1998auto& g = h.subgraph;

1999

2000if((node->_nstate & NSTATE::PREEMPTED) == 0) {

2001

2002// set up the subflow

2003 Subflow sf(*this, worker, node, g);

2004

2005// invoke the subflow callable

2006 _observer_prologue(worker, node);

2007 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

2008 h.work(sf);

2009 });

2010 _observer_epilogue(worker, node);

2011

2012// spawn the subflow if it is joinable and its graph is non-empty

2013// implicit join is faster than Subflow::join as it does not involve corun

2014if(sf.joinable() && !g.empty()) {

2015

2016// signal the executor to preempt this node

2017 node->_nstate |= NSTATE::PREEMPTED;

2018

2019// set up and schedule the graph

2020 _schedule_graph(worker, g, node->_topology, node);

2021return true;

2022 }

2023 }

2024else {

2025 node->_nstate &= ~NSTATE::PREEMPTED;

2026 }

2027

2028// The subflow has finished or joined.

2029// By default, we clear the subflow storage as applications can perform recursive

2030// subflow tasking which accumulates a huge amount of memory overhead, hampering

2031// the performance.

2032if((node->_nstate & NSTATE::RETAIN_SUBFLOW) == 0) {

2033 g.clear();

2034 }

2035

2036return false;

2037}

2038

2039// Procedure: _invoke_condition_task

2040inline void Executor::_invoke_condition_task(

2041Worker& worker, Node* node, SmallVector<int>& conds

  1. {

2043 _observer_prologue(worker, node);

2044 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

2045auto& work = std::get_if<Node::Condition>(&node->_handle)->work;

2046 conds = { work() };

2047 });

2048 _observer_epilogue(worker, node);

2049}

2050

2051// Procedure: _invoke_multi_condition_task

2052inline void Executor::_invoke_multi_condition_task(

2053Worker& worker, Node* node, SmallVector<int>& conds

  1. {

2055 _observer_prologue(worker, node);

2056 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

2057 conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();

2058 });

2059 _observer_epilogue(worker, node);

2060}

2061

2062// Procedure: _invoke_module_task

2063inline bool Executor::_invoke_module_task(Worker& w, Node* node) {

2064return _invoke_module_task_impl(w, node, std::get_if<Node::Module>(&node->_handle)->graph);

2065}

2066

2067// Procedure: _invoke_adopted_module_task

2068inline bool Executor::_invoke_adopted_module_task(Worker& w, Node* node) {

2069return _invoke_module_task_impl(w, node, std::get_if<Node::AdoptedModule>(&node->_handle)->graph);

2070}

2071

2072// Procedure: _invoke_module_task_impl

2073inline bool Executor::_invoke_module_task_impl(Worker& w, Node* node, Graph& graph) {

2074

2075// No need to do anything for empty graph

2076if(graph.empty()) {

2077return false;

2078 }

2079

2080// first entry - not spawned yet

2081if((node->_nstate & NSTATE::PREEMPTED) == 0) {

2082// signal the executor to preempt this node

2083 node->_nstate |= NSTATE::PREEMPTED;

2084 _schedule_graph(w, graph, node->_topology, node);

2085return true;

2086 }

2087

2088// second entry - already spawned

2089 node->_nstate &= ~NSTATE::PREEMPTED;

2090

2091return false;

2092}

2093

2094

2095// Procedure: _invoke_async_task

2096inline bool Executor::_invoke_async_task(Worker& worker, Node* node) {

2097auto& work = std::get_if<Node::Async>(&node->_handle)->work;

2098switch(work.index()) {

2099// void()

2100case 0:

2101 _observer_prologue(worker, node);

2102 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

2103 std::get_if<0>(&work)->operator()();

2104 });

2105 _observer_epilogue(worker, node);

2106break;

2107

2108// void(Runtime&)

2109case 1:

2110if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {

2111return true;

2112 }

2113break;

2114

2115// void(Runtime&, bool)

2116case 2:

2117if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {

2118return true;

2119 }

2120break;

2121 }

2122

2123return false;

2124}

2125

2126// Procedure: _invoke_dependent_async_task

2127inline bool Executor::_invoke_dependent_async_task(Worker& worker, Node* node) {

2128auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;

2129switch(work.index()) {

2130// void()

2131case 0:

2132 _observer_prologue(worker, node);

2133 TF_EXECUTOR_EXCEPTION_HANDLER(worker, node, {

2134 std::get_if<0>(&work)->operator()();

2135 });

2136 _observer_epilogue(worker, node);

2137break;

2138

2139// void(Runtime&) - silent async

2140case 1:

2141if(_invoke_runtime_task_impl(worker, node, *std::get_if<1>(&work))) {

2142return true;

2143 }

2144break;

2145

2146// void(Runtime&, bool) - async

2147case 2:

2148if(_invoke_runtime_task_impl(worker, node, *std::get_if<2>(&work))) {

2149return true;

2150 }

2151break;

2152 }

2153return false;

2154}

2155

2156// Function: run

2157inline tf::Future<void> Executor::run(Taskflow& f) {

2158return run_n(f, 1, {});

2159}

2160

2161// Function: run

2162inline tf::Future<void> Executor::run(Taskflow&& f) {

2163return run_n(std::move(f), 1, {});

2164}

2165

2166// Function: run

2167template <typename C>

2168tf::Future<void> Executor::run(Taskflow& f, C&& c) {

2169return run_n(f, 1, std::forward<C>(c));

2170}

2171

2172// Function: run

2173template <typename C>

2174tf::Future<void> Executor::run(Taskflow&& f, C&& c) {

2175return run_n(std::move(f), 1, std::forward<C>(c));

2176}

2177

2178// Function: run_n

2179inline tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat) {

2180return run_n(f, repeat, {});

2181}

2182

2183// Function: run_n

2184inline tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat) {

2185return run_n(std::move(f), repeat, {});

2186}

2187

2188// Function: run_n

2189template <typename C>

2190tf::Future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {

2191return run_until(

2192 f, repeat mutable { return repeat-- == 0; }, std::forward<C>(c)

2193 );

2194}

2195

2196// Function: run_n

2197template <typename C>

2198tf::Future<void> Executor::run_n(Taskflow&& f, size_t repeat, C&& c) {

2199return run_until(

2200 std::move(f), repeat mutable { return repeat-- == 0; }, std::forward<C>(c)

2201 );

2202}

2203

2204// Function: run_until

2205template<typename P>

2206tf::Future<void> Executor::run_until(Taskflow& f, P&& pred) {

2207return run_until(f, std::forward<P>(pred), {});

2208}

2209

2210// Function: run_until

2211template<typename P>

2212tf::Future<void> Executor::run_until(Taskflow&& f, P&& pred) {

2213return run_until(std::move(f), std::forward<P>(pred), {});

2214}

2215

2216// Function: run_until

2217template <typename P, typename C>

2218tf::Future<void> Executor::run_until(Taskflow& f, P&& p, C&& c) {

2219

2220// No need to create a real topology but returns an dummy future for invariant.

2221if(f.empty() || p()) {

2222 c();

2223 std::promise<void> promise;

2224 promise.set_value();

2225return tf::Future<void>(promise.get_future());

2226 }

2227

2228 _increment_topology();

2229

2230// create a topology for this run

2231auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));

2232//auto t = std::make_shared<DerivedTopology<P, C>>(f, std::forward<P>(p), std::forward<C>(c));

2233

2234// need to create future before the topology got torn down quickly

2235 tf::Future<void> future(t->_promise.get_future(), t);

2236

2237// modifying topology needs to be protected under the lock

2238if(f._fetch_enqueue(t) == 0) {

2239 _set_up_topology(this_worker(), t.get());

2240 }

2241

2242return future;

2243}

2244

2245

2246

2247// Function: corun_until

2248template <typename P>

2249void Executor::corun_until(P&& predicate) {

2250

2251 Worker* w = this_worker();

2252if(w == nullptr) {

2253 TF_THROW("corun_until must be called by a worker of the executor");

2254 }

2255

2256 _corun_until(*w, std::forward<P>(predicate));

2257}

2258

2259/*

2260// Function: _corun_until

2261template <typename P>

2262void Executor::_corun_until(Worker& w, P&& stop_predicate) {

2263

2264 const size_t MAX_VICTIM = num_queues();

2265 const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);

2266

2267 bool stop = false;

2268

2269 while(!stop && !(stop = stop_predicate())) {

2270

2271 // try local queue first — only one task at a time to avoid deep

2272 // recursive corun calls causing stack overflow

2273 if(auto t = w._wsq.pop(); t) {

2274 _invoke(w, t);

2275 continue;

2276 }

2277

2278 // local queue empty: steal from others until stop_predicate or stolen.

2279 // stop is set by the inner loop condition so when predicate becomes true

2280 // the outer loop exits immediately without calling stop_predicate again.

2281 size_t num_steals = 0;

2282 size_t vtm = w._sticky_victim;

2283

2284 while(!(stop = stop_predicate())) {

2285

2286 auto t = (vtm < _workers.size())

2287 ? _workers[vtm]._wsq.steal()

2288 : _buffers[vtm - _workers.size()].queue.steal();

2289

2290 if(t) {

2291 // STOLEN: invoke task then return to outer loop to re-check

2292 // local queue and stop_predicate

2293 _invoke(w, t);

2294 w._sticky_victim = vtm;

2295 break;

2296 }

2297

2298 // pick a new victim excluding self

2299 vtm = w._rdgen() % (MAX_VICTIM - 1);

2300 if(vtm >= w._id) vtm++;

2301

2302 if(++num_steals > MAX_STEALS) {

2303 // unlike _explore_task we cannot sleep here — the calling worker

2304 // is blocked inside a task and must keep making progress to avoid

2305 // deadlock. yield to let other threads run and make progress.

2306 std::this_thread::yield();

2307 }

2308 }

2309 }

2310}*/

2311

2312// Function: _corun_until

2313template <typename P>

2314void Executor::_corun_until(Worker& w, P&& stop_predicate) {

2315

2316const size_t MAX_VICTIM = num_queues();

2317const size_t MAX_STEALS = ((MAX_VICTIM + 1) << 1);

2318

2319 exploit:

2320

2321while(!stop_predicate()) {

2322

2323// here we don't do while-loop to drain out the local queue as it can

2324// potentially enter a very deep recursive corun, cuasing stack overflow

2325if(auto t = w._wsq.pop(); t) {

2326 _invoke(w, t);

2327 }

2328else {

2329size_t num_steals = 0;

2330size_t vtm = w._sticky_victim;

2331

2332 explore:

2333

2334 t = (vtm < _workers.size())

2335 ? _workers[vtm]._wsq.steal()

2336 : _buffers[vtm-_workers.size()].queue.steal();

2337

2338if(t) {

2339 _invoke(w, t);

2340 w._sticky_victim = vtm;

2341goto exploit;

2342 }

2343else if(!stop_predicate()) {

2344if(++num_steals > MAX_STEALS) {

2345 std::this_thread::yield();

2346 }

2347 vtm = w._rdgen() % MAX_VICTIM;

2348goto explore;

2349 }

2350else {

2351break;

2352 }

2353 }

2354 }

2355}

2356

2357// Function: corun

2358template <typename T>

2359void Executor::corun(T& target) {

2360

2361 Worker* w = this_worker();

2362if(w == nullptr) {

2363 TF_THROW("corun must be called by a worker of the executor");

2364 }

2365

2366 NodeBase anchor;

2367 _corun_graph(*w, retrieve_graph(target), nullptr, &anchor);

2368}

2369

2370// Procedure: _corun_graph

2371inline void Executor::_corun_graph(Worker& w, Graph& g, Topology* tpg, NodeBase* p) {

2372

2373// empty graph

2374if(g.empty()) {

2375return;

2376 }

2377

2378// anchor this parent as the blocking point

2379 {

2380 ExplicitAnchorGuard anchor(p);

2381 _schedule_graph(w, g, tpg, p);

2382 _corun_until(w, [p] () -> bool {

2383return p->_join_counter.load(std::memory_order_acquire) == 0; }

2384 );

2385 }

2386

2387// rethrow the exception to the caller

2388 p->_rethrow_exception();

2389}

2390

2391// Procedure: _increment_topology

2392inline void Executor::_increment_topology() {

2393 _num_topologies.fetch_add(1, std::memory_order_relaxed);

2394}

2395

2396// Procedure: _decrement_topology

2397inline void Executor::_decrement_topology() {

2398if(_num_topologies.fetch_sub(1, std::memory_order_acq_rel) == 1) {

2399 _num_topologies.notify_all();

2400 }

2401}

2402

2403// Procedure: wait_for_all

2404inline void Executor::wait_for_all() {

2405size_t n = _num_topologies.load(std::memory_order_acquire);

2406while(n != 0) {

2407 _num_topologies.wait(n, std::memory_order_acquire);

2408 n = _num_topologies.load(std::memory_order_acquire);

2409 }

2410}

2411

2412// Function: _schedule_graph

2413inline void Executor::_schedule_graph(

2414Worker& worker, Graph& graph, Topology* tpg, NodeBase* parent

  1. {

2416size_t num_srcs = _set_up_graph(graph, tpg, parent);

2417 parent->_join_counter.fetch_add(num_srcs, std::memory_order_relaxed);

2418 _bulk_schedule(worker, graph.begin(), num_srcs);

2419}

2420

2421// Function: _set_up_topology

2422inline void Executor::_set_up_topology(Worker* w, Topology* tpg) {

2423// ---- under taskflow lock ----

2424auto& g = tpg->_taskflow._graph;

2425size_t num_srcs = _set_up_graph(g, tpg, tpg);

2426 tpg->_join_counter.store(num_srcs, std::memory_order_relaxed);

2427 w ? _bulk_schedule(*w, g.begin(), num_srcs) : _bulk_schedule(g.begin(), num_srcs);

2428}

2429

2430// Function: _set_up_graph

2431inline size_t Executor::_set_up_graph(Graph& graph, Topology* tpg, NodeBase* parent) {

2432

2433auto first = graph.begin();

2434auto last = graph.end();

2435auto send = first;

2436for(; first != last; ++first) {

2437

2438auto node = *first;

2439 node->_topology = tpg;

2440 node->_parent = parent;

2441 node->_nstate = NSTATE::NONE;

2442 node->_estate.store(ESTATE::NONE, std::memory_order_relaxed);

2443 node->_set_up_join_counter();

2444 node->_exception_ptr = nullptr;

2445

2446// move source to the first partition

2447// root, root, root, v1, v2, v3, v4, ...

2448if(node->num_predecessors() == 0) {

2449 std::iter_swap(send++, first);

2450 }

2451 }

2452return send - graph.begin();

2453}

2454

2455// Function: _tear_down_topology

2456inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg, Node*& cache) {

2457

2458auto &f = tpg->_taskflow;

2459

2460//assert(&tpg == &(f._topologies.front()));

2461

2462// case 1: we still need to run the topology again

2463//if(!tpg->_exception_ptr && !tpg->cancelled() && !tpg->predicate()) {

2464if(!tpg->cancelled() && !tpg->_predicate()) {

2465//assert(tpg->_join_counter == 0);

2466//std::lock_guard<std::mutex> lock(f._mutex);

2467 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);

2468 }

2469// case 2: the final run of this topology

2470else {

2471

2472// invoke the callback after each run

2473 tpg->_on_finish();

2474

2475// there is another topologies to run

2476if(std::unique_lock<std::mutex> lock(f._mutex); f._topologies.size()>1) {

2477

2478auto fetched_tpg {std::move(f._topologies.front())};

2479//assert(fetched_tpg.get() == tpg);

2480

2481 f._topologies.pop();

2482 tpg = f._topologies.front().get();

2483

2484 lock.unlock();

2485

2486// Soon after we carry out the promise, the associate taskflow may got destroyed

2487// from the user side, and we should never tough it again.

2488 fetched_tpg->_carry_out_promise();

2489

2490// decrement the topology

2491 _decrement_topology();

2492

2493 _schedule_graph(worker, tpg->_taskflow._graph, tpg, tpg);

2494 }

2495else {

2496//assert(f._topologies.size() == 1);

2497

2498auto fetched_tpg {std::move(f._topologies.front())};

2499//assert(fetched_tpg.get() == tpg);

2500

2501 f._topologies.pop();

2502

2503 lock.unlock();

2504

2505// Soon after we carry out the promise, the associate taskflow may got destroyed

2506// from the user side, and we should never tough it again.

2507 fetched_tpg->_carry_out_promise();

2508

2509 _decrement_topology();

2510

2511// remove the parent that owns the moved taskflow so the storage can be freed

2512if(auto parent = fetched_tpg->_parent; parent) {

2513//auto state = parent->_nstate;

2514if(parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {

2515// this async is spawned from a preempted parent, so we need to resume it

2516//if(state & NSTATE::PREEMPTED) {

2517 _update_cache(worker, cache, static_cast<Node*>(parent));

2518//}

2519 }

2520 }

2521 }

2522 }

2523}

2524

2525// ############################################################################

2526// Forward Declaration: Subflow

2527// ############################################################################

2528

2529inline void Subflow::join() {

2530

2531if(! joinable()) {

2532 TF_THROW("subflow already joined");

2533 }

2534

2535 _executor._corun_graph(_worker, _graph, _node->_topology, _node);

2536

2537// join here since corun graph may throw exception

2538 _node->_nstate |= NSTATE::JOINED_SUBFLOW;

2539}

2540

2541#endif

2542

2543

2544

2545

2546} // end of namespace tf -----------------------------------------------------

tf::AsyncTask

class to hold a dependent asynchronous task with shared ownership

Definition async_task.hpp:45

tf::Executor::silent_async

void silent_async(P &&params, F &&func)

similar to tf::Executor::async but does not return a future object

tf::Executor::silent_dependent_async

tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)

runs the given function asynchronously when the given predecessors finish

tf::Executor::run_until

tf::Future< void > run_until(Taskflow &taskflow, P &&pred)

runs a taskflow multiple times until the predicate becomes true

tf::Executor::corun_until

void corun_until(P &&predicate)

keeps running the work-stealing loop until the predicate returns true

tf::Executor::remove_observer

void remove_observer(std::shared_ptr< Observer > observer)

removes an observer from the executor

tf::Executor::dependent_async

auto dependent_async(F &&func, Tasks &&... tasks)

runs the given function asynchronously when the given predecessors finish

tf::Executor::run

tf::Future< void > run(Taskflow &&taskflow)

runs a moved taskflow once

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow)

runs a taskflow once

tf::Executor::num_waiters

size_t num_waiters() const noexcept

queries the number of workers that are in the waiting loop

tf::Executor::run

tf::Future< void > run(Taskflow &&taskflow, C &&callable)

runs a moved taskflow once and invoke a callback upon completion

tf::Executor::~Executor

~Executor()

destructs the executor

tf::Executor::this_worker_id

int this_worker_id() const

queries the id of the caller thread within this executor

tf::Executor::num_queues

size_t num_queues() const noexcept

queries the number of work-stealing queues used by the executor

tf::Executor::run_n

tf::Future< void > run_n(Taskflow &taskflow, size_t N)

runs a taskflow for N times

tf::Executor::num_topologies

size_t num_topologies() const

queries the number of running topologies at the time of this call

tf::Executor::Executor

Executor(size_t N=std::thread::hardware_concurrency(), std::shared_ptr< WorkerInterface > wif=nullptr)

constructs the executor with N worker threads

tf::Executor::task_group

TaskGroup task_group()

creates a task group that executes a collection of asynchronous tasks

Definition task_group.hpp:875

tf::Executor::corun

void corun(T &target)

runs a target graph and waits until it completes using an internal worker of this executor

tf::Executor::num_workers

size_t num_workers() const noexcept

queries the number of worker threads

tf::Executor::run_until

tf::Future< void > run_until(Taskflow &&taskflow, P &&pred)

runs a moved taskflow and keeps running it until the predicate becomes true

tf::Executor::wait_for_all

void wait_for_all()

waits for all tasks to complete

tf::Executor::run_n

tf::Future< void > run_n(Taskflow &taskflow, size_t N, C &&callable)

runs a taskflow for N times and then invokes a callback

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow, C &&callable)

runs a taskflow once and invoke a callback upon completion

tf::Executor::run_n

tf::Future< void > run_n(Taskflow &&taskflow, size_t N)

runs a moved taskflow for N times

tf::Executor::run_n

tf::Future< void > run_n(Taskflow &&taskflow, size_t N, C &&callable)

runs a moved taskflow for N times and then invokes a callback

tf::Executor::run_until

tf::Future< void > run_until(Taskflow &taskflow, P &&pred, C &&callable)

runs a taskflow multiple times until the predicate becomes true and then invokes the callback

tf::Executor::this_worker

Worker * this_worker()

queries pointer to the calling worker if it belongs to this executor, otherwise returns nullptr

tf::Executor::async

auto async(P &&params, F &&func)

creates a parameterized asynchronous task to run the given function

tf::Executor::make_observer

std::shared_ptr< Observer > make_observer(ArgsT &&... args)

constructs an observer to inspect the activities of worker threads

tf::Executor::num_observers

size_t num_observers() const noexcept

queries the number of observers

tf::Future

class to access the result of an execution

Definition taskflow.hpp:630

tf::Graph

class to create a graph object

Definition graph.hpp:47

tf::SmallVector

class to define a vector optimized for small array

Definition small_vector.hpp:931

tf::Subflow::join

void join()

enables the subflow to join its parent task

tf::Subflow::joinable

bool joinable() const noexcept

queries if the subflow is joinable

Definition flow_builder.hpp:1834

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

tf::UnboundedWSQ

class to create a lock-free unbounded work-stealing queue

Definition wsq.hpp:105

tf::Worker

class to create a worker in an executor

Definition worker.hpp:55

tf::TaskParamsLike

determines if a type is a task parameter type

Definition graph.hpp:202

tf

taskflow namespace

Definition small_vector.hpp:20

tf::DefaultNotifier

NonblockingNotifier DefaultNotifier

the default notifier type used by Taskflow

Definition worker.hpp:38

tf::retrieve_graph

Graph & retrieve_graph(T &target)

retrieves a reference to the underlying tf::Graph from an object

Definition graph.hpp:1067

tf::has_env

bool has_env(const std::string &str)

checks whether an environment variable is defined

Definition os.hpp:310