We take Covariance algorithm as an example to illustrate the programming steps for distributed mode algorithms in Harp-DAAL. The codebase consists of three Java files:

A Harp-DAAL Application Launcher Class

Similarly to batch mode algorithm, a COVDaalLauncher.java file is the entrance of Harp-DAAL execution, which loads command line arguments, setting Harp environments and launch the application.

public class COVDaalLauncher extends Configured implements Tool 
{
  public static void main(String[] argv)
  throws Exception {
    int res =
    ToolRunner.run(new Configuration(),
      new COVDaalLauncher(), argv);
    System.exit(res);
  }
  
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = this.getConf();
    Initialize init = new Initialize(conf, args);
    init.loadDistributedLibs();
    // load args
    init.loadSysArgs();
    // create the job
    Job covJob = init.createJob("CovJob", COVDaalLauncher.class, COVDaalCollectiveMapper.class);
    // finish job
    boolean jobSuccess = covJob.waitForCompletion(true);

    if (!jobSuccess) {
      covJob.killJob();
      System.out.println(
        "COV Job failed");
    }
    return 0;
  }
}

A Mapper class (Main Body)

A COVDaalCollectiveMapper.java file is the main body to implement the algorithm itself for each Harp Mapper process.

public class COVDaalCollectiveMapper
extends CollectiveMapper<String, String, Object, Object>
{
  
  private int num_mappers;
  private int numThreads;
  private int harpThreads;
  private List<String> inputFiles;
  private Configuration conf;

  private static HarpDAALDataSource datasource;
  private static HarpDAALComm harpcomm;
  private static DaalContext daal_Context = new DaalContext();

  private PartialResult partialResult;
  private SerializableBase[] partialResult_comm;
  private Result result;

  @Override
  protected void setup(Context context)
  throws IOException, InterruptedException 
  {
    long startTime = System.currentTimeMillis();
    this.conf = context.getConfiguration();
    num_mappers = this.conf.getInt(HarpDAALConstants.NUM_MAPPERS, 10);
    numThreads = this.conf.getInt(HarpDAALConstants.NUM_THREADS, 10);
    //always use the maximum hardware threads to load in data and convert data
    harpThreads = Runtime.getRuntime().availableProcessors();
    //set thread number used in DAAL
    Environment.setNumberOfThreads(numThreads);
  }

  protected void mapCollective(KeyValReader reader, Context context) throws IOException, InterruptedException 
  {
      long startTime = System.currentTimeMillis();
      this.inputFiles = new LinkedList<String>();
      //splitting files between mapper
      while (reader.nextKeyValue()) {
        String key = reader.getCurrentKey();
        String value = reader.getCurrentValue();
        LOG.info("Key: " + key + ", Value: "
          + value);
        System.out.println("file name : " + value);
        this.inputFiles.add(value);
      }

      //init data source
      this.datasource = new HarpDAALDataSource(harpThreads, conf);
      // create communicator
      this.harpcomm= new HarpDAALComm(this.getSelfID(), this.getMasterID(), this.num_mappers, daal_Context, this);
      // run the application codes
      runCOV(context);
      this.freeMemory();
      this.freeConn();
      System.gc();
    }

    private void runCOV(Context context) throws IOException 
    {
        //read in csr files 
        NumericTable featureArray_daal = this.datasource.loadCSRNumericTable(this.inputFiles, ",", daal_Context);
        // compute on local nodes
        computeOnLocalNode(featureArray_daal);
        // compute on master node
        if(this.isMaster()){
            computeOnMasterNode();
            HomogenNumericTable covariance = (HomogenNumericTable) result.get(ResultId.covariance);
            HomogenNumericTable mean = (HomogenNumericTable) result.get(ResultId.mean);
            Service.printNumericTable("Covariance matrix:", covariance);
            Service.printNumericTable("Mean vector:", mean);
        }
        daal_Context.dispose();
    }

    private void computeOnLocalNode(NumericTable featureArray_daal) throws java.io.IOException 
    {
        DistributedStep1Local algorithm = new DistributedStep1Local(daal_Context, Double.class, Method.fastCSR);
        // Set input objects for the algorithm 
        algorithm.input.set(InputId.data, featureArray_daal);
        // Compute partial estimates on nodes 
        partialResult = algorithm.compute();
        // gather the partial result
        this.partialResult_comm = this.harpcomm.harpdaal_gather(partialResult, "Covariance", "local-reduce");
    }

    private void computeOnMasterNode()
    {
        // create algorithm instance at master node
        DistributedStep2Master algorithm = new DistributedStep2Master(daal_Context, Double.class, Method.fastCSR);
        // add input data
        for(int j=0;j<this.num_mappers; j++)
          algorithm.input.add(DistributedStep2MasterInputId.partialResults, (PartialResult)(partialResult_comm[j]));
        // compute 
        algorithm.compute();
        result = algorithm.finalizeCompute();
    }
}