March 29, 2011

WARNING : There are about 1 missing blocks. Please check the log or run fsck.

hadoop fsck /
hadoop fsck -delete / 
hadoop fsck -move / 
-move option moves under /lost+found
-delete option deleted all corrupted files
For more options: http://developer.yahoo.com/hadoop/tutorial/module2.html

March 26, 2011

Usage of CAP Theorem in Today's Distributed Storage Systems

CAP Theorem (Eric Brewer) states that a Distributed System can provide at most two of three properties -Consistency, Partition Tolerance, and Availability.

Partition Tolerance is a must in real world systems since machines fail all the time. Therefore, a distributed system has to pick either Availability or Consistency as the second property. Consistency means "always return the correct value" - eg:the latest written one. And availability is "always accept requests" - eg: read & write.
Picking one of these does not always mean loosing the other totally. If application favors from availability, eg: shoppping cart, it is better to prioritize availability and resolving consistency issues later (eventually consistent). On the other hand, if application requires consistency, eg: checkout or backend systems which doesn't require instant response to end user, better to prioritize consistency and give up availability.

Here are some examples:
- Amazon's Dynamo, LinkedIn's Project Voldemort and Facebook's Cassandra provide high availability, but eventual consistency.
- On the other side, Google's Bigtable provides strong consistency and gives up high availability. HyperTable and HBAse are using BigTable approach.

Non-Relational Databases

Non-Relational DBs

from Data Design point of view; broadly there are two design approaches,
- Google's Bigtable like designs which are column based eg: Hypertable and HBAse
- A simpler Key/Value storage using distributed hash tables (DHTs) eg: Project Voldemort, MongoDB, CouchDB

common specs of both designs

- not a prefixed schema, only domains are set (broadly, these are like RDMS tables)
- entries in domains have a key, and keys have set of attributes
- no prefixed rules defined for these attributes & no explicit definition of domains
- scalable - new nodes can be added and removed easily, able to handle heavy workloads

Bigtable paper by Google summarized here.

March 25, 2011

hg - detecting renamed files

hg addremove directory/file --similarity 90
If only want to detect files with no change, then 100 should be used. So the number represents the percentage.

March 24, 2011

Hadoop - out of disk space

Facing weird errors like
Exception running child : org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/spill0.out

means simply some of the nodes run out of memory. To check hdfs status and available storage in each node: http://master-urls:50070/dfsnodelist.jsp?whatNodes=LIVE

March 09, 2011

Grep for zipped file

zgrep filename

Hadoop - MapReduce without reducer to avoid sorting

MR job can be defined with no reducer. In this case, all the mappers write their outputs under specified job output directory. So; there will be no sorting and no partitioning.
Just set the number of reduces to 0.
job.setNumReduceTasks(0);

March 07, 2011

many small input files

If MR input directory consists of many small files (couple MBs), then there will be a seperate map task for each and probably these map tasks will last only for 1-2 secs. it kills the performance ! so much scheduling and/or initialization overhead .. 

As advised here, it is better to combine input files - so there will be less number of map tasks with larger piece of data to process in each one. 

Here is how to combine input files. 
(1) Pick a regular record reader class, like LineRecorReader. 
(2) Define your own record record reader class for multi file inputs using an instance of (1)
(3) Define your own input format class which extends CombineFileInputFormat and returns (2)

The trick is regular recordReader uses fileSplit instance, however record reader to be used with combineFileInputFormat should you be using CombineFileSplit. Here is the code: 

