docs/ExecuteTaskflow.html
After you create a task dependency graph, you need to submit it to threads for execution. In this chapter, we will show you how to execute a task dependency graph.
To execute a taskflow, you need to create an executor of type tf::Executor. An executor is a thread-safe object that manages a set of worker threads and executes tasks through an efficient work-stealing algorithm. Issuing a call to run a taskflow creates a topology, a data structure to keep track of the execution status of a running graph. tf::Executor takes an unsigned integer to construct with N worker threads. The default value is std::thread::hardware_concurrency.
tf::Executor executor1;// create an executor with the number of workers// equal to std::thread::hardware\_concurrencytf::Executor executor2(4);// create an executor of 4 worker threads
Taskflow designs a highly efficient work-stealing algorithm to schedule and run tasks in an executor. Work-stealing is a dynamic scheduling algorithm widely used in parallel computing to distribute and balance workload among multiple threads or cores. Specifically, within an executor, each worker maintains its own local queue of tasks. When a worker finishes its own tasks, instead of becoming idle or going sleep, it (thief) tries to steal a task from the queue another worker (victim). The figure below illustrates the idea of work-stealing:
The key advantage of work-stealing lies in its decentralized nature and efficiency. Most of the time, worker threads work on their local queues without contention. Stealing only occurs when a worker becomes idle, minimizing overhead associated with synchronization and task distribution. This decentralized strategy effectively balances the workload, ensuring that idle workers are put to work and that the overall computation progresses efficiently. That being said, the internal scheduling mechanisms in tf::Executor are not trivial, and it's not easy to explain every detail in just a few sentences. If you're interested in learning more about the technical details, please refer to our paper published in 2022 IEEE Transactions on Parallel and Distributed Systems (TPDS):
tf::Executor provides a set of run_* methods, tf::Executor::run, tf::Executor::run_n, and tf::Executor::run_until to run a taskflow for one time, multiple times, or until a given predicate evaluates to true. All methods accept an optional callback to invoke after the execution completes, and return a tf::Future for users to access the execution status. The code below shows several ways to run a taskflow.
1: // Declare an executor and a taskflow 2: tf::Executor executor; 3: tf::Taskflow taskflow; 4: 5: // Add three tasks into the taskflow 6: tf::Task A = taskflow.emplace([] () { std::cout \<\< "This is TaskA\n"; }); 7: tf::Task B = taskflow.emplace([] () { std::cout \<\< "This is TaskB\n"; }); 8: tf::Task C = taskflow.emplace([] () { std::cout \<\< "This is TaskC\n"; }); 9: 10: // Build precedence between tasks11: A.precede(B, C); 12: 13: tf::Future\<void\> fu = executor.run(taskflow);14: fu.wait();// block until the execution completes15:16: executor.run(taskflow, [](){ std::cout \<\< "end of 1 run"; }).wait();17: executor.run\_n(taskflow, 4);18: executor.wait\_for\_all();// block until all associated executions finish19: executor.run\_n(taskflow, 4, [](){ std::cout \<\< "end of 4 runs"; }).wait();20: executor.run\_until(taskflow, [cnt=0] () mutable { return ++cnt == 10; });
Lines 6–8 create a taskflow consisting of three tasks, A, B, and C. Lines 13–14 execute the taskflow once and block until its completion. Line 16 runs the taskflow once and registers a callback that is invoked when the execution finishes. Lines 17–18 execute the taskflow four times and use tf::Executor::wait_for_all to wait for all executions to complete. Line 19 again runs the taskflow four times but invokes a callback at the end of the fourth execution. Finally, line 20 repeatedly runs the taskflow until the specified predicate returns true.
While it is possible to submit the same taskflow to an executor multiple times, these multiple runs will be synchronized to a sequential chain of executions in the order of their submissions. For example, the three submissions below will finish in order, where execution #1 completes before #2, and execution #2 completes before execution #3:
executor.run(taskflow);// execution #1executor.run\_n(taskflow, 10);// execution #2executor.run(taskflow);// execution #3executor.wait\_for\_all();// execution #1 -\> execution #2 -\> execution #3
However, there is no deterministic order for different taskflows submitted simultaneously. For example, the three submissions below may finish in an arbitrary order, since the three taskflows are distinct:
executor.run(taskflow1);// execution 1executor.run\_n(taskflow2, 10);// execution 2executor.run(taskflow3);// execution 3executor.wait\_for\_all();// no order guarantee among the three taskflows
A running taskflow must remain alive for the duration of its execution because the executor does not take ownership of the taskflow. It is your responsibility to ensure that a taskflow is not destroyed while it is still running. For example, the code below can result in undefined behavior.
tf::Executor executor;// create an executor// create a taskflow whose lifetime is restricted by the block statement{tf::Taskflow taskflow;// adds tasks to the taskflow// ... // runs the taskflow onceexecutor.run(taskflow);} // leaving the block statement will destroy taskflow while it is running, // resulting in undefined behavior
Similarly, you should avoid modifying a taskflow while it is running:
tf::Taskflow taskflow;// Add tasks into the taskflow// ...// Declare an executortf::Executor executor;tf::Future\<void\> future = executor.run(taskflow); // alter the taskflow while running leads to undefined behavior taskflow.emplace([](){ std::cout \<\< "Add a new task\n"; });
You must always keep a taskflow alive and must not modify it while it is running on an executor.
You can transfer the ownership of a taskflow to an executor and run it without wrangling with the lifetime issue of that taskflow. Each run_* method discussed in the previous section comes with an overload that takes a moved taskflow object.
tf::Taskflow taskflow;tf::Executor executor;taskflow.emplace([](){});// let the executor manage the lifetime of the submitted taskflowexecutor.run(std::move(taskflow));// now taskflow has no tasksassert(taskflow.num\_tasks() == 0);
However, you should avoid moving a running taskflow which can result in undefined behavior.
tf::Taskflow taskflow;tf::Executor executor;taskflow.emplace([](){});// executor does not manage the lifetime of taskflowexecutor.run(taskflow);// error! you cannot move a taskflow while it is runningexecutor.run(std::move(taskflow));
The correct way to submit a taskflow with moved ownership to an executor is to ensure all previous runs have completed. The executor will automatically release the resources of a moved taskflow right after its execution completes.
// submit the taskflow and wait until it completesexecutor.run(taskflow).wait();// now it's safe to move the taskflow to the executor and run itexecutor.run(std::move(taskflow));
Likewise, you cannot move a taskflow that is running on an executor. You must wait until all the previous fires of runs on that taskflow complete before calling move.
// submit the taskflow and wait until it completesexecutor.run(taskflow).wait();// now it's safe to move the taskflow to anothertf::Taskflow moved\_taskflow(std::move(taskflow));
Each run variant of tf::Executor returns a tf::Future object that allows you to wait for the associated execution to complete. When tf::Future::wait is called, the caller blocks without making any progress until the underlying state becomes ready. However, this blocking design can lead to potential deadlocks, particularly when multiple taskflows are launched from within the internal workers of the same executor. For example, the following code creates a taskflow of 1,000 tasks, where each task runs another taskflow of 500 tasks in a blocking manner, leading to a potential deadlock problem:
tf::Executor executor(2);tf::Taskflow taskflow;std::array\<tf::Taskflow, 1000\> others;for(size\_t n=0; n\<1000; n++) {for(size\_t i=0; i\<500; i++) {others[n].emplace(&{});}taskflow.emplace([&executor, &tf=others[n]](){// Blocking a worker can cause deadlock if all workers are waiting// for their taskflows to complete without making any progressexecutor.run(tf).wait();});}executor.run(taskflow).wait();
To avoid this deadlock issue, tf::Executor provides a cooperative execution method, tf::Executor::corun, which allows a worker to execute a taskflow cooperatively with other workers within the same executor. Specifically, although the worker is blocking until corun finishes, it is cooperatively blocked and continues to execute the taskflow alongside other tasks in the executor's work-stealing loop. For instance, the following code with tf::Executor::corun is deadlock-free:
tf::Executor executor(2);tf::Taskflow taskflow;std::array\<tf::Taskflow, 1000\> others;std::atomic\<size\_t\> counter{0};for(size\_t n=0; n\<1000; n++) {for(size\_t i=0; i\<500; i++) {others[n].emplace(&{ counter++; });}taskflow.emplace([&executor, &tf=others[n]](){// calling worker coruns the taskflow cooperatively with other workersexecutor.corun(tf);});}executor.run(taskflow).wait();
Similar to tf::Executor::corun, tf::Executor::corun_until provides a more general mechanism for cooperative execution. It allows the calling worker to run tasks cooperatively with other workers until a specified condition becomes true. This function is especially useful for implementing custom synchronization or polling conditions, where the worker continues making progress by executing available tasks in the executor while periodically checking for the termination condition.
taskflow.emplace(&{auto fu = std::async([](){ std::sleep(100s); });executor.corun\_until([](){return fu.wait\_for(std::chrono::seconds(0)) == future\_status::ready;});});
All run_* methods of tf::Executor are thread-safe. You can safely call these methods from multiple threads to execute different taskflows concurrently. For instance, the code below launches ten threads, each executing a different taskflow concurrently on the same executor, and waits for all executions to complete:
tf::Executor executor;std::array\<tf::Taskflow, 10\> taskflows;for(int i=0; i\<10; ++i) {std::thread(i, &{executor.run(taskflows[i]);// thread i runs taskflow i}).detach();}executor.wait\_for\_all();
Each worker thread in a tf::Executor is assigned a unique integer identifier in the range [0, N), where N is the number of worker threads in the executor. You can query the identifier of the calling thread using tf::Executor::this_worker_id. If the calling thread is not a worker of the executor, the method returns -1. This functionality is particularly useful for establishing a one-to-one mapping between worker threads and application-specific data structures.
std::vector\<int\> data[8];// worker-specific data vectortf::Taskflow taskflow;tf::Executor executor(8);// an executor of eight workersassert(executor.this\_worker\_id() == -1);// main thread is not a workertaskflow.emplace(&{int id = executor.this\_worker\_id();// in the range [0, 8)auto& vec = data[id];// worker id process data[id]});
You can observe thread activities in an executor when a worker thread participates in executing a task and leaves the execution using tf::ObserverInterface – an interface class that provides a set of methods for you to define what to do when a thread enters and leaves the execution context of a task.
class ObserverInterface {virtual ~ObserverInterface() = default;virtual void set\_up(size\_t num\_workers) = 0;virtual void on\_entry(tf::WorkerView worker\_view, tf::TaskView task\_view) = 0;virtual void on\_exit(tf::WorkerView worker\_view, tf::TaskView task\_view) = 0;};
There are three methods you must define in your derived class, tf::ObserverInterface::set_up, tf::ObserverInterface::on_entry, and tf::ObserverInterface::on_exit. The method, tf::ObserverInterface::set_up, is a constructor-like method that will be called by the executor when the observer is constructed. It passes an argument of the number of workers to observer in the executor. You may use it to preallocate or initialize data storage, e.g., an independent vector for each worker. The methods, tf::ObserverInterface::on_entry and tf::ObserverInterface::on_exit, are called by a worker thread before and after the execution context of a task, respectively. Both methods provide immutable access to the underlying worker and the running task using tf::WorkerView and tf::TaskView. You may use them to record timepoints and calculate the elapsed time of a task.
You can associate an executor with one or multiple observers using tf::Executor::make_observer. We use std::shared_ptr to manage the ownership of an observer. The executor loops through each observer and invoke the corresponding methods accordingly.
#include \<taskflow/taskflow.hpp\>struct MyObserver : public tf::ObserverInterface {MyObserver(const std::string& name) {std::cout \<\< "constructing observer " \<\< name \<\< '\n';}void set\_up(size\_t num\_workers) override final {std::cout \<\< "setting up observer with " \<\< num\_workers \<\< " workers\n";}void on\_entry(tf::WorkerView w, tf::TaskView tv) override final {std::ostringstream oss;oss \<\< "worker " \<\< w.id() \<\< " ready to run " \<\< tv.name() \<\< '\n';std::cout \<\< oss.str();}void on\_exit(tf::WorkerView w, tf::TaskView tv) override final {std::ostringstream oss;oss \<\< "worker " \<\< w.id() \<\< " finished running " \<\< tv.name() \<\< '\n';std::cout \<\< oss.str();}};int main(){tf::Executor executor(4);// Create a taskflow of eight taskstf::Taskflow taskflow;auto A = taskflow.emplace([] () { std::cout \<\< "1\n"; }).name("A");auto B = taskflow.emplace([] () { std::cout \<\< "2\n"; }).name("B");auto C = taskflow.emplace([] () { std::cout \<\< "3\n"; }).name("C");auto D = taskflow.emplace([] () { std::cout \<\< "4\n"; }).name("D");auto E = taskflow.emplace([] () { std::cout \<\< "5\n"; }).name("E");auto F = taskflow.emplace([] () { std::cout \<\< "6\n"; }).name("F");auto G = taskflow.emplace([] () { std::cout \<\< "7\n"; }).name("G");auto H = taskflow.emplace([] () { std::cout \<\< "8\n"; }).name("H");// create an observerstd::shared\_ptr\<MyObserver\> observer = executor.make\_observer\<MyObserver\>("MyObserver");// run the taskflowexecutor.run(taskflow).get();// remove the observer (optional)executor.remove\_observer(std::move(observer));return 0;}
The above code produces the following output:
constructing observer MyObserver
setting up observer with4workers
worker2ready to run A1worker2finished running A
worker2ready to run B2worker1ready to run C
worker2finished running B3worker2ready to run D
worker3ready to run E
worker1finished running C45worker1ready to run F
worker2finished running D
worker3finished running E6worker2ready to run G
worker3ready to run H
worker1finished running F78worker2finished running G
worker3finished running H
It is expected each line of std::cout interleaves with each other as there are four workers participating in task scheduling. However, the ready message always appears before the corresponding task message (e.g., numbers) and then the finished message.
You can change the property of each worker thread from its executor, such as assigning thread-processor affinity before the worker enters the scheduler loop and post-processing additional information after the worker leaves the scheduler loop, by passing an instance derived from tf::WorkerInterface to the executor. The example demonstrates the usage of tf::WorkerInterface to affine a worker to a specific CPU core equal to its id on a Linux platform:
// affine the given thread to the given core index (linux-specific)bool affine(std::thread& thread, unsigned int core\_id) {cpu\_set\_t cpuset;CPU\_ZERO(&cpuset);CPU\_SET(core\_id, &cpuset);pthread\_t native\_handle = thread.native\_handle();return pthread\_setaffinity\_np(native\_handle, sizeof(cpu\_set\_t), &cpuset) == 0;}class CustomWorkerBehavior : public tf::WorkerInterface {public:// to call before the worker enters the scheduling loopvoid scheduler\_prologue(tf::Worker& w) override {printf("worker %lu prepares to enter the work-stealing loop\n", w.id());// now affine the worker to a particular CPU core equal to its idif(affine(w.thread(), w.id())) {printf("successfully affines worker %lu to CPU core %lu\n", w.id(), w.id());}else {printf("failed to affine worker %lu to CPU core %lu\n", w.id(), w.id());}}// to call after the worker leaves the scheduling loopvoid scheduler\_epilogue(tf::Worker& w, std::exception\_ptr) override {printf("worker %lu left the work-stealing loop\n", w.id());}};int main() {tf::Executor executor(4, tf::make\_worker\_interface\<CustomWorkerBehavior\>());return 0;}
When running the program, we see the following one possible output:
worker3prepares to enter the work-stealing loop
successfully affines worker3to CPU core3worker3left the work-stealing loop
worker0prepares to enter the work-stealing loop
successfully affines worker0to CPU core0worker0left the work-stealing loop
worker1prepares to enter the work-stealing loop
worker2prepares to enter the work-stealing loop
successfully affines worker1to CPU core1worker1left the work-stealing loop
successfully affines worker2to CPU core2worker2left the work-stealing loop
When you create an executor, it spawns a set of worker threads to run tasks using a work-stealing scheduling algorithm. The execution logic of the scheduler and its interaction with each spawned worker via tf::WorkerInterface is given below:
for(size\_t n=0; n\<num\_workers; n++) {create\_thread([](Worker& worker){// enter the scheduling loop// Here, WorkerInterface::scheduler\_prologue is invoked, if anyworker\_interface-\>scheduler\_prologue(worker);try {while(1) {perform\_work\_stealing\_algorithm();if(stop) {break;}}} catch(...) {exception\_ptr = std::current\_exception();}// leaves the scheduling loop and joins this worker thread// Here, WorkerInterface::scheduler\_epilogue is invoked, if anyworker\_interface-\>scheduler\_epilogue(worker, exception\_ptr);});}