Streaming Interface for Hadoop

What is the Streaming Interface?

Let’s suppose you are not friends with javenese. Let’s suppose that you don’t like Java at all. Whether it is due to its slower Virtual Machine performance – although there are java examples that run quicker than optimized C in some special scenarios) or just because it is too high level for you, it is fine. Hadoop understands you and for this reason implements the called Streaming Interface.

The Streaming Interface literally allows any application to use Hadoop for MapReduce. Thus, you can write your mappers in python, jython, or just give the binaries for it (even shell binaries!).
The syntax for using this kind of behavior is the following:

$hadoop jar $HADOOP_HOME/hadoop-streaming.jar -input [input dir] -output [output dir] -mapper [map application] -reducer [reducer application]

One needs to keep in mind that supplying a mapper is mandatory, since the default mapper of Hadoop won’t work. This is due to the fact that the default mapper output is of the type TextInputFormat.
TextInputFormat generates LongWritable keys and Text values. By convention, the Streaming output keys and values are both of type Text. Since there is no conversion from LongWritable to Text, the job will tragically fail.

Yeah Yeah. But isn’t the LongWritable part of the input key really important for TextInputFormat? Not really. In the TextInputFormat, the LongWritable key is just the offset in the line where the given Text value (word) is. Probably you won’t use it. (and if you are going to use it, maybe it’s better to code in java and create your own InputFormat).

Now, here’s a catchy part: Although the Hadoop keeps the default TextInputFormat for reading the file, it will only pass the Value to the mapper (and thus the Text part). The Key will be ignored.
(Hm, but i want to use another input format, how can i make it ignore the key? Just set the to true).

Complete Syntax

Ok, ok. What are all the parameters for the streaming process?

% hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input [your input file] \
-output [your output file] \
-inputformat org.apache.hadoop.mapred.TextInputFormat [or another that you wish] \
-mapper [your defined mapper] \
-partitioner org.apache.hadoop.mapred.lib.HashPartitioner [or another that you wish] \
-numReduceTasks [How many Reduce Tasks do you want] \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer [or the Reducer application that you want] \
-outputformat org.apache.hadoop.mapred.TextOutputFormat [or another one that you fancy]
-combiner [the combiner that you want - optional]

What about the key-value separator?

By default the separator is a tab character ( ‘\t’ ) . But worry not, one can change it. You can do it by setting the following attributes:

Also, the key from the mapper/reducer may be composed of more than one field.
Let’s suppose the following case and assume that the field separator is  ‘;’


one should be able to select cat;dog as a key, and define 1 has a value.
this can be done by modifying the  following attributes:

The value will always be the remaining fields.


Nothing is true in Hadoop if cannot be proved by a simple word counter or just cat example. So, how about if we use the classic Unix binaries in order to make cat example?
I’ll let you define your own input. After that, one just needs to do the following:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat

It should print the exact content of each of the file, splitting it into several files at the end (due to the partitions scheme, which I will explain in a future post).

If you want to know how many newlines, words and letters are in each partition, use the wc unix tool as a reducer. Just do the following command:

$hadoop jar hadoop-streaming.jar -input [your text input] -output [your output] -mapper /bin/cat -reducer /usr/bin/cat

And that’s it. Now you can use your own application for MapReduce, without having deep troubles into the Java aspect of it. Best of luck ;)

Introduction to Hadoop and MapReduce 1.0

One of the most popular implementations of MapReduce is  Hadoop – Rly! There are others! Although people nowadays usually take Hadoop and MapReduce as a synonymous… but you are not one of them! Because I know you’ve  read my previous post and knows exactly what is MapReduce ;)

What is Hadoop?

Hadoop is an open-source framework developed in Java, maintained by the Apache foundation (yeah, yeah. The same company that got really famous by subversion and http server. Most of the nice open-source Big Data softwares are now developed by them).  Originally, it was developed by Doug Cutting and Mike Cafarella as a tool for Apache Nutch. Nutch is an open source web-search engine. They quickly find out that they would not be able to scale Nutch up to one billion of pages as they expected. Luckly enough, Google’s GFS paper (available here) was released, making the storage nodes administration tasks less time demanding. They were going to use the file system to store large files generated by the web crawler and the indexing process. A year after the paper publishing – in 2004 -, an open version of the file system was released. It was called Nutch Distributed File System. YES!  It is the prequel of HDFS ;)

