Sloan Ahrens is a co-founder of Qbox and is currently a freelance data consultant. In this series of guest posts, Sloan will be demonstrating how to set up a large scale machine learning infrastructure using Apache Spark and Elasticsearch. This is part 2 of that series. Part 1: Building an Elasticsearch Index with Python on an Ubuntu is here. -Mark Brandon
In this post we're going to continue setting up some basic tools for doing data science. The ultimate goal is to be able to run machine learning classification algorithms against large data sets using Apache Spark™ and Elasticsearch clusters in the cloud.
The major advantage of this approach is that the same basic techniques can scale up to data sets of arbitrary size. We will start small at first.
In my last post, we set up a VirtualBox Ubuntu 14 virtual machine, installed Elasticsearch, and built a simple index using Python.
In this post we will continue where we left off, by installing Spark on our previously-prepared VM, then doing some simple operations that illustrate reading data from an Elasticsearch index, doing some transformations on it, and writing the results to another Elasticsearch index.
All the code for the posts in this series will be available in this GitHub repository.
We are going to be working locally, on our Ubuntu 14 VM. In the next post we will deploy our setup to the cloud.
Step 1: Preliminaries
The first thing you will need to do is make sure you worked through the first post in the series, so we can be sure we are all on the same page. This helps reduce the possibilities of package conflicts, etc.
I also like to make sure that my VM has access to several processors. This is not strictly necessary, but Spark can run on several processors, which can be useful for running locally to simulate multiple nodes. As shown below, I gave my VM four cores:
Another tool we will find useful is a Google Chrome extension called Sense. Sense is extremely useful for quickly interacting with an Elasticsearch index, and it can translate to and from cURL commands. Qbox has developed a modification of Sense that incorporates elements of GitHub Gist, available at http://sense.qbox.io/gist/, which allows the user to save Elasticsearch query code and provides a shareable URL. Just click the "Save Code" button in the upper-right-hand corner, and you will be redirected to a URL for that code snippet. Then you can share the URL with others if you want to show them your Elasticsearch code.
As a quick illustration of using Sense, let's run a simple search against the Elasticsearch index we built last time, as a sanity check. You can run the code here: http://sense.qbox.io/gist/4f8a5b8e4871e95ce403bdacb765d5b69454a53d, and below is a screenshot:
Step 2: Install Apache Spark™
Now that we have the preliminaries taken care of, let's get started installing Spark. I encourage you to read about Spark here. Spark can be used with Java, Scala, or Python. We're going to use Python.
First we will need to get Spark installed in our VM. We'll head over to the download page, select Spark release "1.1.0," package type "Pre-built for Hadoop 2.4," download type "Direct Download," and click the link next to "Download Spark" to download "spark-1.1.0-bin-hadoop2.4.tgz." Move the file wherever you would like it. I just moved it to my root directory.
This can also be accomplished (at least currently) by running:
# download spark wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.4.tgz
Next we'll extract the files, and clean up the mess:
# extract, clean up tar -xf spark-1.1.0-bin-hadoop2.4.tgz rm spark-1.1.0-bin-hadoop2.4.tgz sudo mv spark-* spark
Now we can run the Spark Python interpreter with:
# run spark cd ~/spark ./bin/pyspark
You should see something like:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.6 (default, Mar 22 2014 22:59:56) SparkContext available as sc. >>>
The interpreter will have already provided us with a Spark context object, which we can see by running:
>>> print(sc) <pyspark.context.SparkContext object at 0x7f34b61c4e50>
Step 3: A Simple Word-Count of a Document
To quickly illustrate some of Spark's basic capabilities, we will run a simple word-count of a document. We will use a document already in the spark directory, "CHANGES.txt," and we will run the task across four processors (assuming you gave your VM that many; if not just use the number you gave it).
So we'll start the Spark interpreter with the following command:
# run spark with 4 cores ./bin/pyspark --master local[4]
Now we'll load the text file into a Spark RDD:
# read in a text file textFile = sc.textFile('CHANGES.txt')
Next we'll count the number of lines, and (non-newline) characters in the file:
# count lines print('lines in file: %s' % textFile.count()) # add up lenths of each line chars = textFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) print('number of non-newline characters in file: %s' % chars)
(Of course, if we were running in a truly distributed environment, we probably wouldn't simply be printing our results to stdout -- but one thing at a time.)
I got:
lines in file: 14577
and:
number of non-newline characters in file: 505483
Now let's do a slightly more sophisticated operation. We'll count the number of occurences of each word (parsed only by spaces) in the document, sort the results, and print out the 10 most commonly-occuring words. Here's the code:
# run a simple word count of the doc # map each line to its words wordCounts = textFile.flatMap(lambda line: line.split()) # emit value:1 for each key:word wordCounts = wordCounts.map(lambda word: (word, 1)) # add up word counts by key:word wordCounts = wordCounts.reduceByKey(lambda a, b: a+b) # sort in descending order by word counts wordCounts = wordCounts.sortBy(lambda item: -item[1]) # collect the results in an array results = wordCounts.collect() # print the first ten elements print(results[:10])
and the output:
[(u'-0700', 2287), (u'Commit:', 2203), (u'from', 994), (u'pull', 896), (u'request', 889), (u'Merge', 888), (u'in', 720), (u'to', 673), (u'-0800', 648), (u'2013', 625)]
Note that, although all the operations on wordCounts
could have been chained in a single line, I just broke it up to make it easier to read.
Okay, now let's move on to integrating Elasticsearch.
Step 4: Elasticsearch I/O with Spark
In order to use Elasticsearch with Spark, the first thing we're going to need is the elasticsearch-hadoop adapter. We're going to use the 2.1.Beta release, available here.
We will install the adapter in a subdirectory of our Spark directory as follows:
# get elasticsearch-hadoop adapter cd ~/spark # or equivalent mkdir jars; cd jars wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.0.Beta2/elasticsearch-hadoop-2.1.0.Beta2.jar cd ..
Now we can run the Spark Python interpreter with the elasticsearch-hadoop jar:
# run spark with elasticsearch-hadoop jar ./bin/pyspark --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar
The Spark docs have an example of reading an Elasticsearch index with Python, here (under the Python tab). We'll use this technique to read the data from our index and print the first document:
# read in ES index/type "titanic/passenger" es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "titanic/passenger" }) print(es_rdd.first())
The output looks like:
(u'892', {u'fare': u'7.8292', u'name': u'Kelly, Mr. James', u'embarked': u'Q', u'age': u'34.5', u'parch': u'0', u'pclass': u'3', u'sex': u'male', u'ticket': u'330911', u'passengerid': u'892', u'sibsp': u'0', u'cabin': None})
Now let's do some simple analytics on the data and write the results back to the index (under a different type).
# extract values for the "sex" field, count occurences of each value value_counts = es_rdd.map(lambda item: item[1]["sex"]) value_counts = value_counts.map(lambda word: (word, 1)) value_counts = value_counts.reduceByKey(lambda a, b: a+b) # put the results in the right format for the adapter value_counts = value_counts.map(lambda item: ('key', { 'field': 'sex', 'val': item[0], 'count': item[1] })) # write the results to "titanic/value_counts" value_counts.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "titanic/value_counts" })
Now if we take a look with Sense (http://sense.qbox.io/gist/f1c30945ad5391d0a4bda233147f863ac8fe49ac), we will see our results in ES:
The code we've executed thus far is available in the repo, here.
Note that if you do this more than once, you will probably want to rebuild the index, since we aren't providing Elasticsearch with IDs for our analytics results (so new ones get added each time we execute the above code). If you have the repo in ~/local_code
(say), you can delete and re-create the index by running:
cd ~/local_code/qbox-blog-code/ch_1_local_ubuntu_es python build_index.py
Step 5: Clean Up, Generalize
The last thing we're going to do is clean up our code a little, generalize it, and put it in a form that makes it more portable so it will be easier for us to run it in the cloud (in the next post). So far we've been running our code from the interpreter. Now we'll put it in a code file and run it with spark-submit
.
We'll also apply the analysis we ran on the "sex"
field to every field, filter the results by those that appear more than once, and write all the results to the index. Here is the code (also available in the GitHub repo):
# es_spark_test.py from pyspark import SparkContext, SparkConf if __name__ == "__main__": conf = SparkConf().setAppName("ESTest") sc = SparkContext(conf=conf) es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/passenger" } es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) es_write_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/value_counts" } doc = es_rdd.first()[1] for field in doc: value_counts = es_rdd.map(lambda item: item[1][field]) value_counts = value_counts.map(lambda word: (word, 1)) value_counts = value_counts.reduceByKey(lambda a, b: a+b) value_counts = value_counts.filter(lambda item: item[1] > 1) value_counts = value_counts.map(lambda item: ('key', { 'field': field, 'val': item[0], 'count': item[1] })) value_counts.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf)
We can run this file with the following command (change the file path appropriately):
./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar ~/local_code/qbox-blog-code/ch_2_local_spark_es/es_spark_test.py
Now if we look at our index again (http://sense.qbox.io/gist/46c166c983d8fa40bbe5af4004e694eb674d3e04), we will see a lot more results:
So that about does it for this post.
To recap, we installed Apache Spark on our previously prepared Ubuntu VM, read data from an Elasticsearch index into Spark, did some analysis on it, and wrote the results back to Elasticsearch.
In the next post, we will take what we've done here and deploy it to the cloud. Stay tuned!