Back to Blog
sparkscalapysparkbig-dataproductionperformance

Scaling Spark to 100TB: Production Patterns

2024-08-1911 min read

Scaling Spark to 100TB: Production Patterns

Most Spark tutorials work fine at 10GB. They quietly fall apart somewhere between 5TB and 20TB, and by the time you hit 100TB you've rewritten half your pipeline from scratch. This is a collection of patterns we learned the hard way scaling a financial data platform from 800GB nightly batches to 100TB+ daily processing — patterns that held up under real production load.

The Starting Point

Our data platform ingested raw transaction records, market feeds, and customer activity logs. At 800GB, everything ran fine with default configs. At 8TB, jobs started failing intermittently. At 25TB, we had full rearchitecture on the table. By the time we reached 100TB, we had rebuilt almost every assumption we started with.

Three areas caused 90% of our problems: cluster configuration, shuffle behavior, and data skew. Everything else was noise.


Pattern 1: Cluster Sizing That Actually Scales

Default Spark executor configurations are tuned for demonstration workloads, not production scale. At 100TB, the wrong executor size is the difference between a 2-hour job and an overnight timeout.

The 4-Core Executor Rule

# BAD: Too few executors, too large — GC pressure kills you
spark = SparkSession.builder \
    .config("spark.executor.cores", "32") \
    .config("spark.executor.memory", "200g") \
    .getOrCreate()
 
# GOOD: 4-core executors, parallelism through count not size
spark = SparkSession.builder \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.driver.maxResultSize", "8g") \
    .getOrCreate()

4 cores per executor is the sweet spot. It gives enough parallelism per executor to keep CPUs busy, while keeping GC pauses manageable. Executors beyond 8 cores see exponential GC pressure as heap size grows.

def recommended_executor_config(total_nodes, cores_per_node, memory_per_node_gb):
    executor_cores = 4
    executors_per_node = (cores_per_node - 1) // executor_cores  # Reserve 1 core for OS
    total_executors = total_nodes * executors_per_node
 
    # Reserve 1GB per executor for overhead
    executor_memory = (memory_per_node_gb // executors_per_node) - 1
 
    parallelism = total_executors * executor_cores * 2
 
    print(f"Executors:        {total_executors}")
    print(f"Executor memory:  {executor_memory}g")
    print(f"Parallelism:      {parallelism}")
    return total_executors, executor_memory, parallelism
 
# Example: 20-node cluster, 32 cores, 256GB RAM per node
recommended_executor_config(20, 32, 256)
# Executors:        160
# Executor memory:  30g
# Parallelism:      1280

Impact: Moving from 8-core to 4-core executors on our 100TB job reduced GC pause time from 34% to 4% of total runtime.


Pattern 2: Shuffle Tuning at Scale

Shuffle is the single most expensive operation in large-scale Spark. At 100TB, a poorly configured shuffle doesn't just slow you down — it crashes your job.

Partition Count is Everything

# At 100TB, default 200 shuffle partitions means 500GB per partition
# That's a guaranteed OOM
 
# Rule: target 128MB–512MB per shuffle partition
data_size_bytes = 100 * 1024 ** 4           # 100TB
target_partition_size = 256 * 1024 ** 2     # 256MB
recommended_partitions = data_size_bytes // target_partition_size
 
spark.conf.set("spark.sql.shuffle.partitions", str(recommended_partitions))
# Result: ~400,000 partitions for 100TB at 256MB target
# Enable AQE to let Spark tune partition count dynamically at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "256mb")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64mb")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

Optimize Shuffle I/O

# Use faster shuffle implementation on Databricks / EMR
spark.conf.set("spark.shuffle.service.enabled", "true")
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.io.compression.codec", "lz4")     # Faster than snappy at scale
 
# Increase shuffle fetch buffer
spark.conf.set("spark.reducer.maxSizeInFlight", "96m")
spark.conf.set("spark.shuffle.file.buffer", "1m")
spark.conf.set("spark.shuffle.unsafe.file.output.buffer", "5m")

Impact: Shuffle read time dropped by 41% after switching from snappy to lz4 and tuning fetch buffer on our 100TB aggregation job.


Pattern 3: Memory Management Under Pressure

At 100TB, memory is always the constraint. Spark's default memory fractions are balanced for mid-scale workloads — they need explicit tuning for large-scale batch processing.

Memory Fraction Tuning

# Default splits: 60% execution, 40% storage (unified memory model)
# For batch jobs with heavy shuffles and no caching:
spark.conf.set("spark.memory.fraction", "0.8")           # Total JVM heap for Spark
spark.conf.set("spark.memory.storageFraction", "0.2")    # Within that, reserve for cache
 
# For iterative jobs with heavy caching:
spark.conf.set("spark.memory.fraction", "0.8")
spark.conf.set("spark.memory.storageFraction", "0.5")    # More room for cached data

Spill Detection and Prevention

# Always monitor spill — it's silent and devastating at scale
def check_spill_metrics(spark):
    metrics = spark.sparkContext.statusTracker()
    stages = metrics.getActiveStageIds()
 
    for stage_id in stages:
        info = metrics.getStageInfo(stage_id)
        if info:
            print(f"Stage {stage_id}:")
            print(f"  Memory spill: {info.memoryBytesSpilled / 1e9:.2f} GB")
            print(f"  Disk spill:   {info.diskBytesSpilled / 1e9:.2f} GB")
# If you're spilling, increase executor memory overhead before anything else
spark.conf.set("spark.executor.memoryOverhead", "6g")     # Was 4g
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "8g")
 
