Hello! Hope you're doing well. In my last post I've explained about internals of Hadoop MapReduce. As promised in that post, we will write and execute a MapReduce program in Java for a simple wordcount example. Let's dive in!
Topics covered in this post
- Hadoop cluster setup on local machine and on Cloud
- Writing a MapReduce program on Eclipse
- Create a JAR file for the MapReduce Program and Uploading to HDFS
- Executing the MapReduce Program on the Hadoop Cluster
- Admin access to the machine (local preferably)
- Hadoop Cluster (Single/Multi node cluster) on local machine or on cloud
- Install JDK 1.8 or later on the local machine
- Eclipse IDE or any Java IDE installed on the local machine
2. Hadoop cluster setup on local machine and on Cloud
i. Single Node cluster setup
As we already discussed, the DataNodes store and process the data. We need at least a single node Hadoop cluster to run the MapReduce program and process the data.
Setting up a single node Hadoop cluster on a local machine is a bit lengthy process and often could lead us to errors. I'm sharing the guides that I've used to setup the cluster on my local for testing below for both and Windows and Linux.
- Windows- https://towardsdatascience.com/installing-hadoop-3-2-1-single-node-cluster-on-windows-10-ac258dd48aef
- Linux (Ubuntu)- https://phoenixnap.com/kb/install-hadoop-ubuntu
ii. Multi node Cluster setup
Alternatively, we can use a cloud-based Hadoop cluster like DataProc on Google Cloud platform (GCP) which doesn't require any setup other than selecting the configuration of the NameNode and the DataNodes. The GCP account setup can be referred here. We'll see the setup in the following steps.
Before going any further you should consider two important steps while operating in any cloud environment.
a. Setting up the billing alerts to avoid any unexpected bills.
b. Turn off/delete the resources soon after the work is done
a. Sign up to the Google Cloud and login to your account
b. Search for "DataProc" and select the option with the same name in the results
c. Select the "Create Cluster" option
d. Provide the following details in the create cluster page under "setup a cluster page"
i. Cluster name - **test-cluster** ii. Cluster region and Zone - **us-central1**, **us-central1-a** iii. Cluster Type - **Standard (1 master, N workers)**
iv. Autoscaling Policy - **None** v. Image type and version - **2.0-debian10 (default)** vi. **Select Enable Component gateway**
e. Under "Configure nodes" select the following for Master node
i. Machine family - **General-Purpose (default)** ii. Series - **N1 (default)** iii. Machine type - **n1-standard-2 (2 vCPU, 7.5 GB memory)** iv. Primary disk size (min 15 GB) - **100GB** v. Primary disk type - **Standard Persistent Disk** vi. Number of local SSDs - **0**
f. Select the following for "Worker Nodes"
i. Machine family - **General-Purpose (default)** ii. Series - **N1 (default)** iii. Machine type - **n1-standard-2 (2 vCPU, 7.5 GB memory)** iv. Number of worker nodes - **2** v. Primary disk size (min 15 GB) - **100GB** vi. Primary disk type - **Standard Persistent Disk** vii. Number of local SSDs **- 0**
g. Leave the rest of the config as is and select on "CREATE"
h. Click on the cluster name and select the "VM Instances" tab in the page
i. Click on "SSH" for the master node and you'll be presented with a new browser window connected to the master node of our HDFS cluster. I've used local terminal to connect to the master node for the rest of the post.
Note: In real world scenarios, we would connect to the Hadoop cluster via a gateway node or edge node. We'll not use the NameNode for connecting to the cluster since it'll be very busy in handling the cluster.
3. Writing a MapReduce program on Eclipse
a. Create a new Java project called "wordcountmapreduce" in Eclipse IDE on your local machine. Here, I'm using a Linux (ubuntu) machine to create the project. The rest of the steps should stay same for Windows machine as well.
b. Create a new Class for Map by right clicking on the project and select "Class". Once you select it, enter the name of the Map class as "WordCountMapper" and hit Finish.
WordCountMapper class is created, use the following link for the mapper, reducer, partitioner implementation for the wordcount example. Refer the GitHub link for the code.
d. To remove the errors in the IDE, we must mention the Hadoop libraries in project build path. The following are the libraries (only jar files) that should be added to the project:
<hadoop_dir>is the path where you saved the hadoop distribution. Ex: /home/
Click on "Add External JARs" and navigate to the paths mentioned in the above list. After all the required JARs, click "Apply and Close"
e. After adding the jars to the project build path, we can see the errors disappeared in the IDE in the below image. Use the code for the reducer (WordCountReducer.java), partitioner (WordCountPartitioner.java) and the driver (WordCount.java) classes from the GitHub link
f. Once the project setup is done, we will have a look at the "WordCount.java" class. This is a driver class which executes the Map, Reduce, Combiner and the Partitioner classes on the cluster. This class includes config like
i. Job Name - setJobName ii. Driver class - setJarByClass iii. Mapper class - setMapperClass iv. Combiner class - setCombinerClass. Same as Reducer class for wordcount example v. Reducer class - setReducerClass vi. Number of Reducers - setNumReduceTasks vii. Output data types from each class - setOutputKeyClass, setOutputValueClass viii. Input and Output paths - addInputPath, setOutputPath respectively
This is basically the end of the project and code setup required for the wordcount problem in MapReduce.
4. Create a JAR file for the MapReduce Program and Uploading to HDFS
Once the project and the Mapreduce code setup is done, there are two ways we could execute the MapReduce Java program:
- Run the Java program within the eclipse. You can find the guide for the same here.
- Package the Java program as a JAR file with all the dependencies and execute on the Hadoop cluster. We'll follow this method for this guide
Steps to package the wordcount MapReduce Java program as a JAR file:
a. Right click on the project and select "Export" option
b. Under Java, select "JAR" option and click Next.
c. Select the path for saving the JAR file. Click Next until the final step
d. Select the Main class as "WordCount" using Browse window.
e. Select Finish to create the jar file
f. The jar file will be created as shown below. Once the jar file is created, we'll upload it to the GCP Hadoop cluster and run it.
g. Now, we'll upload this to the master node in the HDFS cluster using SCP. You can configure SSH to connect to HDFS cluster instance on GCP using this link. I've used Windows + Windows Terminal and the same steps mentioned below are followed. To copy the jar file(s) to master node on the cluster, we use the following command:
SCP -i "`<Path/to/SSH/key/ssh-key>`" Path/to/jar/file/wordcountmapperonly.jar username@`<master-ip>`:/path/on/server
h. Once the jar file is available on the master node instance, we can use the following commands to copy the jar file to the HDFS cluster. Please note master node instance and the HDFS cluster are different.
SSH -i "`<Path/to/SSH/key/>`" username@`<master-ip>` hadoop fs -put -f Path/to/jar/file/wordcountmapperonly.jar `<hdfs_path>` hadoop fs -ls `<hdfs_path>`
Here, we are copying the jar files
wordcountmapreducepartitioner.jar and the input data folder
HadoopInputFiles for the Hadoop Directory
'/'. The input folder contains 3 text files
5. Executing the MapReduce Program on the Hadoop Cluster
As we've seen already, the MapReduce driver class (WordCount.java) will be configured to execute Mapper, Combiner, Reducer and Partitioner. We'll run the MapReduce program with different configurations using the driver class
i. Only Mapper
ii. Mapper and Reducer
ii. Mapper, Reducer and Partitioner
i. Only Mapper
To run Mapper only, we need to comment out the Combiner, Reducer and Partitioner classes configured in the driver class and package the jar file as shown in the above step. The driver class should look like the below picture. The code for the same is here.
The input files are in "/HadoopInputFiles" and has data as in three files as mentioned below. You can find the input files here.
Now, run the jar file "wordcountmapperonly.jar" on the Hadoop cluster with the following command and above input files. The steps to copy the jar file to HDFS location is shown above section.
hadoop jar `<hdfs_path>`/wordcountmapperonly.jar `<input_file_or_dir_path>` `<output_path>`
The following image show how to run the mapreduce jars on Hadoop cluster. The full output log of the run is here.
The output of the mapper only phase contains all the words with count 1 as shown below
Once we run the MapReduce job, we can see the application is tracked under YARN which is a resource manager for the cluster. Every run gets an entry here. The default YARN URL is
<cluster-hostname>:8088. For DataProc cluster though, we need to go to cluster details in the GCP console, select "Web Interfaces" tab under cluster details and select "YARN ResourceManager" to get the YARN web interface.
In case where the output path in the
hadoop jar command already exists, the MapReduce framework throws "Output directory already exists" error as shown below. This is to avoid the overwriting of any output data.
-D mapred.reduce.tasks is set to 3 by default and we need only map phase to run. We can force the reducer count to zero using this property.
In the output path, we can see four different files
- _SUCCESS - Indicates the job status
- part-m-00000 to part-m-00002 - output file corresponding each input files. here 'm' in the output filename indicates 'mapper' phase. Since we don't have a reduce phase configured for this run, we'll get an output file for an input file
As we already know, each mapper produces the key-value pairs
<word,1> for all the words in the input sentence as output shown below
ii. Mapper and Reducer
Now, Let's run the 'wordcountmapreduce.jar' with the same input files and a different output path. This has both map and reduce phase configured in the driver class. Logs for the run are here and code for the same is here
The output is generated after the reduce phase into a single output file. Since we have only one reducer by default in the cluster
iii. Mapper, Reducer and Partitioner
Now, Let's run the 'wordcountmapreducepartitioner.jar' with the same input files and a different output path. This has map, partition and reduce phases configured in the driver class. Logs for the run are here and code for the same is here
The output for the MapReduce with partitioner is as follows. As per the partitioner logic here, for each letter at the starting of the word, there will be a different output file created. This means we are creating 26 partitions which will create same number of reducers to process the records Example: all the words starting with letter 'a' will end up in
'part-r-00001' file with the count.
We have seen a practical example of wordcount with MapReduce as promised in my last post. This is an exhaustive guide to capture most known ways to create and execute the MapReduce programs in Java.
MapReduce as a compute has lost its edge to new compute framework like Spark. But do you know that we can use the MapReduce to ingest the data into HDFS from an RDBMS source? or write SQL like queries to execute MapReduce job? We will discuss about those in detail in my next blog posts. Stay tuned!
For now though, I'll delete the cloud resource that I've spun up for the tutorial. If you did the same, please delete the resource you have created else you'll end up with something like this
If my conent helped you in anyway and like to contribute to my knowledge quest and sharing, you can contribute to me here