Critical Concepts:
- Source
- Sinks
- Schema
- Streaming Query
- Triggers
- Output Modes
- Checkpoint
- Fault Tolerance
- Exactly Once Processing
Intro to Streaming Processing
Stream Processing is a super-set of batch processing techniques. Many of the batch processing problems are also addressed by the stream process solutions. For example, Stream processing job can take care of the scheduling requirements of batch jobs. It can handle job failures and restart again from the same point where it failed. Streaming job can maintain intermediate results and combine the results of previous batches to the current batch. They can also track if the previous batch is still running and wait for the completion before a new batch starts. Other than this, stream processing also offers a bunch of facilities to solve unique streaming problems. Additionally, Spark Stream processing is an extension of Spark Data-frame of APIs.
Dstream Vs Structured Streaming
The notion of stream processing starts with the new idea of reducing the duration between the batches. One natural approach to achieve stream processing is to reduce the batch size to a few seconds. It is not a easy goal, because smaller batches will exaggerate a bunch of existing problems. Other than those problems, a stream processing requirement will need some additional facilities as well.
Spark introduces the idea of micro-batch stream processing. In this approach, we collect data for a short duration and then process it as a micro-batch. Then we loop again and process the next batch.
To simplify our life ^_^, the Spark Streaming API takes care of the following:
- Automatic Looping between micro-batches.
- Batch start and end position management.
- Intermediate state management.
- Combining results to the previous batch results.
- Fault tolerance and Restart management.
Spark Streaming (Dstream) API which is out of dated was used to do these stuff and Spark moved to Data-frame APIs, and RDD APIs are now not recommended for use.
Structured Streaming API Advantages:
- Data-frame based streaming API.
- SQL engine Optimization.
- Supports event time semantics.
- Expected further enhancements and new features.
Create a stream processing application
Outline:
- Creating a Spark Streaming Project
- Configuring and using application logs
- Typical structure of Spark Streaming application
A word count streaming application will be created in this section. The example might not make a lot of practical use, but to get the first sense of how stream processing works in Spark.
First, we will start a Netcat listener on port 9999. The netcat will start a TCP/IP socket and listens to the port for incoming connections. Once the connection is established, we can send some text using the Netcat terminal to generate a stream of text that will send to TCP/IP port. On the other side, we will create a Spark Structure Streaming application. The application will connect to thhe TCP/IP port 9999 and start reading the incoming text. All of this will happen in real-time.
Note: one windows, NMap tool will be used for this purpose, because Netcat is only available on Linux. Download this tool from https://nmap.org/download.html and install it!
After installation, we can use
1 |
ncat -lk 9999 |
to listen to port 9999.

