Facebook Engineering

The high quotient of transparency leads to socialization, which got affirmed by the popularity of Facebook supporting social media networking through a single window. The members of Facebook upload millions of photos, videos and also paint its wall with their colourful thoughts. My first interface with it not started with a real desire to participate but motivated by the professional experimentation. The art of scaling a social site is really challengeable; they did it (Kudos to Facebook engineering team). Also, they have released their engineering tools as open source @ https://github.com/facebook.

The Facebook has to challenge with: (as per July 2010)

  • 570 billion page views per month
  • More than 3 billion photo uploads every month
  • Number of photos exceed than all other photo sites combined (including sites like Flickr)
  • Serving 1.2 million photos per second
  • More than 25 billion pieces of content of status, updates, comments, etc., are shared every month          

      Facebook Engineering tools          


Cassandra is a distributed storage system with no single point of failure. Internally it’s using Distributed Hash Table (DHT), which is elaborated in my earlier article. Cassandra is using Amazon Dynamo’s Infrastructure design and Google BigTable’s ColumnFamily-based data model.

We had a good time in experimenting with Cassandra implemented along with Apache Lucene, which now caters our scalable search engine. This architecture is now gradually getting its place in our various tools, which are using Apache Solr/Lucene.

Large data set analyzing/processing on cluster machines

MapReduce framework is widely using for analyzing the large set of data. One important type of data analysis is done with MapReduce is log processing. MapReduce is basically runs on any distributed file storage system such as Hadoop File System (HDFS) from Apache Hadoop. This MapReduce Framework which runs on each HDFS Node and do the parallel aggregation of the log data then produce the result.

A Hadoop job is to analyze the log data from HDFS. The basic procedures to be followed to achieve the objective are:

  • Collect the necessary log files from the cluster machines, which are producing the actual logs.
  • Batch the logs into a file which is atleast equivalent to 64MB, which is the default HDFS block level storage.
  • Compress the batched log files.
  • Send the batched log files to HDFS.
  • Run the Hadoop jobs to analyze the batch log files using the MapReduce framework.
  • Store the results into HBase for future purpose

The challenge involved in developing such system is, ‘How are we going to collect the log data from the cluster machines?’

The generic way to overcome the above stated challenge is to write the logs directly into a message queue from all the cluster machines. The utility which is running on top of each HDFS node will read the messages from the queue and convert into a compressed batch log files. But this is not an optimal way of processing the log files.

Recently, Facebook open sourced their internal logging system called Scribe. Scribe is a server for aggregating streaming of log data. It is designed to scale to a large number of nodes and also fault tolerant. Facebook describes their usage of Scribe by saying, “[Scribe] runs on thousands of machines and reliably delivers tens of billions of messages a day.”

The basic components for this Scribe Logging system are:

  • Thrift Client Interface
  • Distribution System

The Scribe logging system consists of two entries, a category and a message. A category is a high description and a message consists of the details about the log data.

The types of stores currently available are:

  • file – writes to a file, either local or nfs.
  • network – sends messages to another scribe server.
  • buffer – contains a primary and a secondary store. Messages are sent to the primary store if possible, and otherwise the secondary. When the primary store becomes available the messages are read from the secondary store and sent to the primary.
  • bucket – contains a large number of other stores, and decides which messages to send to which stores based on a hash.
  • null – discards all messages.
  • thriftfile – similar to a file store but writes messages into a Thrift TFileTransport file.
  • multi – a store that forwards messages to multiple stores.

Though Scribe serves to stream the logs to the HDFS nodes directly, Hadoop and HDFS definitely cannot solve any real-time problems. Hadoop jobs needs a start up cost of at least a few seconds. HDFS reads and writes have inadequate latency for anything real-time. Auditing/ analyzing MBs of data in very fast manner, good have this system in place.

Hashing Techniques

