Giraph configuration details

Heeeey!

Today I had to deploy a single-node Apache Giraph installation in my machine. Giraph has a really good quick start guide (here). A few corrections need to be made in order to keep it up-to-date with the latest version available (1.1.0), but hopefully I will be able to do it this week.

There are, however, a few glitches that can happen during the installation process. Here I will present quick solutions to it.

OpenJDK management.properties problem

I must confess that I rather use the oracle java than the openjdk. As most of you may know, in order to run hadoop you need at least java 1.6 installed. In my current configuration, I was trying to run hadoop with Openjdk java 7, on Ubuntu 14.04.2

With all my $JAVA_HOME variables correctly set, i was getting the following error:

hduser@prometheus-UX32A:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/management$ hadoop namenode -format
Error: Config file not found: /usr/lib/jvm/java-7-openjdk-amd64/jre/lib/management/management.properties

This code also appeared for other hadoop scripts like start-dfs.sh, start-mapred.sh – and of course start-all.sh.

Taking a quick look at the installation, I realized that management.properties was just a broken symbolic link. Also, looks like it is directly related with Java RMI capabilities (Remove Method Invocation). This is probably how Hadoop is going to start all the daemons across the nodes. I must confess that I installed all the opendjk packages available in the Ubuntu repository and none solve my problem.

This can be solved with the following workaround:

  1. Download the equivalent Oracle Java version HERE. Keep in mind to download the same version of your openjdk (i.e. download java 6, 7 or 8 depending on your jdk).
  2. Open the Oracle Java files and copy all the content of the folder /jre/lib/management/management.properties into the OpenJDK equivalent folder.
  3. Run again

Hopefully, this will solve that problem.

Giraph Mapper Allocation problem

Let’s suppose you installed Giraph correctly and Hadoop MapReduce examples are running as you expect in your machine.

Giraph gives, however, this very interesting line when you submit a job:

15/06/04 16:58:03 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers

If you want to make sure that we are talking about the same thing, here is a complete error output:

hduser@prometheus-UX32A:~$ hadoop jar /opt/apache/giraph/giraph-examples/target/giraph-examples-1.2.0-SNAPSHOT-for-hadoop-1.2.1-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsComputation -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/hduser/input/tiny_graph.txt -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/hduser/output/shortestpaths -w 1
15/06/04 16:57:58 INFO utils.ConfigurationUtils: No edge input format specified. Ensure your InputFormat does not require one.
15/06/04 16:57:58 INFO utils.ConfigurationUtils: No edge output format specified. Ensure your OutputFormat does not require one.
15/06/04 16:57:59 INFO job.GiraphJob: run: Since checkpointing is disabled (default), do not allow any task retries (setting mapred.map.max.attempts = 1, old value = 4)
15/06/04 16:58:03 INFO job.GiraphJob: Tracking URL: http://hdhost:50030/jobdetails.jsp?jobid=job_201506041652_0003
15/06/04 16:58:03 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers
15/06/04 16:58:31 INFO mapred.JobClient: Running job: job_201506041652_0003
15/06/04 16:58:31 INFO mapred.JobClient: Job complete: job_201506041652_0003
15/06/04 16:58:31 INFO mapred.JobClient: Counters: 5
15/06/04 16:58:31 INFO mapred.JobClient:   Job Counters
15/06/04 16:58:31 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=31987
15/06/04 16:58:31 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/06/04 16:58:31 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/06/04 16:58:31 INFO mapred.JobClient:     Launched map tasks=2
15/06/04 16:58:31 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0

Good news: It is not your fault (unless you think that giving bad hostnames to your machine is your fault). According to this JIRA there is a problem parsing hostnames. Basically, it does not recognize upper and lower letters in the hostname. This can be solved by setting a new hostname with only lower letters. To do this perform as sudo:

 sudo hostname NEW_HOST_NAME 

also, do not forget to change your /etc/hosts table to your new hostname. Keep in mind that this may affect other softwares that rely on your hostname to run. Once you change it, restart the machine and the hadoop daemons. Hopefully you will get the correct output, that in my case is:

