Processing Large Unsplittable Files: A precursor to Big Data Processing

We often find ourselves in situations where we  have to process a file that has a much larger memory footprint than the machine on which you are running your processing. In those scenarios it isn’t feasible to load the entire data set in memory, as you would run into Out of memory issues. One way to solve the problem is vertical scalability, i.e throw more memory at the problem. But what if you don’t have the luxury of using more memory, yet you have to process this huge file.

Most of the distributed processing frameworks like Apache Spark or Flink are great at processing data that can be split and distributed across the nodes in a cluster.  However, if you have data that cannot be split, like a huge compressed file containing a giant block of data, then you are out of luck.

The approach that seems to work in this case is streaming. The idea is to read the file in, as a stream and then process bits and pieces without loading the whole file in memory. Lets look at some samples as to how we could accomplish this using Java or Scala. Similar techniques are available for other programming languages.

The use-case that we are going to deal with today is processing a giant array of JSONs that is gzipped together in a file. This is a fairly common use-case where some one would want to dump lots of data collected over time like clickstream, or visitor log, or product catalog into a file system for analytical needs.

The first step is to read this gzipped file and convert it into a stream:

public GZIPInputStream getGzipStream(String fileLocation){
    InputStream fileIs = null;
    BufferedInputStream bufferedIs = null;
    GZIPInputStream gzipIs =  null;
    try {
        fileIs = Files.newInputStream(Paths.get(fileLocation), new StandardOpenOption[]{StandardOpenOption.READ});
        // Even though GZIPInputStream has a buffer it reads individual bytes
        // when processing the header, better add a buffer in-between
        bufferedIs = new BufferedInputStream(fileIs, 65535);
        gzipIs = new GZIPInputStream(bufferedIs);
    } catch (IOException e) {
        closeSafely(gzipIs);
        closeSafely(bufferedIs);
        closeSafely(fileIs);
        throw new UncheckedIOException(e);
    }
    return gzipIs;
}