Hash function is the technique which will help us to index huge data into small chunk utilizing the numerical data. The outcome of a hash function is a hash code, checksums etc helps to easily look up the index for retrieving corresponding data from a large data set. In our continuous journey in exploring technology, we landed up in researching distributed hash tables which led us to analyze the various algorithms running behind a hash function.

A brief about Distributed Hash Table

In a distributed network environment, implementation of Distributed Hash Table (DHT) results in a set of numerical data for a given set of data, which shall be stored in any of the participating node (i.e., in underlying peer-to-peer systems).

The main characteristics of the DHT

  • Decentralization
  • Scalability
  • Fault-Tolerance

Decentralization is the mantra behind a scalability of application. The distributed hash keys are stored in any participating node and replicated with its neighbor too. There is no centralized control to perform the distributed hashing. The DHT leads to a reliable system, independent of nodes joining/leaving or failing from the network, without impacting the application functionality.

DHT meets the needs of many applications such as Membase, Memcahe, session data and user related data in the distributed environment. The hashing functionality caters the quick retrieval of data which is mapped to the hash bucket, by performing the major role of storing and retrieving the data using the hash key in the bucket. While applying the hash function, we need to consider the following factors:

S.No Factors Description
1 Minimal Hash Collision When two sets of data having the same hash value, checksum etc., (Diagram), it results in Hash Collision.
2 Performance The latency in retrieving data should be minimal

Is there any “Best Hashing algorithm”, would solve the collision problem and improve the performance?

The simplest answer is “No”. The above question shall be addressed in an alternative way by identifying algorithms leading to less collision instead of digging into a collision free hashing mechanism. Though we are able to identify an efficient i.e., collision free hashing mechanism, the complexity will grow along with data and it would lead to hash collision.

A better hashing shall be achieved using the following algorithms:

  • FNVHash
  • knuthHash
  • BobJenkins
  • SuperFastHash

In the above set of hashing algorithms, FNVHash and BobJenkins are well suited for large set of data. Sample FNVHash implementation in below.

Thanks to Paul Hsieh’s test program, here are some performance numbers for these different implementations (as benchmarked on my aging PowerBook G4):

FNVHash : 3.9300s
knuthHash : 2.9700s
BobJenkins : 2.4600s
SuperFastHash   : 2.2800s

The latest version of Bob Jenkin’s hash is called Lookup3 is outsmarting the SuperFastHash by providing better collision properties and also meeting the performance benchmark of it.

Not Only SQL

Globally maximum percentage of the applications utilize presentation layer, business layer and data layer in serving its users. But most of the enterprises are finding it difficult in scaling up an application with RDBMS data layer. For e.g., let us assume that the famous social websites like Facebook, Twitter, Digg etc are built in such a way that they make use of RDBMS to serve contents to millions of its users. The results of huge number of hits to the database server will impact the performance of application in providing data and it also results in chaos in network traffic too. But the expectation of the users of such social media application is the lighting performance in delivering and displaying data. Hence such social media application developers presume to develop the application as a scalable and highly available entity.No SQL

The other option hitting the neurons of the developer is the replication of database which comes with an add-on problem of data in-consistency i.e., it decreases the ACID feature of the RDBMS. Replication is not providing the guarantee that the slave(s) are always in sync with the master at any point of time. The other factors blocking the developer in opting for replications are the single point-of-failure, maintenance cost etc.

Differing from RDBMS, NoSQL development approach focus on the scalability of data store for large set of data. The data are replicated at many places, but it uses different approach like “memtable” and so on. NoSQL is an artifact which has no structured schema. It makes the developer to add columns on the fly. For scaling up the requests, one can dynamically add the nodes/machine without making any impact on the applications.

Categories of NoSQL stores
NoSQL are categorized in different ways in terms of data storing models.
No SQL Categories

Pros & Cons of NoSQL store
NoSQL are categorized in different ways in terms of data storing models.
Pros & Cons of NoSQL