# Reduce records per batch for wide schemas
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "5000")  # Default 10000

Cache Selectively, Unpersist Aggressively

from pyspark import StorageLevel
 
# At 100TB, MEMORY_AND_DISK_SER is your friend — not MEMORY_ONLY
df_base = spark.read.parquet("s3://lake/transactions/") \
    .filter(col("date") >= "2024-01-01") \
    .select("account_id", "amount", "category", "ts")
 
# Use serialized storage to cut memory footprint by 3–5x
df_base.persist(StorageLevel.MEMORY_AND_DISK_SER)
 
# Force materialization before branching
df_base.count()
 
result_a = df_base.groupBy("account_id").agg(sum("amount").alias("total"))
result_b = df_base.groupBy("category").agg(count("*").alias("events"))
result_c = df_base.filter(col("amount") > 10000)
 
# Unpersist as soon as the last consumer is done — don't wait for job end
df_base.unpersist()

Impact: Switching from MEMORY_AND_DISK to MEMORY_AND_DISK_SER freed 68GB of executor memory per node on our widest table (220 columns), eliminating all spill-to-disk on that stage.


Pattern 4: Partition Strategy for 100TB Tables

At this scale, how you partition your data on storage is as important as how Spark processes it. Wrong partitioning means full table scans. Right partitioning means reading 2% of your data for 90% of your queries.

Hierarchical Partitioning

# BAD: Single-level partitioning creates too many small files over time
df.write.partitionBy("date").parquet("s3://lake/transactions/")
 
# GOOD: Hierarchical partitioning — prune by year/month first, then date
df.withColumn("year",  year(col("ts"))) \
  .withColumn("month", month(col("ts"))) \
  .write \
  .partitionBy("year", "month", "date") \
  .mode("overwrite") \
  .option("compression", "snappy") \
  .parquet("s3://lake/transactions/")
# Optimal partition file size: 512MB–1GB for batch reads
# Calculate how many files to write per partition
 
def files_per_partition(partition_size_gb, target_file_size_mb=512):
    return max(1, int((partition_size_gb * 1024) / target_file_size_mb))
 
# For a 50GB daily partition targeting 512MB files:
files_per_partition(50)  # → 102 files per day partition

Dynamic Partition Pruning

# Enable DPP — critical for star-schema joins at scale
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.useStats", "true")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio", "0.5")
 
# Structure your joins so the dimension table filters the fact table's partitions
dim_accounts = spark.read.parquet("s3://lake/dim/accounts/") \
    .filter(col("region") == "EMEA")
 
fact_transactions = spark.read.parquet("s3://lake/transactions/")
 
# DPP will push the region filter into the fact table partition scan
result = fact_transactions.join(dim_accounts, "account_id")

Impact: Dynamic partition pruning reduced data scanned from 100TB to 14TB on our most common report query — a filter on region and product category via dimension joins.


Pattern 5: Handling Skew at Scale

Data skew at 100TB doesn't just slow one task — it can stall an entire 6-hour job on a single partition while 799 executors sit idle. Every skew mitigation technique that works at 10GB works at 100TB, but the tolerances are tighter.

Detect Skew Before It Hits

from pyspark.sql.functions import col, count, spark_partition_id
 
def diagnose_skew(df, sample_fraction=0.01):
    partition_counts = df.sample(sample_fraction) \
        .groupBy(spark_partition_id().alias("partition_id")) \
        .agg(count("*").alias("row_count")) \
        .orderBy(col("row_count").desc())
 
    stats = partition_counts.select(
        mean("row_count").alias("mean"),
        stddev("row_count").alias("stddev"),
        max("row_count").alias("max"),
        min("row_count").alias("min")
    ).collect()[0]
 
    skew_ratio = stats["max"] / stats["mean"]
    print(f"Skew ratio (max/mean): {skew_ratio:.1f}x")
    if skew_ratio > 5:
        print("WARNING: Significant skew detected. Consider salting.")
 
    return partition_counts
 
diagnose_skew(transactions_df)

Salting for Skewed Joins

from pyspark.sql.functions import rand, concat, lit, floor
 
SALT_BUCKETS = 50  # Tune based on skew severity
 
# Salt the large skewed table
fact_salted = fact_transactions \
    .withColumn("salt", (rand() * SALT_BUCKETS).cast("int")) \
    .withColumn("account_id_salted", concat(col("account_id"), lit("_"), col("salt")))
 
# Explode the small dimension table to match all salt values
from pyspark.sql.functions import explode, array
 
