Before going through this tutorial take a look at the overview section to get an understanding of the structure of the tutorial.

K-Means is a powerful and easily understood clustering algorithm. The aim of the algorithm is to divide a given set of points into `K` partitions. `K` needs to be specified by the user. In order to understand K-Means, first, you need to understand the proceeding concepts and their meanings.

• `Centroids`: Centroids can be defined as the center of each cluster. If we are performing clustering with k=3, we will have 3 centroids. To perform K-Means clustering, the users need to provide an initial set of centroids.
• `Distance`: In order to group data points as close together or as far-apart, we need to define a distance between two given data points. In K-Means, clustering distance is normally calculated as the Euclidean Distance between two data points.

The K-Means algorithm simply repeats the following set of steps until there is no change in the partition assignments. In that, it has clarified which data point is assigned to which partition.

1. Choose K points as the initial set of centroids.

2. Assign each data point in the dataset to the closest centroid (this is done by calculating the distance between the data point and each centroid).

3. Calculate the new centroids based on the clusters that were generated in step 2. Normally this is done by calculating the mean of each cluster.

4. Repeat step 2 and 3 until data points do not change cluster assignments, which means that their centroids are set.

## PARALLEL DESIGN

• What are the models? What kind of data structure is applicable?

Centroids of the clusters are models in vanilla k-means. It has a vector structure as a double array.

• What are the characteristics of the data dependency in model update computation, can updates run concurrently?

In the core model update computation, each data point should access all the model, compute distance with each centroid, find the nearest one and partially update the model. The true update only occurs when all data points finish their search.

• Which kind of parallelism scheme is suitable, data parallelism or model parallelism?

Data parallelism can be used, i.e., calculating different data points in parallel.

For model parallelism, there are two different solutions.

1). Without model parallelism, each node get one replica of the whole model, which updates locally in parallel, and then synchronizes when local computation all finish

2). With model parallelism, each node gets one partition of the model, which updates in parallel, and then rotates to the neighbor node when local computation for all local data points finish. Repeat until each partition returns back to the original node, then do the final model update.

• Which collective communication operation is suitable to synchronize the model?

For solution without model parallelism, Synchronize replicas of the model by `allreduce`, then calculate the new centroids and go to the next iteration. For vanilla kmeans, the combination of `reduce/broadcast`, `push/pull`, `regroup/allgather` are similar to `allreduce`.

For solution with model parallelism, there is no replica exists, but the movement of the model partitions are a kind of synchronized collective operation, supported in Harp by an abstraction of rotate.

## Step 1 — The Main Method

The tasks of the main class are to configure and run the job iteratively.

```generate N data points (D dimensions), write to HDFS
generate M centroids, write to HDFS
for iterations{
configure a job
launch the job
}
```

## Step 2 — The mapCollective function

This is the definition of the map-collective task. It reads data from context and then calls `runKmeans` function to actually run k-means Mapper task.

```protected void mapCollective( KeyValReader reader, Context context) throws IOException, InterruptedException {
LOG.info("Start collective mapper.");
long startTime = System.currentTimeMillis();
List<String> pointFiles = new ArrayList<String>();
LOG.info("Key: " + key + ", Value: " + value);
}
Configuration conf = context.getConfiguration();
runKmeans(pointFiles, conf, context);
LOG.info("Total iterations in master view: " + (System.currentTimeMillis() - startTime));
}
```

## Step 3 — The runKmeans function

Harp provides several collective communication operations. Here are some examples provided to show how to apply these collective communication methods to K-Means.

#### Use AllReduce collective communication to do synchronization

```private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
// —————————————————–
//for every partition in the centoid table, we will use the last element to store the number of points
// which are clustered to the particular partitionID
Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
if (this.isMaster()) {
}
printTable(cenTable);
System.out.println("After brodcasting centroids");
printTable(cenTable);
ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
Table<DoubleArray> previousCenTable =  null;
//iterations
for(int iter=0; iter < iteration; iter++){
previousCenTable =  cenTable;
cenTable = new Table<>(0, new DoubleArrPlus());
System.out.println("Iteraton No."+iter);
//compute new partial centroid table using previousCenTable and data points
computation(cenTable, previousCenTable, dataPoints);
//AllReduce;
/*************************************/
allreduce("main", "allreduce_"+iter, cenTable);
//we can calculate new centroids
calculateCentroids(cenTable);
/*************************************/
printTable(cenTable);
}
//output results
if(this.isMaster()){
outputCentroids(cenTable,  conf,   context);
}
}
```

#### Use broadcast and reduce collective communication to do synchronization

The video below is the step by step guide on how this collective communication works for K-means. The data is partitions into K different partitions with K centroids. Data is then broadcasted to all the different partitions. And the centroids for each of the partition is grouped together and sent to the master node.

Once all the local centroids from the partition is collected in the global centroid table, the updated table is transferred to the root node and then broadcasted again. This step keeps repeating itself till the convergence is reached.

```private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
// —————————————————–
//for every partition in the centoid table, we will use the last element to store the number of points
// which are clustered to the particular partitionID
Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
if (this.isMaster()) {
}
printTable(cenTable);
System.out.println("After brodcasting centroids");
printTable(cenTable);
ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
Table<DoubleArray> previousCenTable =  null;
//iterations
for(int iter=0; iter < iteration; iter++){
previousCenTable =  cenTable;
cenTable = new Table<>(0, new DoubleArrPlus());
System.out.println("Iteraton No."+iter);
//compute new partial centroid table using previousCenTable and data points
computation(cenTable, previousCenTable, dataPoints);
/*************************************/
reduce("main", "reduce"+iter, cenTable, this.getMasterID());
if(this.isMaster())
calculateCentroids(cenTable);
/**********************************/
printTable(cenTable);
}
//output results
if(this.isMaster()){
outputCentroids(cenTable,  conf,   context);
}
}
```