hduser@prometheusmobile:/opt/apache/giraph/giraph-examples$ hadoop jar target/giraph-examples-1.2.0-SNAPSHOT-for-hadoop-1.2.1-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsComputation -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/hduser/input/tiny_graph.txt -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/hduser/output/shortestpaths -w 1
15/06/04 17:06:58 INFO utils.ConfigurationUtils: No edge input format specified. Ensure your InputFormat does not require one.
15/06/04 17:06:58 INFO utils.ConfigurationUtils: No edge output format specified. Ensure your OutputFormat does not require one.
15/06/04 17:06:58 INFO job.GiraphJob: run: Since checkpointing is disabled (default), do not allow any task retries (setting mapred.map.max.attempts = 1, old value = 4)
15/06/04 17:07:02 INFO job.GiraphJob: Tracking URL: http://hdhost:50030/jobdetails.jsp?jobid=job_201506041705_0002
15/06/04 17:07:02 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 2 mappers
15/06/04 17:07:31 INFO job.HaltApplicationUtils$DefaultHaltInstructionsWriter: writeHaltInstructions: To halt after next superstep execute: 'bin/halt-application --zkServer prometheusmobile.ironnetwork:22181 --zkNode /_hadoopBsp/job_201506041705_0002/_haltComputation'
15/06/04 17:07:31 INFO mapred.JobClient: Running job: job_201506041705_0002
15/06/04 17:07:32 INFO mapred.JobClient:  map 100% reduce 0%
15/06/04 17:07:43 INFO mapred.JobClient: Job complete: job_201506041705_0002
15/06/04 17:07:43 INFO mapred.JobClient: Counters: 37
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper halt node
15/06/04 17:07:43 INFO mapred.JobClient:     /_hadoopBsp/job_201506041705_0002/_haltComputation=0
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper base path
15/06/04 17:07:43 INFO mapred.JobClient:     /_hadoopBsp/job_201506041705_0002=0
15/06/04 17:07:43 INFO mapred.JobClient:   Job Counters
15/06/04 17:07:43 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=40730
15/06/04 17:07:43 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
15/06/04 17:07:43 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
15/06/04 17:07:43 INFO mapred.JobClient:     Launched map tasks=2
15/06/04 17:07:43 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
15/06/04 17:07:43 INFO mapred.JobClient:   Giraph Timers
15/06/04 17:07:43 INFO mapred.JobClient:     Input superstep (ms)=188
15/06/04 17:07:43 INFO mapred.JobClient:     Total (ms)=9260
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 2 SimpleShortestPathsComputation (ms)=62
15/06/04 17:07:43 INFO mapred.JobClient:     Shutdown (ms)=8795
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 0 SimpleShortestPathsComputation (ms)=75
15/06/04 17:07:43 INFO mapred.JobClient:     Initialize (ms)=3254
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 3 SimpleShortestPathsComputation (ms)=44
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep 1 SimpleShortestPathsComputation (ms)=61
15/06/04 17:07:43 INFO mapred.JobClient:     Setup (ms)=32
15/06/04 17:07:43 INFO mapred.JobClient:   Zookeeper server:port
15/06/04 17:07:43 INFO mapred.JobClient:     prometheusmobile.ironnetwork:22181=0
15/06/04 17:07:43 INFO mapred.JobClient:   Giraph Stats
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate edges=12
15/06/04 17:07:43 INFO mapred.JobClient:     Sent message bytes=0
15/06/04 17:07:43 INFO mapred.JobClient:     Superstep=4
15/06/04 17:07:43 INFO mapred.JobClient:     Last checkpointed superstep=0
15/06/04 17:07:43 INFO mapred.JobClient:     Current workers=1
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate sent messages=12
15/06/04 17:07:43 INFO mapred.JobClient:     Current master task partition=0
15/06/04 17:07:43 INFO mapred.JobClient:     Sent messages=0
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate finished vertices=5
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate sent message message bytes=267
15/06/04 17:07:43 INFO mapred.JobClient:     Aggregate vertices=5
15/06/04 17:07:43 INFO mapred.JobClient:   File Output Format Counters
15/06/04 17:07:43 INFO mapred.JobClient:     Bytes Written=0
15/06/04 17:07:43 INFO mapred.JobClient:   FileSystemCounters
15/06/04 17:07:43 INFO mapred.JobClient:     HDFS_BYTES_READ=200
15/06/04 17:07:43 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43376
15/06/04 17:07:43 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=30
15/06/04 17:07:43 INFO mapred.JobClient:   File Input Format Counters
15/06/04 17:07:43 INFO mapred.JobClient:     Bytes Read=0
15/06/04 17:07:43 INFO mapred.JobClient:   Map-Reduce Framework
15/06/04 17:07:43 INFO mapred.JobClient:     Map input records=2
15/06/04 17:07:43 INFO mapred.JobClient:     Spilled Records=0
15/06/04 17:07:43 INFO mapred.JobClient:     Map output records=0
15/06/04 17:07:43 INFO mapred.JobClient:     SPLIT_RAW_BYTES=88

Of MapReduce and Men

First of all, why was MapReduce originally created?

Basically Google needed a solution for making large computation jobs easily parallelizable, allowing data to be distributed in a number of machines connected through a network. Aside from that, it had to handle the machine failure in a transparent way and manage load balancing issues.

What are MapReduce true strengths?

One may say that MapReduce magic is based on the Map and Reduce functions application. I must confess mate, that I strongly disagree. The main feature that made MapReduce so popular is its capability of automatic parallelization and distribution, combined with the simple interface. These factor summed with transparent failure handling for most of the errors made this framework so popular.

A little more depth on the paper:

MapReduce was originally mentioned in a Google paper (Dean & Ghemawat, 2004 – link here) as a solution to make computations in Big Data using a parallel approach and commodity-computer clusters. In contrast to Hadoop, that is written in Java, the Google’s framework is written in C++. The document describes how a parallel framework would behave using the Map and Reduce functions from functional programming over large data sets.

