Giraph configuration details

Heeeey!

Today I had to deploy a single-node Apache Giraph installation in my machine. Giraph has a really good quick start guide (here). A few corrections need to be made in order to keep it up-to-date with the latest version available (1.1.0), but hopefully I will be able to do it this week.

There are, however, a few glitches that can happen during the installation process. Here I will present quick solutions to it.

OpenJDK management.properties problem

I must confess that I rather use the oracle java than the openjdk. As most of you may know, in order to run hadoop you need at least java 1.6 installed. In my current configuration, I was trying to run hadoop with Openjdk java 7, on Ubuntu 14.04.2

With all my $JAVA_HOME variables correctly set, i was getting the following error:

hduser@prometheus-UX32A:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/management$ hadoop namenode -format
Error: Config file not found: /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/management/management.properties

This code also appeared for other hadoop scripts like start-dfs.sh, start-mapred.sh – and of course start-all.sh.

Taking a quick look at the installation, I realized that management.properties was just a broken symbolic link. Also, looks like it is directly related with Java RMI capabilities (Remove Method Invocation). This is probably how Hadoop is going to start all the daemons across the nodes. I must confess that I installed all the opendjk packages available in the Ubuntu repository and none solve my problem.

This can be solved with the following workaround:

  1. Download the equivalent Oracle Java version HERE. Keep in mind to download the same version of your openjdk (i.e. download java 6, 7 or 8 depending on your jdk).
  2. Open the Oracle Java files and copy all the content of the folder /jre/lib/management/management.properties into the OpenJDK equivalent folder.
  3. Run again

Hopefully, this will solve that problem.

Giraph Mapper Allocation problem

Let’s suppose you installed Giraph correctly and Hadoop MapReduce examples are running as you expect in your machine.

Giraph gives, however, this very interesting line when you submit a job:

15/06/04 16:58:03 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers

If you want to make sure that we are talking about the same thing, here is a complete error output:

hduser@prometheus-UX32A:~$ hadoop jar /opt/apache/giraph/giraph-examples/target/giraph-examples-1.2.0-SNAPSHOT-for-hadoop-1.2.1-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsComputation -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/hduser/input/tiny_graph.txt -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/hduser/output/shortestpaths -w 1
15/06/04 16:57:58 INFO utils.ConfigurationUtils: No edge input format specified. Ensure your InputFormat does not require one.
15/06/04 16:57:58 INFO utils.ConfigurationUtils: No edge output format specified. Ensure your OutputFormat does not require one.
15/06/04 16:57:59 INFO job.GiraphJob: run: Since checkpointing is disabled (default), do not allow any task retries (setting mapred.map.max.attempts = 1, old value = 4)
15/06/04 16:58:03 INFO job.GiraphJob: Tracking URL: http://hdhost:50030/jobdetails.jsp?jobid=job_201506041652_0003
15/06/04 16:58:03 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers
15/06/04 16:58:31 INFO mapred.JobClient: Running job: job_201506041652_0003
15/06/04 16:58:31 INFO mapred.JobClient: Job complete: job_201506041652_0003
15/06/04 16:58:31 INFO mapred.JobClient: Counters: 5
15/06/04 16:58:31 INFO mapred.JobClient:   Job Counters
15/06/04 16:58:31 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=31987
15/06/04 16:58:31 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/06/04 16:58:31 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/06/04 16:58:31 INFO mapred.JobClient:     Launched map tasks=2
15/06/04 16:58:31 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0

Good news: It is not your fault (unless you think that giving bad hostnames to your machine is your fault). According to this JIRA there is a problem parsing hostnames. Basically, it does not recognize upper and lower letters in the hostname. This can be solved by setting a new hostname with only lower letters. To do this perform as sudo:

 sudo hostname NEW_HOST_NAME 

also, do not forget to change your /etc/hosts table to your new hostname. Keep in mind that this may affect other softwares that rely on your hostname to run. Once you change it, restart the machine and the hadoop daemons. Hopefully you will get the correct output, that in my case is:

