November 24, 2010

hadoop - wrong key class exception

Usually happens because of mismatch between Map or Reduce class signature and configuration settings.

But also be careful about the combiner ! Check if you are using the same class as reducer and combiner. If reducer's input key-value pair is not same as its output key-value pair, then it can not be used as a combiner -because combiner's output will became input on the reducer side !

here is an example:
reducer input key val : < IntWritable, IntWritable >
reducer output key val: < Text, Text >

if this reducer is used as combiner, then the combiner will output <text, text> and reducer will receive <text, text> as input - and boom - wrong key class exception !

November 23, 2010

java.lang.InstantiationException hadoop

java.lang.InstantiationException definition:
Thrown when an application tries to create an instance of a class using the newInstance method in class Class, but the specified class object cannot be instantiated because it is an interface or is an abstract class.

I get this exception for setting input reader to FileInputFormat
FileInputFormat is an abstract class !
job.setInputFormatClass(FileInputFormat.class)

Default is TextInputFormat and it can be used instead..
job.setInputFormatClass(TextInputFormat.class)

exception:
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:123)
        at org.apache.hadoop.mapreduce.lib.input.MultipleInputs.getInputFormatMap(MultipleInputs.java:109)
        at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:58)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:401)
        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:418)
        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:338)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:960)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:976)
        at nyu.cs.webgraph.LinkGraphUrlIdReplacement.phase1(LinkGraphUrlIdReplacement.java:326)
        at nyu.cs.webgraph.LinkGraphUrlIdReplacement.main(LinkGraphUrlIdReplacement.java:351)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:616)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:192)
Caused by: java.lang.InstantiationException
        at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:532)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:121)
        ... 14 more

November 20, 2010

Finding a needle in Haystack: Facebook's photo storage

Summary of the idea Haystack project that Facebook started to use for storing pictures

Total image workload Facebook has:
  • 260 billion images (~20 petabytes) of data
  • every week 1 billion (~60terabyte) new photos are uploaded

Main charachteristics of Facebook images:
  • read often
  • written once
  • no modification
  • rarely deleted

Traditional file systems are not fast for these specifications (too many disk accesses per read) and external CDN won't be enough in near future due to increasing workload -especially for long tail. As a solution, Haystack is designed to provide;
  1. High throughput low latency:
    • keeps metadata in main memory -at most one disk access per read
  2. Fault tolerance
    • replicas are in different geographical regions
  3. Cost effective and simple
    • comparison to NFS based NAS appliance
    • each usable terabyte costs ~28% less
    • ~4% more reads per sec

Design Previous to Haystack



What is learned from NFS-based Design
  • more than 10 disk operation to read an image
  • if directory size is reduced, 3 disk operation to fetch an image
  • caching file name for highly possible next requests - new kernel func open_by_file_handle
Take away from previous design
  • Focusing only on caching has limited impact on reducing disk operations for long tail
  • CDN are not effective for long tail
  • Would GoogleFS like system be useful ?
  • Lack of correct RAM/disk ratio in current system
Haystack Solution:
  • use XFS (extend base file system)
    • reduce metadata size per picture so all metadata can fit into RAM
    • store multiple photos per file
    • so very good price/performance point -better off than buying more NAS appliances
    • holding all regular size metadata in RAM would be way expensive
  • design your own CDN (Haystack Cache)
    • uses distributed hash table
    • in requested photo can not be find in cache, fetches from Haystack store
    • store multiple photos per file


DESIGN DETAILS
needs to be updated ..

D. Beaver, S. Kumar, H. C. Li, J. Sobel, and P. Vajgel. Finding a needle in Haystack: Facebook’s photo storage. In OSDI ’10

November 03, 2010

Using Distributed Cache in Hadoop

Distributed cache allows to share static data among all nodes. In order use this functionality, the data location should be set before MR job starts.

Here is an example usage for distributed cache. While working on web-graph problem I replace URLs with unique id's. If I have the url-Id mapping in memory, I can easily replace URLs with their corresponding ids. So here is the sample usage:

public static class ReplacementMapper extends Mapper<Text, Text, Text, Text> {

    private HashMap<String, String> idmap;

    @Override
    public void setup(Context context) {
     LoadIdUrlMapping(context);
    }

    @Override
    public void map(Text key, Text value, Context context) throws InterruptedException {
        ....
    }

id-url mapping is loaded at the beginning of each Map task. Below example simply reads the file out of HDFS and stores the data in a hashmap for quick access. Here is the function:
private void loadIdUrlMapping(Context context) {
   
 FSDataInputStream in = null;
 BufferedReader br = null;
 try {
  FileSystem fs = FileSystem.get(context.getConfiguration());
  Path path = new Path(cacheFileLocation);
  in = fs.open(path);
  br  = new BufferedReader(new InputStreamReader(in));
 } catch (FileNotFoundException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: file not found!");
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: IO exception!");
 }
 try {
  this.idmap = new HashMap< string, string >();
  String line = "";
  while ( (line = br.readLine() )!= null) {
   String[] arr = line.split("\t");
   if (arr.length == 2)
    idmap.put(arr[1], arr[0]);
  }
  in.close();
 } catch (IOException e1) {
  e1.printStackTrace();
  System.out.println("read from distributed cache: read length and instances");
 }
   }
}

This is one way of accessing shared data among hadoop nodes. Other way of accessing it is through local file system. Here is a great article about how to throw cache data automatically among nodes and access it later.

November 02, 2010

How to manipulate (copy/move/rename etc..) most recent n files under a directory

In order to list most recent n files under current directory:

ls --sort-time -r | tail -n

n represent number of files, so should be replaced with a number; eg: 5

in order to move, copy, delete or do something with the result, this line can be fed into "cp" "mv" "rm" commands. However the format is important. This line should be in between single quotes. However not the ones near by enter on your keyboard ( ' ), use the ones under esc key ( ` ).
So here is the command line for moving top n files from one directory to another:

mv `ls --sort-time -r | tail -n` /home/yasemin/hebelek/