In this solution there would be two main steps – called Map and Reduce –, with an optional step between the first and the second – called Combine. The Map step would run first, do computations in the input key-value pair and generate a new output key-value. One must keep in mind that the format of the input key-value pairs does not need to necessarily match the output format pair. The Reduce step would assemble all values of the same key, performing other computations over it. As a result, this last step would output key-value pairs. One of the most trivial applications of MapReduce is to implement word counts.

The pseudo-code for this application, is given bellow:

map(String key, String value):

// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1”);

reduce(String key, Iterator values):

// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

As one can notice, the map reads all the words in a record (in this case a record can be a line) and emits the word as a key and the number 1 as a value.
Later on, the reduce will group all values of the same key. Let’s give an example: imagine that the word ‘house’ appears three times in the record. The input of the reducer would be [house,[1,1,1]]. In the reducer, it will sum all the values for the key house and give as an output the following key value: [house,[3]].

Here’s an image of how this would look like in a MapReduce framework:
mapreduce
Image from the Original MapReduce Google paper.

As a few other classical examples of MapReduce applications, one can say:

  • Count of URL access frequency
  • Reverse Web-link Graph
  • Distributed Grep
  • Term Vector per host

In order to avoid too much network traffic, the paper describes how the framework should try to maintain the data locality. This means that it should always try to make sure that a machine running Map jobs has the data in its memory/local storage, avoiding to fetch it from the network. Aiming to reduce the network through put of a mapper, the optional combiner step, described before, is used. The Combiner performs computations on the output of the mappers in a given machine before sending it to the Reducers – that may be in another machine.

The document also describes how the elements of the framework should behave in case of faults. These elements, in the paper, are called as worker and master. They will be divided into more specific elements in open-source implementations.
Since the Google has only described the approach in the paper and not released its proprietary software, many open-source frameworks were created in order to implement the model. As examples one may say Hadoop or the limited MapReduce feature in MongoDB.

The run-time should take care of non-expert programmers details, like partitioning the input data, scheduling the program execution across the large set of machines, handling machines failures (in a transparent way, of  course) and managing the inter-machine communication. An experienced user may tune these parameters, as how the input data will be partitioned between workers.

Key Concepts:

  • Fault Tolerance: It must tolerate machine failure gracefully. In order to perform this, the master pings the workers periodically. If the master does not receive responses from a given worker in a definite time lapse, the master will define the work as failed in that worker. In this case, all map tasks completed by the faulty worker are thrown away and are given to another available worker. Similar happens if the worker was still processing a map or a reduce task. Note that if the worker already completed its reduce part, all computation was already finished by the time it failed and does not need to be reset. As a primary point of failure, if the master fails, all the job fails. For this reason, one may define periodical checkpoints for the master, in order to save its data structure. All computations that happen between the last checkpoint and the master failure are lost.
  • Locality: In order to avoid network traffic, the framework tries to make sure that all the input data is locally available to the machines that are going to perform computations on them. In the original description, it uses Google File System (GFS) with replication factor set to 3 and block sizes of 64 MB. This means that the same block of 64 MB (that compose a file in the file system) will have identical copies in three different machines. The master knows where are the blocks and try to schedule map jobs in that machine. If that fails, the master tries to allocate a machine near a replica of the tasks input data (i.e. a worker machine in the same rack of the data machine).
  • Task Granularity: Assuming that each map phase is divided into M pieces and that each Reduce phase is divided into R pieces, the ideal would be that M and R are a lot larger than the number of worker machines. This is due the fact that a worker performing many different tasks improves dynamic load balancing. Aside from that, it increases the recovery speed in the case of worker fail (since the many map tasks it has completed can be spread out across all the other machines).
  • Backup Tasks: Sometimes, a Map or Reducer worker may behave a lot more slow than the others in the cluster. This may hold the total processing time and make it equal to the processing time of that single slow machine. The original paper describes an alternative called Backup Tasks, that are scheduled by the master when a MapReduce operation is close to completion. These are tasks that are scheduled by the Master of the in-progress tasks. Thus, the MapReduce operation completes when the primary or the backup finishes.
  • Counters: Sometimes one may desire to count events occurrences. For this reason, counts where created. The counter values in each workers are periodically propagated to the master. The master then aggregates (Yep. Looks like Pregel aggregators came from this place ;) ) the counter values of a successful map and reduce task and return them to the user code when the MapReduce operation is complete. There is also a current counter value available in the master status, so a human watching the process can keep track of how it is behaving.

Well, I guess with all the concepts above, Hadoop will be a piece of cake for you. If you have any question about the original MapReduce article or anything related, please let me know.

My next post will be about Hadoop and MapReduce 1.0. See you soon!

And it begins!

Hello World!

This is literally my first post in my new page, how exciting huh?

Soon, I will be posted thoughts and discussions concerned to Big Data and Embedded Systems.
Here is the place to talk about MapReduce, Pregel, Spark, ARM programming, embedded OS, etc.

Be more than welcome to comment my posts with your personal insights!

Soon™ (hope Blizzard doesn’t give me a lawsuit for this) I’ll be back, with an introduction to Map Reduce.
See you all in a bit!