June 20, 2011

How to pass job specific parameters in Hadoop

Say there is a parameter your mapper or reducer needs, and it is desirable to get this parameter from the user at the beginning of the job submission. Here is how to use "Configuration" to let the user set the parameter:

public class GenericReplace {

   public static final String IS_KEY_FIRST = "IsKeyFirstInMapFile";

   public static class GenerateLinks extends Mapper {

      public void map(Text key, Text value, Context context)  {
         if (context.getConfiguration().getInt(IS_KEY_FIRST, 1)) {
              //do this .. 
         }
         else{
              //do that .. 
         }
      }
   }

   public static void main(String[] args) throws Exception {

   Configuration conf = new Configuration();
   GenericReplace.graphPath = args[0];
   GenericReplace.outputPath = args[1];
   conf.setBoolean(IS_KEY_FIRST , Boolean.getBoolean(args[3]));
   Job job = Job.getInstance(new Cluster(conf), conf);
   ...
   }
}

Ways to write & read HDFS files

- Output Stream
FSDataOutputStream dos = fs.create(new Path("/user/tmp"), true); 
dos.writeInt(counter); 
dos.close();

- Buffered Writer/Reader
//Writer
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(new Path("/user/tmp"), true)));
bw.write(counter.toString());
bw.close();

//Reader
DataInputStream d = new DataInputStream(fs.open(new Path(inFile)));
BufferedReader reader = new BufferedReader(new InputStreamReader(d));
while ((line = reader.readLine()) != null){
...
}
reader.close();
  

- SequenceFile Reader and Writer (I think most preferable way for Hadoop jobs):
//writer
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(pathForCounters, context.getTaskAttemptID().toString()), Text.class, Text.class);
   writer.append(new Text(firtUrl.toString()+"__"+ context.getTaskAttemptID().getTaskID().toString()), new Text(counter+""));
   writer.close(); 

//reader
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(makeUUrlFileOffsetsPathName(FileInputFormat.getInputPaths(context)[0].toString())),  conf);
   while (reader.next(key, val)){
    offsets.put(key.toString(), Integer.parseInt(val.toString()));
   }

June 14, 2011

How to ensure each key in Reducer has sorted iterator ?

There are three properties control how values are partitioned and sorted for reducer's consumption. As mentioned at riccomini's blog Owen O'Malley explains with a very simple & nice example. By default intermediate pairs are partitioned using the key. To manipulate this behavior, custom Partitioner can be defined.

Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
setOutputKeyComparatorClass defines the sort order of the keys and setOutputValueGroupingComparator defines the groups, which pairs will be grouped together to process once. Order of values at the reducer's iterator can be set using combination of these two.

public static class RemoveIdentifierAndPartition extends Partitioner< Text, Writable > {

  @Override
  public int getPartition(Text key, Writable value, int numReduceTasks) {
   return (removeKeyIdentifier(key.toString()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 }

 public static final class SortReducerByValuesValueGroupingComparator implements RawComparator< Text >  {
     private static Text.Comparator NODE_COMPARATOR = new Text.Comparator();

     @Override
     public int compare(Text e1, Text e2) {
         return e1.compareTo(e2);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

         // skip last 2 bytes ( space 1 / space 2)
         
         int skip = 2;
         int stringsize1 = 0;
         int stringsize2 = 0;

         // compare the byte array of Node first
         return NODE_COMPARATOR.compare(b1, s1 , l1-skip, b2, s2 , l2-skip);
     }
 }

How to set number of Maps with Hadoop

Setting number of map tasks is not simple like the reduce tasks. User can not explicitly give a fixed number, FileInputFormat decides how to split input files using various parameters.

First one is isSplitable, determines whether file is splittable or not.
Next three variables, mapred.min.split.size, mapred.max.split.size, dfs.block.size determine the actual split size used if input is splittable. By default, min split size is 0 and max split size is Long.MAX and block size 64MB. For actual split size; minSplitSize&blockSize set the lower bound and blockSize&maxSplitSize together sets the upper bound. Here is the function to calculate:
max(minsplitsize, min(maxsplitsize, blocksize))
Note: compressed input files (eg. gzip) are not splittable, there are patches * * available.