Apache Spark – A Closer Look

Undoubtedly one of the most famous big data processing libraries out there is Apache spark (http://spark.apache.org) . Apache Spark project started at UC Berkley and the first major release happened in 2014. Apache spark claims to be much faster than Hadoop (http://hadoop.apache.org) both while processing in memory or while running on disks. Apache spark provides an easy to use API for Java, Scala and Python.

The corner stone of Apache spark is RDDs ( Resilient Distributed Data set). The salient feature of RDDs is that the data is distributed across nodes in a fault tolerant fashion. All the big data operations ( maps, Reduces, Joins, Aggregates, Folds, Wraps etc. )  are performed on these RDDs. The spark ecosystem consists of:

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark Machine learning
  • Spark GraphX

In this blog we will be discussing the most common use cases that you may encounter using spark core and spark SQL. We will deal with rest of the modules in a dedicated blog post later.

Adding spark to your project : 

I generally use maven as the build tool. If you are using maven all you will need is :

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.10</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>org.mongodb.mongo-hadoop</groupId>
  <artifactId>mongo-hadoop-core</artifactId>
  <version>1.5.1</version>
</dependency>

Now dependency 3 is not needed, but its just a connector that connects spark to MongoDB(https://www.mongodb.com/)
I am going to use it to demonstrate how spark connects to a data source easily. There will be connectors like this to
other data sources. 

Usage

The first and foremost thing that you will need is, to initialize the spark context.  I am running Spark along with a Spring app, so I am going to create a bean  for it:

@Bean
 public JavaSparkContext sparkContext(){
 return new JavaSparkContext("local[2]", "Aggregate Spark Jobs");
 }

Note: If you want to run more than 1 spark jobs on your project , you cannot instantiate multiple spark contexts. You will have to reuse the existing spark context.

Just on adding this, you may see some logs like this:

2016-08-29 16:13:35.950 INFO 86076 --- [ main] org.apache.spark.SparkContext : Running Spark version 1.5.1
2016-08-29 16:13:36.095 WARN 86076 --- [ main] org.apache.hadoop.util.NativeCodeLoader : Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2016-08-29 16:13:36.199 INFO 86076 --- [ main] org.apache.spark.SecurityManager : Changing view acls to: chattod
2016-08-29 16:13:36.200 INFO 86076 --- [ main] org.apache.spark.SecurityManager : Changing modify acls to: chattod
2016-08-29 16:13:36.201 INFO 86076 --- [ main] org.apache.spark.SecurityManager : SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(chattod); users with modify permissions: Set(chattod)
2016-08-29 16:13:36.699 INFO 86076 --- [lt-dispatcher-2] akka.event.slf4j.Slf4jLogger : Slf4jLogger started
2016-08-29 16:13:36.732 INFO 86076 --- [lt-dispatcher-2] Remoting : Starting remoting
2016-08-29 16:13:36.871 INFO 86076 --- [lt-dispatcher-3] Remoting : Remoting started; listening on addresses :[akka.tcp://sparkDriver@139.126.184.245:51465]
2016-08-29 16:13:36.877 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'sparkDriver' on port 51465.
2016-08-29 16:13:36.897 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering MapOutputTracker
2016-08-29 16:13:36.911 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering BlockManagerMaster
2016-08-29 16:13:36.933 INFO 86076 --- [ main] o.apache.spark.storage.DiskBlockManager : Created local directory at /private/var/folders/18/hbm7trx518z_f6dhsndc2h00521fhz/T/blockmgr-2f38c5c0-d4e7-4892-bb12-eeeb5aecd059
2016-08-29 16:13:36.947 INFO 86076 --- [ main] org.apache.spark.storage.MemoryStore : MemoryStore started with capacity 1966.1 MB
2016-08-29 16:13:36.989 INFO 86076 --- [ main] org.apache.spark.HttpFileServer : HTTP File server directory is /private/var/folders/18/hbm7trx518z_f6dhsndc2h00521fhz/T/spark-0477e6a6-c3c2-43b4-8e32-20a2fe9c31c6/httpd-93aa240c-4c54-486a-99e3-f174f9ca81b3
2016-08-29 16:13:36.991 INFO 86076 --- [ main] org.apache.spark.HttpServer : Starting HTTP Server
2016-08-29 16:13:37.044 INFO 86076 --- [ main] org.spark-project.jetty.server.Server : jetty-8.y.z-SNAPSHOT
2016-08-29 16:13:37.058 INFO 86076 --- [ main] o.s.jetty.server.AbstractConnector : Started SocketConnector@0.0.0.0:51466
2016-08-29 16:13:37.059 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'HTTP file server' on port 51466.
2016-08-29 16:13:37.071 INFO 86076 --- [ main] org.apache.spark.SparkEnv : Registering OutputCommitCoordinator
2016-08-29 16:13:37.153 INFO 86076 --- [ main] org.spark-project.jetty.server.Server : jetty-8.y.z-SNAPSHOT
2016-08-29 16:13:37.160 INFO 86076 --- [ main] o.s.jetty.server.AbstractConnector : Started SelectChannelConnector@0.0.0.0:4040
2016-08-29 16:13:37.160 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'SparkUI' on port 4040.
2016-08-29 16:13:37.162 INFO 86076 --- [ main] org.apache.spark.ui.SparkUI : Started SparkUI at http://139.126.184.245:4040
2016-08-29 16:13:37.220 WARN 86076 --- [ main] org.apache.spark.metrics.MetricsSystem : Using default name DAGScheduler for source because spark.app.id is not set.
2016-08-29 16:13:37.223 INFO 86076 --- [ main] org.apache.spark.executor.Executor : Starting executor ID driver on host localhost
2016-08-29 16:13:37.410 INFO 86076 --- [ main] org.apache.spark.util.Utils : Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51468.
2016-08-29 16:13:37.410 INFO 86076 --- [ main] o.a.s.n.netty.NettyBlockTransferService : Server created on 51468
2016-08-29 16:13:37.411 INFO 86076 --- [ main] o.a.spark.storage.BlockManagerMaster : Trying to register BlockManager
2016-08-29 16:13:37.414 INFO 86076 --- [lt-dispatcher-3] o.a.s.s.BlockManagerMasterEndpoint : Registering block manager localhost:51468 with 1966.1 MB RAM, BlockManagerId(driver, localhost, 51468)
2016-08-29 16:13:37.415 INFO 86076 --- [ main] o.a.spark.storage.BlockManagerMaster : Registered BlockManager
2016-08-29 16:13:37.515 INFO 86076 --- [ main] c.c.d.i.e.I.DaySupplyAggregatorITTest : Started DaySupplyAggregatorITTest in 1.915 seconds (JVM running for 6.965)

Now that you have spark up and running, lets dive into how we can get stuff into our app using spark.

Connecting to source:

Now I am using Mongo hadoop (https://github.com/mongodb/mongo-hadoop) connector to connect to my mongo instance. If you remember we added this as a dependency to our project.

All you got to do to get data is:

JavaPairRDD sparkRDD =  sparkContext.newAPIHadoopRDD(config, MongoInputFormat.class,Object.class, BSONObject.class);
where config is:
 org.apache.hadoop.conf.Configuration mongodbConfig = new org.apache.hadoop.conf.Configuration();
 mongodbConfig.set("mongo.job.input.format",
 "com.mongodb.hadoop.MongoInputFormat");
 mongodbConfig.set("mongo.input.uri", "mongodb://"+mongodbHost+"/"+database+"."+aggregateSalesCollection);
 mongodbConfig.set("mongo.input.query",getMongoQuery());

This is now going to get data from mongo using whatever mongo query we pass to it. As you can see we have the mongo result now stored on an RDD which may be distributed across multiple node. Similarly you can just read a log file by specifying a location. Connect to Oracle databases using a spark JDBC driver, so on and so forth.

Spark Operations on RDDs

There are 2 types of RDD operations:

  • Transformation – New datasets are created after performing some operations on existing dataset.
  • Actions: Spark driver returns data after performing transformation on the data.

Spark lazy loads transformation. By that what I mean is suppose you do an aggregation or a group-by on a spark RDD, the transformation doesn’t take place instantly, instead spark remembers what operations it has to perform and then when any action is called it completes all transformations and returns result.

Most used transformation operations are:

  • Map
  • Group-By
  • Join
  • Filter
  • Sort
  • Distinct.

Most commonly used Action operations are:

  • Reduce
  • Collect
  • For-each
  • Save

Now that we know what operations can be performed lets look into some more code.

Suppose we are storing documents in this format:

{
 "firstName" : "dipayan"
 "lastName" : "chattopadhyay"
 "city" : "Seattle"
 "phone" : "12398760"
 "jobtitle" : "developer"
}

Now suppose we want to find out phone numbers of people with the same first name who reside in Seattle. And you want to perform this operation on Spark, you can do something like:

//sparkRDD is a javaPairRDD, but we are interested in the values, so we can do
JavaRDD<Person> seattlePeople = sparkRDD.values().filter(e -> e.getCity().equals("Seattle"));
// We are interested in grouping firstname and phone numbers now
JavaRDD<Person> groupedResult = seattlePeople.groupBy(e -> e.getFirstName().toLowercase()+e.getPhone()).values();

Now both of these operations that we performed are transformation, so nothing has yet been executed. Things get executed only when an action is taken. Suppose, now we want to collect the data from all the RDDs distributed across multiple data nodes, we can simply do:

List<Person> result = groupedResult.collect();

Few things are pretty handy. In the above code, we converted an RDD into a Java List. Similarly if you ever want to convert a Java List into a Spark RDD, you can do:

JavaRDD<Person> personRDD = sparkContext.parallelize(result);

Now that we have this processed data, you can do what ever you want to do with it. Save it to a file, save it to a DB , write it to a queue etc.

Spark Sql

Spark SQL is an apache spark library that provides data abstractions on which we can perform SQL like queries. Previously I introduced you to how RDDs store the data from different sources. Similar to RDDs, Spark SQL has concept of Datasets and Dataframes.

Dataset:  Datasets added from spark 1.6 built on top of RDDs provides all benefits of RDDs and all transformations that can be performed on RDDs along with the added benefit of using Spark SQls optimized execution engine.

Dataframe: Dataframe are set of datasets organized into columns. Conceptually its very similar to an SQL based database table.

 Usage:

In order to use sql like features, you will have to initialize spark’s SQL context. That can be done by:

SQLContext sqlContext = new SQLContext(sparkContext);

Once you have the sqlContext, you just need to connect to the source now. Lets see how that can be done.  I have come to love mongodb, so I am going to be using another mongo connector that provides an easy api to connect to mongodb.

Connecting to the source:

I am using another mongodb connector for this purpose, you can easily add this using:

<dependency>
 <groupId>com.stratio.datasource</groupId>
 <artifactId>spark-mongodb_2.10</artifactId>
 <version>0.10.0</version>
</dependency>

Now that we have this dependency in place we can connect using:

 DataFrame peopleDF = sqlContext.read().format("com.stratio.datasource.mongodb").options(options).load();

where options is just the mongodb connection configurations:

Map options = new HashMap<>();
 options.put("host",mongodbHost);
 options.put("database",database);
 options.put("collection",personCollection);

Now that we have the dataframe consisting of all the records we can perform SQL like queries to filter out people living in Seattle using:

 DataFrame seattlePeople = peopleDF.filter("city = 'Seattle'");

After filtering we now have to group our records thats very simple and intuitive:

seattlePeople.registerTempTable("seattlites");
DataFrame groupedResult = sqlContext.sql("SELECT firstName,phone,COUNT(*) FROM seattlites GROUP BY firstName,phone);

Now that we have our grouped result, we can easily see the result using:

groupedResult.show();

Incase you want to see only firstName you can do:

groupedResult.select("firstName").show();

In case you are just interested in seeing the resultant schema, you can do:

groupedResult.printSchema();

Hope that I have not overwhelmed you guys with all of this information. This was a very quick Spark 101 usage. Let me know any doubts, questions or suggestions.

Leave a comment