Real-time data streaming is a crucial factor for maintaining competitiveness in today’s market. Organizations are increasingly pressured to process vast streams of data instantaneously, whether it’s for spotting fraudulent transactions, delivering tailored customer experiences, or optimizing intricate supply chains to stay ahead of market shifts. Apache Spark’s Structured Streaming addresses these pressing business needs through its stateful processing features, enabling applications to track and update intermediate results across various data streams or time frames. Introduced in Apache Spark 3.2, RocksDB serves as a more efficient alternative to the default HDFS-based in-memory store, particularly in stateful streaming scenarios that need to manage large volumes of state data. It significantly enhances performance by alleviating Java Virtual Machine (JVM) memory pressure and minimizing garbage collection (GC) overhead.
This article delves into the key features of RocksDB and provides an implementation guide using Spark on Amazon EMR and AWS Glue, arming you with the insights needed to scale your real-time data processing efforts.
Overview of RocksDB State Store
Spark Structured Streaming tasks can be categorized as:
- Stateful: These tasks require the tracking of intermediate results across micro-batches (e.g., during aggregations and de-duplication).
- Stateless: These tasks handle each batch independently.
Stateful applications necessitate a state store to monitor intermediate query results, vital for computations reliant on continuous events and those that adjust results based on incoming batches or aggregate data over time, including late-arriving data. Spark’s default state store keeps states in JVM memory, which generally performs well for most streaming scenarios. However, if your application has numerous stateful operations—like streaming aggregation, streaming dropDuplicates, and stream-stream joins—the default in-memory state store may encounter out-of-memory (OOM) issues due to a high JVM memory footprint or frequent GC pauses, leading to performance degradation.
Benefits of RocksDB Over In-Memory State Store
RocksDB resolves the limitations of an in-memory state store with its off-heap memory management and efficient checkpointing.
- Off-heap memory management: RocksDB utilizes OS-managed off-heap memory for state data storage, thus reducing GC pressure. Although off-heap memory still uses machine memory, it doesn’t occupy JVM space. Instead, its core memory structures, like block cache or memTables, allocate directly from the operating system, bypassing the JVM heap, making it ideal for memory-intensive applications.
- Efficient checkpointing: RocksDB automatically saves state changes to checkpoint locations, such as Amazon S3 paths or local directories, ensuring full fault tolerance. When interfacing with S3, RocksDB enhances checkpointing efficiency through incremental updates and compaction, minimizing the data transferred to S3 during checkpoints and reducing the number of large state files compared to the numerous small files typical of the default state store, which in turn decreases S3 API calls and latency.
Implementation Considerations
RocksDB operates as a native C++ library embedded within the Spark executor, utilizing off-heap memory. While it does not fall under JVM GC control, it still impacts overall executor memory usage from both the YARN and OS perspectives. The off-heap memory usage of RocksDB may exceed YARN container limits without triggering container termination, potentially leading to OOM issues. It’s advisable to manage Spark’s memory by:
- Adjusting the Spark executor memory size
- Increasing
spark.executor.memoryOverhead
orspark.executor.memoryOverheadFactor
to allocate more space for off-heap usage. The following example demonstrates setting half (4 GB) ofspark.executor.memory
(8 GB) as the memory overhead size:
# Total executor memory = 8GB (heap) + 4GB (overhead) = 12GB
spark-submit
. . . . . . . .
--conf spark.executor.memory=8g # JVM Heap
--conf spark.executor.memoryOverhead=4g # Off-heap allocation (RocksDB + other native)
. . . . . . . .
For Amazon EMR on Amazon EC2, activating YARN memory control with strict container memory enforcement helps prevent node-wide OOM failures:
yarn.nodemanager.resource.memory.enforced = false
yarn.nodemanager.elastic-memory-control.enabled = false
yarn.nodemanager.pmem-check-enabled = true
or
yarn.nodemanager.vmem-check-enabled = true
Off-Heap Memory Control
Utilize RocksDB-specific settings to manage memory usage effectively. More information is available in the Best practices and considerations section.
Getting Started with RocksDB on Amazon EMR and AWS Glue
To activate the RocksDB state store in Spark, configure your application with the following setting:
spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
In the subsequent sections, we will create a sample Spark Structured Streaming job with RocksDB enabled on both Amazon EMR and AWS Glue.
RocksDB on Amazon EMR
Amazon EMR versions 6.6.0 and later support RocksDB, including Amazon EMR on EC2, Amazon EMR serverless, and Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). Here’s how to run a sample streaming job with RocksDB enabled.
- Upload the sample script to
s3://<YOUR_S3_BUCKET>/script/sample_script.py
:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, expr
import random
# List of words
words = ["apple", "banana", "orange", "grape", "melon",
"peach", "berry", "mango", "kiwi", "lemon"]
# Create random strings from words
def generate_random_string():
return " ".join(random.choices(words, k=5))
# Create Spark Session
spark = SparkSession
.builder
.appName("StreamingWordCount")
.config("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
.getOrCreate()
# Register UDF
spark.udf.register("random_string", generate_random_string)
# Create streaming data
raw_stream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.withColumn("words", expr("random_string()"))
# Execute word counts
wordCounts = raw_stream.select(explode(split(raw_stream.words, " ")).alias("word")).groupby("word").count()
# Output the results
query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
- On the AWS Management Console for Amazon EMR, select Create Cluster.
- For Name and applications – required, choose the latest Amazon EMR release.
- Under Steps, select Add. For Type, choose Spark application.
- For Name, enter…
For further insights on this topic, you can read another blog post here, which provides additional context. Additionally, if you’re looking for authoritative information, check out this resource that discusses best practices. Lastly, consider visiting this link for an excellent resource on fulfillment center management.
Leave a Reply