docs/kmeans.html
We study a fundamental clustering problem in unsupervised learning, k-means clustering. We will begin by discussing the problem formulation and then learn how to write a parallel k-means algorithm.
k-means clustering uses centroids, k different randomly-initiated points in the data, and assigns every data point to the nearest centroid. After every point has been assigned, the centroid is moved to the average of all of the points assigned to it. We describe the k-means algorithm in the following steps:
The algorithm is illustrated as follows:
A sequential implementation of k-means is described as follows:
// sequential implementation of k-means on a CPU// N: number of points// K: number of clusters// M: number of iterations// px/py: 2D point vector void kmeans\_seq(int N, int K, int M, const std::vector\<float\>& px, const std::vector\<float\>& py) {std::vector\<int\> c(K);std::vector\<float\> sx(K), sy(K), mx(K), my(K);// initial centroidsstd::copy\_n(px.begin(), K, mx.begin());std::copy\_n(py.begin(), K, my.begin());// k-means iterationfor(int m=0; m\<M; m++) {// clear the storagestd::fill\_n(sx.begin(), K, 0.0f);std::fill\_n(sy.begin(), K, 0.0f);std::fill\_n(c.begin(), K, 0);// find the best k (cluster id) for each pointfor(int i=0; i\<N; ++i) {float x = px[i];float y = py[i];float best\_d = std::numeric\_limits\<float\>::max();int best\_k = 0;for (int k = 0; k \< K; ++k) {const float d = L2(x, y, mx[k], my[k]);if (d \< best\_d) {best\_d = d;best\_k = k;}}sx[best\_k] += x;sy[best\_k] += y;c [best\_k] += 1;}// update the centroidfor(int k=0; k\<K; k++) {const int count = max(1, c[k]);// turn 0/0 to 0/1mx[k] = sx[k] / count;my[k] = sy[k] / count;}}// print the k centroids foundfor(int k=0; k\<K; ++k) {std::cout \<\< "centroid " \<\< k \<\< ": " \<\< std::setw(10) \<\< mx[k] \<\< ' '\<\< std::setw(10) \<\< my[k] \<\< '\n';}}
The second step of k-means algorithm, assigning every point to the nearest centroid, is highly parallelizable across individual points. We can create a parallel-for task to run parallel iterations.
std::vector\<int\> best\_ks(N);// nearest centroid of each pointunsigned P = 12;// 12 partitioned tasks// update clustertaskflow.for\_each\_index(0, N, 1, [&](int i){float x = px[i];float y = py[i];float best\_d = std::numeric\_limits\<float\>::max();int best\_k = 0;for (int k = 0; k \< K; ++k) {const float d = L2(x, y, mx[k], my[k]);if (d \< best\_d) {best\_d = d;best\_k = k;}}best\_ks[i] = best\_k;});
The third step of moving every centroid to the average of points is also parallelizable across individual centroids. However, since k is typically not large, one task of doing this update is sufficient.
taskflow.emplace(&{// sum of pointsfor(int i=0; i\<N; i++) {sx[best\_ks[i]] += px[i];sy[best\_ks[i]] += py[i];c [best\_ks[i]] += 1;}// average of pointsfor(int k=0; k\<K; ++k) {auto count = max(1, c[k]);// turn 0/0 to 0/1mx[k] = sx[k] / count;my[k] = sy[k] / count;}});
To describe M iterations, we create a condition task that loops the second step of the algorithm by M times. The return value of zero goes to the first successor which we will connect to the task of the second step later; otherwise, k-means completes.
taskflow.emplace(m=0, M mutable {return (m++ \< M) ? 0 : 1;});
The entire code of CPU-parallel k-means is shown below. Here we use an additional storage, best_ks, to record the nearest centroid of a point at an iteration.
// N: number of points// K: number of clusters// M: number of iterations// px/py: 2D point vector void kmeans\_par(int N, int K, int M, cconst std::vector\<float\>& px, const std::vector\<float\>& py) {unsigned P = 12;// 12 partitions of the parallel-for graphtf::Executor executor;tf::Taskflow taskflow("K-Means");std::vector\<int\> c(K), best\_ks(N);std::vector\<float\> sx(K), sy(K), mx(K), my(K);// initial centroidstf::Task init = taskflow.emplace(&{for(int i=0; i\<K; ++i) {mx[i] = px[i];my[i] = py[i];}}).name("init");// clear the storagetf::Task clean\_up = taskflow.emplace(&{for(int k=0; k\<K; ++k) {sx[k] = 0.0f;sy[k] = 0.0f;c [k] = 0;}}).name("clean\_up");// update clustertf::Task pf = taskflow.for\_each\_index(0, N, 1, [&](int i){float x = px[i];float y = py[i];float best\_d = std::numeric\_limits\<float\>::max();int best\_k = 0;for (int k = 0; k \< K; ++k) {const float d = L2(x, y, mx[k], my[k]);if (d \< best\_d) {best\_d = d;best\_k = k;}}best\_ks[i] = best\_k;}).name("parallel-for");tf::Task update\_cluster = taskflow.emplace(&{for(int i=0; i\<N; i++) {sx[best\_ks[i]] += px[i];sy[best\_ks[i]] += py[i];c [best\_ks[i]] += 1;}for(int k=0; k\<K; ++k) {auto count = max(1, c[k]);// turn 0/0 to 0/1mx[k] = sx[k] / count;my[k] = sy[k] / count;}}).name("update\_cluster");// convergence checktf::Task condition = taskflow.emplace(m=0, M mutable {return (m++ \< M) ? 0 : 1;}).name("converged?");init.precede(clean\_up);clean\_up.precede(pf);pf.precede(update\_cluster);condition.precede(clean\_up).succeed(update\_cluster);executor.run(taskflow).wait();}
The taskflow consists of two parts, a clean_up task and a parallel-for graph. The former cleans up the storage sx, sy, and c that are used to average points for new centroids, and the later parallelizes the searching for nearest centroids across individual points using 12 tasks (may vary depending on the machine). If the iteration count is smaller than M, the condition task returns 0 to let the execution path go back to clean_up. Otherwise, it returns 1 to stop (i.e., no successor tasks at index 1). The taskflow graph is illustrated below:
Taskflowcluster_p0x1dcb6e0Subflow: parallel-forp0x1dcb4c0initp0x1dcb5d0clean_upp0x1dcb4c0->p0x1dcb5d0p0x1dcb6e0parallel-forp0x1dcb5d0->p0x1dcb6e0p0x1dcb7f0update_clusterp0x1dcb6e0->p0x1dcb7f0p0x1dcb900converged?p0x1dcb7f0->p0x1dcb900p0x7fd610000b50pfg_0p0x7fd610000b50->p0x1dcb6e0p0x7fd610000c60pfg_1p0x7fd610000c60->p0x1dcb6e0p0x7fd610000d70pfg_2p0x7fd610000d70->p0x1dcb6e0p0x7fd610000e80pfg_3p0x7fd610000e80->p0x1dcb6e0p0x7fd610000f90pfg_4p0x7fd610000f90->p0x1dcb6e0p0x7fd6100010a0pfg_5p0x7fd6100010a0->p0x1dcb6e0p0x7fd6100011b0pfg_6p0x7fd6100011b0->p0x1dcb6e0p0x7fd6100012c0pfg_7p0x7fd6100012c0->p0x1dcb6e0p0x7fd6100013d0pfg_8p0x7fd6100013d0->p0x1dcb6e0p0x7fd6100014e0pfg_9p0x7fd6100014e0->p0x1dcb6e0p0x7fd6100015f0pfg_10p0x7fd6100015f0->p0x1dcb6e0p0x7fd610001700pfg_11p0x7fd610001700->p0x1dcb6e0p0x7fd610001810pfg_12p0x7fd610001810->p0x1dcb6e0p0x1dcb900->p0x1dcb5d00
The scheduler starts with init, moves on to clean_up, and then enters the parallel-for task parallel-for that spawns a subflow of 12 workers to perform parallel iterations. When parallel-for completes, it updates the cluster centroids and checks if they have converged through a condition task. If not, the condition task informs the scheduler to go back to clean_up and then parallel-for; otherwise, it returns a nominal index to stop the scheduler.
Based on the discussion above, we compare the runtime of computing various k-means problem sizes between a sequential CPU and parallel CPUs on a machine of 12 Intel i7-8700 CPUs at 3.2 GHz.
| N | K | M | CPU Sequential | CPU Parallel |
|---|---|---|---|---|
| 10 | 5 | 10 | 0.14 ms | 77 ms |
| 100 | 10 | 100 | 0.56 ms | 86 ms |
| 1000 | 10 | 1000 | 10 ms | 98 ms |
| 10000 | 10 | 10000 | 1006 ms | 713 ms |
| 100000 | 10 | 100000 | 102483 ms | 49966 ms |
When the number of points is larger than 10K, the parallel CPU implementation starts to outperform the sequential CPU implementation.