hduser@prometheusmobile:/opt/apache/giraph/giraph-examples$ hadoop jar target/giraph-examples-1.2.0-SNAPSHOT-for-hadoop-1.2.1-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsComputation -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/hduser/input/tiny_graph.txt -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/hduser/output/shortestpaths -w 1
15/06/04 17:06:58 INFO utils.ConfigurationUtils: No edge input format specified. Ensure your InputFormat does not require one.
15/06/04 17:06:58 INFO utils.ConfigurationUtils: No edge output format specified. Ensure your OutputFormat does not require one.
15/06/04 17:06:58 INFO job.GiraphJob: run: Since checkpointing is disabled (default), do not allow any task retries (setting mapred.map.max.attempts = 1, old value = 4)
15/06/04 17:07:02 INFO job.GiraphJob: Tracking URL: http://hdhost:50030/jobdetails.jsp?jobid=job_201506041705_0002
15/06/04 17:07:02 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers
15/06/04 17:07:31 INFO job.HaltApplicationUtils$DefaultHaltInstructionsWriter: writeHaltInstructions: To halt after next superstep execute: 'bin/halt-application --zkServer prometheusmobile.ironnetwork:22181 --zkNode /_hadoopBsp/job_201506041705_0002/_haltComputation'
15/06/04 17:07:31 INFO mapred.JobClient: Running job: job_201506041705_0002
15/06/04 17:07:32 INFO mapred.JobClient:  map 100% reduce 0%
15/06/04 17:07:43 INFO mapred.JobClient: Job complete: job_201506041705_0002
15/06/04 17:07:43 INFO mapred.JobClient: Counters: 37
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper halt node
15/06/04 17:07:43 INFO mapred.JobClient:     /_hadoopBsp/job_201506041705_0002/_haltComputation=0
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper base path
15/06/04 17:07:43 INFO mapred.JobClient:     /_hadoopBsp/job_201506041705_0002=0
15/06/04 17:07:43 INFO mapred.JobClient:   Job Counters
15/06/04 17:07:43 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=40730
15/06/04 17:07:43 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/06/04 17:07:43 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/06/04 17:07:43 INFO mapred.JobClient:     Launched map tasks=2
15/06/04 17:07:43 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
15/06/04 17:07:43 INFO mapred.JobClient:   Giraph Timers
15/06/04 17:07:43 INFO mapred.JobClient:     Input superstep (ms)=188
15/06/04 17:07:43 INFO mapred.JobClient:     Total (ms)=9260
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 2 SimpleShortestPathsComputation (ms)=62
15/06/04 17:07:43 INFO mapred.JobClient:     Shutdown (ms)=8795
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 0 SimpleShortestPathsComputation (ms)=75
15/06/04 17:07:43 INFO mapred.JobClient:     Initialize (ms)=3254
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 3 SimpleShortestPathsComputation (ms)=44
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 1 SimpleShortestPathsComputation (ms)=61
15/06/04 17:07:43 INFO mapred.JobClient:     Setup (ms)=32
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper server:port
15/06/04 17:07:43 INFO mapred.JobClient:     prometheusmobile.ironnetwork:22181=0
15/06/04 17:07:43 INFO mapred.JobClient:   Giraph Stats
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate edges=12
15/06/04 17:07:43 INFO mapred.JobClient:     Sent message bytes=0
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep=4
15/06/04 17:07:43 INFO mapred.JobClient:     Last checkpointed superstep=0
15/06/04 17:07:43 INFO mapred.JobClient:     Current workers=1
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate sent messages=12
15/06/04 17:07:43 INFO mapred.JobClient:     Current master task partition=0
15/06/04 17:07:43 INFO mapred.JobClient:     Sent messages=0
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate finished vertices=5
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate sent message message bytes=267
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate vertices=5
15/06/04 17:07:43 INFO mapred.JobClient:   File Output Format Counters
15/06/04 17:07:43 INFO mapred.JobClient:     Bytes Written=0
15/06/04 17:07:43 INFO mapred.JobClient:   FileSystemCounters
15/06/04 17:07:43 INFO mapred.JobClient:     HDFS_BYTES_READ=200
15/06/04 17:07:43 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43376
15/06/04 17:07:43 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30
15/06/04 17:07:43 INFO mapred.JobClient:   File Input Format Counters
15/06/04 17:07:43 INFO mapred.JobClient:     Bytes Read=0
15/06/04 17:07:43 INFO mapred.JobClient:   Map-Reduce Framework
15/06/04 17:07:43 INFO mapred.JobClient:     Map input records=2
15/06/04 17:07:43 INFO mapred.JobClient:     Spilled Records=0
15/06/04 17:07:43 INFO mapred.JobClient:     Map output records=0
15/06/04 17:07:43 INFO mapred.JobClient:     SPLIT_RAW_BYTES=88

