docs/kmeans.html
| | Taskflow: A General-purpose Task-parallel Programming System |
Loading...
Searching...
No Matches
k-means Clustering
We study parallel k-means clustering, a fundamental unsupervised learning algorithm, using Taskflow's parallel-for and conditional tasking to build an iterative task graph that converges to stable cluster centroids.
The k-means algorithm partitions a set of N data points into K clusters by iterating two steps until convergence:
These two steps repeat for M iterations or until the centroids stop moving.
A sequential implementation of the full algorithm:
// N: number of 2D points, K: number of clusters, M: max iterations
// px/py: input point coordinates
void kmeans_seq(
int N, int K, int M,
const std::vector<float>& px, const std::vector<float>& py
) {
std::vector<int> c(K);
std::vector<float> sx(K), sy(K), mx(K), my(K);
// initialise centroids to the first K points
std::copy_n(px.begin(), K, mx.begin());
std::copy_n(py.begin(), K, my.begin());
for(int m = 0; m < M; m++) {
std::fill_n(sx.begin(), K, 0.0f);
std::fill_n(sy.begin(), K, 0.0f);
std::fill_n(c.begin(), K, 0);
// assignment step
for(int i = 0; i < N; i++) {
float x = px[i], y = py[i];
float best_d = std::numeric_limits<float>::max();
int best_k = 0;
for(int k = 0; k < K; k++) {
float d = L2(x, y, mx[k], my[k]);
if(d < best_d) { best_d = d; best_k = k; }
}
sx[best_k] += x;
sy[best_k] += y;
c [best_k] += 1;
}
// update step
for(int k = 0; k < K; k++) {
int count = std::max(1, c[k]);
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}
}
for(int k = 0; k < K; k++) {
std::cout << "centroid " << k << ": "
<< mx[k] << ' ' << my[k] << '\n';
}
}
The assignment step — finding the nearest centroid for each point — is embarrassingly parallel across points. We parallelise it with tf::Taskflow::for_each_index and express the iterative structure with a condition task that loops back to the assignment step for M iterations.
The full parallel implementation:
void kmeans_par(
int N, int K, int M,
const std::vector<float>& px, const std::vector<float>& py
) {
tf::Executor executor;
tf::Taskflow taskflow("K-Means");
std::vector<int> c(K), best_ks(N);
std::vector<float> sx(K), sy(K), mx(K), my(K);
// initialise centroids to the first K points
tf::Task init = taskflow.emplace(& {
for(int i = 0; i < K; i++) {
mx[i] = px[i];
my[i] = py[i];
}
}).name("init");
// clear per-centroid accumulators before each iteration
tf::Task clean_up = taskflow.emplace(& {
for(int k = 0; k < K; k++) {
sx[k] = 0.0f;
sy[k] = 0.0f;
c [k] = 0;
}
}).name("clean_up");
// parallel assignment: find the nearest centroid for each point
tf::Task pf = taskflow.for_each_index(0, N, 1, [&](int i) {
float x = px[i], y = py[i];
float best_d = std::numeric_limits<float>::max();
int best_k = 0;
for(int k = 0; k < K; k++) {
float d = L2(x, y, mx[k], my[k]);
if(d < best_d) { best_d = d; best_k = k; }
}
best_ks[i] = best_k;
}).name("parallel-for");
// sequential update: recompute each centroid
tf::Task update_cluster = taskflow.emplace(& {
for(int i = 0; i < N; i++) {
sx[best_ks[i]] += px[i];
sy[best_ks[i]] += py[i];
c [best_ks[i]] += 1;
}
for(int k = 0; k < K; k++) {
int count = std::max(1, c[k]);
mx[k] = sx[k] / count;
my[k] = sy[k] / count;
}
}).name("update_cluster");
// condition task: repeat for M iterations, then stop
tf::Task condition = taskflow.emplace(m = 0, M mutable {
return (m++ < M) ? 0 : 1; // 0 → loop back; 1 → exit
}).name("converged?");
init.precede(clean_up);
clean_up.precede(pf);
pf.precede(update_cluster);
condition.succeed(update_cluster)
.precede(clean_up); // successor 0: loop back
executor.run(taskflow).wait();
}
class to create an executor
Definition executor.hpp:62
tf::Future< void > run(Taskflow &taskflow)
runs a taskflow once
class to create a task handle over a taskflow node
Definition task.hpp:569
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition task.hpp:1266
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition task.hpp:1258
class to create a taskflow object
Definition taskflow.hpp:64
The taskflow graph is illustrated below:
Execution starts at init, flows into clean_up, then into the parallel-for task that distributes the assignment step across all available cores. After each iteration, the condition task checks whether M iterations have been completed. If not, it returns 0 and the scheduler loops back to clean_up. When done, it returns 1 (no successor at index 1) and the taskflow ends.
Runtime comparison on a 12-core Intel i7-8700 at 3.2 GHz:
| N | K | M | CPU sequential | CPU parallel |
|---|---|---|---|---|
| 10 | 5 | 10 | 0.14 ms | 77 ms |
| 100 | 10 | 100 | 0.56 ms | 86 ms |
| 1000 | 10 | 1000 | 10 ms | 98 ms |
| 10000 | 10 | 10000 | 1006 ms | 713 ms |
| 100000 | 10 | 100000 | 102483 ms | 49966 ms |
Parallel speed-up becomes significant when N ≥ 10K, where the assignment step is large enough to dominate the task-creation and synchronisation overhead. For N = 100K, the parallel CPU implementation runs approximately 2× faster than the sequential version.
NoteFor GPU-accelerated k-means that achieves over 12× speed-up at large problem sizes, see k-means Clustering with CUDA GPU.