Reading from Kafka, Socket, and File Stream in Spark
Apache Spark Structured Streaming enables real-time processing of data streams. It supports a wide range of data sources including Kafka topics, TCP sockets, and file directories. In this tutorial, we will learn how to read from each of these sources using PySpark with step-by-step examples.
What is Structured Streaming?
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express streaming computations just like batch computations on static data.
Reading from a Socket Stream
This is useful for prototyping. You can simulate a data stream using the nc
(netcat) command from a terminal, and Spark can read from it in real-time.
Example: Read Stream from a Local Port
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder .appName("SocketStreamExample") .getOrCreate()
# Read stream from socket (localhost:9999)
lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
# Word count logic
from pyspark.sql.functions import explode, split
words = lines.select(explode(split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()
# Output to console
query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination()
Running the Example
- Start a TCP server on terminal:
nc -lk 9999
- Type some text into the terminal. Spark will process it live.
+-----+-----+ | word|count| +-----+-----+ |spark| 1| |data | 2| |stream| 1| +-----+-----+
Question:
Why is this useful?
Answer:
It helps simulate real-time input for testing, like incoming messages or logs. Later, you can switch this to Kafka or other production sources.
Reading from a File Stream
Spark can monitor a directory for new files and treat those as a data stream. This is useful when data is continuously dropped into a folder from another system.
Example: Stream from Folder of CSV Files
# Read CSV files dropped in a folder
csv_stream = spark.readStream .option("header", "true") .schema("name STRING, age INT") .csv("/path/to/input-folder")
# Show incoming data
query = csv_stream.writeStream .outputMode("append") .format("console") .start()
query.awaitTermination()
Question:
How often does Spark check the folder?
Answer:
By default, every 500 milliseconds, but you can configure it using trigger
option.
Reading from Kafka Stream
Kafka is a distributed messaging system. It's widely used in production to stream logs, user activity, or financial transactions.
Prerequisite
Kafka and Zookeeper must be running, and a topic must be created before reading data from it.
Example: Read Stream from Kafka Topic
# Read from Kafka
kafka_stream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "test-topic") .load()
# Convert Kafka data to readable format
from pyspark.sql.functions import col
messages = kafka_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Output messages to console
query = messages.writeStream .outputMode("append") .format("console") .start()
query.awaitTermination()
+----+-------------+ |key |value | +----+-------------+ |null|user clicked | |null|new order | +----+-------------+
Question:
Why use Kafka over socket or file?
Answer:
Kafka is reliable and scalable. It ensures no data loss, supports replaying messages, and works well in distributed environments. Ideal for real-world production pipelines.
Summary
Structured Streaming in Spark supports various input sources, each suited for different scenarios. Start with socket for demos, use file stream for scheduled data drops, and move to Kafka for real-time distributed systems. Understanding these sources is essential for building robust data pipelines with Apache Spark.