A New Dawn

Hey fellows! I know that quite a decent amount of time has passed since my last post here. I have, however, a really good excuse for it ;)

No, no, I didn’t quit with Big Data, neither stopped with Embedded Systems (to be sincere, I’ve been working a lot with the second in the last past month). Actually, I spent all this time doing researches and publishing a brand new Big Data project! I used it as my graduation thesis and is all about using MapReduce to process CT images to detect lung nodules.

Yes! How about the doctors use the Big Data strengths to help in the diagnostics of the deadliest Cancer in US?
It’s all about my work :)
I’ll describe it in more details later. If you want to check it, the document (in Portuguese – but with an English abstract) is available here.

The project itself is going to be available in my GitHub soon, and when I get I’ll post the links here too!

See you guys in a bit!

Update:

The gitHub links are here. Soon I will write a post getting into more details about how it works.
MatLung: A matlab script that shows the image processing features of the distributed software.
LungProcessor: A single machine program that is fully-code-compatible with the distributed hadoop application. Used for Hadoop code debugging.
LungPostProcessor: The Image metadata post-processor. Used to detect the lung nodules and remove false positives.
LungPostProcessor: The Hadoop image processing application

Streaming Interface for Hadoop

What is the Streaming Interface?

Let’s suppose you are not friends with javenese. Let’s suppose that you don’t like Java at all. Whether it is due to its slower Virtual Machine performance – although there are java examples that run quicker than optimized C in some special scenarios) or just because it is too high level for you, it is fine. Hadoop understands you and for this reason implements the called Streaming Interface.

The Streaming Interface literally allows any application to use Hadoop for MapReduce. Thus, you can write your mappers in python, jython, or just give the binaries for it (even shell binaries!).
The syntax for using this kind of behavior is the following:

$hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input [input dir] -output [output dir] -mapper [map application] -reducer [reducer application]

One needs to keep in mind that supplying a mapper is mandatory, since the default mapper of Hadoop won’t work. This is due to the fact that the default mapper output is of the type TextInputFormat.
TextInputFormat generates LongWritable keys and Text values. By convention, the Streaming output keys and values are both of type Text. Since there is no conversion from LongWritable to Text, the job will tragically fail.

Yeah Yeah. But isn’t the LongWritable part of the input key really important for TextInputFormat? Not really. In the TextInputFormat, the LongWritable key is just the offset in the line where the given Text value (word) is. Probably you won’t use it. (and if you are going to use it, maybe it’s better to code in java and create your own InputFormat).

Now, here’s a catchy part: Although the Hadoop keeps the default TextInputFormat for reading the file, it will only pass the Value to the mapper (and thus the Text part). The Key will be ignored.
(Hm, but i want to use another input format, how can i make it ignore the key? Just set the stream.map.input.ignoreKey to true).

Complete Syntax

Ok, ok. What are all the parameters for the streaming process?

% hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input [your input file] \
-output [your output file] \
-inputformat org.apache.hadoop.mapred.TextInputFormat [or another that you wish] \
-mapper [your defined mapper] \
-partitioner org.apache.hadoop.mapred.lib.HashPartitioner [or another that you wish] \
-numReduceTasks [How many Reduce Tasks do you want] \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer [or the Reducer application that you want] \
-outputformat org.apache.hadoop.mapred.TextOutputFormat [or another one that you fancy]
-combiner [the combiner that you want - optional]