To create a new project we need to config Spark log4J which is located in C:\spark\spark3\conf\log4j.properties for this project or we can simply use the default log4J configuration. Copy and paste this file to the project directory and add two lines to modify the application log level.
1 2 3 |
# add application log level log4j.logger.guru.learningjournal.spark.examples=INFO, console log4j.additivity.guru.learningjournal.spark.examples=false |
However, the IDE is not going to pick this log4J configurationg, we need to modify the path to the configuration file we modified above.
In C:\spark\spark3\conf\spark-defaults.conf add:
1 2 |
spark.driver.extraJavaOptions -Dlog4j.configuration=file:log4J.properties |
If we want to use the log4j to log, we need to first create a log class, In this class, we pulls the logger instance from the JVM. Once we have this class we can create log and use it in our main program.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class Log4j: def __init__(self, spark): log4j = spark._jvm.org.apache.log4j root_class = "guru.learningjournal.spark.examples" conf = spark.sparkContext.getConf() app_name = conf.get("spark.app.name") self.logger = log4j.LogManager.getLogger(root_class + "." + app_name) def warn(self, message): self.logger.warn(message) def info(self, message): self.logger.info(message) def error(self, message): self.logger.error(message) def debug(self, message): self.logger.debug(message) |
We can create log like this:
1 2 |
from lib.logger import Log4j logger = Log4j(spark) |
Now we can start to create our program, a Spark Structured Streaming application follows a three-step process:
- Read a Stream Source – Input Data-frame
- Transform – Output Data-frame
- Write the output – Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
from pyspark.sql import SparkSession from pyspark.sql.functions import * from lib.logger import Log4j if __name__ == "__main__": # Create a spark sessiong to read and process the data. spark = SparkSession \ .builder \ .appName("Streaming Word Count") \ .master("local[3]") \ .config("spark.streaming.stopGracefullyOnShutdown", "true") \ .config("spark.sql.shuffle.partitions", 3) \ .getOrCreate() # init log logger = Log4j(spark) # 1. Read a Stream Source - Input Data-frame lines_df = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", "9999") \ .load() # lines_df.printSchema() # root # | -- value: string(nullable=true) # 2.Transform - Output Data-frame ## STEP 1: split the value column by white spaces to separate the words words_df = lines_df.select(expr("explode(split(value,' ')) as word")) ## STEP 2: Group the words and apply the count aggregate counts_df = words_df.groupBy("word").count() # 3. Write the output - Sink word_count_query = counts_df.writeStream \ .format("console") \ .outputMode("complete") \ .option("checkpointLocation", "chk-point-dir") \ .start() # run forever logger.info("Listening to localhost:9999") word_count_query.awaitTermination() |
After running the program, we can use the ncat to send some as shown.

Notice we have the checkpoint directory, this directory contains information about the progress of the current batch and some info about the previous batch. If we need to rerun the program, we have to delete the checkpoint directory!
Stream Processing model in Spark
Again, creating streaming processing application in Spark is a three-step process:

In this part, we will dig a little bit into this loop.

The spark engine will started form the read stream until the write stream and submit it to the Spark SQL Engine. The SQL engine will analyze the code, optimize it and compile it to generate an execution plan. All this happens at run time and the generated execution plan is similar to the Data-frame execution plan which is made up of the stages and tasks.
Once have an execution plan, Spark will start a background thread to execute it.

The background thread will trigger one spark job to do the following:
- Read data
- Process the input Data Frame as predefined logic to finally generate the output Data Frame.
- Write the output Data Frame to the given sink.
This is one micro-batch and it ends here. After this the background thread will again look for the new inputs from the given data source. If we do not have any new inputs yet, the background job will wait for data to arrive. If new data was received the background thread will clean the input data frame and load it with new data. And that will be the input data for the next micro-batch. Notice, the previous data has been processed and hence the background thread will be discard. And load input data frame with the newly arrived data. The background process will wait for the second micro-batch to complete and trigger the next micro batch.
We can monitor all the process above on Spark UI which is http://localhost:4040/jobs/

Click on the job, we can see two stages, the first stages is before the groupBy, and the second stage is for transformation after the gourpBy.

Now, a new problem. As soon as received new data, the loop was triggered. How much data do we need to trigger the new micro-batch? Will it should be triggered for each record or is is based on time? Can we have some control here.

Actually, we have four trigger configuration:
- Unspecified (No trigger conf, we are using this one)
- Time Interval
- One time
- Continuous
Notice, If we use the time interval as trigger, for example 5 minutes per micro-batch, the first trigger will start immediately and the second trigger will start after the first trigger (5 minutes) is completed.
However, if the first trigger finishes in less than five minutes, the second micro-batch will wait for the remaining time and only trigger when 5 minutes are over from the start of the first micro-batch. If the first micro-batch takes more than 5 minutes to complete, than the second micro-batch will start immediately as soon as the first one ends, because we already pass the trigger time.
In this way, the time based trigger give us the option to collect some input data and process it together rather than process each record individually.
The one time trigger is like a batch processing job. It will create one and only one micro-batch. This option is useful in scenarios when we want to spin up a spark cluster in the cloud, process everything available since the last period and then shut down the cluster.
The Continuous trigger is a new experimental feature to achieve millisecond latencies.
Working with files and directory
In the Part, I will show spark built-in streaming sources and create a program to read streaming data from a file source.
Spark Streaming source type:
- Socket source
- Rate source
- File source
- Kafka source
In the previous program, I already use socket source form a TCP/IP socket connection. Rate source is a dummy data source which generates a configurable number of key-value pare per second. File data source and Kafka data source are the most commonly used streaming data sources.
Architecture – cope with JSON file