In 2004, Google release the MapReduce paper – aye! The one that we talked about in the previous post. Shortly after, in 2005, Nutch developers had an implementation of MapReduce running over Nutch. In february 2006 they realised that MapReduce was more than just search. Thus they move it out of Nutch and created a project name Hadoop. Shortly after, Doug Cutting was hired by Yahoo in order to develop even further Hadoop.

People say that Hadoop stands for : High-Availability Distributed Object-Oriented Platform, but this is just part true. Actually, Hadoop name comes after Doug’s son toy elephant name . This entails the logo:


Who uses Hadoop?

Hadoop follows many of the premises shown in the Google’s MapReduce Paper and it focus in commodity hardware for the cluster.
Between the companies that use Hadoop, one can list

  • Adobe: uses Hadoop and HBase in structured data storage and in order to perform internal processing.
  • Facebook: uses to store copies to internal log, also uses as source for reporting/analytics and machine learning.
  • Spotify: uses for data aggregation, content generation, analysis and reports.

This is only a small slice of all the large companies that uses it right now. One can find more companies in the following link: PoweredBy


How is Hadoop composed?


In order to implement the framework as a top level project – and thus independent of Nutch –  a complete new file system, named HDFS – that stands for Hadoop Distributed File System –, had to be created.

Hadoop Distributed File System (HDFS)

The DataNodes:

The file system is composed of a Namenode, a SecondaryNamenode and several Datanodes, which implement the master/slave relationship describe in the MapReduce Paper. A Datanode contains blocks of data. By default, in the HDFS, all the data is split in 64 MBytes blocks and these blocks are replicated among the Datanodes. The standard replication factor is 3, which means that one should find the exactly the same data block in three different Datanodes. Some Datanodes also perform the MapReduce jobs.

The NameNode and the SecundaryNameNode:

As an attempt to keep track of all the replicated data blocks, the Namenode stores all the meta-data information and is a single point of failure in the system. All its stored information is related to which Datanode holds what blocks of the file system. The user can define checkpoints, where all the information existing in the Namenode is copied to the SecondaryNamenode. In case of a Namenode failure, one can then return the system to the last checkpoint available in the SecondaryNamenode. All the computations between the failure and the checkpoint are lost. The Namenode and the SecondaryNamenode do not store blocks of data nor perform computations, since they should use all their performance to preserve the framework integrity.

MapReduce Implementation

JobTracker and TaskTrackers:

Usually in the same machine that runs the Namenode, a daemon called JobTracker is executed. The JobTracker acts as the master described in the MapReduce paper, coordinating the TaskTrackers (that are the slave nodes that perform the mapreduce computations). The JobTracker receives heartbeats from every single TaskTracker in order to verify that all mappers and reducers are working correctly. In the case of a TaskTracker failure, the JobTracker analyses the cause of the failure and determines if a new TaskTracker should be allocated or if the job should be declared as failed.

Error Management

There are four type of errors that may happen in the framework. They are:

  • JobTracker/Namenode failure
  • Data integrity error
  • Task Failure
  • TaskTracker Failure

JobTracker/Namenode failure

The are basically two single point of failure in all the Hadoop framework. The first one is the NameNode. In the case of its failure, all computation results and data modifications in the cluster between the the failure and the last checkpoint will be lost. The system may be restored  to the state stored in the SecundaryNameNode, but this is not transparent and must be performed manually, since there is going to be data loss.

The second single point of failure is the job tracker. In the case of failure, all computations that are currently being performed will be lost. Also, this kind of failure is not transparent.

Keep in mind that in MapReduce 2.0 this kind of vulnerability was solved, but this will be depicted later.

Data Integrity Error

This happens due to checksums verifications and is usually transparently solved. The checksum is verified before writing and after reading a block, by the DataNode. If there is an inconsistency with the values, a corrupted block is reported to the NameNode and the following procedure will happen:

  1. The block with incorrect checksum will be tagged has corrupt.
  2. Corrupt block cannot be read by clients anymore.
  3. The client that requested to read that block will be redirected to another DataNode, where the same block is already replicated.
  4. A block replica, from a DataNode with the correct checksum, will be scheduled. This will keep the replication factor. The replica will be copied into another DataNode, avoiding the one with corrupted blocks.

Task Failure

Keep in mind that a task failure is different from a job failure. A job failure would implicate in the failure of all the tasks (that are the Map/Reduce/Combine codes that would be executed in all the designated TaskTrackers). It is basically due to the failure of a task submitted to the Hadoop framework. Among the possible causes of this errors, one can point:

  • Incorrect java version. E.g. the job required java 7.4 and only found java 6
  • An exception as been thrown by the task code.

