We take Association Rule algorithm as an example to illustrate the programming steps for batch mode algorithms in Harp-DAAL. The codebase consists of three Java files

A Harp-DAAL Application Launcher Class

A ARDaalLauncher.java file is the entrance of Harp-DAAL execution, which loads command line arguments, setting Harp environments and launch the application.

public class ARDaalLauncher extends Configured
  implements Tool {

   public static void main(String[] argv)
    throws Exception {
    int res =
      ToolRunner.run(new Configuration(),
        new ARDaalLauncher(), argv);

  public int run(String[] args) throws Exception {

      // create the Initialize Module
      Configuration conf = this.getConf();
      Initialize init = new Initialize(conf, args);

      // load native kernels to distributed cache

      // load system-wide args

      //load application-wide args
      conf.setInt(HarpDAALConstants.FILE_DIM, Integer.parseInt(args[init.getSysArgNum()]));
      conf.setDouble(Constants.MIN_SUPPORT, Double.parseDouble(args[init.getSysArgNum()+1]));
      conf.setDouble(Constants.MIN_CONFIDENCE, Double.parseDouble(args[init.getSysArgNum()+2]));

      // launch job, specifiy the launcher class name and mapper class name
      Job arbatchJob = init.createJob("arbatchJob", ARDaalLauncher.class, ARDaalCollectiveMapper.class);

      // finish job
      boolean jobSuccess = arbatchJob.waitForCompletion(true);
      if (!jobSuccess) {
              System.out.println("ArBatchJob Job failed");

    return 0;

Application-Wide Configuration Constants (Optional)

A Constants.java file is to store application-wide command line arguments, which would be used by the mapper class

public class Constants {

  public static final String MIN_SUPPORT =
  public static final String MIN_CONFIDENCE =

A Mapper class (Main Body)

A ARDaalCollectiveMapper.java file is the main body to implement the algorithm itself for each Harp Mapper process.

public class ARDaalCollectiveMapper
    CollectiveMapper<String, String, Object, Object> {
        //cmd args
        private int num_mappers;
        private int numThreads;
        private int harpThreads;
        private int fileDim;
        private double minSupport;      
        private double minConfidence;   
        private List<String> inputFiles;
        private Configuration conf;
        private static HarpDAALDataSource datasource;
        private static DaalContext daal_Context = new DaalContext();
        private NumericTable input;

        protected void setup(Context context)
        throws IOException, InterruptedException 
            this.conf =context.getConfiguration();
            this.num_mappers = this.conf.getInt(HarpDAALConstants.NUM_MAPPERS, 10);
            this.numThreads = this.conf.getInt(HarpDAALConstants.NUM_THREADS, 10);
            this.harpThreads = Runtime.getRuntime().availableProcessors();
            this.fileDim = this.conf.getInt(HarpDAALConstants.FILE_DIM, 10);
            this.minSupport = this.conf.getDouble(Constants.MIN_SUPPORT, 0.001);
            this.minConfidence = this.conf.getDouble(Constants.MIN_CONFIDENCE, 0.7);

            //set thread number used in DAAL
            LOG.info("The default value of thread numbers in DAAL: " + Environment.getNumberOfThreads());
            LOG.info("The current value of thread numbers in DAAL: " + Environment.getNumberOfThreads());

        protected void mapCollective(KeyValReader reader, Context context) throws IOException, InterruptedException 
            // read data file names from HDFS
            this.inputFiles = new LinkedList<String>();
            while (reader.nextKeyValue()) {
                String key = reader.getCurrentKey();
                String value = reader.getCurrentValue();
                LOG.info("Key: " + key + ", Value: "
                        + value);
                LOG.info("file name: " + value);

            // set up data source module
            this.datasource = new HarpDAALDataSource(harpThreads, conf);

            // ----------------------- start the execution -----------------------

        private void runAR(Context context) throws IOException
                //  load data 
                this.input = this.datasource.createDenseNumericTableInput(this.inputFiles, this.fileDim, ",",  daal_Context);
                // Create an algorithm to mine association rules using the Apriori method 
                Batch alg = new Batch(daal_Context, Double.class, Method.apriori);
                // Set an input object for the algorithm 
                alg.input.set(InputId.data, input);
                // Set Apriori algorithm parameters 
                // Find large item sets and construct association rules 
                Result res = alg.compute();
                HomogenNumericTable largeItemsets = (HomogenNumericTable) res.get(ResultId.largeItemsets);
                HomogenNumericTable largeItemsetsSupport = (HomogenNumericTable) res.get(ResultId.largeItemsetsSupport);
                // Print the large item sets 
                Service.printAprioriItemsets(largeItemsets, largeItemsetsSupport);
                HomogenNumericTable antecedentItemsets = (HomogenNumericTable) res.get(ResultId.antecedentItemsets);
                HomogenNumericTable consequentItemsets = (HomogenNumericTable) res.get(ResultId.consequentItemsets);
                HomogenNumericTable confidence = (HomogenNumericTable) res.get(ResultId.confidence);
                // Print the association rules 
                Service.printAprioriRules(antecedentItemsets, consequentItemsets, confidence);