K-Means is a powerful and easily understood clustering algorithm. The aim of the algorithm is to divide a given set of points into K partitions. K needs to be specified by the user. In order to understand K-Means, first you need to understand the proceeding concepts and their meanings.

  • Centroids: Centroids can be defined as the center of each cluster. If we are performing clustering with k=3, we will have 3 centroids. To perform K-Means clustering, the users needs to provide an initial set of centroids.

  • Distance: In order to group data points as close together or as far-apart, we need to define a distance between two given data points. In K-Means, clustering distance is normally calculated as the Euclidean Distance between two data points.

The K-Means algorithm simply repeats the following set of steps until there is no change in the partition assignments. In that, it has clarified which data point is assigned to which partition.

  1. Choose K points as the initial set of centroids.

  2. Assign each data point in the data set to the closest centroid (this is done by calculating the distance between the data point and each centroid).

  3. Calculate the new centroids based on the clusters that were generated in step 2. Normally this is done by calculating the mean of each cluster.

  4. Repeat step 2 and 3 until data points do not change cluster assignments, which means that their centroids are set.

PARALLEL DESIGN

  • What are the model? What kind of data structure?

    Centroids of the clusters are model in vanilla kmeans. It has a vector structure as a double array.

  • What are the characteristics of the data dependency in model update computation, can updates run concurrently?

    In the core model update computation, each data point should access all the model, compute distance with each centroid, find the nearest one and partially update model. The true update only occurs when all data points finish their search.

  • which kind of parallelism scheme is suitable, data parallelism or model parallelism?

    Data parallelism can be used, i.e., calculating different data points in parallel.

    For model parallelism, there are two different solutions.

    1). Without model parallelism, each node get one replica of the whole model, which updates locally in parallel, and then synchronizes when local computation all finish

    2). With model parallelism, each node get one partition of the model, which updates in parallel, and then rotates to the neighbor node when local computation for all local data points finish. Repeat until each partition returns back to the original node, then do the final model update.

  • which collective communication operations is suitable to synchronize model?

    For solution without model parallelism, Synchronize replicas of the model by allreduce, then calculate the new centroids and go to the next iteration. For vanilla kmeans, the combination of reduce/broadcast, push/pull, regroup/allgather are similar to allreduce.

    For solution with model parallelism, there is no replica exists, but the movement of the model partitions are a kind of synchronized collective operation, supported in Harp by an abstraction of rotate.

DATAFLOW

dataflow

Step 1 — The Main Method

The tasks of the main class is to configure and run the job iteratively.

generate N data points (D dimensions), write to HDFS
generate M centroids, write to HDFS
for iterations{
    configure a job
    launch the job
}

Step 2 — The mapCollective function

This is the definition of map-collective task. It reads data from context and then call runKmeans function to actually run kmeans Mapper task.

protected void mapCollective( KeyValReader reader, Context context) throws IOException, InterruptedException {
    LOG.info("Start collective mapper.");
    long startTime = System.currentTimeMillis();
    List<String> pointFiles = new ArrayList<String>();
    while (reader.nextKeyValue()) {
	   	String key = reader.getCurrentKey();
	   	String value = reader.getCurrentValue();
    	LOG.info("Key: " + key + ", Value: " + value);
	    pointFiles.add(value);
	}
	Configuration conf = context.getConfiguration();
	runKmeans(pointFiles, conf, context);
    LOG.info("Total iterations in master view: " + (System.currentTimeMillis() - startTime));
}

Step 3 — The runKmeans function

Harp provides several collective communication operations. Here are some examples provided to show how to apply these collective communication methods to K-Means.

Use AllReduce collective communication to do synchronization

private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
      // —————————————————–
      // Load centroids
      //for every partition in the centoid table, we will use the last element to store the number of points 
      // which are clustered to the particular partitionID
      Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
      if (this.isMaster()) {
            loadCentroids(cenTable, vectorSize, conf.get(KMeansConstants.CFILE), conf);
      }
      System.out.println("After loading centroids");
      printTable(cenTable);
      //broadcast centroids
      broadcastCentroids(cenTable);
      //after broadcasting
      System.out.println("After brodcasting centroids");
      printTable(cenTable);
      //load data 
      ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
      Table<DoubleArray> previousCenTable =  null;
      //iterations
      for(int iter=0; iter < iteration; iter++){
            previousCenTable =  cenTable;
            cenTable = new Table<>(0, new DoubleArrPlus());
            System.out.println("Iteraton No."+iter);
            //compute new partial centroid table using previousCenTable and data points
            computation(cenTable, previousCenTable, dataPoints);
            //AllReduce; 
            /*************************************/
            allreduce("main", "allreduce_"+iter, cenTable);
            //we can calculate new centroids
            calculateCentroids(cenTable);
            /*************************************/
            printTable(cenTable);
        }
        //output results
     if(this.isMaster()){
        outputCentroids(cenTable,  conf,   context);
      }
}
 