What about the key-value separator?

By default the separator is a tab character ( ‘\t’ ) . But worry not, one can change it. You can do it by setting the following attributes:

stream.map.input.field.separator
stream.map.output.field.separator
stream.reduce.input.field.separator
stream.reduce.output.field.separator

Also, the key from the mapper/reducer may be composed of more than one field.
Let’s suppose the following case and assume that the field separator is  ‘;’

cat;dog;1

one should be able to select cat;dog as a key, and define 1 has a value.
this can be done by modifying the  following attributes:

stream.num.map.output.key.fields
stream.num.reduce.output.key.fields

The value will always be the remaining fields.

Example:

Nothing is true in Hadoop if cannot be proved by a simple word counter or just cat example. So, how about if we use the classic Unix binaries in order to make cat example?
I’ll let you define your own input. After that, one just needs to do the following:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat

It should print the exact content of each of the file, splitting it into several files at the end (due to the partitions scheme, which I will explain in a future post).

If you want to know how many newlines, words and letters are in each partition, use the wc unix tool as a reducer. Just do the following command:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat -reducer /usr/bin/cat

And that’s it. Now you can use your own application for MapReduce, without having deep troubles into the Java aspect of it. Best of luck ;)

Introduction to Hadoop and MapReduce 1.0

One of the most popular implementations of MapReduce is  Hadoop – Rly! There are others! Although people nowadays usually take Hadoop and MapReduce as a synonymous… but you are not one of them! Because I know you’ve  read my previous post and knows exactly what is MapReduce ;)

What is Hadoop?

Hadoop is an open-source framework developed in Java, maintained by the Apache foundation (yeah, yeah. The same company that got really famous by subversion and http server. Most of the nice open-source Big Data softwares are now developed by them).  Originally, it was developed by Doug Cutting and Mike Cafarella as a tool for Apache Nutch. Nutch is an open source web-search engine. They quickly find out that they would not be able to scale Nutch up to one billion of pages as they expected. Luckly enough, Google’s GFS paper (available here) was released, making the storage nodes administration tasks less time demanding. They were going to use the file system to store large files generated by the web crawler and the indexing process. A year after the paper publishing – in 2004 -, an open version of the file system was released. It was called Nutch Distributed File System. YES!  It is the prequel of HDFS ;)

In 2004, Google release the MapReduce paper – aye! The one that we talked about in the previous post. Shortly after, in 2005, Nutch developers had an implementation of MapReduce running over Nutch. In february 2006 they realised that MapReduce was more than just search. Thus they move it out of Nutch and created a project name Hadoop. Shortly after, Doug Cutting was hired by Yahoo in order to develop even further Hadoop.

People say that Hadoop stands for : High-Availability Distributed Object-Oriented Platform, but this is just part true. Actually, Hadoop name comes after Doug’s son toy elephant name . This entails the logo:

 

Who uses Hadoop?

Hadoop follows many of the premises shown in the Google’s MapReduce Paper and it focus in commodity hardware for the cluster.
Between the companies that use Hadoop, one can list

  • Adobe: uses Hadoop and HBase in structured data storage and in order to perform internal processing.
  • Facebook: uses to store copies to internal log, also uses as source for reporting/analytics and machine learning.
  • Spotify: uses for data aggregation, content generation, analysis and reports.

This is only a small slice of all the large companies that uses it right now. One can find more companies in the following link: PoweredBy

 

How is Hadoop composed?

 

In order to implement the framework as a top level project – and thus independent of Nutch –  a complete new file system, named HDFS – that stands for Hadoop Distributed File System –, had to be created.

Hadoop Distributed File System (HDFS)

The DataNodes:

The file system is composed of a Namenode, a SecondaryNamenode and several Datanodes, which implement the master/slave relationship describe in the MapReduce Paper. A Datanode contains blocks of data. By default, in the HDFS, all the data is split in 64 MBytes blocks and these blocks are replicated among the Datanodes. The standard replication factor is 3, which means that one should find the exactly the same data block in three different Datanodes. Some Datanodes also perform the MapReduce jobs.

