Large data set analyzing/processing on cluster machines


MapReduce framework is widely using for analyzing the large set of data. One important type of data analysis is done with MapReduce is log processing. MapReduce is basically runs on any distributed file storage system such as Hadoop File System (HDFS) from Apache Hadoop. This MapReduce Framework which runs on each HDFS Node and do the parallel aggregation of the log data then produce the result.

A Hadoop job is to analyze the log data from HDFS. The basic procedures to be followed to achieve the objective are:

  • Collect the necessary log files from the cluster machines, which are producing the actual logs.
  • Batch the logs into a file which is atleast equivalent to 64MB, which is the default HDFS block level storage.
  • Compress the batched log files.
  • Send the batched log files to HDFS.
  • Run the Hadoop jobs to analyze the batch log files using the MapReduce framework.
  • Store the results into HBase for future purpose

The challenge involved in developing such system is, ‘How are we going to collect the log data from the cluster machines?’

The generic way to overcome the above stated challenge is to write the logs directly into a message queue from all the cluster machines. The utility which is running on top of each HDFS node will read the messages from the queue and convert into a compressed batch log files. But this is not an optimal way of processing the log files.

Recently, Facebook open sourced their internal logging system called Scribe. Scribe is a server for aggregating streaming of log data. It is designed to scale to a large number of nodes and also fault tolerant. Facebook describes their usage of Scribe by saying, “[Scribe] runs on thousands of machines and reliably delivers tens of billions of messages a day.”

The basic components for this Scribe Logging system are:

  • Thrift Client Interface
  • Distribution System

The Scribe logging system consists of two entries, a category and a message. A category is a high description and a message consists of the details about the log data.

The types of stores currently available are:

  • file – writes to a file, either local or nfs.
  • network – sends messages to another scribe server.
  • buffer – contains a primary and a secondary store. Messages are sent to the primary store if possible, and otherwise the secondary. When the primary store becomes available the messages are read from the secondary store and sent to the primary.
  • bucket – contains a large number of other stores, and decides which messages to send to which stores based on a hash.
  • null – discards all messages.
  • thriftfile – similar to a file store but writes messages into a Thrift TFileTransport file.
  • multi – a store that forwards messages to multiple stores.

Though Scribe serves to stream the logs to the HDFS nodes directly, Hadoop and HDFS definitely cannot solve any real-time problems. Hadoop jobs needs a start up cost of at least a few seconds. HDFS reads and writes have inadequate latency for anything real-time. Auditing/ analyzing MBs of data in very fast manner, good have this system in place.

Advertisements

Hadoop Map/Reduce Use Case


Of course, everyone can implement the hadoop cluster setup, but the thing is, how we can effectively using the hadoop framework in our requirements. Map/Reduce software framework has been developed on Java. Its mostly mapper reads the files from the HDFS and reducer writes the content to the hadoop file system. New I/O package has a predominant role in this framework. Because, parallel programming, every thread access the shared data. NIO package provides, semaphore like feature, write lock etc.,

Map/Reduce Tips

  • Even though, the map/reduce engine core is developed in Java. We can write the Mapper and reducer not only in Java, some other script languages like Scala, Python, Jython, Ruby etc., But each has its own strengths and weaknesses
  • The problem behind the small chunks, each one by default 64 MB. So every file, directory and block represents as an object in the NameNode’s memory. Each of which occupies 150 bytes in NameNode’s memory. Imagine, if 10 million smaller files, obviously occupies 3GB memory.
  • Also reading through small files, requires lots of file position seek.

Our Case study

We are cloudy guys! We have the capability as Cloud based solution providers. One of our tool, CloudBuddy enterprise (currently supports Amazon S3) which emits the logs for every user’s transactions such as upload, download to and from respectively on the Amazon S3. CloudBuddy enterprise server upload the logs to S3 on periodic basis. The hadoop cluster machines, will download the logs by the java scheduler. Java scheduler initiate the download trigger and download the log files from S3 and store it into the HDFS. Map/Reduce engine, fetch the input logs from the hadoop file system and process the logs. The final output will store it in Hbase, Hadoop database. Every time result will be update into hbase with new processed data by the Map/Reduce engine.

Data-Set

For any Map/Reduce task, we need the data set to processing the transactional logs as an input.The Mapper which has been written on java to map the each transactions (Upload/Download) based on the user id.

The log structure is comma separated values and the format is

First Column : User-ID

Second Column : [[UPLOAD – 1], [DOWNLOAD – 2], [DELETE – 3]]

Third Column : Bytes

Fourth Column : Time Stamp

Sample Logs :

51841,1,1048756,2009-12-27

51234,1,105768,2009-12-26

51841,2,28476,2009-12-12

51234,1,105768,2009-12-26

51232,3,518,2009-12-16

51841,1,1048,2009-12-12

51234,3,38756,2009-11-12

51841,5,104556,2009-12-07

51234,2,102768,2009-12-26

51841,3,48756,2009-12-19

51234,1,0768,2009-12-26

For every user, the application will allocate the disk size. To computing the logs by using the map/reduce framework, it requires the input data which is already stored in the hadoop file system. The Map/Reduce framework will split the records based on the user id and combine the bytes by transaction type. The Mapper Class, its using the patten to split the columns by the delimiter

Pattern.compile(“^(\\d+),(\\d+),(\\d+),\\d{4}-\\d{2}-\\d{2}$”)

By overriding the map method to create the N-number of maps based on number of records in the input file. Mapper, maps provides the output as follows.,

<51232, <<1,564556>,<2,342304>,<3,0>,<3,343553>,<2,566422>,<2,786543>>>

<51234, <<1,23423>,<2,745645>,<3,4353>,<1,3453453>,<2,45345>>>

<51841, <<1,10234>,<2,383932>,<1,4353443>,<3,54644>,<2,456456>,<2,124853>>>

Now, the Combiner add the similar transactions of the respective User-ID

<51232, <<1,564556>,<2,1695269>,<3,343553>>>

<51234, <<1,3476876>,<2,790990>,<3,4353>>>

<51841, <<1,4363677>,<2,965241>,<3,54644>>>

Eventually, reducer calculate the total transactions for every user

<User-ID ,(Upload+Download)-(Delete))> —> <User-ID ,Total_Transactions>

Reducer Output

<51232, 1916272>

<51234, 4263513>

<51841, 5274274>

The Reducer also update the output to the Hbase database. CloudBuddy Enterprise server, which uses the webservice to fetch the total transactions for every user from the Hbase and intimate to the CloudBuddy enterprise user.