In this case to processing JSON files, I will handle failure scenarios and achieving exactly-once-processing which is a complex requirement.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
from pyspark.sql import SparkSession from pyspark.sql.functions import expr from lib.logger import Log4j if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("File Streaming Demo") \ .master("local[3]") \ .config("spark.streaming.stopGracefullyOnShutdown", "true") \ .config("spark.sql.streaming.schemaInference", "true") \ .getOrCreate() logger = Log4j(spark) raw_df = spark.readStream \ .format("json") \ .option("path", "input") \ .option("maxFilesPerTrigger", "1") \ .option("cleanSource", "delete") \ .load() explode_df = raw_df.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID", "CustomerType", "PaymentMethod", "DeliveryType", "DeliveryAddress.City", "DeliveryAddress.State", "DeliveryAddress.PinCode", "explode(InvoiceLineItems) as LineItem") flattened_df = explode_df \ .withColumn("ItemCode", expr("LineItem.ItemCode")) \ .withColumn("ItemDescription", expr("LineItem.ItemDescription")) \ .withColumn("ItemPrice", expr("LineItem.ItemPrice")) \ .withColumn("ItemQty", expr("LineItem.ItemQty")) \ .withColumn("TotalValue", expr("LineItem.TotalValue")) \ .drop("LineItem") invoiceWriterQuery = flattened_df.writeStream \ .format("json") \ .queryName("Flattened Invoice Writer") \ .outputMode("append") \ .option("path", "output") \ .option("checkpointLocation", "chk-point-dir") \ .trigger(processingTime="1 minute") \ .start() logger.info("Flattened Invoice Writer started") invoiceWriterQuery.awaitTermination() |
Several features we need to focus on this program:
1.Time alignment

From the spark UI, we noticed the time between the first micro-batch and the second micro-batch is less than 1 minute. And the time between the second micro-batch and third micro-batch is exact 1 minute. This difference happens because Spark tries to align the trigger time to a round off time.
2. Output Modes
DataStreamWriter allows three output modes:
- Append (insert only senior)
- Update (implement an upsert like operation)
- Complete (Overwrite the previous results with new results)
Take the previous word count program as an example to demonstrate the differences between these three kinds of output mode:
The input words are:
1 2 |
Hello Spark Streaming Hello Spark Programming |
Append Mode:

Complete Mode:

Update Mode:

Fault Tolerance & Exactly Once Processing
Running an application forever is an impossible scenario. An application will be stop for at least two reasons:
- Failure
- Maintenance
So the application must be able to handle the stop and restart gracefully. This simply means to maintain the exact-once feature, and exact-once feature means two things:
- Do not miss any input records
- Do not create duplicate output records
To achieve exactly-once, two things are maintained using checkpoint and write-ahead-log techniques which are used to achieve durability and atomic. The read position represents the start and end of the data range, which is being processed by the current micro-batch. Once the micro-batch finishes, create a commit to indicate that the data range is successfully processed. The state info is the intermediate data for the micro-batch. Briefly, application maintains all the necessary info to restart the unfinished micro-batch.

However, the ability to restart the failed micro-batch cannot guarantee exactly-once. We have following some other requirements:
- Restart with the same checkpoint
- Use a replayable source
- Use Deterministic computation
- Use an idempotent sink
Sometimes the stream job may failed with an exception which might be caused by some bugs. To restart it again from the same point is possible by using the checkpoint file which has the run time info. Spark is able to detect changes in code by merely looking at the running aggregate of prev batch. So same change in the aggregate column is not allowed. So be careful with code change in final production application! Checkpoint should be deleted once some illegal changed made in development environment.