Skip to main content

MapReduce on Hadoop - Big Data in Action

In the previous post we’ve discovered what the secret of Hadoop is when it needs to store hundredths of TB. Based on a simple master-slave architecture, Hadoop is a system that can store and manipulate big amount of data very easily.
Hadoop contains two types of nodes for storing data. The NameNode is the node that plays the role of master. It knows the name and locations of each file that Hadoop stores.  It is the only node that can identify the location of a file based on the file name. Around this node we can have 1 to n nodes that store the files content. The name of this kind of nodes is DataNode.
Data processing
Hadoop stores big data without any kind of problems. But it became known as the system that can process big data in a simple, fast and stable way. It is a system that can process and extract the information that we want from hundredths of TB of data. This is why Hadoop is the king of big data. In this post we will discover the secret of data processing – how Hadoop manage to do this.
The secret that give us the ability to process data is called Hadoop. This paradigm it was not invented by Hadoop, but Hadoop managed to implement it very good. The first meeting with MapReduce will be hard for us. It will be pretty complicated to understand it. Each person that wants to use MapReduce needs to understand first the MapReduce paradigm.
Without understanding MapReduce we will not be able to know if Hadoop is the solution for our problem and what kind of data we should expect from Hadoop.
MapReduce and Tuples
Don’t expect Hadoop to be a system that stores data on tables. This system doesn’t have the concept of tables. It only works with tuples that are formed by a key and a value. This is the only thing that Hadoop uses to extract data. Each task that is executed in this system will accept as input this tuples. Of course the output of a task will be formed by (key, values) pairs. Each pair can contain one or more values.
Even if this tuple seems to be trivial, we will see that this is the only thing that we need if we want to process data.
The MapReduce process is formed from two differents steps – Map and Reduce. The Map is the process used to convert the input data into a new set of data. The data that will be obtained after this step is only intermediate data that will be used in the next step. We have the option to persist this data, but generally this information is not relevant for the end user.
The Map action is not executed on only one node. This action is executed on 1 to m nodes of DataNode type. Each DataNode on which this action is executed will contain the input data – because of this on each node we execute the Map over a part of input data. The result size of this action is smaller than the input data. This data can be processed more easily. At this step we have the result in the memory. The result is not written to the disk.
We can image that the output of this step is like a summary of our data. Based on the input and how we want to map the input data we will obtain different results. At this step, the output data doesn’t need to have the same format as the input data. The result is partitioned based on the function that uses the key of the tuple. In general a hash function is applied, but we can define any kind of partitioning mechanism.
The intermediate result can be used by Hadoop for different operations. At this step we can execute actions like sorting or shuffle. This small steps can prepare the data for the next step. This operations can and are executed also after the Reduce step.
From the parallelize point of view, on each node where the Map reduce is executed we can have from 10 to 100-150 operations in the same time. The number of concurrent operations is dictated by the hardware performance and the complexity of the Map action.
Once we have the intermediate results, we can start the next step of processing – Reduce. In comparison with the Map operation, the Reduce step operation cannot be executed on each node of Hadoop. This operation will be executed on only a small part of the nodes. This is happening because the size of data that we need to process was already reduced. Each data is partitioned for each Reducer.
If the Map reduce was formed by only one step, we will see that Reduce contains 3 main steps:

  • Shuffle
  • Sort
  • Reduce