Brewer’s CAP Theorem depicts that any shared or distributed systems should possess the following characteristics

Strong Consistency
All clients see the same view, even in presence of updates
High Availability
All clients can find some replica of the data, even in the presence of failures
The system properties hold even when the system is partitioned

The CAP theorem states that you can always have only two of the above three CAP properties. The ACID system serves consistency. Hence Amazon Dynamo providing Availability and Partitioning properties, consistency is eventually achieved.

Hadoop Map/Reduce Use Case

Of course, everyone can implement the hadoop cluster setup, but the thing is, how we can effectively using the hadoop framework in our requirements. Map/Reduce software framework has been developed on Java. Its mostly mapper reads the files from the HDFS and reducer writes the content to the hadoop file system. New I/O package has a predominant role in this framework. Because, parallel programming, every thread access the shared data. NIO package provides, semaphore like feature, write lock etc.,

Map/Reduce Tips

  • Even though, the map/reduce engine core is developed in Java. We can write the Mapper and reducer not only in Java, some other script languages like Scala, Python, Jython, Ruby etc., But each has its own strengths and weaknesses
  • The problem behind the small chunks, each one by default 64 MB. So every file, directory and block represents as an object in the NameNode’s memory. Each of which occupies 150 bytes in NameNode’s memory. Imagine, if 10 million smaller files, obviously occupies 3GB memory.
  • Also reading through small files, requires lots of file position seek.

Our Case study

We are cloudy guys! We have the capability as Cloud based solution providers. One of our tool, CloudBuddy enterprise (currently supports Amazon S3) which emits the logs for every user’s transactions such as upload, download to and from respectively on the Amazon S3. CloudBuddy enterprise server upload the logs to S3 on periodic basis. The hadoop cluster machines, will download the logs by the java scheduler. Java scheduler initiate the download trigger and download the log files from S3 and store it into the HDFS. Map/Reduce engine, fetch the input logs from the hadoop file system and process the logs. The final output will store it in Hbase, Hadoop database. Every time result will be update into hbase with new processed data by the Map/Reduce engine.


For any Map/Reduce task, we need the data set to processing the transactional logs as an input.The Mapper which has been written on java to map the each transactions (Upload/Download) based on the user id.

The log structure is comma separated values and the format is

First Column : User-ID

Second Column : [[UPLOAD – 1], [DOWNLOAD – 2], [DELETE – 3]]

Third Column : Bytes

Fourth Column : Time Stamp

Sample Logs :












For every user, the application will allocate the disk size. To computing the logs by using the map/reduce framework, it requires the input data which is already stored in the hadoop file system. The Map/Reduce framework will split the records based on the user id and combine the bytes by transaction type. The Mapper Class, its using the patten to split the columns by the delimiter


By overriding the map method to create the N-number of maps based on number of records in the input file. Mapper, maps provides the output as follows.,

<51232, <<1,564556>,<2,342304>,<3,0>,<3,343553>,<2,566422>,<2,786543>>>

<51234, <<1,23423>,<2,745645>,<3,4353>,<1,3453453>,<2,45345>>>

<51841, <<1,10234>,<2,383932>,<1,4353443>,<3,54644>,<2,456456>,<2,124853>>>

Now, the Combiner add the similar transactions of the respective User-ID

<51232, <<1,564556>,<2,1695269>,<3,343553>>>

<51234, <<1,3476876>,<2,790990>,<3,4353>>>

<51841, <<1,4363677>,<2,965241>,<3,54644>>>

Eventually, reducer calculate the total transactions for every user

<User-ID ,(Upload+Download)-(Delete))> —> <User-ID ,Total_Transactions>

Reducer Output

<51232, 1916272>

<51234, 4263513>

<51841, 5274274>

The Reducer also update the output to the Hbase database. CloudBuddy Enterprise server, which uses the webservice to fetch the total transactions for every user from the Hbase and intimate to the CloudBuddy enterprise user.