Spark Streaming

10 / 14
Apache Spark - Streaming - Wordcount Hands-On

[Spark Streaming - Hands-on]

Let's take a quick look at what a Spark Streaming program looks like and do a hands-on. Let's say we want to count the number of words continuously in the text data received from a server listening on a host and a port

How do we write such a program? Let's understand it.

There is a data server which runs on a port and produces data. This data becomes input data stream to Spark streaming and then spark engine count the words in the batches.

To simulate the above scenario, on one console, we run a server which generates data and on the second console we run Spark streaming code which listens to this server and count the words.

We've provided the code on CloudxLab GitHub repository. Let's look at the code.

First, we import the Spark Streaming libraries.

Then we create a local StreamingContext with a batch interval of 10 seconds. Batch interval of 10 seconds means spark streaming creates batches with 10 seconds of data from input stream. Since we will run this code in spark-shell, spark context will be available as "sc" variable. Streaming context is the main entry point for all streaming functionality.

Using this context, we create a DStream that represents streaming data from a server, which runs on localhost and port 9999. "lines" DStream represents the stream of data that will be received from the server. It represents batches of data with each batch having 10 seconds of data. Each record in line DStream is a line of text

Next, we split each line in each batch into words. Please note that since we have applied high-level DStream operation flatMap to "lines" DStream, "words" will also be a DStream.

Next, we map the "word" DStream to a DStream of (word, 1) pairs and reduce it to get the frequency of the word in each batch of data.

Now we print words and their count to the console, which are calculated every 10 seconds.

Up to this point, we have just defined the computation steps. No real processing has started yet. To start the computation we define, ssc.start(). To stop the computation we define ssc.awaitTermination()

Let's run the code. Clone the repository into CloudxLab directory under your home directory in the web console. If you've already cloned the repository, then just update it using git pull origin master command. Since I've already cloned the repository, I will just update it.

Go to the code directory by typing cd ~/cloudxlab/spark/examples/streaming/word_count. Where tilde is a quick way to represent your home directory in Linux.

Open word_count.scala and copy the code. Now launch spark shell by typing the command spark-shell and paste the code. As you can see, spark streaming code has started. It has given error as there is no server running on localhost and port 9999. Let's run a server on port 9999. netcat is a good way to quickly create a server which listens on a specified port. Login to the web console in a new window and type nc -lk 9999. Now whatever you type here would be passed to a process connecting at 9999 port. Let's type in some data.Type

The quick brown fox jumps over the lazy dog

Now switch back to the console, where Spark Streaming code is running. It may take some time to print the word count as spark-streaming code starts processing only when 10 seconds batch is available.

As you can see word count is printed on the screen. Let's type in more data. Type

my first Spark Streaming code

And wait for to get it processed. As you can see word count is printed on the screen

[Spark Streaming - Word Count - SBT]

Spark-shell is good for quickly testing the code. In the production environment, we submit the job using spark-submit command.Let's run the above word count code using spark-submit. For spark submit we need to create a JAR package containing the code and other dependencies. We have the word count scala project in CloudxLab GitHub repository. Go to word_count_sbt directory and open build.sbt file. As you can see we have specified two library dependencies here, spark-core and spark-streaming. Here 1.5.2 represents the spark version. You can change the spark version if you want to run your code on the different available versions of Spark. Source code for the word count is located inside src/main/scala directory.

Let's package the JAR. Run "sbt package" from the root of your project which is word_count_sbt. Wait for JAR to get created. Now type

spark-submit --class "WordCount" --master "local[2]" target/scala-2.10/word-count_2.10-1.0.jar to run the word count code. As you can see spark streaming code has started. Now type in some data in the second console and you can see the word count is printed on the screen.

[Spark Streaming - Word Count - Python]

We've also provided the Python code for word count problem in the You can run the Python code using spark-submit command. Type spark-submit --master "local[2]" and as you can see the spark streaming code has started. Now type in some data in the second console and you can see the word count is printed on the screen.

[Slide - Spark Streaming - Adding Dependencies]

As you have seen in the word count example, we have to add spark streaming dependency to SBT or Maven project. For ingesting data from other sources like Kafka, Flume we add the corresponding artifact to the dependencies.

Please refer to maven repository for the full list of supported sources and artifacts.

[Spark Streaming - A Quick Recap]

Let's do a quick recap of the word count program using Spark Streaming. We first initialize the StreamingContext. It is initialized in ssc variable in our code. Then we define the input sources by creating input DStreams. It is defined in lines variable in our code. Then we define the streaming computations by applying transformations to DStreams. It is defined in words, pairs and wordCounts variables in our code

Then we start receiving data and processing it using streamingContext.start(). Then we add a code to wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination(). Please note that the processing can be stopped manually using streamingContext.stop()