Use broadcast and reduce collective communication to do synchronization

The video below is the step by step guide on how this collective communication works for K-means. The data is partitions into K different partitions with K centroids. Data is then broadcasted to all the different partitions. And the centroids for each of the partition is grouped together and sent to the master node.

Once all the local centroids from the partition is collected in the global centroid table, the updated table is transferred to the root node and then broadcasted again. This step keeps repeating itself till the convergence is reached.

private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
     // —————————————————–
     // Load centroids
     //for every partition in the centoid table, we will use the last element to store the number of points 
     // which are clustered to the particular partitionID
     Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
     if (this.isMaster()) {
        loadCentroids(cenTable, vectorSize, conf.get(KMeansConstants.CFILE), conf);
     }
     System.out.println("After loading centroids");
     printTable(cenTable);
     //broadcast centroids
     broadcastCentroids(cenTable);
     //after broadcasting
     System.out.println("After brodcasting centroids");
     printTable(cenTable);
     //load data 
     ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
     Table<DoubleArray> previousCenTable =  null;
     //iterations
     for(int iter=0; iter < iteration; iter++){
        previousCenTable =  cenTable;
        cenTable = new Table<>(0, new DoubleArrPlus());
        System.out.println("Iteraton No."+iter);
        //compute new partial centroid table using previousCenTable and data points
        computation(cenTable, previousCenTable, dataPoints);
        /*************************************/
        reduce("main", "reduce"+iter, cenTable, this.getMasterID());
        if(this.isMaster())
            calculateCentroids(cenTable);
        broadcast("main", "bcast"+iter, cenTable, this.getMasterID(), false);
        /**********************************/
        printTable(cenTable);
    }
        //output results
    if(this.isMaster()){
        outputCentroids(cenTable,  conf,   context);
    }
}
     

Use push and pull collective communication to do synchronization

private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
          // —————————————————–
          // Load centroids
          //for every partition in the centoid table, we will use the last element to store the number of points 
          // which are clustered to the particular partitionID
          Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
          if (this.isMaster()) {
              loadCentroids(cenTable, vectorSize, conf.get(KMeansConstants.CFILE), conf);
          }
          System.out.println("After loading centroids");
          printTable(cenTable);
          //broadcast centroids
          broadcastCentroids(cenTable);
          //after broadcasting
          System.out.println("After brodcasting centroids");
          printTable(cenTable);
          //load data 
          ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
          Table<DoubleArray> globalTable = new Table<DoubleArray>(0,  new DoubleArrPlus());
          Table<DoubleArray> previousCenTable =  null;
          //iterations
          for(int iter=0; iter < iteration; iter++){
              // clean contents in the table.
              globalTable.release();
              previousCenTable =  cenTable;
              cenTable = new Table<>(0, new DoubleArrPlus());
              System.out.println("Iteraton No."+iter);
              //compute new partial centroid table using previousCenTable and data points
              computation(cenTable, previousCenTable, dataPoints);
              /**********************************/
              push("main", "push"+iter, cenTable, globalTable , new Partitioner(this.getNumWorkers()));
              //we can calculate new centroids
              calculateCentroids(globalTable);
              pull("main", "pull"+iter, cenTable, globalTable, true);
              /**********************************/
              printTable(cenTable);
          }
          //output results
          if(this.isMaster()){
              outputCentroids(cenTable,  conf,   context);
          }
     }

Use Regroup and allgather collective communication to do synchronization