The system will try to transparently solve this errors by taking the following actions:

  1. Report the error back to the TaskTracker. The TaskTracker will tag that task as failed.
  2. Any task in the queue in that TaskTracker will also be tagged as failed.
  3. The JobTracker will then try to reschedule that task with a different TaskTracker.
  4. If the number of task failures surpasses a fixed value, the complete job will be declared as failed. One can set the number, but by default it is 4.

TaskTracker Failure

It is caused by hardware and communications failure. If the TaskTracker machines stops working or the network of that machine stops answering, this kind of error will happen. This is detected by the JobTracker, when it stops receiving heartbeats from that TaskTracker. The following action are taken:

  1. The JobTracker removes the TaskTracker from the pool of available trackers.
  2. Any incomplete task that was being performed by that TaskTracker is reassigned to another alive TaskTracker.
  3. Completed Maps will also be reassigned to another TaskTrackers. (Yeah! Just like in the MapReduce article, remember?). Thus any output that Map function generated will not be stored in HDFS.


Of MapReduce and Men

First of all, why was MapReduce originally created?

Basically Google needed a solution for making large computation jobs easily parallelizable, allowing data to be distributed in a number of machines connected through a network. Aside from that, it had to handle the machine failure in a transparent way and manage load balancing issues.

What are MapReduce true strengths?

One may say that MapReduce magic is based on the Map and Reduce functions application. I must confess mate, that I strongly disagree. The main feature that made MapReduce so popular is its capability of automatic parallelization and distribution, combined with the simple interface. These factor summed with transparent failure handling for most of the errors made this framework so popular.

A little more depth on the paper:

MapReduce was originally mentioned in a Google paper (Dean & Ghemawat, 2004 – link here) as a solution to make computations in Big Data using a parallel approach and commodity-computer clusters. In contrast to Hadoop, that is written in Java, the Google’s framework is written in C++. The document describes how a parallel framework would behave using the Map and Reduce functions from functional programming over large data sets.

In this solution there would be two main steps – called Map and Reduce –, with an optional step between the first and the second – called Combine. The Map step would run first, do computations in the input key-value pair and generate a new output key-value. One must keep in mind that the format of the input key-value pairs does not need to necessarily match the output format pair. The Reduce step would assemble all values of the same key, performing other computations over it. As a result, this last step would output key-value pairs. One of the most trivial applications of MapReduce is to implement word counts.

The pseudo-code for this application, is given bellow:

map(String key, String value):

// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1”);

reduce(String key, Iterator values):

// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);

As one can notice, the map reads all the words in a record (in this case a record can be a line) and emits the word as a key and the number 1 as a value.
Later on, the reduce will group all values of the same key. Let’s give an example: imagine that the word ‘house’ appears three times in the record. The input of the reducer would be [house,[1,1,1]]. In the reducer, it will sum all the values for the key house and give as an output the following key value: [house,[3]].

Here’s an image of how this would look like in a MapReduce framework:
Image from the Original MapReduce Google paper.

As a few other classical examples of MapReduce applications, one can say:

  • Count of URL access frequency
  • Reverse Web-link Graph
  • Distributed Grep
  • Term Vector per host

In order to avoid too much network traffic, the paper describes how the framework should try to maintain the data locality. This means that it should always try to make sure that a machine running Map jobs has the data in its memory/local storage, avoiding to fetch it from the network. Aiming to reduce the network through put of a mapper, the optional combiner step, described before, is used. The Combiner performs computations on the output of the mappers in a given machine before sending it to the Reducers – that may be in another machine.

The document also describes how the elements of the framework should behave in case of faults. These elements, in the paper, are called as worker and master. They will be divided into more specific elements in open-source implementations.
Since the Google has only described the approach in the paper and not released its proprietary software, many open-source frameworks were created in order to implement the model. As examples one may say Hadoop or the limited MapReduce feature in MongoDB.

The run-time should take care of non-expert programmers details, like partitioning the input data, scheduling the program execution across the large set of machines, handling machines failures (in a transparent way, of  course) and managing the inter-machine communication. An experienced user may tune these parameters, as how the input data will be partitioned between workers.