#### Use push and pull collective communication to do synchronization

```private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
// —————————————————–
//for every partition in the centoid table, we will use the last element to store the number of points
// which are clustered to the particular partitionID
Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
if (this.isMaster()) {
}
printTable(cenTable);
System.out.println("After brodcasting centroids");
printTable(cenTable);
ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
Table<DoubleArray> globalTable = new Table<DoubleArray>(0,  new DoubleArrPlus());
Table<DoubleArray> previousCenTable =  null;
//iterations
for(int iter=0; iter < iteration; iter++){
// clean contents in the table.
globalTable.release();
previousCenTable =  cenTable;
cenTable = new Table<>(0, new DoubleArrPlus());
System.out.println("Iteraton No."+iter);
//compute new partial centroid table using previousCenTable and data points
computation(cenTable, previousCenTable, dataPoints);
/**********************************/
push("main", "push"+iter, cenTable, globalTable , new Partitioner(this.getNumWorkers()));
//we can calculate new centroids
calculateCentroids(globalTable);
pull("main", "pull"+iter, cenTable, globalTable, true);
/**********************************/
printTable(cenTable);
}
//output results
if(this.isMaster()){
outputCentroids(cenTable,  conf,   context);
}
}
```

### Use Regroup and allgather collective communication to do synchronization

```private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
// —————————————————–
//for every partition in the centoid table, we will use the last element to store the number of points
// which are clustered to the particular partitionID
Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
if (this.isMaster()) {
}
printTable(cenTable);
System.out.println("After brodcasting centroids");
printTable(cenTable);
ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
Table<DoubleArray> previousCenTable =  null;
//iterations
for(int iter=0; iter < iteration; iter++){
previousCenTable =  cenTable;
cenTable = new Table<>(0, new DoubleArrPlus());
System.out.println("Iteraton No."+iter);
//compute new partial centroid table using previousCenTable and data points
computation(cenTable, previousCenTable, dataPoints);
/**********************************/
//regroup and allgather to synchronized centroids
regroup("main", "regroup"+iter, cenTable, null);
//we can calculate new centroids
calculateCentroids(cenTable);
allgather("main", "allgather"+iter, cenTable);
/*************************************/
printTable(cenTable);
}
//output results
if(this.isMaster()){
outputCentroids(cenTable,  conf,   context);
}
}
```

## Step 4 — Compute local centroids

```private void computation(Table<DoubleArray> cenTable, Table<DoubleArray> previousCenTable,ArrayList<DoubleArray> dataPoints){
double err=0;
for(DoubleArray aPoint: dataPoints){
//for each data point, find the nearest centroid
double minDist = -1;
double tempDist = 0;
int nearestPartitionID = -1;
for(Partition ap: previousCenTable.getPartitions()){
DoubleArray aCentroid = (DoubleArray) ap.get();
tempDist = calcEucDistSquare(aPoint, aCentroid, vectorSize);
if(minDist == -1 || tempDist < minDist){
minDist = tempDist;
nearestPartitionID = ap.id();
}
}
err+=minDist;

//for the certain data point, found the nearest centroid.
// add the data to a new cenTable.
double[] partial = new double[vectorSize+1];
for(int j=0; j < vectorSize; j++){
partial[j] = aPoint.get()[j];
}
partial[vectorSize]=1;

if(cenTable.getPartition(nearestPartitionID) == null){
Partition<DoubleArray> tmpAp = new Partition<DoubleArray>(nearestPartitionID, new DoubleArray(partial, 0, vectorSize+1));
}else{
Partition<DoubleArray> apInCenTable = cenTable.getPartition(nearestPartitionID);
for(int i=0; i < vectorSize +1; i++){
apInCenTable.get().get()[i] += partial[i];
}
}
}
System.out.println("Errors: "+err);
}
```

## Step 5 — Calculate new centroids

```private void calculateCentroids( Table<DoubleArray> cenTable){
for( Partition<DoubleArray> partialCenTable: cenTable.getPartitions()){
double[] doubles = partialCenTable.get().get();
for(int h = 0; h < vectorSize; h++){
doubles[h] /= doubles[vectorSize];
}
doubles[vectorSize] = 0;
}
System.out.println("after calculate new centroids");
printTable(cenTable);
}
```

## COMPILE

```cd \$HARP_ROOT_DIR
```
```cd \$HARP_ROOT_DIR/contrib/target
```

## USAGE

Run Harp K-Means:

```hadoop jar contrib-0.1.0.jar edu.iu.kmeans.common.KmeansMapCollective <numOfDataPoints> <num of Centroids> <size of
vector> <number of map tasks> <number of iteration> <workDir> <localDir> <communication operation>

<numOfDataPoints>: the number of data points you want to generate randomly
<num of centriods>: the number of centroids you want to clustering the data to
<size of vector>: the number of dimension of the data
<number of iteration>: the number of iterations to run
<work dir>: the root directory for this running in HDFS
<local dir>: the harp kmeans will firstly generate files which contain data points to local directory. Set this argument to determine the local directory.
<communication operation> includes:
[allreduce]: use allreduce operation to synchronize centroids
[regroup-allgather]: use regroup and allgather operation to synchronize centroids
```hadoop jar contrib-0.1.0.jar edu.iu.kmeans.common.KmeansMapCollective 1000 10 10 2 10 /kmeans /tmp/kmeans allreduce
```hdfs dfs -ls /