In the moment when the Shuffle step is executed, each DataNode that was involved in the Map operation starts to send the results to the nodes that will run the Reduce operation. The data is send over an HTTP connection. Because Hadoop runs in a private network, we don’t have any kind of security problems.
All the key value pairs are send and sorted based on the key. This needs to be done because there are cases when we can have the same key from different nodes. In general, this step is done in parallel with the shuffle process.
Once the shuffle step ends, the Hadoop system will start to make another sort. In this moment Hadoop can control how the data is sorted and how the result will be grouped. This sort step give us the possibility to sort items not only by key, but also based by different parameters. This operation is executed on disk and also on memory.
The last step that needs to be executed is the Reduce. In the moment when this operation is executed, the final results will be written on disk. At this step, each tuple is formed from a key and a collection of values. From this tuple, the Reduce operation will select a key and only one value – the value will represent the final value.
Even if the Reduce step is very important, we can have cases when this step is not necessary. In this cases the intermediate data is the final result for the end user.
JobTracker, TaskTracker
The MapReduce operation requires two types of services - JobTracker and TaskTracker. This two types of services are in a master-slave relationship that is very similar with the one that we saw earlier on how the data is stored - NameNode and DataNode.
The main scope of the JobTracker is to schedule and monitor each action that is executed. If one of the operations failes, then the JobTracker is capable to rerun the action.
JobTracker discusses with the NameNode and programs the actions in a way that each job is executed on the DataNode that has the input data – in this way no input data is send over the wire.
TaskTracker is a node that accepts Map, Reduce and Suffle operations. Usually this is the DataNode where the input data can be found, but we can have exceptions from this rule. Each TaskTracker has a limited number of jobs that can be executed - slots. Because of this the JobTracker will try to execute jobs on the TaskTracker that has free slots.
From the execution model an interesting thing is the way how jobs are executed. Each job is executed on a separate JVM process. Because of this if something happens (an exception appears), only one job will be affected. The rest of the jobs will run without problems.  
Until now we have discovered how MapReduce works. Theory is very good, but we need also to practice. I propose a small example that will help us to understand how MapReduce works. In this way we will be able to understand in a simple way how MapReduce is doing his magic.
We will start from the next problem. We have hundreds of files that contains the number of accidents from each city of Europe that happened every month. Because UE is formed from different countries that have different system we end up with a lot of files. Because of this, we have files that contains information from the cities of a country, others contains information for only one city and so on. Let’s assume that we have the following file format:
London, 2013 January, 120
Berlin, 2013 January, 300
Roma, 2013 February, 110
Berlin, 2013 March, 200

Based on the input data we need to calculate the maximum number of accidents that took place in each city during a month. This simple problem can become a pretty complicated one when we have 10 TB of input data. In this case Hadoop is the perfect solution for us.
The first operation from MapReduce process is Map. In this moment each file will be process and a key value collection will be obtained. In our case the key will be represented by the name of the city and the value will be the number of accidents. From each file we will extract the maximum number of accidents from each city during a month. This would represent the Map operation and the output would be something like this:
(London, 120), (Berlin, 300), (Roma, 110), (London, 100), (Roma, 210), …
This intermediate result has no value for us (yet). We need to extract the maximum number of accidents for each city. The Reduce operation will be applied now. The final output would be:
(London, 120)
(Berlin, 300)
(Roma, 210)
A similar mechanism is used by Hadoop. The power of this mechanism is the simplicity. Having a simple mechanism, it can be duplicated and controlled very easily over the network.
In this article we found out how Hadoop process the data using MapReduce. We discovered that the core of the mechanism is very simple, but is duplicated on all the available nodes. One each node we can have more than one job that can run in parallel. In conclusion we could say that all the tasks that are executed over the data are maximum paralyzed. Hadoop tried to use all the resources that are available.
As a remark, don’t forget that the native language for Hadoop is Java, but it has support for other languages like Python, C# or PHP.


Popular posts from this blog

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine:
threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration:

TeamCity.NET 4.51EF 6.0.2VS2013
It seems that there …

Entity Framework (EF) TransactionScope vs Database.BeginTransaction

In today blog post we will talk a little about a new feature that is available on EF6+ related to Transactions.
Until now, when we had to use transaction we used ‘TransactionScope’. It works great and I would say that is something that is now in our blood.
using (var scope = new TransactionScope(TransactionScopeOption.Required)) { using (SqlConnection conn = new SqlConnection("...")) { conn.Open(); SqlCommand sqlCommand = new SqlCommand(); sqlCommand.Connection = conn; sqlCommand.CommandText = ... sqlCommand.ExecuteNonQuery(); ... } scope.Complete(); } Starting with EF6.0 we have a new way to work with transactions. The new approach is based on Database.BeginTransaction(), Database.Rollback(), Database.Commit(). Yes, no more TransactionScope.
In the followi…

GET call of REST API that contains '/'-slash character in the value of a parameter

Let’s assume that we have the following scenario: I have a public HTTP endpoint and I need to post some content using GET command. One of the parameters contains special characters like “\” and “/”. If the endpoint is an ApiController than you may have problems if you encode the parameter using the http encoder.
using (var httpClient = new HttpClient()) { httpClient.BaseAddress = baseUrl; Task<HttpResponseMessage> response = httpClient.GetAsync(string.Format("api/foo/{0}", "qwert/qwerqwer"))); response.Wait(); response.Result.EnsureSuccessStatusCode(); } One possible solution would be to encode the query parameter using UrlTokenEncode method of HttpServerUtility class and GetBytes method ofUTF8. In this way you would get the array of bytes of the parameter and encode them as a url token.
The following code show to you how you could write the encode and decode methods.