Key Concepts:

  • Fault Tolerance: It must tolerate machine failure gracefully. In order to perform this, the master pings the workers periodically. If the master does not receive responses from a given worker in a definite time lapse, the master will define the work as failed in that worker. In this case, all map tasks completed by the faulty worker are thrown away and are given to another available worker. Similar happens if the worker was still processing a map or a reduce task. Note that if the worker already completed its reduce part, all computation was already finished by the time it failed and does not need to be reset. As a primary point of failure, if the master fails, all the job fails. For this reason, one may define periodical checkpoints for the master, in order to save its data structure. All computations that happen between the last checkpoint and the master failure are lost.
  • Locality: In order to avoid network traffic, the framework tries to make sure that all the input data is locally available to the machines that are going to perform computations on them. In the original description, it uses Google File System (GFS) with replication factor set to 3 and block sizes of 64 MB. This means that the same block of 64 MB (that compose a file in the file system) will have identical copies in three different machines. The master knows where are the blocks and try to schedule map jobs in that machine. If that fails, the master tries to allocate a machine near a replica of the tasks input data (i.e. a worker machine in the same rack of the data machine).
  • Task Granularity: Assuming that each map phase is divided into M pieces and that each Reduce phase is divided into R pieces, the ideal would be that M and R are a lot larger than the number of worker machines. This is due the fact that a worker performing many different tasks improves dynamic load balancing. Aside from that, it increases the recovery speed in the case of worker fail (since the many map tasks it has completed can be spread out across all the other machines).
  • Backup Tasks: Sometimes, a Map or Reducer worker may behave a lot more slow than the others in the cluster. This may hold the total processing time and make it equal to the processing time of that single slow machine. The original paper describes an alternative called Backup Tasks, that are scheduled by the master when a MapReduce operation is close to completion. These are tasks that are scheduled by the Master of the in-progress tasks. Thus, the MapReduce operation completes when the primary or the backup finishes.
  • Counters: Sometimes one may desire to count events occurrences. For this reason, counts where created. The counter values in each workers are periodically propagated to the master. The master then aggregates (Yep. Looks like Pregel aggregators came from this place ;) ) the counter values of a successful map and reduce task and return them to the user code when the MapReduce operation is complete. There is also a current counter value available in the master status, so a human watching the process can keep track of how it is behaving.

Well, I guess with all the concepts above, Hadoop will be a piece of cake for you. If you have any question about the original MapReduce article or anything related, please let me know.

My next post will be about Hadoop and MapReduce 1.0. See you soon!

Configure PXE Boot for CentOS 6.5 using FTP, HTTP and Kickstart

This is my first true posts in the blog (yep, hello worlds do not count ), so how about we start from the beginning (brilliant!). How about how deploy a Linux OS on a cluster?

One can say: “Oh, but I know how! I just need to get a Linux iso and install on all the machines. All I need is a keyboard, a monitor, a usb stick/cd drive (if this still exists) and a mouse.” Yeah, sure. This would be too much of a monkey job too. All this time that you expend doing this you could be doing more productive stuff ;)  Having to configure all the details again and again for all the machines is boring, unproductive and wastes the time of the person that is doing this.

In order to perform this installation, first we need to select a Linux distribution. How about CentOS 6.5? It’s the latest release (at the moment that I am writing this post) and looks like the procedure is quite straight forward for all the distributions once you have a decent template.

Before you start all the process, please check if the client machines allow boot through network. One can verify this on the BIOS. If there is an option (and a boot priority) for LAN boot, your machine most likely supports PXE protocol.
Oh yeah, sure. And what is PXE protocol?
It stands for Preboot eXecution Environment (just call “pixie” – like that ace combat pilot, remember?). Long story short, it is an environment that allows computer to boot through network interface, ignoring all data storage devices and installed OSs.

The protocol itself is a combination of DHCP and TFTP:

  • DHCP is used to locate the appropriate boot server(s)
  • TFTP is used to download the initial bootstrap program (and some additional files)

Ouch Marco, what’s DHCP, TFTP and bootstrap program?

DHCP stands for Dynamic Host Configuration Protocol. It is a network protocol that works on IP (Internet Protocol, but that level of detail is already too much) in a way to dynamically distribute network configuration parameters. Things like IP address for interfaces and other services are distributed on it. It is in the Application Layer of the Internet Protocol Suite. Basically, this protocol removes the need of a network administrator to set these items manually.

TFTP? Trivial File Transfer Protocol. Known for its simplicity (triviality? Yeah, master of obvious). Usually people use it to transfer boot files and configuration files between machines over a local environment. By trivial one can assume that it is really limited when compared to FTP, since there is no authentication (it’s on a local network – you should make sure that it is secure).

Last but not least: Bootstrap program. In computer science, booting is basically the process of starting a computer. It has a lot of stages. Usually each stage loads a more complex program for the next state. The term itself was initially a metaphor for “the computer pulls itself up by its bootstraps”, but later was used as a technical term.

Ok, and how exactly PXE communicates with the machine?

  1. The client computer ( the one that you are intending to install the OS ) broadcast (yep, broadcast are to the entire network) a DHCPDISCOVER packets, extended with PXE-specific options to port 67, using UDP protocol. This package tell the server that the client is PXE-capable and is ignored (dropped) by standard DHCP servers.
  2. The server (PXE redirection service) answers to the client with a DHCPOFFER to the client 68/UDP port. This answer contains:
    1. PXE Discovery control field, used to define the best way to contact the PXE boot servers. It can be multicasting, broadcasting or unicasting.
    2. List of IP addresses available on the DHCP server.
    3. PXE boot menu, where each entry is a PXE Boot Server Type (in our case, this option will be the kind of installation of the OS)
    4. PXE Boot Prompt, that tells the user to press a defined key to see the boot menu.
    5. Time-out. It launches the first boot menu entry if it expires. In this first version of document, this feature is not implemented. But is really easily done and I will probably edit this post in the future to allow this feature.
  3. Now it needs an IP to be able to contact the PXE Boot server – since it has options to define and the server must be able to found out what machine defined which options. The client sends a DHCPREQUEST with PXE-specific options to port 67/UDP. It can multicast or unicast, depending on the preferences set on the DHCPOFFER.
  4. Once the PXE Boot server receives the package, it sends a DHCPACK, which includes:
    1. The complete file path to download the NBP (Network Bootstrap Program) via TFTP
    2. The multicast TFTP configuration
    3. PXE Boot Server type and PXE Boot layer it answer – this allows multiple boot servers to run from a daemon, but the scope is beyond this document.
  5. After the client receive the DHCPACK, the NBP is uploaded into the client RAM and executed. It may or may not be verified, by checksum, before being executed (available for 2.1 version of PXE).

One can visualize the flow in the image below:


More details on this protocol can be found on the Intel Website

How’s the network layout of the example used in the example?

Here it is ;)


Ok, ok, let’s go to the Linux part:

Network interfaces configuration

You just need to follow the topology of the network in this step and remember to enable ip forwarding. So:

  • Eth0 is the outside network.
  • Eth1 is the local network, managed by the DHCP server.

To enable IP Forwarding do:

$ echo 1 > /proc/sys/net/ipv4/ip_forward

To test if the ip forwarding is enabled (do as sudoer, plz):

$ /sbin/sysctl net.ipv4.ip_forward

if it returns 1, then the IP forwarding is enabled. If it returns 0, do:

$ /sbin/sysctl -w net.ipv4.ip_forward=1

DHCP Service installation

Not rocket science, do:

$ yum install dhcp

Now one needs to configure the /etc/dhcp/dhcpd.conf

No worries, I got an example. But it is of paramount important that you set the correct IP address of the TFTP (boot) server.

So, put in the dhcpd.conf the following content:

# DHCP Server Configuration file.
# see /usr/share/doc/dhcp*/dhcpd.conf.sample
# see 'man 5 dhcpd.conf'
# Written by Marco Lotz

ddns-update-style interim;

# Added for PXE boot support
allow booting;
allow bootp;

option option-128 code 128 = string;
option option-129 code 129 = text;

filename "pxelinux.0";
ignore client-updates;