Name of the regular record reader class I use in this example is RawWebGraphRecordReader. Its basic idea is similar to LineRecordReader. 
Below is the code for multi file record reader -MultiFileRawWebGraphRecordReader and input format -RawGraphInputFormat
public class MultiFileRawWebGraphRecordReader extends 
                                       RecordReader < Text, Text > {
 private static final Log LOG = LogFactory.getLog(MultiFileRawWebGraphRecordReader.class);

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< Text, Text > rr;

 public MultiFileRawWebGraphRecordReader(CombineFileSplit split,
                                         TaskAttemptContext context, 
                                         Integer index) throws IOException {
  this.split = split;
  this.context = context;
  this.index = index;
  rr = new RawWebGraphRecordReader();
 }
 
 
 public void initialize(InputSplit genericSplit, TaskAttemptContext context)
 throws IOException, InterruptedException {

  this.split = (CombineFileSplit) genericSplit;
  this.context = context;

  if (null == rr) {
   rr = new RawWebGraphRecordReader();
  }

  FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                      this.split.getOffset(index), 
                                      this.split.getLength(index), 
                                      this.split.getLocations());
  this.rr.initialize(fileSplit, this.context);
 }
 
 public boolean nextKeyValue() throws IOException, InterruptedException {
  return rr.nextKeyValue();
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  return rr.getCurrentKey();
 }

 @Override
 public Text getCurrentValue() throws IOException, InterruptedException {
  return rr.getCurrentValue();
 }

 /**
  * Get the progress within the split
  * @throws InterruptedException 
  * @throws IOException 
  */
 @Override
 public float getProgress() throws IOException, InterruptedException {
  return rr.getProgress();
 }

 public synchronized void close() throws IOException {
  if (rr != null) {
   rr.close();
   rr = null;
  }
 }

public static class RawGraphInputFormat extends 
                                 CombineFileInputFormat< Text, Text > {

  @Override
  public RecordReader< Text, Text > 
                     createRecordReader(InputSplit split, 
                                        TaskAttemptContext context)throws IOException {
   return new CombineFileRecordReader< Text, Text >( 
                                 (CombineFileSplit) split, 
                                  context, 
                                  MultiFileRawWebGraphRecordReader.class);
  }
  
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
   CompressionCodec codec = new CompressionCodecFactory(    
                                context.getConfiguration()).getCodec(file);
   return codec == null;
  }

 }

Hadoop - Java Heap Space Error

"Error: Java Heap space" means I'm trying to allocate more memory then available in the system.
how to go around? (1) better configuration (2) look for unnecessarily allocated objects
Configuration

mapred.map.child.java.opts : heap size for map tasks
mapred.reduce.child.java.opts: heap size for reduce tasks

mapred.tasktracker.map.tasks.maximum: max map tasks can run simultaneously per node
mapred.tasktracker.reduce.tasks.maximum: max reduce tasks can run simultaneously per node

Make sure ((num_of_maps * map_heap_size) + (num_of_reducers * reduce_heap_size)) is not larger than memory available in the system. Max number of mappers & reducers can also be tuned looking at available system resources.

io.sort.factor: max # of streams to merge at once for sorting. Used both in map and reduce.

io.sort.mb: map side memory buffer size used while sorting
mapred.job.shuffle.input.buffer.percent: Reduce side buffer related - The percentage of memory to be allocated from the maximum heap size for storing map outputs during the shuffle

NOTE: Using fs.inmemory.size.mb is very bad idea!
Unnecessary memory allocation

Simply look for new keyword and make sure there is no unnecessary allocation. A very common tip is using set() method of Writable objects rather than re-allocating a new object at every map or reduce.
Here is a simple count example to show the trick:

public static class UrlReducer extends Reducer{
  IntWritable sumw = new IntWritable();
  int sum;

  public void reduce(Text key,Iterable<IntW> vals,Context context){
    sum=0;
    for (IntWritable val : vals) {
      sum += val.get();
    }
    sumw.set(sum);
    context.write(key, sumw);
  }
}

note: There are couple more tips here for resolving common errors in Hadoop.

March 01, 2011

hg - untracking without deleting

-A is shortcut for addremove  -->  adds all files under directory / deletes already deleted ones
f is shortcut for force, rm -f     --> forces to delete
-Af surprisingly  becomes untracking files without deleting from local repository.
forget is an alias for -Af
hg rm -Af
hg forget