Back to Taskflow

Problem Formulation

docs/kmeans.html

4.1.07.0 KB
Original Source

| | Taskflow: A General-purpose Task-parallel Programming System |

Loading...

Searching...

No Matches

k-means Clustering

We study parallel k-means clustering, a fundamental unsupervised learning algorithm, using Taskflow's parallel-for and conditional tasking to build an iterative task graph that converges to stable cluster centroids.

Problem Formulation

The k-means algorithm partitions a set of N data points into K clusters by iterating two steps until convergence:

  1. Assignment: for each point, find the nearest centroid (by L2 distance) and assign the point to that cluster.
  2. Update: recompute each centroid as the mean of all points assigned to it.

These two steps repeat for M iterations or until the centroids stop moving.

A sequential implementation of the full algorithm:

// N: number of 2D points, K: number of clusters, M: max iterations

// px/py: input point coordinates

void kmeans_seq(

int N, int K, int M,

const std::vector<float>& px, const std::vector<float>& py

) {

std::vector<int> c(K);

std::vector<float> sx(K), sy(K), mx(K), my(K);

// initialise centroids to the first K points

std::copy_n(px.begin(), K, mx.begin());

std::copy_n(py.begin(), K, my.begin());

for(int m = 0; m < M; m++) {

std::fill_n(sx.begin(), K, 0.0f);

std::fill_n(sy.begin(), K, 0.0f);

std::fill_n(c.begin(), K, 0);

// assignment step

for(int i = 0; i < N; i++) {

float x = px[i], y = py[i];

float best_d = std::numeric_limits<float>::max();

int best_k = 0;

for(int k = 0; k < K; k++) {

float d = L2(x, y, mx[k], my[k]);

if(d < best_d) { best_d = d; best_k = k; }

}

sx[best_k] += x;

sy[best_k] += y;

c [best_k] += 1;

}

// update step

for(int k = 0; k < K; k++) {

int count = std::max(1, c[k]);

mx[k] = sx[k] / count;

my[k] = sy[k] / count;

}

}

for(int k = 0; k < K; k++) {

std::cout << "centroid " << k << ": "

<< mx[k] << ' ' << my[k] << '\n';

}

}

Parallel k-means on CPU

The assignment step — finding the nearest centroid for each point — is embarrassingly parallel across points. We parallelise it with tf::Taskflow::for_each_index and express the iterative structure with a condition task that loops back to the assignment step for M iterations.

The full parallel implementation:

void kmeans_par(

int N, int K, int M,

const std::vector<float>& px, const std::vector<float>& py

) {

tf::Executor executor;

tf::Taskflow taskflow("K-Means");

std::vector<int> c(K), best_ks(N);

std::vector<float> sx(K), sy(K), mx(K), my(K);

// initialise centroids to the first K points

tf::Task init = taskflow.emplace(& {

for(int i = 0; i < K; i++) {

mx[i] = px[i];

my[i] = py[i];

}

}).name("init");

// clear per-centroid accumulators before each iteration

tf::Task clean_up = taskflow.emplace(& {

for(int k = 0; k < K; k++) {

sx[k] = 0.0f;

sy[k] = 0.0f;

c [k] = 0;

}

}).name("clean_up");

// parallel assignment: find the nearest centroid for each point

tf::Task pf = taskflow.for_each_index(0, N, 1, [&](int i) {

float x = px[i], y = py[i];

float best_d = std::numeric_limits<float>::max();

int best_k = 0;

for(int k = 0; k < K; k++) {

float d = L2(x, y, mx[k], my[k]);

if(d < best_d) { best_d = d; best_k = k; }

}

best_ks[i] = best_k;

}).name("parallel-for");

// sequential update: recompute each centroid

tf::Task update_cluster = taskflow.emplace(& {

for(int i = 0; i < N; i++) {

sx[best_ks[i]] += px[i];

sy[best_ks[i]] += py[i];

c [best_ks[i]] += 1;

}

for(int k = 0; k < K; k++) {

int count = std::max(1, c[k]);

mx[k] = sx[k] / count;

my[k] = sy[k] / count;

}

}).name("update_cluster");

// condition task: repeat for M iterations, then stop

tf::Task condition = taskflow.emplace(m = 0, M mutable {

return (m++ < M) ? 0 : 1; // 0 → loop back; 1 → exit

}).name("converged?");

init.precede(clean_up);

clean_up.precede(pf);

pf.precede(update_cluster);

condition.succeed(update_cluster)

.precede(clean_up); // successor 0: loop back

executor.run(taskflow).wait();

}

tf::Executor

class to create an executor

Definition executor.hpp:62

tf::Executor::run

tf::Future< void > run(Taskflow &taskflow)

runs a taskflow once

tf::Task

class to create a task handle over a taskflow node

Definition task.hpp:569

tf::Task::succeed

Task & succeed(Ts &&... tasks)

adds precedence links from other tasks to this

Definition task.hpp:1266

tf::Task::precede

Task & precede(Ts &&... tasks)

adds precedence links from this to other tasks

Definition task.hpp:1258

tf::Taskflow

class to create a taskflow object

Definition taskflow.hpp:64

The taskflow graph is illustrated below:

Embedded content

Execution starts at init, flows into clean_up, then into the parallel-for task that distributes the assignment step across all available cores. After each iteration, the condition task checks whether M iterations have been completed. If not, it returns 0 and the scheduler loops back to clean_up. When done, it returns 1 (no successor at index 1) and the taskflow ends.

Benchmarking

Runtime comparison on a 12-core Intel i7-8700 at 3.2 GHz:

NKMCPU sequentialCPU parallel
105100.14 ms77 ms
100101000.56 ms86 ms
100010100010 ms98 ms
1000010100001006 ms713 ms
10000010100000102483 ms49966 ms

Parallel speed-up becomes significant when N ≥ 10K, where the assignment step is large enough to dominate the task-creation and synchronisation overhead. For N = 100K, the parallel CPU implementation runs approximately 2× faster than the sequential version.

NoteFor GPU-accelerated k-means that achieves over 12× speed-up at large problem sizes, see k-means Clustering with CUDA GPU.