dim_salted = dim_accounts \
    .withColumn("salt_range", array([lit(i) for i in range(SALT_BUCKETS)])) \
    .withColumn("salt", explode(col("salt_range"))) \
    .withColumn("account_id_salted", concat(col("account_id"), lit("_"), col("salt"))) \
    .drop("salt_range", "salt")
 
# Join on salted key — skew eliminated
result = fact_salted.join(dim_salted, "account_id_salted") \
    .drop("account_id_salted", "salt")

AQE Skew Handling (Automatic)

# Let AQE split skewed partitions automatically — no salting needed for moderate skew
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3")  # Default 5
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "128mb")

Rule: Use AQE skew handling first — it's zero-code. Fall back to manual salting only when skew ratio exceeds 20x or AQE cannot split the partition further.

Impact: Salting a single skewed merchant_id join (top merchant had 340M of 2.1B rows) reduced that stage from 4.2 hours to 18 minutes.


Pattern 6: Writing 100TB Efficiently

Writing is as expensive as reading at this scale. The wrong write strategy creates small files that poison every future read.

# BAD: Write with default parallelism — thousands of tiny files
df.write.partitionBy("date").parquet("s3://lake/output/")
 
# GOOD: Repartition before writing to control file count and size
from pyspark.sql.functions import col
 
# Calculate target file count per date partition
# Assuming ~50GB per day partition, targeting 512MB files:
files_per_day = 100
 
df.repartition(files_per_day, col("date")) \
  .sortWithinPartitions("account_id", "ts") \     # Improves read-time predicate pushdown
  .write \
  .partitionBy("date") \
  .mode("overwrite") \
  .option("compression", "snappy") \
  .option("maxRecordsPerFile", 5_000_000) \        # Hard cap per file
  .parquet("s3://lake/output/")
# For Delta Lake output — use replaceWhere for partition-safe overwrites
from delta.tables import DeltaTable
 
dt = DeltaTable.forPath(spark, "s3://lake/delta/transactions/")
 
df.repartition(files_per_day, col("date")) \
  .sortWithinPartitions("account_id", "ts") \
  .write \
  .format("delta") \
  .option("replaceWhere", "date = '2024-08-19'") \  # Overwrite only today's partition
  .mode("overwrite") \
  .save("s3://lake/delta/transactions/")

Impact: Controlling output file count eliminated the small-file problem on downstream reads — subsequent pipeline stages ran 3.4x faster without needing OPTIMIZE compaction first.


The 100TB Config Baseline

The full Spark config we run in production for 100TB batch jobs on a 20-node cluster (32 cores, 256GB RAM each):

conf = {
    # Executor sizing
    "spark.executor.cores":                                     "4",
    "spark.executor.memory":                                    "28g",
    "spark.executor.memoryOverhead":                            "6g",
    "spark.driver.memory":                                      "32g",
    "spark.driver.maxResultSize":                               "8g",
 
    # Parallelism
    "spark.sql.shuffle.partitions":                             "20000",
    "spark.default.parallelism":                                "1280",
 
    # AQE
    "spark.sql.adaptive.enabled":                               "true",
    "spark.sql.adaptive.coalescePartitions.enabled":            "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes":          "256mb",
    "spark.sql.adaptive.skewJoin.enabled":                      "true",
    "spark.sql.adaptive.skewJoin.skewedPartitionFactor":        "3",
    "spark.sql.optimizer.dynamicPartitionPruning.enabled":      "true",
 
    # Memory
    "spark.memory.fraction":                                    "0.8",
    "spark.memory.storageFraction":                             "0.2",
    "spark.memory.offHeap.enabled":                             "true",
    "spark.memory.offHeap.size":                                "8g",
 
    # Shuffle I/O
    "spark.shuffle.compress":                                   "true",
    "spark.shuffle.spill.compress":                             "true",
    "spark.io.compression.codec":                               "lz4",
    "spark.reducer.maxSizeInFlight":                            "96m",
    "spark.shuffle.file.buffer":                                "1m",
 
    # Serialization
    "spark.serializer":                                         "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max":                          "1g",
}
 
for k, v in conf.items():
    spark.conf.set(k, v)

Real-World Impact

After applying all six patterns on our 100TB financial data platform:

  • Runtime: 14 hours → 2.5 hours (82% reduction)
  • Cluster cost: $4,800 → $1,100 per run (77% savings)
  • GC pause time: 34% → 4% of total runtime
  • Spill to disk: 2.1TB → 0GB
  • Data scanned: 100TB → 14TB (via DPP and partition pruning)
  • Annual savings: ~$1.4M

Key Takeaways

  1. Fix cluster sizing first — wrong executor size makes every other optimization irrelevant
  2. Set shuffle partitions explicitly; never trust the default 200 at multi-TB scale
  3. Enable AQE — it handles partition coalescing and skew join splitting automatically
  4. Diagnose skew before every large join; a single hot key can stall an entire stage
  5. Control write parallelism — small files on write become slow reads forever
  6. Dynamic partition pruning is the highest-leverage config change for star-schema workloads

At 100TB, the bottleneck is almost never compute. It's almost always I/O, shuffle, or a single skewed partition. Measure first, tune second.


Next Steps: