A New Dawn

Hey fellows! I know that quite a decent amount of time has passed since my last post here. I have, however, a really good excuse for it ;)

No, no, I didn’t quit with Big Data, neither stopped with Embedded Systems (to be sincere, I’ve been working a lot with the second in the last past month). Actually, I spent all this time doing researches and publishing a brand new Big Data project! I used it as my graduation thesis and is all about using MapReduce to process CT images to detect lung nodules.

Yes! How about the doctors use the Big Data strengths to help in the diagnostics of the deadliest Cancer in US?
It’s all about my work :)
I’ll describe it in more details later. If you want to check it, the document (in Portuguese – but with an English abstract) is available here.

The project itself is going to be available in my GitHub soon, and when I get I’ll post the links here too!

See you guys in a bit!

Update:

The gitHub links are here. Soon I will write a post getting into more details about how it works.
MatLung: A matlab script that shows the image processing features of the distributed software.
LungProcessor: A single machine program that is fully-code-compatible with the distributed hadoop application. Used for Hadoop code debugging.
LungPostProcessor: The Image metadata post-processor. Used to detect the lung nodules and remove false positives.
LungPostProcessor: The Hadoop image processing application

Graphs and Tradicional Graph Processing

Graphs

“How cute! So many arrows and lines!”
– Anyone who never had to traverse a large graph.

Finally, I will be able to quote myself, yey!

Graphs, in computer science, are an abstract data type that implements the graph and hypergraph concepts from mathematics. This data structure has a finite – and possible mutable – set of ordered pairs consisting of edges (or arcs), and nodes (or vertices). An edge can be described mathematically as edge(x,y) which is said to go from x to y. There may be some data structures associated with edges, called edge value. This structure can represent numeric attributes (for example costs) or just a symbolic label. Also there is the important definition of adjacent(G, x, y), that is a function that tests for a given graph data structure G if the edge(x,y) exists.”

As mentioned by (Lotz, 2013)

There are many flavours of graphs, each one of them take into account some detail of the graph structure. In example, an undirected graph is a graph in which the edges do not take into account the orientation, as opposed to the direct graphs in which they take.

Here’s a quick example of a directed graph:

Directed Graph Example

Example of a directed graph (Wolfram Alpha)

Large Scale Graph Processing Problem


“Houston, we have a problem”
– The same person, after having to compute a large Graph.

In the big data analysis, graphs are usually known as hard to compute. (Malewicz & all, 2010)(not my quote this time) points that the main factors that contribute to these difficulties are:

  1. Poor locality of memory access.
  2. Very little work per vertex.
  3. Changing degree of parallelism.
  4. Running over many machines makes the problem worse (i.e. how will I make a machine communicate with another without generating I/O bottleneck?)

Large Scale Graph Pre-Pregel Solutions

Before the introduction of Pregel (Malewicz & all, 2010) – yep, the same guy that I quoted above… that’s because he knows all in this field –  the state-of-art solutions to this problem were:

  • The implementation of a customised framework: that demands lots of engineering effort. It’s like: “Oh, I have a large graph problem that will demand me a Big Data approach. Since I have nothing else to do, I will implement my own framework for the problem. It’s ok if it only takes 10 years and I be really problem specific.”
  • Implementation of a graph analysis algorithm using the MapReduce platform: this proves to be inefficient, since it has to store too much data after each MapReduce job and requires too much communication between the stages (which may cause excessive I/O traffic). Aside from that, it is quite hard to implement graph algorithms, since there is no option to make calculations node/vertex-oriented on the framework.

In order to completely solve the I/O problem, one may use a single computer graph solution – you said a super computer? – This would, however, require not commodity hardware and would not be scalable. The existing distributed graph systems that existed before the Pregel introduction were not fault tolerant, which may cause the loss of huge amount of work in the case of any failure.

and that’s why my next post will be about Pregel, a Large Graph processing framework developed by Google.

MapReduce 2.0: Hadoop with YARN

Context

After the release of Hadoop version 0.20.203 the complete framework was restructured. Actually, almost a complete new framework was created. This framework is called MapReduce 2.0 (MRv2) or YARN.

The short YARN stands for “Yet Another Resources Negotiator” (but if you are GNU mate and you are into recursive anonymous, call it: YARN Application Resource Negotiator). This modification has as a main goal to make the MapReduce component less dependent of Hadoop itself. Furthermore, this will improve the possibility of using programming interfaces other than MapReduce over it and increase the scalability of the framework.

Originally YARN was expected to be in production by the end of 2012. Guest what: that didn’t happen. Sound like the world cup building in Brazil huh? Actually, it went from alpha to beta only on 25th of August of 2013 (with version 2.1.0-beta). On 7th of October  of 2013 the first stable YARN version was released, with Hadoop 2.2.0. The alpha and beta versions were however  available on most of the third parties distributions, like Cloudera CDH4. Cloudera included warnings to remind consumers from the instability of the build by that time.

Why?

This is a solution to a scalability bottleneck that was happening with MapReduce 1.0. For very large clusters (and with that I mean clusters larger than 4000 nodes) Hadoop was not performing as it should be. Then Yahoo though “How about we make some modifications in the framework in order to solve these problems?” and that how it appeared. You can read more about it in the following link:
The Next Generation of Apache MapReduce

 

What are the differences?

One of the main characteristics of YARN is that the original JobTracker was split into two daemons: the ResourceManager and the ApplicationMaster. In a first though, this removes the single point of failure from the JobTracker and puts it in the ResourceManager. I will get into more detail on it later.

Here’s a small diagram of how it works, as depicted in the Apache Hadoop website:

Hadoop Yarn Diagram

ResourceManager

It monitors the cluster status, indicating which resources are available to which nodes. This is only possible due to the concept of container (which I will explain later). The Resource Manager arbitrates the resources among all the applications in the system.

ApplicationMaster:

It manages the parallel execution of any job. In Yarn this is done separately for each individual job. It is a framework specific library and negotiates the resources from the ResourceManager and works with the NodeManager to execute and monitor tasks.

Oh! Almost forgot to say. The TaskTracker became the NodeManager! So let’s get into it:

NodeManager:

The TaskTracker was transformed in the NodeManager, that with the ResourceManager compose the data-computation framework. It is a machine individual framework agent that takes care of the containers by monitoring the resource usage (memory, disk, CPU, network) and reporting the results to the Scheduler that is inside the ResourceManager. This is a quite interesting modification, once with this other Apaches frameworks are not running over the Hadoop TaskTracker structure, but are actually running on the NodeManager structure that belong to YARN. This way, they can freely implement how they agent is going to behave and communicate with the ApplicationMaster.

 

Back to the ResourceManager:

The ResourceManager daemon has two main components: the Scheduler and the ApplicationsManager. The first allocates the resource for the various running applications and takes into considerations capacity and queues constraints. The second is a basic scheduler, without tracking status or monitoring an application. It only performs the scheduling taking into account the resource requirements of an application, by taking an abstract notion of the ResourceContainer.

ResourceContainer:

Also called container – it incorporates elements such memory, CPU, disk and network. In the early versions only memory is supported – and I guess that in the moment that I am writing this post, it is still like that.

Now, let’s get back to the ApplicationsManager. It does the job-submissions acceptance work. It makes the negotiation with the first container for executing the application specific ApplicationMaster and also provides the service for restarting the ApplicationMaster container on failure – therefore helping to solve one of the single points of failure that was present in the original Hadoop framework (yey! :D )

One of the major features of YARN is that it is backward compatible with Hadoop non-YARN. For this reason, applications that were developed for the previous versions do not need to be recompiled in order to run on a YARN cluster.

Streaming Interface for Hadoop

What is the Streaming Interface?

Let’s suppose you are not friends with javenese. Let’s suppose that you don’t like Java at all. Whether it is due to its slower Virtual Machine performance – although there are java examples that run quicker than optimized C in some special scenarios) or just because it is too high level for you, it is fine. Hadoop understands you and for this reason implements the called Streaming Interface.

The Streaming Interface literally allows any application to use Hadoop for MapReduce. Thus, you can write your mappers in python, jython, or just give the binaries for it (even shell binaries!).
The syntax for using this kind of behavior is the following:

$hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input [input dir] -output [output dir] -mapper [map application] -reducer [reducer application]

One needs to keep in mind that supplying a mapper is mandatory, since the default mapper of Hadoop won’t work. This is due to the fact that the default mapper output is of the type TextInputFormat.
TextInputFormat generates LongWritable keys and Text values. By convention, the Streaming output keys and values are both of type Text. Since there is no conversion from LongWritable to Text, the job will tragically fail.

Yeah Yeah. But isn’t the LongWritable part of the input key really important for TextInputFormat? Not really. In the TextInputFormat, the LongWritable key is just the offset in the line where the given Text value (word) is. Probably you won’t use it. (and if you are going to use it, maybe it’s better to code in java and create your own InputFormat).

Now, here’s a catchy part: Although the Hadoop keeps the default TextInputFormat for reading the file, it will only pass the Value to the mapper (and thus the Text part). The Key will be ignored.
(Hm, but i want to use another input format, how can i make it ignore the key? Just set the stream.map.input.ignoreKey to true).

Complete Syntax

Ok, ok. What are all the parameters for the streaming process?

% hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input [your input file] \
-output [your output file] \
-inputformat org.apache.hadoop.mapred.TextInputFormat [or another that you wish] \
-mapper [your defined mapper] \
-partitioner org.apache.hadoop.mapred.lib.HashPartitioner [or another that you wish] \
-numReduceTasks [How many Reduce Tasks do you want] \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer [or the Reducer application that you want] \
-outputformat org.apache.hadoop.mapred.TextOutputFormat [or another one that you fancy]
-combiner [the combiner that you want - optional]

What about the key-value separator?

By default the separator is a tab character ( ‘\t’ ) . But worry not, one can change it. You can do it by setting the following attributes:

stream.map.input.field.separator
stream.map.output.field.separator
stream.reduce.input.field.separator
stream.reduce.output.field.separator

Also, the key from the mapper/reducer may be composed of more than one field.
Let’s suppose the following case and assume that the field separator is  ‘;’

cat;dog;1

one should be able to select cat;dog as a key, and define 1 has a value.
this can be done by modifying the  following attributes:

stream.num.map.output.key.fields
stream.num.reduce.output.key.fields

The value will always be the remaining fields.

Example:

Nothing is true in Hadoop if cannot be proved by a simple word counter or just cat example. So, how about if we use the classic Unix binaries in order to make cat example?
I’ll let you define your own input. After that, one just needs to do the following:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat

It should print the exact content of each of the file, splitting it into several files at the end (due to the partitions scheme, which I will explain in a future post).

If you want to know how many newlines, words and letters are in each partition, use the wc unix tool as a reducer. Just do the following command:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat -reducer /usr/bin/cat

And that’s it. Now you can use your own application for MapReduce, without having deep troubles into the Java aspect of it. Best of luck ;)