The NameNode and the SecundaryNameNode:

As an attempt to keep track of all the replicated data blocks, the Namenode stores all the meta-data information and is a single point of failure in the system. All its stored information is related to which Datanode holds what blocks of the file system. The user can define checkpoints, where all the information existing in the Namenode is copied to the SecondaryNamenode. In case of a Namenode failure, one can then return the system to the last checkpoint available in the SecondaryNamenode. All the computations between the failure and the checkpoint are lost. The Namenode and the SecondaryNamenode do not store blocks of data nor perform computations, since they should use all their performance to preserve the framework integrity.

MapReduce Implementation

JobTracker and TaskTrackers:

Usually in the same machine that runs the Namenode, a daemon called JobTracker is executed. The JobTracker acts as the master described in the MapReduce paper, coordinating the TaskTrackers (that are the slave nodes that perform the mapreduce computations). The JobTracker receives heartbeats from every single TaskTracker in order to verify that all mappers and reducers are working correctly. In the case of a TaskTracker failure, the JobTracker analyses the cause of the failure and determines if a new TaskTracker should be allocated or if the job should be declared as failed.

Error Management

There are four type of errors that may happen in the framework. They are:

  • JobTracker/Namenode failure
  • Data integrity error
  • Task Failure
  • TaskTracker Failure

JobTracker/Namenode failure

The are basically two single point of failure in all the Hadoop framework. The first one is the NameNode. In the case of its failure, all computation results and data modifications in the cluster between the the failure and the last checkpoint will be lost. The system may be restored  to the state stored in the SecundaryNameNode, but this is not transparent and must be performed manually, since there is going to be data loss.

The second single point of failure is the job tracker. In the case of failure, all computations that are currently being performed will be lost. Also, this kind of failure is not transparent.

Keep in mind that in MapReduce 2.0 this kind of vulnerability was solved, but this will be depicted later.

Data Integrity Error

This happens due to checksums verifications and is usually transparently solved. The checksum is verified before writing and after reading a block, by the DataNode. If there is an inconsistency with the values, a corrupted block is reported to the NameNode and the following procedure will happen:

  1. The block with incorrect checksum will be tagged has corrupt.
  2. Corrupt block cannot be read by clients anymore.
  3. The client that requested to read that block will be redirected to another DataNode, where the same block is already replicated.
  4. A block replica, from a DataNode with the correct checksum, will be scheduled. This will keep the replication factor. The replica will be copied into another DataNode, avoiding the one with corrupted blocks.

Task Failure

Keep in mind that a task failure is different from a job failure. A job failure would implicate in the failure of all the tasks (that are the Map/Reduce/Combine codes that would be executed in all the designated TaskTrackers). It is basically due to the failure of a task submitted to the Hadoop framework. Among the possible causes of this errors, one can point:

  • Incorrect java version. E.g. the job required java 7.4 and only found java 6
  • An exception as been thrown by the task code.

The system will try to transparently solve this errors by taking the following actions:

  1. Report the error back to the TaskTracker. The TaskTracker will tag that task as failed.
  2. Any task in the queue in that TaskTracker will also be tagged as failed.
  3. The JobTracker will then try to reschedule that task with a different TaskTracker.
  4. If the number of task failures surpasses a fixed value, the complete job will be declared as failed. One can set the number, but by default it is 4.

TaskTracker Failure

It is caused by hardware and communications failure. If the TaskTracker machines stops working or the network of that machine stops answering, this kind of error will happen. This is detected by the JobTracker, when it stops receiving heartbeats from that TaskTracker. The following action are taken:

  1. The JobTracker removes the TaskTracker from the pool of available trackers.
  2. Any incomplete task that was being performed by that TaskTracker is reassigned to another alive TaskTracker.
  3. Completed Maps will also be reassigned to another TaskTrackers. (Yeah! Just like in the MapReduce article, remember?). Thus any output that Map function generated will not be stored in HDFS.