November 28, 2011

How to get android app store apps on kindle fire

It was not happy to realize that kindle fire does not allow access to android market place. Probably this is a well thought out marketing strategy of Amazon, but if you're tired of keep being redirected to "can not find app" message and don't want to root your kindle, keep reading :)

The default kindle fire browser (Silk) redirects all android market links to amazon app store. After googling a little bit, idea of getting another browser and using that to access android market seemed like to be a smart way. I tried accessing via dolphin - but failure, still redirect to amazon app store (btw - give a try to dolphin if you haven't done yet, it is a pretty cool browser.). So I'm left with the last option, sideloading apps through third parties such as freewarelovers.com and getjar.com. Just search for the apps online, download the apk file, and click on it to install. Before installing, make sure "allow applications from unknown sources" option is enabled through Settings->Device.

November 08, 2011

Scalable Manipulation of Archival Web Graphs

I was working on archival web graphs till last June. Actually, most of the Hadoop related posts on this blog are things I learned while working on this project.

We looked at the problem of processing large-scale archival web graphs and generating a simple representation for the raw input graph data. This representation should allow the end user to be able to analyze & query the graph efficiently. Also, the representation should be flexible enough - so it can be loaded into a database or can be processed using distributed computing frameworks, ie. Hadoop.
To achieve these goals, we developed a workflow for archival graph processing within Hadoop. This project is still going on and its current status has appeared at LSDS-IR '11 workshop at CIKM conference last month. I like to share the paper [acm] [pdf]  for those who are interested in further details. The abstract is following:
In this paper, we study efficient ways to construct, represent and analyze large-scale archival web graphs. We first discuss details of the distributed graph construction algorithm implemented in MapReduce and the design of a space-efficient layered graph representation. While designing this representation, we consider both offline and online algorithms for the graph analysis. The offline algorithms, such as PageRank, can use MapReduce and similar large-scale, distributed frameworks for computation. On the other side, online algorithms can be implemented by tapping into a scalable repository (similar to DEC’s Connectivity Server or Scalable Hyperlink Store by Najork), in order to perform the computations. Moreover, we also consider updating the graph representation with the most recent information available and propose an efficient way to perform updates using MapReduce. We survey various storage options and outline essential API calls for the archival web graph specific real-time access repository. Finally, we conclude with a discussion of ideas for interesting archival web graph analysis that can lead us to discover novel patterns for designing state-of-art compression techniques.
Also, the source code for "graph construction algorithm" is open sourced at GitHub.

October 09, 2011

Setting up EC2 Command Line Tools (API Tools)

Download the latest version of EC2 API from AWS website and unzip. All APIs are under the bin. Setting up the $EC_HOME It's better to add this directory to your $PATH. Here is how:
export EC2_HOME=/Users/yasemin/My-Apps/ec2-api-tools
export PATH=$PATH:$EC2_HOME/bin
Next is setting up the credentials for the aws account. Create & download a private key and the corresponding certificate from the account's "security credentials" page and link these files to EC2 CLI. Here is how:
export EC2_PRIVATE_KEY=/Users/yasemin/My-Apps/ec2-api-tools/credentilas/pk-HKZYKTAIG2ECMXYIBH3HXV4ZBZQ55CLO.pem 
export EC2_CERT=/Users/yasemin/My-Apps/ec2-api-tools/credentilas/cert-HKZYKTAIG2ECMXYIBH3HXV4ZBZQ55CLO.pem  
This is all - you should be good to go!
Notes:
- If the export commands are appended to ~/.bashrc file, then they will be executed automatically with every new bash session - which is nice to have.
- For detailed setup instruction, please see the official docs. - Make sure $JAVA_HOME is also set. Use '$ which java' to figure out the current path.

September 08, 2011

S3 Access

Two ways to access your S3 buckets: jets3t and s3cmd. Jet3set provides UI and s3cmd is only UI.
Here is how to get a file using s3cmd: First configure the account:
s3cmd --configure
Then, you can use s3cmd to access your buckets. eg: downloading a folder from s3:
s3cmd get --recursive s3://bucket_name/object_name to_local_file

Mysql command line basics

show databases;
use db_name; 
show tables; 
describle table_name;
These will get you enogh info to run your real SQL command..
The other way is just to use the UI app eg. MySQL Query Browser

September 02, 2011

bash alias and colorful ls

Everything below should go under ~/.bashrc
alias la='ls -la'
alias gopen='gnome-open'
alias gterminal='gnome-terminal'
alias ls='ls --color'

PS1="\`if [ \$? = 0 ]; then echo \e[33\;40m\\\^\\\_\\\^\e[0m; else echo
\e[36\;40m\\\-\e[0m\\\_\e[36\;40m\\\-\e[0m; fi\` \w> "
NOTE: Make sure alias comes after PS1. PS1 is for cutom prompt.

June 20, 2011

How to pass job specific parameters in Hadoop

Say there is a parameter your mapper or reducer needs, and it is desirable to get this parameter from the user at the beginning of the job submission. Here is how to use "Configuration" to let the user set the parameter:

public class GenericReplace {

   public static final String IS_KEY_FIRST = "IsKeyFirstInMapFile";

   public static class GenerateLinks extends Mapper {

      public void map(Text key, Text value, Context context)  {
         if (context.getConfiguration().getInt(IS_KEY_FIRST, 1)) {
              //do this .. 
         }
         else{
              //do that .. 
         }
      }
   }

   public static void main(String[] args) throws Exception {

   Configuration conf = new Configuration();
   GenericReplace.graphPath = args[0];
   GenericReplace.outputPath = args[1];
   conf.setBoolean(IS_KEY_FIRST , Boolean.getBoolean(args[3]));
   Job job = Job.getInstance(new Cluster(conf), conf);
   ...
   }
}

Ways to write & read HDFS files

- Output Stream
FSDataOutputStream dos = fs.create(new Path("/user/tmp"), true); 
dos.writeInt(counter); 
dos.close();

- Buffered Writer/Reader
//Writer
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(new Path("/user/tmp"), true)));
bw.write(counter.toString());
bw.close();

//Reader
DataInputStream d = new DataInputStream(fs.open(new Path(inFile)));
BufferedReader reader = new BufferedReader(new InputStreamReader(d));
while ((line = reader.readLine()) != null){
...
}
reader.close();
  

- SequenceFile Reader and Writer (I think most preferable way for Hadoop jobs):
//writer
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(pathForCounters, context.getTaskAttemptID().toString()), Text.class, Text.class);
   writer.append(new Text(firtUrl.toString()+"__"+ context.getTaskAttemptID().getTaskID().toString()), new Text(counter+""));
   writer.close(); 

//reader
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(makeUUrlFileOffsetsPathName(FileInputFormat.getInputPaths(context)[0].toString())),  conf);
   while (reader.next(key, val)){
    offsets.put(key.toString(), Integer.parseInt(val.toString()));
   }

June 14, 2011

How to ensure each key in Reducer has sorted iterator ?

There are three properties control how values are partitioned and sorted for reducer's consumption. As mentioned at riccomini's blog Owen O'Malley explains with a very simple & nice example. By default intermediate pairs are partitioned using the key. To manipulate this behavior, custom Partitioner can be defined.

Once we ensure pairs belonging to same partition are sent to the same reducer, now there are two functions take care of their ordering and grouping of keys in each partition/reducer.
setOutputKeyComparatorClass defines the sort order of the keys and setOutputValueGroupingComparator defines the groups, which pairs will be grouped together to process once. Order of values at the reducer's iterator can be set using combination of these two.

public static class RemoveIdentifierAndPartition extends Partitioner< Text, Writable > {

  @Override
  public int getPartition(Text key, Writable value, int numReduceTasks) {
   return (removeKeyIdentifier(key.toString()).hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 }

 public static final class SortReducerByValuesValueGroupingComparator implements RawComparator< Text >  {
     private static Text.Comparator NODE_COMPARATOR = new Text.Comparator();

     @Override
     public int compare(Text e1, Text e2) {
         return e1.compareTo(e2);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

         // skip last 2 bytes ( space 1 / space 2)
         
         int skip = 2;
         int stringsize1 = 0;
         int stringsize2 = 0;

         // compare the byte array of Node first
         return NODE_COMPARATOR.compare(b1, s1 , l1-skip, b2, s2 , l2-skip);
     }
 }

How to set number of Maps with Hadoop

Setting number of map tasks is not simple like the reduce tasks. User can not explicitly give a fixed number, FileInputFormat decides how to split input files using various parameters.

First one is isSplitable, determines whether file is splittable or not.
Next three variables, mapred.min.split.size, mapred.max.split.size, dfs.block.size determine the actual split size used if input is splittable. By default, min split size is 0 and max split size is Long.MAX and block size 64MB. For actual split size; minSplitSize&blockSize set the lower bound and blockSize&maxSplitSize together sets the upper bound. Here is the function to calculate:
max(minsplitsize, min(maxsplitsize, blocksize))
Note: compressed input files (eg. gzip) are not splittable, there are patches * * available.

April 24, 2011

Java BitSet Size

Java BitSet could be described as bit array. It holds certain of bits.
What we used to see is a constructor with a size value as argument returns an object with that size. However BitSet(int nbits) constructor does not work this way. Here is the description from JavaDocs:
Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range 0 through nbits-1.
Indeed length of the object is equals to or bigger than specified value.
BitSet set = new BitSet(1);
System.out.println(set.size()); //64
BitSet set = new BitSet(10);
System.out.println(set.size()); //64
BitSet set = new BitSet(65);
System.out.println(set.size()); //128

It seems like BitSet constructor sets the size to closest 2^n value starting with n=6.

April 21, 2011

Hadoop - Incompatible namespaceIDs Error

After formatting the namenode, restarting Hadoop fails - more specifically namenode does not start with Incompatible namespaceIDs Error.
bin/hadoop namenode -format
..
bin/start-dfs.sh
..
ERROR org.apache.hadoop.hdfs.server.datanode.DataNode:
  java.io.IOException: Incompatible namespaceIDs in /hadoop21/hdfs/datadir: 
  namenode namespaceID = 515704843; datanode namespaceID = 572408927
  ..
Why? -datanodes have the old version number after formatting the namenode.
Solution? - hacking in to <hdfs-data-path>/datadir/current/VERSION file and changing the version number with the new one (which is 572408927 in this example) solves the problem. Make sure to change it for every data-node in the cluster.

WARNING: most probably you will be loosing the data in HDFS. even though it is not deleted, not accessible with the new version.

To avoid such a boring case, be careful before formatting. Take a look at this

April 19, 2011

Hadoop MultipleOutputs

Want to generate various types of output files. For example, I have a huge linkgraph with includes timestamp and the outlink information. I want put these two constrains into seperate files. Here is how to use MultipleOutputFormat for this purpose:
public static class FinalLayersReducer extends Reducer<IntWritable, Text, WritableComparable,Writable> 
{
     public void setup(Context context) 
     {
          mos = new MultipleOutputs(context);
     }
  
     public void reduce(IntWritable key, Iterable<text> values, Context context) throws IOException, InterruptedException {
          for ( Text val : values) {
          // some sort of a computation ..
          }
          mos.write("outlink", key, outlink_text);
          mos.write("timestamp", key, timestamp_text);
     }

     protected void cleanup(Context context) throws IOException, InterruptedException {
          mos.close();
     }
}

public static void main(String[] args) throws Exception {

     Job job = new Job(conf, "prepare final layer files");
     
     // other job settings ..

     MultipleOutputs.addNamedOutput(job, "outlink", TextOutputFormat.class , IntWritable.class, Text.class);
     MultipleOutputs.addNamedOutput(job, "timestamp", TextOutputFormat.class , IntWritable.class, Text.class);
}

Facing zero sized output files OR lines in the 2 separate outputs do not match when they supposed to OR can not unzip the output files -> these are signs are telling that you forget to close() the MultipleOutputs object at the end - in the cleanup() function.

April 18, 2011

Hadoop Intermediate Data Compression

To enable intermediate data compression, setup corresponding variables in mapred-site.xml.
<!-- mapred-site.xml -->   
<property>
    <name> mapreduce.map.output.compress </name> 
    <value> true</value> 
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

Setting up LZO compression is a bit tricky. First of all, should install LZO package on all nodes. I built this package and followed instructions here.

Having difficulty while building  eg: "BUILD FAILED make sure $JAVA_HOME set correctly." - then take a look at here.

At the end, this is how my config files look like:
<!-- mapred-site.xml -->
<property>
    <name> mapreduce.map.output.compress </name> 
    <value> true</value> 
</property>
<property>
    <name>mapreduce.map.output.compress.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<!-- core-site.xml -->
<property>
    <name>io.compression.codecs</name>
    <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

To compress final output data, Job object should be set to output compressed data before its execution.

Compressing Hadoop Output usinig Gzip and Lzo

In most of the cases, writing out output files in compressed format is faster - less amount of data will be written. To have a faster computation, compression algorithm should perform well - so time is saved even though there is an extra compression time overhead.

Compressing regular output formats with Gzip, use:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
...

For Lzo Output compression, download this package by @kevinweil. Then following should work:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, LzoCodec.class);
...

In terms of space efficiency, Gzip compresses better. However, in terms of time Lzo i smuch faster. Also, it is possible to split Lzo files, splittable Gzip is not available.
Keep in mind that these two techniques will only compress the final outputs of a Hadoop job. To be able to compress intermediate data, parameters in mapred-site.xml should be configured.

April 11, 2011

LZO build problem

Trying to built the lzo library for Hadoop, it failed with "make sure $JAVA_HOME" set correctly message. Here is the full error log:
....     
   [exec] checking jni.h usability... no
   [exec] checking jni.h presence... no
   [exec] checking for jni.h... no
   [exec] configure: error: Native java headers not found. 
   Is $JAVA_HOME set correctly?
BUILD FAILED make sure $JAVA_HOME set correctly. 

This means build is using incorrect java installation. To make sure JAVA_HOME is pointing to the correct one use apt-file search - searches in all packages installed in your system.
apt-file search jni.h
And then set JAVA_HOME accordingly.

Common Hadoop HDFS exceptions with large files

Big data in HDFS, so many disk problems. First of all, make sure there are at least ~20-30% free space in each node. There are two other problems I faced recently:

all datanodes are bad
This error could be cause because of there are too many open files. limit is 1024 by default. To increase this use
ulimit -n newsize
For more information click!

error in shuffle in fetcher#k 
This is another problem - here is full error log:
2011-04-11 05:59:45,744 WARN org.apache.hadoop.mapred.Child: 
Exception running child : org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: 
error in shuffle in fetcher#2
 at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:124)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:362)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:416)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
 at org.apache.hadoop.mapred.Child.main(Child.java:211)
Caused by: java.lang.OutOfMemoryError: Java heap space
 at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:58)
 at org.apache.hadoop.io.BoundedByteArrayOutputStream.(BoundedByteArrayOutputStream.java:45)
 at org.apache.hadoop.mapreduce.task.reduce.MapOutput.(MapOutput.java:104)
 at org.apache.hadoop.mapreduce.task.reduce.MergeManager.unconditionalReserve(MergeManager.java:267)
 at org.apache.hadoop.mapreduce.task.reduce.MergeManager.reserve(MergeManager.java:257)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:305)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:251)
 at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:149)

One way to go around this problem is making sure there are not too many map tasks for small input files. If possible you can cat input files manually to create bigger chunks or push hadoop to combine multiple tiny input files for a single mapper. For more details, take a look at here.

Also, at Hadoop discussions groups, it is mentioned that default value of dfs.datanode.max.xcievers parameter, the upper bound for the number of files an HDFS DataNode can serve, is too low and causes ShuffleError. In hdfs-site.xml, I set this value to 2048 and worked in my case.
<property>
        <name>dfs.datanode.max.xcievers</name>
        <value>2048</value>
  </property>

Update: Default value for dfs.datanode.max.xcievers is updated with this JIRA.

April 04, 2011

how to remove unnecessary scrollbar from syntaxhighlighter code

I've been looking for how to remove annoying scroll bar in syntaxhighlighted code. They were visible in Chrome but not in FF. I came across the solution here. Just add following code snipped at the end of < head > section.

<style type="text/css">
.syntaxhighlighter { overflow-y: hidden !important; }
</style>  

April 03, 2011

java.io.EOFException with Hadoop

My code runs smoothly with a smaller dataset, however whenever I run it with a larger one, it fails with java.io.EOFException I've been trying to figure out the problem.

11/03/31 01:13:55 INFO mapreduce.Job: 
  Task Id: attempt_201103301621_0025_m_000634_0, Status : FAILED
java.io.EOFException
 at java.io.DataInputStream.readFully(DataInputStream.java:197)
 at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68)
 at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106)
 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1999)
 at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2131)
 ...
 ...
 ...
 at org.apache.hadoop.mapred.MapTask$
  NewTrackingRecordReader.nextKeyValue(MapTask.java:465)
 at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
 at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:90)
 at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
 at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run
  (Delegatin

So, EOFException means something wrong with your input files. If files are not written & closed correctly, this exception is thrown - the file systems thinks there are more to read but actually number of bytes left are less than expected.
To solve the problem, dig into the input files and make sure they are created carefully without any corruption. Also if MultipleOutputs is used to prepare input files, make sure it is also closed at the end!

March 29, 2011

WARNING : There are about 1 missing blocks. Please check the log or run fsck.

hadoop fsck /
hadoop fsck -delete / 
hadoop fsck -move / 
-move option moves under /lost+found
-delete option deleted all corrupted files
For more options: http://developer.yahoo.com/hadoop/tutorial/module2.html

March 26, 2011

Usage of CAP Theorem in Today's Distributed Storage Systems

CAP Theorem (Eric Brewer) states that a Distributed System can provide at most two of three properties -Consistency, Partition Tolerance, and Availability.

Partition Tolerance is a must in real world systems since machines fail all the time. Therefore, a distributed system has to pick either Availability or Consistency as the second property. Consistency means "always return the correct value" - eg:the latest written one. And availability is "always accept requests" - eg: read & write.
Picking one of these does not always mean loosing the other totally. If application favors from availability, eg: shoppping cart, it is better to prioritize availability and resolving consistency issues later (eventually consistent). On the other hand, if application requires consistency, eg: checkout or backend systems which doesn't require instant response to end user, better to prioritize consistency and give up availability.

Here are some examples:
- Amazon's Dynamo, LinkedIn's Project Voldemort and Facebook's Cassandra provide high availability, but eventual consistency.
- On the other side, Google's Bigtable provides strong consistency and gives up high availability. HyperTable and HBAse are using BigTable approach.

Non-Relational Databases

Non-Relational DBs

from Data Design point of view; broadly there are two design approaches,
- Google's Bigtable like designs which are column based eg: Hypertable and HBAse
- A simpler Key/Value storage using distributed hash tables (DHTs) eg: Project Voldemort, MongoDB, CouchDB

common specs of both designs

- not a prefixed schema, only domains are set (broadly, these are like RDMS tables)
- entries in domains have a key, and keys have set of attributes
- no prefixed rules defined for these attributes & no explicit definition of domains
- scalable - new nodes can be added and removed easily, able to handle heavy workloads

Bigtable paper by Google summarized here.

March 25, 2011

hg - detecting renamed files

hg addremove directory/file --similarity 90
If only want to detect files with no change, then 100 should be used. So the number represents the percentage.

March 24, 2011

Hadoop - out of disk space

Facing weird errors like
Exception running child : org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/spill0.out

means simply some of the nodes run out of memory. To check hdfs status and available storage in each node: http://master-urls:50070/dfsnodelist.jsp?whatNodes=LIVE

March 09, 2011

Grep for zipped file

zgrep filename

Hadoop - MapReduce without reducer to avoid sorting

MR job can be defined with no reducer. In this case, all the mappers write their outputs under specified job output directory. So; there will be no sorting and no partitioning.
Just set the number of reduces to 0.
job.setNumReduceTasks(0);

March 07, 2011

many small input files

If MR input directory consists of many small files (couple MBs), then there will be a seperate map task for each and probably these map tasks will last only for 1-2 secs. it kills the performance ! so much scheduling and/or initialization overhead .. 

As advised here, it is better to combine input files - so there will be less number of map tasks with larger piece of data to process in each one. 

Here is how to combine input files. 
(1) Pick a regular record reader class, like LineRecorReader. 
(2) Define your own record record reader class for multi file inputs using an instance of (1)
(3) Define your own input format class which extends CombineFileInputFormat and returns (2)

The trick is regular recordReader uses fileSplit instance, however record reader to be used with combineFileInputFormat should you be using CombineFileSplit. Here is the code: 

Name of the regular record reader class I use in this example is RawWebGraphRecordReader. Its basic idea is similar to LineRecordReader. 
Below is the code for multi file record reader -MultiFileRawWebGraphRecordReader and input format -RawGraphInputFormat
public class MultiFileRawWebGraphRecordReader extends 
                                       RecordReader < Text, Text > {
 private static final Log LOG = LogFactory.getLog(MultiFileRawWebGraphRecordReader.class);

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< Text, Text > rr;

 public MultiFileRawWebGraphRecordReader(CombineFileSplit split,
                                         TaskAttemptContext context, 
                                         Integer index) throws IOException {
  this.split = split;
  this.context = context;
  this.index = index;
  rr = new RawWebGraphRecordReader();
 }
 
 
 public void initialize(InputSplit genericSplit, TaskAttemptContext context)
 throws IOException, InterruptedException {

  this.split = (CombineFileSplit) genericSplit;
  this.context = context;

  if (null == rr) {
   rr = new RawWebGraphRecordReader();
  }

  FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                      this.split.getOffset(index), 
                                      this.split.getLength(index), 
                                      this.split.getLocations());
  this.rr.initialize(fileSplit, this.context);
 }
 
 public boolean nextKeyValue() throws IOException, InterruptedException {
  return rr.nextKeyValue();
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  return rr.getCurrentKey();
 }

 @Override
 public Text getCurrentValue() throws IOException, InterruptedException {
  return rr.getCurrentValue();
 }

 /**
  * Get the progress within the split
  * @throws InterruptedException 
  * @throws IOException 
  */
 @Override
 public float getProgress() throws IOException, InterruptedException {
  return rr.getProgress();
 }

 public synchronized void close() throws IOException {
  if (rr != null) {
   rr.close();
   rr = null;
  }
 }

public static class RawGraphInputFormat extends 
                                 CombineFileInputFormat< Text, Text > {

  @Override
  public RecordReader< Text, Text > 
                     createRecordReader(InputSplit split, 
                                        TaskAttemptContext context)throws IOException {
   return new CombineFileRecordReader< Text, Text >( 
                                 (CombineFileSplit) split, 
                                  context, 
                                  MultiFileRawWebGraphRecordReader.class);
  }
  
  @Override
  protected boolean isSplitable(JobContext context, Path file) {
   CompressionCodec codec = new CompressionCodecFactory(    
                                context.getConfiguration()).getCodec(file);
   return codec == null;
  }

 }

Hadoop - Java Heap Space Error

"Error: Java Heap space" means I'm trying to allocate more memory then available in the system.
how to go around? (1) better configuration (2) look for unnecessarily allocated objects
Configuration

mapred.map.child.java.opts : heap size for map tasks
mapred.reduce.child.java.opts: heap size for reduce tasks

mapred.tasktracker.map.tasks.maximum: max map tasks can run simultaneously per node
mapred.tasktracker.reduce.tasks.maximum: max reduce tasks can run simultaneously per node

Make sure ((num_of_maps * map_heap_size) + (num_of_reducers * reduce_heap_size)) is not larger than memory available in the system. Max number of mappers & reducers can also be tuned looking at available system resources.

io.sort.factor: max # of streams to merge at once for sorting. Used both in map and reduce.

io.sort.mb: map side memory buffer size used while sorting
mapred.job.shuffle.input.buffer.percent: Reduce side buffer related - The percentage of memory to be allocated from the maximum heap size for storing map outputs during the shuffle

NOTE: Using fs.inmemory.size.mb is very bad idea!
Unnecessary memory allocation

Simply look for new keyword and make sure there is no unnecessary allocation. A very common tip is using set() method of Writable objects rather than re-allocating a new object at every map or reduce.
Here is a simple count example to show the trick:

public static class UrlReducer extends Reducer{
  IntWritable sumw = new IntWritable();
  int sum;

  public void reduce(Text key,Iterable<IntW> vals,Context context){
    sum=0;
    for (IntWritable val : vals) {
      sum += val.get();
    }
    sumw.set(sum);
    context.write(key, sumw);
  }
}

note: There are couple more tips here for resolving common errors in Hadoop.

March 01, 2011

hg - untracking without deleting

-A is shortcut for addremove  -->  adds all files under directory / deletes already deleted ones
f is shortcut for force, rm -f     --> forces to delete
-Af surprisingly  becomes untracking files without deleting from local repository.
forget is an alias for -Af
hg rm -Af
hg forget

February 24, 2011

Java Profiler

HProf is a simple CPU Heap profiling tool. Hadoop's profiling also uses it. Here is the link.

February 15, 2011

JVM Garbage Collector (Tuning)

java.lang.OutOfMemoryError: GC overhead limit exceeded means garbage collector is taking so much ( > 98% ) time and can't open up as much space in memory ( < 2% ). I got this exception while working on a large dataset and holding ~1G of data in memory. One way to get over this exception is using a specific jvm garbage collector called Concurrent Collector. Concurrent collector does not let executing program to pause for a long time because of gc. Also concurrent collector takes advantage of multiple CPUs available in the environment. Can be enabled via -XX:+UseConcMarkSweepGC

Tip: For RMI apps - unnecessary rmi garbage collection can be avoided via tuning its execution frequency. by default it runs every 60,000 msec.

-Dsun.rmi.dgc.client.gcInteraval=3600000
-Dsun.rmi.dgc.server.gcInteraval=3600000

References
- offial reference: [1]
- nice summary:  [2]