# Network address and subnet mask
subnet netmask {
option routers;
option subnet-mask;
option domain-name "cluster";
option domain-name-servers;

# Range of lease IP address, should be based
# On the size of the network
range dynamic-bootp;

# Broadcast address
option broadcast-address;

# Default lease time
default-lease-time 864000000;
# Maximum lease time
#max-lease-time 43200;

Why such a huge lease time? Because I do not intend to have a node changing address and messing all the /etc/hosts configuration and making me reconfigure all my MapReduce Cluster. If you don’t mind to have small lease times, do as you want.

On range dynamic-bootp attribute, one defines the range of ip that wants to have distributed for DHCP clients in that subnet. The subnet that it distributes IPs is the one configured in eth1, so the subnet.

Don’t forget that comments start with # and the this file should probably be modified by you in order to reflect your own network (owrly?)

Once you configured all this, do:

To make the changes and verify if your configurations have a correct syntax (albeit they can be illogical)

$ service dhcpd restart 

Define the service to start automatically during the system boot:

$ chkconfig dhcpd on

TFTP time!

Get the tftp/ server:

$ yum install tftp-server

Tftp in CentOS runs under the xinetd, that is a open-source super-server daemon which runs on many Unix-like systems. It manages internet based connectivity. By default, the tftp comes disabled (not impressive, since it is by its nature so insecure).

To enable it, one has to modify /etc/xinetd.d/tftp

There’s a field called disable. Set it to ‘no’. Aside from that, in order to make it easier for you to debug (or just check logs) of what happened once the clients communicated to it, one should set it to really verbose by adding the –vvv option in the arguments section.

Here is an example of the content of this file:

# default: off
# description: The tftp server serves files using the trivial file transfer
# protocol. The tftp protocol is often used to boot diskless
# workstations, download configuration files to network-aware printers,
# and to start the installation process for some operating systems.

service tftp
socket_type = dgram
protocol = udp
wait = yes
user = root
server = /usr/sbin/in.tftpd
server_args = -vvv -s /var/lib/tftpboot
disable = no
per_source = 11
cps = 100 2
flags = IPv4

The /var/lib/tftpboot defines where are going to be the bootstrap files that you will need to perform network boot. You can change this directory, if you wish. Most of the tftp configuration files probably follow this template (which I would recommend you to keep it) and have the exactly same content.

The –s is a secure option used to limit the access to your files when other users connect to the tftp. This will make the tftp run under chroot (aka chroot jail)

 Install the Apache webserver (http service)


$ yum install httpd

Then edit the file /etc/httpd/conf/httpd.conf in order to set the ServerName to your server IP Address and Port 80. In my case, the content of that line is set to:


Start the Apache server and enable it to boot start by running the following commands:

$ service httpd start
$ chkconfig httpd on

To verify is the server is running smooth perform:

$ service httpd status


Note: this is a little bit tricky, because you may have set another rules to your firewall. Basically what you have do to is Open UDP port 69 in the firewall and TCP Port 80. In order to do this, you can just add the following lines to the file /etc/sysconfig/iptables :

-A INPUT -p udp -m state --state NEW -m udp --dport 69 -j ACCEPT
-A INPUT -p tcp -m state --state NEW -m tcp --dport 80 -j ACCEPT

In the default file, I would suggest to add these lines right after the SSH opening port line, that is:

-A INPUT -p tcp -m state --state NEW -m tcp --dport 22 -j ACCEPT

Note: Keep in mind that this may cause you some troubles. If the firewall is incorrectly configured, your client machine is going to find the DHCP server but will have problems to find the TFTP server.

Once you performed all the configuration, do service restart and check if it is running smoothly:

$ service iptables restart
$ service iptables status

If you have spent lots of time trying to configure it, but you still have problems for find server machine, you can try the following. (DANGER!)

$ service iptables stop

This will disable your iptables firewall and your computer will be vulnerable for outside connections, I totally do not recommend this solution.

After the stop command, run the boot sequence again. If it solves your tftp problem, then the problem is in your iptables. Correctly configure it and turn on the firewall again.


Now that all the server daemons are correctly installed, it’s time to install Syslinux. (– Hmmm, ok, but what is this foul creature?) It’s a project that assembles a whole collection of bootloaders. This way, you can boot OS from multiples sources, including CD and Network. For the network – that is our case – we’re going to use PXELINUX. So, let’s start. First part is (guess what!) Install Syslinux:

$ yum install syslinux

Copy Syslinux files to /tftpboot directory:

$ cp /usr/share/syslinux/pxelinux.0 /var/lib/tftpboot/
$ cp /usr/share/syslinux/menu.c32 /var/lib/tftpboot/
$ cp /usr/share/syslinux/memdisk /var/lib/tftpboot/
$ cp /usr/share/syslinux/mboot.c32 /var/lib/tftpboot/
$ cp /usr/share/syslinux/chain.c32 /var/lib/tftpboot/

Make a new directory for the PXE basic menu files (Yes Sir! These are the options that will be (or not) displayed to the user once the machines boots)

$ mkdir /var/lib/tftpboot/pxelinux.cfg

Inside the pxelinux.cfg directory (yes! it’s a directory. Not a file extension, although it looks like one), create a file named default and insert the following content into it:

default menu.c32
prompt 0


LABEL MinimalInstallation
MENU LABEL centos-6.5-64-LiveDVD (Developer install)
KERNEL images/centos-6.5-64/vmlinuz
APPEND initrd=images/centos-6.5-64/initrd.img nomodeset ksdevice=bootif ip=dhcp ks=

LABEL Interactive
MENU LABEL centos-6.5-64-LiveDVD (interactive)
KERNEL images/centos-6.5-64/vmlinuz
APPEND initrd=images/centos-6.5-64/initrd.img nomodeset ksdevice=bootif ip=dhcp ks=

Needless to say, one must chose the http server address for the lines that start with ks=

In my personal case I didn’t implement the second option, that is the Interactive option. One can implement it if desired ;)