private static void closeSafely(Closeable closeable) {
    if (closeable != null) {
        try {
            closeable.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

As you can see above, we opened the gzipped file and are reading it in as a stream. The next step, we know that it contains an array of JSONs. Lets use GSON to read this array in a streaming fashion and process the JSONs sequentially:

public boolean readGzipStreamAndWriteToFile(GZIPInputStream gzipInputStream){
    try {
        JsonReader reader = new JsonReader(new InputStreamReader(gzipInputStream));
        Gson gson = new GsonBuilder().create();

        // Read file in stream mode
        reader.beginArray();
        int recordCount = 0;
        StringBuilder sb = null;
        String demarcator = "";
        while (reader.hasNext()) {
            // Bucketing policy, make buckets of size X and buffer write or do something.
            if(recordCount % MAX_RECORD_COUNT_IN_BUCKET == 0 ){
                if(sb != null &&  0 != sb.length() ){
                    FileSystemWriter.getInstance().writeToFile(String.valueOf(recordCount),sb.toString());
                }
                sb = new StringBuilder();
                demarcator = "";
            }

            JsonObject record = gson.fromJson(reader,JsonObject.class);
            sb.append(demarcator);
            demarcator = "\n";
            sb.append(record.toString());
            recordCount++;
        }
        // Process Items That are lesser than bucket size
        if(sb != null && 0 != sb.length() ){
            FileSystemWriter.getInstance().writeToFile(String.valueOf(recordCount),sb.toString());
        }
        reader.close();
    } catch (IOException ex) {
        ex.printStackTrace();
        return false;
    }
    return true;
}

In the above code fragment, we read a GzippedInputStream and use GSON’s JSON reader to read the array of JSON’s and carried on with our simple processing. We used a bucketing strategy to bucket the huge JSON into smaller buckets of processed data.

This code was tested on a 1GB compressed gzipped file, which on inflation blows up to about 10GB of data. The file was processed on a machine with 2GB heap space set . The processing time was less than 4 minutes. Most of the memory was consumed not for processing the giant file, but for maintaining the buffer before writing it out. If more memory could be provisioned, we could create larger buckets, in turn resulting in fewer IOPS and faster processing.

I hope this will be useful to someone dealing with similar problems and having limited resources at disposal. You can find the entire source code here.

Auto Error Retry a.k.a autoerroretry

One of the most commonly recurring pattern in developing data processing pipelines is the necessity to retry transient failures and a need to gracefully handle bad data. Given that this is a recurring pattern. I recently authored an open-source library autoerrorretry ( https://github.com/dipayan90/autoerroretry )  that provides a neat solution to this need.

The solution provides clean intuitive interface to achieve retrying of business logic  which is totally customizable along with a retry functionality whose frequency is user driven. More over this solution is completely asynchronous and the retry logic is handled by a background daemon.

The solution comes out of the box to support Amazon SQS, Apache Kafka, Square Tape ( a file based queuing mechanism ) . The solution also provides easy interfaces that helps abstract out the core problem and implement custom solutions.

To start with, you can get this library which is a java jar using maven:

<dependency>
 <groupId>com.nordstrom.ds</groupId>
 <artifactId>autoerroretry</artifactId>
 <version>1.3</version>
 <type>pom</type>
</dependency>

You can also use IVY and Gradle for the same.

Now that you have the library, how do you use it. Well, actually its pretty simple:

All the objects are serialized and de-serialized using a String serializer. The library comes out of the box with converters to help you out with this. ObjectStringConverter helps you to convert any given java object to a String.

Example:

Transaction transaction = new Transaction();
        transaction.setDate("12/27/1990");
        transaction.setTransactionId(1);
        transaction.setValue(100);
        
 String s = converter.toString(transaction);
  
 List<String> messageList = Collections.singletonList(s);

Now that we have converted it to a string, all the bad data that needs a second look can be moved to a deadletter queue or an error queue using:

LoaderHandlerClient client = new LoaderHandlerClient();
client.publishErrors(new PublishErrorMessageRequest. PublishErrorMessageRequestBuilder() 
.withMessages(messageList) .withMessageBroker(MessageBroker.SQS) 
.withSqsUrl(queueUrl)
.build());

As you can see the mandatory parameters are your objects that have bad data issues, an indication as to what type of backend message broker you want to use, in this case since we are using SQS we have to provide the global SQS url with it.

Retries have a very similar interface as errors, all you have to do is call publishRetries method instead of publishErrors.

For a kafka based backend you would use:

client.publishRetries(new PublishErrorMessageRequest.
                PublishErrorMessageRequestBuilder()
                .withMessages(messageList)
                .withMessageBroker(MessageBroker.KAFKA)
                .withKafkaRetries(0)
                .withKafkaServers(Collections.singletonList("localhost:9092"))
                .withKafkaTopic("retry")
                .withOrderGuarentee(true)
                .build());

Similarly for a simple file based backed you can use:

client.publishRetries(new PublishErrorMessageRequest.
                PublishErrorMessageRequestBuilder()
                .withMessages(messageList)
                .withMessageBroker(MessageBroker.TAPE)
                .withTapeFileName("tape.txt")
                .build());

Good use cases of retries would be transient database unavailability, I/O errors due to network issues, failure of preceding steps in a chain of steps and so on. The best way to do see what to retry would be to cat exceptions on your processing functions and call this retry routine or get a list of failed messages as a result from your processing functions and adding them to the queue. The process of sending retriable messages in non blocking and is handled by a background thread which will not stop your chain of execution.

Handling retirable objects

Now that we know what to retry and how to publish retriable messages, lets see how do we handle retriable messages. Turns out that even that is quite simple. All you got to do is pass your business logic that needs to be retired on the objects.

A simple example would be:

client.recieveRetires(new ReceiveErrorMessageRequest
                .ReceiveErrorMessageRequestBuilder()
                .withSqsUrl(queueUrl)
                .withPingInterval(5)
                .build(), strings -> {
            strings.forEach(e -> { System.out.println(converter.fromString(e)); });
            return null;
        });

Please not that the input function to this method takes a list of strings that first needs to be converted to the native object. ObjectStringConverter to the rescue, which has a small utility method to convert Strings back to your object. Whatever business logic/function that you pass as an argument to the method call will be executed on your native object. In the example above, we are passing a function that just prints stuff. We can use this to save our native objects to a database, send it to another stream based system and so on. Again this is also non blocking and is processed by a background daemon. Please take a look at readme for more details on how other message brokers like kafka and tape are supported.

Please note , if the function execution fails on the retriable objects you can either discard the message by sending it to the erro queues or push is back to be retired again.

Hope this small light weight library helps solve some of your commonly recurring issues. Let me know your comments on this. Persistence is the key 🙂 .

Apache Spark – A Closer Look

Undoubtedly one of the most famous big data processing libraries out there is Apache spark (http://spark.apache.org) . Apache Spark project started at UC Berkley and the first major release happened in 2014. Apache spark claims to be much faster than Hadoop (http://hadoop.apache.org) both while processing in memory or while running on disks. Apache spark provides an easy to use API for Java, Scala and Python.

The corner stone of Apache spark is RDDs ( Resilient Distributed Data set). The salient feature of RDDs is that the data is distributed across nodes in a fault tolerant fashion. All the big data operations ( maps, Reduces, Joins, Aggregates, Folds, Wraps etc. )  are performed on these RDDs. The spark ecosystem consists of:

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark Machine learning
  • Spark GraphX

In this blog we will be discussing the most common use cases that you may encounter using spark core and spark SQL. We will deal with rest of the modules in a dedicated blog post later.

Adding spark to your project : 

I generally use maven as the build tool. If you are using maven all you will need is :

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.10</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>org.mongodb.mongo-hadoop</groupId>
  <artifactId>mongo-hadoop-core</artifactId>
  <version>1.5.1</version>
</dependency>

Now dependency 3 is not needed, but its just a connector that connects spark to MongoDB(https://www.mongodb.com/)
I am going to use it to demonstrate how spark connects to a data source easily. There will be connectors like this to
other data sources. 

Usage

The first and foremost thing that you will need is, to initialize the spark context.  I am running Spark along with a Spring app, so I am going to create a bean  for it:

@Bean
 public JavaSparkContext sparkContext(){
 return new JavaSparkContext("local[2]", "Aggregate Spark Jobs");
 }

Note: If you want to run more than 1 spark jobs on your project , you cannot instantiate multiple spark contexts. You will have to reuse the existing spark context.

Just on adding this, you may see some logs like this:

2016-08-29 16:13:35.950 INFO 86076 --- [ main] org.apache.spark.SparkContext : Running Spark version 1.5.1
2016-08-29 16:13:36.095 WARN 86076 --- [ main] org.apache.hadoop.util.NativeCodeLoader : Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-08-29 16:13:36.199 INFO 86076 --- [ main] org.apache.spark.SecurityManager : Changing view acls to: chattod
2016-08-29 16:13:36.200 INFO 86076 --- [ main] org.apache.spark.SecurityManager : Changing modify acls to: chattod
2016-08-29 16:13:36.201 INFO 86076 --- [ main] org.apache.spark.SecurityManager : SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(chattod); users with modify permissions: Set(chattod)
2016-08-29 16:13:36.699 INFO 86076 --- [lt-dispatcher-2] akka.event.slf4j.Slf4jLogger : Slf4jLogger started
2016-08-29 16:13:36.732 INFO 86076 --- [lt-dispatcher-2] Remoting : Starting remoting
2016-08-29 16:13:36.871 INFO 86076 --- [lt-dispatcher-3] Remoting : Remoting started; listening on addresses :[akka.tcp://sparkDriver@139.126.184.245:51465]
2016-08-29 16:13:36.877 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'sparkDriver' on port 51465.
2016-08-29 16:13:36.897 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering MapOutputTracker
2016-08-29 16:13:36.911 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering BlockManagerMaster
2016-08-29 16:13:36.933 INFO 86076 --- [ main] o.apache.spark.storage.DiskBlockManager : Created local directory at /private/var/folders/18/hbm7trx518z_f6dhsndc2h00521fhz/T/blockmgr-2f38c5c0-d4e7-4892-bb12-eeeb5aecd059
2016-08-29 16:13:36.947 INFO 86076 --- [ main] org.apache.spark.storage.MemoryStore : MemoryStore started with capacity 1966.1 MB
2016-08-29 16:13:36.989 INFO 86076 --- [ main] org.apache.spark.HttpFileServer : HTTP File server directory is /private/var/folders/18/hbm7trx518z_f6dhsndc2h00521fhz/T/spark-0477e6a6-c3c2-43b4-8e32-20a2fe9c31c6/httpd-93aa240c-4c54-486a-99e3-f174f9ca81b3
2016-08-29 16:13:36.991 INFO 86076 --- [ main] org.apache.spark.HttpServer : Starting HTTP Server
2016-08-29 16:13:37.044 INFO 86076 --- [ main] org.spark-project.jetty.server.Server : jetty-8.y.z-SNAPSHOT
2016-08-29 16:13:37.058 INFO 86076 --- [ main] o.s.jetty.server.AbstractConnector : Started SocketConnector@0.0.0.0:51466
2016-08-29 16:13:37.059 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'HTTP file server' on port 51466.
2016-08-29 16:13:37.071 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering OutputCommitCoordinator
2016-08-29 16:13:37.153 INFO 86076 --- [ main] org.spark-project.jetty.server.Server : jetty-8.y.z-SNAPSHOT
2016-08-29 16:13:37.160 INFO 86076 --- [ main] o.s.jetty.server.AbstractConnector : Started SelectChannelConnector@0.0.0.0:4040
2016-08-29 16:13:37.160 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'SparkUI' on port 4040.
2016-08-29 16:13:37.162 INFO 86076 --- [ main] org.apache.spark.ui.SparkUI : Started SparkUI at http://139.126.184.245:4040
2016-08-29 16:13:37.220 WARN 86076 --- [ main] org.apache.spark.metrics.MetricsSystem : Using default name DAGScheduler for source because spark.app.id is not set.
2016-08-29 16:13:37.223 INFO 86076 --- [ main] org.apache.spark.executor.Executor : Starting executor ID driver on host localhost
2016-08-29 16:13:37.410 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51468.
2016-08-29 16:13:37.410 INFO 86076 --- [ main] o.a.s.n.netty.NettyBlockTransferService : Server created on 51468
2016-08-29 16:13:37.411 INFO 86076 --- [ main] o.a.spark.storage.BlockManagerMaster : Trying to register BlockManager
2016-08-29 16:13:37.414 INFO 86076 --- [lt-dispatcher-3] o.a.s.s.BlockManagerMasterEndpoint : Registering block manager localhost:51468 with 1966.1 MB RAM, BlockManagerId(driver, localhost, 51468)
2016-08-29 16:13:37.415 INFO 86076 --- [ main] o.a.spark.storage.BlockManagerMaster : Registered BlockManager
2016-08-29 16:13:37.515 INFO 86076 --- [ main] c.c.d.i.e.I.DaySupplyAggregatorITTest : Started DaySupplyAggregatorITTest in 1.915 seconds (JVM running for 6.965)

Now that you have spark up and running, lets dive into how we can get stuff into our app using spark.

Connecting to source:

Now I am using Mongo hadoop (https://github.com/mongodb/mongo-hadoop) connector to connect to my mongo instance. If you remember we added this as a dependency to our project.

All you got to do to get data is:

JavaPairRDD sparkRDD =  sparkContext.newAPIHadoopRDD(config, MongoInputFormat.class,Object.class, BSONObject.class);
where config is:
 org.apache.hadoop.conf.Configuration mongodbConfig = new org.apache.hadoop.conf.Configuration();
 mongodbConfig.set("mongo.job.input.format",
 "com.mongodb.hadoop.MongoInputFormat");
 mongodbConfig.set("mongo.input.uri", "mongodb://"+mongodbHost+"/"+database+"."+aggregateSalesCollection);
 mongodbConfig.set("mongo.input.query",getMongoQuery());

This is now going to get data from mongo using whatever mongo query we pass to it. As you can see we have the mongo result now stored on an RDD which may be distributed across multiple node. Similarly you can just read a log file by specifying a location. Connect to Oracle databases using a spark JDBC driver, so on and so forth.

Spark Operations on RDDs

There are 2 types of RDD operations:

  • Transformation – New datasets are created after performing some operations on existing dataset.
  • Actions: Spark driver returns data after performing transformation on the data.

Spark lazy loads transformation. By that what I mean is suppose you do an aggregation or a group-by on a spark RDD, the transformation doesn’t take place instantly, instead spark remembers what operations it has to perform and then when any action is called it completes all transformations and returns result.

Most used transformation operations are:

  • Map
  • Group-By
  • Join
  • Filter
  • Sort
  • Distinct.

Most commonly used Action operations are:

  • Reduce
  • Collect
  • For-each
  • Save

Now that we know what operations can be performed lets look into some more code.

Suppose we are storing documents in this format:

{
 "firstName" : "dipayan"
 "lastName" : "chattopadhyay"
 "city" : "Seattle"
 "phone" : "12398760"
 "jobtitle" : "developer"
}

Now suppose we want to find out phone numbers of people with the same first name who reside in Seattle. And you want to perform this operation on Spark, you can do something like:

//sparkRDD is a javaPairRDD, but we are interested in the values, so we can do
JavaRDD<Person> seattlePeople = sparkRDD.values().filter(e -> e.getCity().equals("Seattle"));
// We are interested in grouping firstname and phone numbers now
JavaRDD<Person> groupedResult = seattlePeople.groupBy(e -> e.getFirstName().toLowercase()+e.getPhone()).values();

Now both of these operations that we performed are transformation, so nothing has yet been executed. Things get executed only when an action is taken. Suppose, now we want to collect the data from all the RDDs distributed across multiple data nodes, we can simply do:

List<Person> result = groupedResult.collect();

Few things are pretty handy. In the above code, we converted an RDD into a Java List. Similarly if you ever want to convert a Java List into a Spark RDD, you can do:

JavaRDD<Person> personRDD = sparkContext.parallelize(result);

Now that we have this processed data, you can do what ever you want to do with it. Save it to a file, save it to a DB , write it to a queue etc.

Spark Sql

Spark SQL is an apache spark library that provides data abstractions on which we can perform SQL like queries. Previously I introduced you to how RDDs store the data from different sources. Similar to RDDs, Spark SQL has concept of Datasets and Dataframes.

Dataset:  Datasets added from spark 1.6 built on top of RDDs provides all benefits of RDDs and all transformations that can be performed on RDDs along with the added benefit of using Spark SQls optimized execution engine.

Dataframe: Dataframe are set of datasets organized into columns. Conceptually its very similar to an SQL based database table.

 Usage:

In order to use sql like features, you will have to initialize spark’s SQL context. That can be done by:

SQLContext sqlContext = new SQLContext(sparkContext);

Once you have the sqlContext, you just need to connect to the source now. Lets see how that can be done.  I have come to love mongodb, so I am going to be using another mongo connector that provides an easy api to connect to mongodb.

Connecting to the source:

I am using another mongodb connector for this purpose, you can easily add this using:

<dependency>
 <groupId>com.stratio.datasource</groupId>
 <artifactId>spark-mongodb_2.10</artifactId>
 <version>0.10.0</version>
</dependency>

Now that we have this dependency in place we can connect using:

 DataFrame peopleDF = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();

where options is just the mongodb connection configurations:

Map options = new HashMap<>();
 options.put("host",mongodbHost);
 options.put("database",database);
 options.put("collection",personCollection);

Now that we have the dataframe consisting of all the records we can perform SQL like queries to filter out people living in Seattle using:

 DataFrame seattlePeople = peopleDF.filter("city = 'Seattle'");

After filtering we now have to group our records thats very simple and intuitive:

seattlePeople.registerTempTable("seattlites");
DataFrame groupedResult = sqlContext.sql("SELECT firstName,phone,COUNT(*) FROM seattlites GROUP BY firstName,phone);

Now that we have our grouped result, we can easily see the result using:

groupedResult.show();

Incase you want to see only firstName you can do:

groupedResult.select("firstName").show();

In case you are just interested in seeing the resultant schema, you can do:

groupedResult.printSchema();

Hope that I have not overwhelmed you guys with all of this information. This was a very quick Spark 101 usage. Let me know any doubts, questions or suggestions.

Evaluation of Big Data Processing Frameworks – A Series

Big data and related technologies have developed quite rapidly in the recent years. With fast paced development tons of big data processing libraries have mushroomed. Lot of libraries have gained some traction while others have been left behind. In this series I would like to share my experience with few of these libraries and discuss our real world use-case.

Before we begin, I would like to clarify what I like to refer big data as. It is data which has 3 V attributes.

  • Volume
  • Variety
  • Velocity

Large quantities of data which may or may not have a fixed schema getting generated at a very high speed is big data. This data can be website traffic logs, data from connected devices as part of Internet of things or data being generated by airplanes flying in the air that’s continuously being relayed to ground stations.

Enough of introduction into what big data is. Now lets see a high level overview into what a big data processing framework needs to have. In a nutshell, there are 3 very simple things:

  • Source
  • Processor
  • Sink

Source

A good big data processing framework should be able to connect with an array of different sources. Sources can be in the form of databases which can be Sql or NoSql based databases, flat files, message brokers or maybe even a simple HTTP REST endpoint where data can be continuously POSTed.

In order to be a successful library they need to have drivers/connectors that can connect to most of the leading databases like Oracle, MySql, Postgres, MongoDB, Cassandra, HBase etc. Not a lot of these connectors are available out of the box with the library so community support is very important to develop connectors for some of these sources.

Processor

One of the big features that most of these big data processing frameworks bring to the table is the distribution. Not only is the data distributed across multiple nodes but a simple process can be broken down int several tasks where each task can then be independently processed by different threads or machines in a cluster. After the data is processed across several threads or nodes depending on whether its a single node configuration or a cluster configuration, the processed data is then collected and presented as result. Most common processing operations that we do is a Map ( alter the structure of data into a much more simpler format or create a list of values after processing some input data ) and then Reduce ( do some aggregation on the mapped list) and produce a result. many frameworks allow you to join data from multiple different sources into some temporary data stores. For instance Apache spark allows you to join data from a queue with data coming from a database. There are many other powerful operations like wrap , fold etc. which generally are provided by most of the frameworks.

Sink

Sink has requirements more or less like Source. The processed data needs to be written out. So the framework should support the functionality of writing to different datastores/ message brokers.

The journey of data through all these different steps is often referred to as a pipeline. A pipeline generally involves one/multiple sources and sinks and may also have one or several processing steps depending on the use case and complexity of the task.

Big data processing pipeline
Big data processing pipeline

Now that we have a general idea of what any big data processing framework is expected to do, lets dive in and take a closer look at some of the most prominent big data processing libraries that are available and some of the concepts that they promote. In the next post I will provide a brief introduction into some basic concepts of Apache Spark. Stay tuned.