private void runKmeans(List<String> fileNames, Configuration conf, Context context) throws IOException {
          // —————————————————–
          // Load centroids
          //for every partition in the centoid table, we will use the last element to store the number of points 
          // which are clustered to the particular partitionID
          Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus());
          if (this.isMaster()) {
              loadCentroids(cenTable, vectorSize, conf.get(KMeansConstants.CFILE), conf);
          }
          System.out.println("After loading centroids");
          printTable(cenTable);
          //broadcast centroids
          broadcastCentroids(cenTable);
          //after broadcasting
          System.out.println("After brodcasting centroids");
          printTable(cenTable);
          //load data 
          ArrayList<DoubleArray> dataPoints = loadData(fileNames, vectorSize, conf);
          Table<DoubleArray> previousCenTable =  null;
          //iterations
          for(int iter=0; iter < iteration; iter++){
              previousCenTable =  cenTable;
              cenTable = new Table<>(0, new DoubleArrPlus());
              System.out.println("Iteraton No."+iter);
              //compute new partial centroid table using previousCenTable and data points
              computation(cenTable, previousCenTable, dataPoints);
              /**********************************/
              //regroup and allgather to synchronized centroids
              regroup("main", "regroup"+iter, cenTable, null);
              //we can calculate new centroids
              calculateCentroids(cenTable);
              allgather("main", "allgather"+iter, cenTable);
              /*************************************/
              printTable(cenTable);
          }
          //output results
          if(this.isMaster()){
              outputCentroids(cenTable,  conf,   context);
          }
     }

Step 4 — Compute local centroids

private void computation(Table<DoubleArray> cenTable, Table<DoubleArray> previousCenTable,ArrayList<DoubleArray> dataPoints){
    double err=0;
    for(DoubleArray aPoint: dataPoints){
    //for each data point, find the nearest centroid
        double minDist = -1;
        double tempDist = 0;
        int nearestPartitionID = -1;
        for(Partition ap: previousCenTable.getPartitions()){
            DoubleArray aCentroid = (DoubleArray) ap.get();
            tempDist = calcEucDistSquare(aPoint, aCentroid, vectorSize);
            if(minDist == -1 || tempDist < minDist){
                minDist = tempDist;
                nearestPartitionID = ap.id();
            }
        }
        err+=minDist;

        //for the certain data point, found the nearest centroid.
        // add the data to a new cenTable.
        double[] partial = new double[vectorSize+1];
        for(int j=0; j < vectorSize; j++){
            partial[j] = aPoint.get()[j];
        }
        partial[vectorSize]=1;

        if(cenTable.getPartition(nearestPartitionID) == null){
            Partition<DoubleArray> tmpAp = new Partition<DoubleArray>(nearestPartitionID, new DoubleArray(partial, 0, vectorSize+1));
            cenTable.addPartition(tmpAp);
        }else{
             Partition<DoubleArray> apInCenTable = cenTable.getPartition(nearestPartitionID);
             for(int i=0; i < vectorSize +1; i++){
             apInCenTable.get().get()[i] += partial[i];
             }
        }
    }
    System.out.println("Errors: "+err);
}

Step 5 — Calculate new centroids

private void calculateCentroids( Table<DoubleArray> cenTable){
    for( Partition<DoubleArray> partialCenTable: cenTable.getPartitions()){
        double[] doubles = partialCenTable.get().get();
        for(int h = 0; h < vectorSize; h++){
            doubles[h] /= doubles[vectorSize];
        }
        doubles[vectorSize] = 0;
	}
	System.out.println("after calculate new centroids");
    printTable(cenTable);
}

COMPILE

cd $HARP_ROOT_DIR
mvn clean package
cd $HARP_ROOT_DIR/harp-tutorial-app
cp target/harp-tutorial-app-1.0-SNAPSHOT.jar $HADOOP_HOME
cd $HADOOP_HOME

USAGE

Run Harp K-Means:

hadoop jar harp-tutorial-app-1.0-SNAPSHOT.jar edu.iu.kmeans.common.KmeansMapCollective <numOfDataPoints> <num of Centroids> <size of vector> <number of map tasks> <number of iteration> <workDir> <localDir> <communication operation>

   <numOfDataPoints>: the number of data points you want to generate randomly
   <num of centriods>: the number of centroids you want to clustering the data to
   <size of vector>: the number of dimension of the data
   <number of map tasks>: number of map tasks
   <number of iteration>: the number of iterations to run
   <work dir>: the root directory for this running in HDFS
   <local dir>: the harp kmeans will firstly generate files which contain data points to local directory. Set this argument to determine the local directory.
   <communication operation> includes:
		[allreduce]: use allreduce operation to synchronize centroids
		[regroup-allgather]: use regroup and allgather operation to synchronize centroids
		[broadcast-reduce]: use broadcast and reduce operation to synchronize centroids
		[push-pull]: use push and pull operation to synchronize centroids

For example:

hadoop jar harp-tutorial-app-1.0-SNAPSHOT.jar edu.iu.kmeans.common.KmeansMapCollective 1000 10 10 2 10 /kmeans /tmp/kmeans allreduce

Fetch the results:

hdfs dfs -ls /
hdfs dfs -cat /kmeans/centroids/*