Important Note: Keep in mind that the APPEND line is the final line of each label, thus the browser probably formatted that line to look line two lines, when it is actually only one.

Image Selection

Once you have finished to configure the default file, one has to place the linux kernel and the RAM image into the /tftpboot directory.

I strongly recommend to create a new directory for each Linux distribution that you may have created an option in the default. It is important that the path follows the KERNEL and APPEND directives.

Thus run the following command in the server machine to create the directory:

$ mkdir -p /var/lib/tftpboot/images/centos-6.5-64

Here comes a catchy part: For each Linux distribution that you may want to install, you need to get the vmlinuz (that is the kernel) and initrd.img (initial RAM image) of it. In my personal case, I have downloaded the following version of centOS:CentOS-6.5-x86_64-bin-DVD1.iso(you can get it at

Why am i not the minimal iso? Well, because I really wanted to perform a full developer workspace installation. This installation requires several packages that are not available in the minimal installation.

Again, why not a Live installation? Well, looks like in the Live installations, the RAM image actually is huge, since it is made to be able to run the OS into RAM without installing it. Remember that we’re going to transfer this files through a TFTP server? So yeah, the TFTP server tends to be quite slow to transfer huge files. Thus Live versions were not made to be used on PXE. Aside from that, the kernel is named vmlinuz0 instead of vmlinuz.

Then, get the bin versions and everything will be fine, with lots of sunshine! ;)
In order to get the files from the distribution you have chosen, you need to first mount the downloaded iso and then copy the files from it. Let’s assume that your iso is in the folder /linuximages/CentOS-6.5-x86_64-bin-DVD1.iso

So to mount it, perform:

$ mount /linuximages/CentOS-6.5-x86_64-bin-DVD1.iso /mnt -o loop

Don’t forget that you need to have sudoers privileges to mount! (yeah, its a sbin binary)
Now go to the tftp directory, by doing:

$cd /var/lib/tftpboot/images/centos-6.5-64cp /mnt/isolinux/vmlinuz
$cp /mnt/isolinux/initrd.img

To check out if you performed the correct file disposition for the tftp, do the following command:

$tree /var/lib/tftpboot

The output probably is in the format:

├── chain.c32
├── images
│   └── centos-6.5-64
│   ├── initrd.img
│   └── vmlinuz
├── mboot.c32
├── memdisk
├── menu.c32
├── pxelinux.0
└── pxelinux.cfg
└── default
 3 directories, 8 files

if you can’t perform treecommand, nothing that the following command won’t solve:

$ yum install tree

Copying the image to the HTTP server

We’re almost there! Time for the Http server image configuration. You need to put all the content of the mounted image in the directory that you specified in the defaultfile and in the kickstart file, which in our case is:


Remember that the absolute path for the webserver is /var/www/html/, thus you have to perform the following commands to copy the image to there:

$mkdir -p /var/www/html/mirrors/centos/6.5/
$cd /var/www/html/mirrors/centos/6.5/

And copy all the files from the mounted image into that directory:

