docs/graphtraversal.html
We study the graph traversal problem by visiting each vertex in parallel following their edge dependencies. Traversing a graph is a fundamental building block of many graph applications especially for large-scale graph analytics.
Given a directed acyclic graph (DAG), i.e., a graph that has no cycles, we would like to traverse each vertex in order without breaking dependency constraints defined by edges. The following figure shows a graph of six vertices and seven edges. Each vertex represents a particular task and each edge represents a task dependency between two tasks.
TaskflowTask1Task1Task2Task2Task1->Task2Task3Task3Task1->Task3Task4Task4Task1->Task4Task5Task5Task2->Task5Task3->Task5Task6Task6Task4->Task6Task5->Task6
Traversing the above graph in parallel, the maximum parallelism we can acquire is three. When Task1 finishes, we can run Task2, Task3, and Task4 in parallel.
We define the data structure of our graph. The graph is represented by an array of nodes of the following structure:
struct Node {std::string name;size\_t idx;// index of the node in a arraybool visited {false};std::atomic\<size\_t\> dependents {0};// number of incoming edgesstd::vector\<Node\*\> successors;// number of outgoing edgesvoid precede(Node& n) {successors.emplace\_back(&n);n.dependents ++;}};
Based on the data structure, we randomly generate a DAG using ordered edges.
std::unique\_ptr\<Node[]\> make\_dag(size\_t num\_nodes, size\_t max\_degree) {std::unique\_ptr\<Node[]\> nodes(new Node[num\_nodes]);// Make sure nodes are in clean statefor(size\_t i=0; i\<num\_nodes; i++) {nodes[i].idx = i;nodes[i].name = std::to\_string(i);}// Create a DAG by randomly insert ordered edgesfor(size\_t i=0; i\<num\_nodes; i++) {size\_t degree {0};for(size\_t j=i+1; j\<num\_nodes && degree \< max\_degree; j++) {if(std::rand() % 2 == 1) {nodes[i].precede(nodes[j]);degree ++;}}}return nodes;}
The function, make_dag, accepts two arguments, num_nodes and max_degree, to restrict the number of nodes in the graph and the maximum number of outgoing edges of every node.
We create a taskflow to traverse the graph using static tasks (see Static Tasking). Each task does nothing but marks visited to true and subtracts dependents from one, both of which are used for validation after the graph is traversed. In practice, this computation may be replaced with a heavy function.
tf::Taskflow taskflow;tf::Executor executor;std::unique\_ptr\<Node[]\> nodes = make\_dag(100000, 4);std::vector\<tf::Task\> tasks;// create the traversal task for each nodefor(size\_t i=0; i\<num\_nodes; ++i) {tf::Task task = taskflow.emplace([v=&(nodes[i])](){v-\>visited = true;for(size\_t j=0; j\<v-\>successors.size(); ++j) {v-\>successors[j]-\>dependents.fetch\_sub(1);}}).name(nodes[i].name);tasks.push\_back(task);}// create the dependency between nodes on top of the graph structurefor(size\_t i=0; i\<num\_nodes; ++i) {for(size\_t j=0; j\<nodes[i].successors.size(); ++j) {tasks[i].precede(tasks[nodes[i].successors[j]-\>idx]);}}executor.run(taskflow).wait();// after the graph is traversed, all nodes must be visited with no dependentsfor(size\_t i=0; i\<num\_nodes; i++) {assert(nodes[i].visited);assert(nodes[i].dependents == 0);}
The code above has two parts to construct the parallel graph traversal. First, it iterates each node and constructs a traversal task for that node. Second, it iterates each outgoing edge of a node and creates a dependency between the node and the other end (successor) of that edge. The resulting taskflow structure is topologically equivalent to the given graph.
Taskflowp0x7f95e780b0d00p0x7f95e780ac504p0x7f95e780b0d0->p0x7f95e780ac50p0x7f95e780ab305p0x7f95e780b0d0->p0x7f95e780ab30p0x7f95e780a8f07p0x7f95e780b0d0->p0x7f95e780a8f0p0x7f95e780a7d08p0x7f95e780b0d0->p0x7f95e780a7d0p0x7f95e780ac50->p0x7f95e780a8f0p0x7f95e780ac50->p0x7f95e780a7d0p0x7f95e780aa106p0x7f95e780ac50->p0x7f95e780aa10p0x7f95e780a6b09p0x7f95e780ac50->p0x7f95e780a6b0p0x7f95e780ab30->p0x7f95e780a8f0p0x7f95e780ab30->p0x7f95e780a6b0p0x7f95e780a47011p0x7f95e780ab30->p0x7f95e780a470p0x7f95e780a11014p0x7f95e780ab30->p0x7f95e780a110p0x7f95e780a8f0->p0x7f95e780a7d0p0x7f95e780a8f0->p0x7f95e780a6b0p0x7f95e780a59010p0x7f95e780a8f0->p0x7f95e780a590p0x7f95e780a23013p0x7f95e780a8f0->p0x7f95e780a230p0x7f95e780a7d0->p0x7f95e780a470p0x7f95e780a7d0->p0x7f95e780a110p0x7f95e780a35012p0x7f95e780a7d0->p0x7f95e780a350p0x7f95e780a7d0->p0x7f95e780a230p0x7f95e780afb01p0x7f95e780afb0->p0x7f95e780ab30p0x7f95e780afb0->p0x7f95e780a8f0p0x7f95e780afb0->p0x7f95e780aa10p0x7f95e780afb0->p0x7f95e780a6b0p0x7f95e780aa10->p0x7f95e780a6b0p0x7f95e780aa10->p0x7f95e780a110p0x7f95e780aa10->p0x7f95e780a590p0x7f95e780aa10->p0x7f95e780a350p0x7f95e780a6b0->p0x7f95e780a470p0x7f95e780a6b0->p0x7f95e780a110p0x7f95e780a6b0->p0x7f95e780a350p0x7f95e780a6b0->p0x7f95e780a230p0x7f95e780ae902p0x7f95e780ae90->p0x7f95e780ab30p0x7f95e780ae90->p0x7f95e780a8f0p0x7f95e780ae90->p0x7f95e780a7d0p0x7f95e780ae90->p0x7f95e780aa10p0x7f95e780ad703p0x7f95e780ad70->p0x7f95e780ac50p0x7f95e780ad70->p0x7f95e780ab30p0x7f95e780ad70->p0x7f95e780a8f0p0x7f95e780ad70->p0x7f95e780aa10p0x7f95e780a470->p0x7f95e780a110p0x7f95e780a470->p0x7f95e780a350p0x7f95e780a470->p0x7f95e780a230p0x7f95e7809ed016p0x7f95e780a470->p0x7f95e7809ed0p0x7f95e780a110->p0x7f95e7809ed0p0x7f95e7809db017p0x7f95e780a110->p0x7f95e7809db0p0x7f95e780981022p0x7f95e780a110->p0x7f95e7809810p0x7f95e780993021p0x7f95e780a110->p0x7f95e7809930p0x7f95e780a590->p0x7f95e780a470p0x7f95e780a590->p0x7f95e780a110p0x7f95e780a590->p0x7f95e7809ed0p0x7f95e780a590->p0x7f95e7809db0p0x7f95e780a350->p0x7f95e780a230p0x7f95e780a350->p0x7f95e7809db0p0x7f95e7809a5020p0x7f95e780a350->p0x7f95e7809a50p0x7f95e780a350->p0x7f95e7809810p0x7f95e780a230->p0x7f95e780a110p0x7f95e780a230->p0x7f95e7809ed0p0x7f95e780a230->p0x7f95e7809db0p0x7f95e7809c9018p0x7f95e780a230->p0x7f95e7809c90p0x7f95e7809ed0->p0x7f95e7809db0p0x7f95e7809ed0->p0x7f95e7809810p0x7f95e7809b7019p0x7f95e7809ed0->p0x7f95e7809b70p0x7f95e78096f023p0x7f95e7809ed0->p0x7f95e78096f0p0x7f95e7809db0->p0x7f95e7809a50p0x7f95e7809db0->p0x7f95e7809b70p0x7f95e7809db0->p0x7f95e78096f0p0x7f95e780915028p0x7f95e7809db0->p0x7f95e7809150p0x7f95e7809a50->p0x7f95e7809810p0x7f95e7809a50->p0x7f95e7809930p0x7f95e7809a50->p0x7f95e78096f0p0x7f95e78095d024p0x7f95e7809a50->p0x7f95e78095d0p0x7f95e7809810->p0x7f95e78096f0p0x7f95e7809810->p0x7f95e7809150p0x7f95e7809810->p0x7f95e78095d0p0x7f95e780939026p0x7f95e7809810->p0x7f95e7809390p0x7f95e7809c90->p0x7f95e7809930p0x7f95e7809c90->p0x7f95e7809b70p0x7f95e7809c90->p0x7f95e78096f0p0x7f95e7809c90->p0x7f95e78095d0p0x7f95e7809930->p0x7f95e78095d0p0x7f95e7809930->p0x7f95e7809390p0x7f95e78094b025p0x7f95e7809930->p0x7f95e78094b0p0x7f95e780903029p0x7f95e7809930->p0x7f95e7809030p0x7f95e7809ff015p0x7f95e7809ff0->p0x7f95e7809db0p0x7f95e7809ff0->p0x7f95e7809810p0x7f95e7809ff0->p0x7f95e7809c90p0x7f95e7809ff0->p0x7f95e7809b70p0x7f95e7809b70->p0x7f95e7809a50p0x7f95e7809b70->p0x7f95e78096f0p0x7f95e7809b70->p0x7f95e7809390p0x7f95e780927027p0x7f95e7809b70->p0x7f95e7809270p0x7f95e78096f0->p0x7f95e7809150p0x7f95e78096f0->p0x7f95e7809390p0x7f95e78096f0->p0x7f95e78094b0p0x7f95e7809150->p0x7f95e7809030p0x7f95e78095d0->p0x7f95e7809390p0x7f95e7809390->p0x7f95e7809150p0x7f95e7809390->p0x7f95e7809270p0x7f95e7809270->p0x7f95e7809150p0x7f95e78094b0->p0x7f95e7809150p0x7f95e78094b0->p0x7f95e7809030
With task parallelism, we flow computation naturally with the graph structure. The runtime autonomously distributes tasks across processor cores to obtain maximum task parallelism. You do not need to worry about details of scheduling.
We can traverse the graph dynamically using tf::Subflow (see Subflow Tasking). We start from the source nodes of zero incoming edges and recursively spawn subflows whenever the dependency of a node is meet. Since we are creating tasks from the execution context of another task, we need to store the task callable in advance.
tf::Taskflow taskflow;tf::Executor executor;// task callable of traversing a node using subflowstd::function\<void(Node\*, tf::Subflow&)\> traverse;traverse = [&] (Node\* n, tf::Subflow& subflow) {assert(!n-\>visited);n-\>visited = true;for(size\_t i=0; i\<n-\>successors.size(); i++) {if(n-\>successors[i]-\>dependents.fetch\_sub(1) == 1) {subflow.emplace([s=n-\>successors[i], &traverse](tf::Subflow &subflow){ traverse(s, subflow); }).name(n-\>name);}}};// create a graphstd::unique\_ptr\<Node[]\> nodes = make\_dag(100000, 4);// find the source nodes (no incoming edges)std::vector\<Node\*\> src;for(size\_t i=0; i\<num\_nodes; i++) {if(nodes[i].dependents == 0) { src.emplace\_back(&(nodes[i]));}}// create only tasks for source nodesfor(size\_t i=0; i\<src.size(); i++) {taskflow.emplace([s=src[i], &traverse](tf::Subflow& subflow){ traverse(s, subflow); }).name(nodes[i].name);}executor.run(taskflow).wait();// after the graph is traversed, all nodes must be visited with no dependentsfor(size\_t i=0; i\<num\_nodes; i++) {assert(nodes[i].visited);assert(nodes[i].dependents == 0);}
A partial graph is shown as follows:
Taskflowcluster_p0x7fd36b804d90Subflow: 3cluster_p0x7fd36c005e90Subflow: 3cluster_p0x7fd36c005c50Subflow: 4cluster_p0x7fd36c005a10Subflow: 6cluster_p0x7fd36c005470Subflow: 9cluster_p0x7fd36c005590Subflow: 11cluster_p0x7fd36c0057d0Subflow: 12cluster_p0x7fd36c005350Subflow: 13cluster_p0x7fd36c005b30Subflow: 4p0x7fd36b804c700p0x7fd36b804a301p0x7fd36b804b502p0x7fd36b804d903p0x7fd36c005e903p0x7fd36c005e90->p0x7fd36b804d90p0x7fd36c005c504p0x7fd36c005c50->p0x7fd36c005e90p0x7fd36c005a106p0x7fd36c005a10->p0x7fd36c005c50p0x7fd36c0054709p0x7fd36c005470->p0x7fd36c005a10p0x7fd36c00559011p0x7fd36c005590->p0x7fd36c005470p0x7fd36c0057d012p0x7fd36c0057d0->p0x7fd36c005590p0x7fd36c00535013p0x7fd36c005350->p0x7fd36c0057d0p0x7fd36c00523014p0x7fd36c005230->p0x7fd36c005350p0x7fd36c0058f06p0x7fd36c0058f0->p0x7fd36c005c50p0x7fd36c005b304p0x7fd36c005b30->p0x7fd36c005e90p0x7fd36c0056b07p0x7fd36c0056b0->p0x7fd36c005b30p0x7fd36c005d703p0x7fd36c005d70->p0x7fd36b804d90p0x7fd36b804eb04
In general, the dynamic version of graph traversal is slower than the static version due to the overhead incurred by spawning subflows. However, it may be useful for the situation where the graph structure is unknown at once but being partially explored during the traversal.