Spark is quickly becoming an attractive platform for developers as it addresses multiple data-processing use cases through different components — Spark SQL, Spark Streaming, Machine Learning, Graph engine and so on. While Spark provides the framework for cluster computing, it does not include its own distributed data persistence technology (i.e. a database or a file system), relying on technologies like Hadoop/HDFS for that purpose. In fact, Spark can work with any Hadoop-compatible data formats.
You can use the MarkLogic Connector for Hadoop as an input source to Spark and take advantage of the Spark framework to develop your ‘Big Data’ applications on top of MarkLogic. Since the MarkLogic Connector for Hadoop already provides the interface for using MarkLogic as a MapReduce input source, I decided to use the same connector as an input source for my Spark application that we walk through below.
MarkLogicWordCount is an example application designed to work with simple XML documents that contain name:value pairs — but it does much more than a simple word count. Here is an example XML document:
<?xml version="1.0" encoding="UTF-8"?> <complaint xmlns="http://data.gov.consumercomplaints"> <Complaint_ID>1172370</Complaint_ID> <Product>Credit reporting</Product> <Issue>Improper use of my credit report</Issue> <Sub-issue>Report improperly shared by CRC</Sub-issue> <State>CA</State> <ZIP_code>94303</ZIP_code> <Submitted_via>Web</Submitted_via> <Date_received>12/28/2014</Date_received> <Date_sent_to_company>12/28/2014</Date_sent_to_company> <Company>TransUnion</Company> <Company_response>Closed with explanation</Company_response> <Timely_response_>Yes</Timely_response_> <Consumer_disputed_>Yes</Consumer_disputed_> </complaint>
The complaint XML documents are stored within a MarkLogic database. The MarkLogicWordCount application loads all the documents from the database into Spark RDD (Resilient Distributed Dataset) and performs following operations:
The application produces the output as shown below:
(Product,11) (Product:Bank account or service,44671) (Product:Consumer loan,12683) (Product:Credit card,48400) (Product:Credit reporting,54768) (Product:Debt collection,62662) (Product:Money transfers,2119) (Product:Mortgage,143231) (Product:Other financial service,191) (Product:Payday loan,2423) (Product:Prepaid card,626) (Product:Student loan,11489) (State,63) (State:,5360) (State:AA,10) (State:AE,141) (State:AK,465) ... ...
As you can see in the output above, the first line indicates that 11 distinct product names were found in the given data set (result of step 2) and following 11 lines indicates the number of times (or the number of documents in which) each of the 11 product names were found within the data set (result of step3). The output contains this statistic profile for all element names and associated name:value pairs found in the data set.
MarkLogic Connector for Hadoop is supported against Hortonworks Data Platform (HDP) and the Cloudera Distribution of Hadoop (CDH). Recent releases of HDP and CDH come bundled with Apache Spark. Please refer to Getting Started with the MarkLogic Connector for Hadoop in order to setup the Hadoop connector. Also refer to setup instructions specific to MarkLogicWordCount.
Although Spark is developed using Scala, it also supports application development in Java and Python. MarkLogicWordCount is a Java application.
The first logical step within the application is to load documents from the MarkLogic database into a Spark RDD. The new RDD is created using the newAPIHadoopRDD
method on the SparkContext
object. The MarkLogic-specific configuration properties are passed using a Hadoop Configuration
object. These properties are loaded from the configuration XML that is passed as an input argument to the MarkLogicWordCount application. The properties include username, password, MarkLogic host, port, database name etc. We use the DocumentInputFormat
class that enables reading documents from the MarkLogic instance into DocumentURI
and MarkLogicNode
objects as key-value pairs. The following code demonstrates how to create an RDD based on documents within a MarkLogic database.
//first you create the spark context within java SparkConf conf = new SparkConf().setAppName("com.marklogic.spark.examples").setMaster("local"); JavaSparkContext context = new JavaSparkContext(conf); //Create configuration object and load the MarkLogic specific properties from the configuration file. Configuration hdConf = new Configuration(); String configFilePath = args[0].trim(); FileInputStream ipStream = new FileInputStream(configFilePath); hdConf.addResource(ipStream); //Create RDD based on documents within MarkLogic database. Load documents as DocumentURI, MarkLogicNode pairs. JavaPairRDD<DocumentURI, MarkLogicNode> mlRDD = context.newAPIHadoopRDD( hdConf, //Configuration DocumentInputFormat.class, //InputFormat DocumentURI.class, //Key Class MarkLogicNode.class //Value Class );
Now that we have loaded the documents in Spark RDD, let’s apply the necessary transformations to produce the intended output. Spark RDD supports two types of operations: transformations and actions. Transformations create a new dataset and actions return a value or save the dataset back to the persistence layer. While performing RDD operations, Spark’s API relies heavily on passing a function that is defined within a Spark application that will be executed in a Spark cluster. For example, map is a transformation that you can apply to a Spark RDD. The map API will take a user-defined function, pass each dataset element through that function and return a new RDD that represents the results. Since Spark is developed in Scala, which is fundamentally a functional programming language, this programming paradigm is very natural to Scala developers. Since we are developing the MarkLogicWordCount application in Java, we will implement the functions by extending the classes that are available in the Spark Java API, specifically in the package org.apache.spark.api.java.function
. Also we will use the special Java RDDs in spark.api.java
that provide the same methods as Scala RDDs but take Java functions.
Within the MarkLogicWordCount example we apply the following transformation steps to the RDD in which we have loaded the documents from the MarkLogic database.
The following code demonstrates how these steps are accomplished within the MarkLogicWordCount example.
//extract XML elements as name value pairs where element content is value JavaPairRDD<String, String> elementNameValuePairs = mlRDD.flatMapToPair(ELEMENT_NAME_VALUE_PAIR_EXTRACTOR); //Group element values for the same element name JavaPairRDD<String, Iterable<String> > elementNameValueGroup = elementNameValuePairs.groupByKey(); //Count distinct values for each element name JavaPairRDD<String, Integer> elementNameDistinctValueCountMap = elementNameValueGroup.mapValues(DISTINCT_VALUE_COUNTER); //map the element name value pairs to occurrence count of each name:value pair JavaPairRDD<String, Integer> elementNameValueOccurrenceCountMap = elementNameValuePairs.mapToPair(ELEMENT_VALUE_OCCURRENCE_COUNT_MAPPER); //aggregate the occurrence count of each distinct name:value pair. JavaPairRDD<String, Integer> elementNameValueOccurrenceAggregateCountMap = elementNameValueOccurrenceCountMap.reduceByKey(ELEMENT_VALUE_COUNT_REDUCER); //filter out the name:value occurrences that are statistically insignificant JavaPairRDD<String, Integer> relevantNameValueOccurrenceCounters = elementNameValueOccurrenceAggregateCountMap.filter(ELEMENT_VALUE_COUNT_FILTER); //Combine the distinct value count for each element name and occurrence count for each distinct name:value pair in a single RDD JavaPairRDD<String, Integer> valueDistribution = elementNameDistinctValueCountMap.union(relevantNameValueOccurrenceCounters); //sort the RDD based on key value so that element name and name:value keys appear in order JavaPairRDD<String, Integer> sortedValueDistribution = valueDistribution.sortByKey(true, 1); //Save the output to HDFS location that is specified as a part of input argument. sortedValueDistribution.saveAsTextFile(args[1]);
All RDD transformations in Spark are lazy. They are applied only when Spark encounters an action. In this case the only action performed on the RDD is saveAsTextFile
. By default each time you perform an action on a transformed RDD, that RDD may be recomputed. However, Spark allows you to cache the RDD in memory using the persist
or cache
methods for much faster access whenever you need to perform multiple actions on the RDD.
Note that in the code above, for many of the transformation steps, custom functions are passed as input parameters. To take a look at the implementation of these functions refer to the complete source code of MarkLogicWordCount. Feel free to download the source code, build it and try out the MarkLogicWordCount application yourself.
Hemant Puranik is a Technical Product Manager at MarkLogic, with a specific focus on data engineering tooling. He is responsible for creating and managing field ready materials that includes best practices documentation, code and scripts, modules for partner products, whitepapers and demonstrations of joint solutions - all this to accelerate MarkLogic projects when using ETL, data cleansing, structural transformation and other data engineering tools and technologies.
Prior to MarkLogic, Hemant worked more than a decade at SAP and BusinessObjects as a Product Manager and Engineering Manager on a variety of products within the Enterprise Information Management product portfolio which includes text analytics, data quality, data stewardship, data integration and master data management.
Subscribe to get all the news, info and tutorials you need to build better business apps and sites