$cp -r /mnt/* .

Don’t forget that you can name this directories as it pleases you. But it needs to be configured in the defaultPXE file and in the kickstart file (that will be discussed later)

Running a sample tree command for the file structure, one can see:

├── CentOS_BuildTag
├── EFI
│   ├── BOOT
│   │   ├── BOOTX64.conf
│   │   ├── BOOTX64.efi
│   │   ├── splash.xpm.gz
│   │   └── TRANS.TBL
│   └── TRANS.TBL
├── EULA
├── GPL
├── images
│   ├── efiboot.img
│   ├── efidisk.img
│   ├── install.img
│   ├── pxeboot
│   │   ├── initrd.img
│   │   ├── TRANS.TBL
│   │   └── vmlinuz
│   └── TRANS.TBL
├── isolinux
│   ├──
│   ├── boot.msg

(yes! am not going to post all the image tree here. Too many lines, but you got how it works)

 Kickstart file

Now it’s time for the kickstart file. This file is of paramount important, since it defines the configurations of the machine without you having to interact with it. (No monkey work, more developing time, yey!)

You can use anaconda or another kickstart generator in order to create this file. In my personal case, I wrote the script myself.

[root@server 6.5]# cat ks.cfg
# Kickstart for CentOS 6.5 for Hadoop Cluster
# By Marco Lotz (
# Install system on the machine.

# Use http as the package repository
# It is where are the image files

url --url

# Use the text installation interface.
# Use UTF-8 encoded USA English as the Language
lang en_US.UTF-8

# Configure time zone.
timezone America/Sao_Paulo

# Use Br-abnt2 keyboard.
keyboard br-abnt2

# Set bootloader location.
bootloader --location=mbr --driveorder=sda rhgb quiet

# Set root password
# Use crypt to create a encripted one
rootpw --plaintext TellThePasswordYouWant

# Enable shadowing and defines algorithm
authconfig --enableshadow --passalgo=sha512

# Disables SELinux for default
# This is a little tricky with cloudera manager
selinux --disabled

# Configure Network device

# Use DHCP and disable IPv6
# set hostname to hduser
network --onboot yes --device eth0 --bootproto dhcp --noipv6 --hostname=hduser

# Disable firewall
firewall --service=ssh

# Partition the hard disk

# Clear the master boot record on the hard drive.
zerombr yes

# Clear existing partitions
clearpart --all --initlabel

# Create RAID
# Note: Minimum partition size is 1.
part raid.11 --size 100000 --asprimary --ondrive=sda
part swap --size 8192 --asprimary --ondrive=sda
part raid.12 --size 100000 --asprimary --ondrive=sda
part raid.13 --grow --asprimary --ondrive=sda --size=1
part raid.21 --size 100000 --asprimary --ondrive=sdb
part swap --size 8192 --asprimary --ondrive=sdb
part raid.22 --size 100000 --asprimary --ondrive=sdb
part raid.23 --grow --asprimary --ondrive=sdb --size=1
part raid.31 --size 100000 --asprimary --ondrive=sdc
part swap --size 8192 --asprimary --ondrive=sdc
part raid.32 --size 100000 --asprimary --ondrive=sdc
part raid.33 --grow --asprimary --ondrive=sdc --size=1
raid / --fstype ext4 --level=1 --device=md0 raid.11 raid.21
raid /opt --fstype ext4 --level=0 --device=md1 raid.12 raid.31
raid /usr/local --fstype ext4 --level=0 --device=md2 raid.22 raid.32
raid /dump --fstype ext4 --level=0 --device=md3 raid.13 raid.23 raid.33

# Specify packages to install

# Define tha packages that are going to be installed

#The commented lines were used for minimal installation
# Install core packages
# @Base
# Don't install OpenJDK
# Install wget.
# wget
# Install vim text editor
# vim
# Install the Emacs text editor
# emacs

# Install rsync
# rsync

#Make Developer Workstation Install

# Dont install Openjdk
# Due to Hadoop compatibility


# Post installation configuration

# Enable post process logging.
%post --log=~/install-post.log

# Comment the Hadoop section if using a manager,
# as Cloudera or HortonWorks managers.
# Create hadoop user hduser with password hduser
#useradd -m -p hduser hduser

# Create group Hadoop
#groupadd hadoop

# Change user hduser's current group to hadoop
# usermod -g hadoop hduser

# Tell the nodes hostnmae and ip address of the admin
# machine.
echo " server.cluster" >> /etc/hosts

# Configure administrative privilege to hadoop group
# Configure the kernel settings

ulimit -u

# Startup services

service sshd start

chkconfig sshd on


# Reboot after installation

# Disable first boot configuration.
firstboot --disable

Ok, I guess the script above is quite self explanatory (if you have a question, please do not hesitate to post on the comments). Just remember to set the timezone the your correct timezone and keyboard to your region keyboard.

For more information about the attributes in this file, please refer to:

Another important thing is that you need to configure partition hard disk to reflect your hardware configuration (really!?). In my scenario I was using three disks in RAID. Don’t forget that if you want to get all the unallocated space to a partition, aside from telling it to grow, you need to set the minimum size to 1 MB (standards, who are we to discuss with them?)

The sunshine, that joyful moment

Now that we have finally configured everything, you are good to go. Just restart your client computer, make sure that the network boot is set as a primary option and see the magic happens :)

In the current configuration, it will probably need two enters to be pressed. One can be removed by making a timeout in the PXE default file, making the system auto select the first boot option. The second one can be select by modifying the ks.cfg  (kickstart file), making sure that the network will be auto-selected (by auto I mean that the script will select it). Also, there’s also a really good reference guide, that helps you to perform about the same procedure with virtual machines. It is available at:
Please fell free tow rite comments and give me feedbacks about this post. If you find an error or a typo, please let me know.
See you soon!