Spark #4 | Write


Introduction

  • Contents
    • 4.1 options
    • 4.2 partitioning
    • 4.3 bucketing


4.1. Options

  • .option("maxRecordsPerFile", 100) : set the max number of records per file
    • The last number from the name of created files like c000, c001, c002


4.2 Partitioning

  • Partition : In HDFS, physical partitions are called as Block (Default block size is 128MB), Spark creates one partition for every block of a file

  • Partitioning : Dividing the dataset into multiple pars across the cluster.

    • To improve the job performance, you can distribute the input dataset into smaller chunks and process those vid Spark executors.
  • CheatSheets

    • df.repartition() :
      • in-memory partitioning
      • increase or decrease the number of partitions in a dataset
      • involves a full reshuffle of data across the cluster
      • slow due to high network I/O during shuffling
      • The partitions in the newly created dataset are roughly equal in size
    • df.coalesce():
      • in-memory partitioning
      • can only decrease the number of partitions or keep them the same as in the dataset
      • does not involve a full reshuffle and faster than repartition
      • The partition sizes created can be of unequal sizes
    • df.repartitionByRange() :
      • in-memory partitioning
      • create folders for each unique value in a given column and distribute the rows
    • df.partitionBy():
      • on-disk partitioning
      • create N partitions by column and distribute the rows
// Usage example
df.repartition(100)
df.coalesce(1)
df.repartitionByRange(300, col("VIDEO_LENGTH"))
df.partitionBy("TOPIC", "COUNTRY")


4.3 Bucketing

  • Bucketing : on-disk pre-shuffle. Rows are hash-partitioned by one or more columns into a fixed number of files (buckets) at write time, so subsequent joins/aggregations on those columns can skip the shuffle stage entirely.
  • vs. Partitioning
    • Partitioning creates a separate directory per column value → unbounded cardinality is dangerous (small-files problem). Best for low-cardinality columns and pruning filters.
    • Bucketing creates a bounded number of files (e.g. 256) regardless of value cardinality. Best for high-cardinality join/group keys.
    • The two compose: partition by date, then bucket by user_id.
  • Requires a managed table : bucketBy only works through saveAsTable (Hive metastore needs to record the bucketing metadata). Plain df.write.parquet(path) ignores it.
  • Bucket pruning gains apply only when
    • Both sides of a join are bucketed on the same columns, with the same number of buckets, by the same hash function (Spark’s default), and
    • the join keys are exactly the bucketing columns.
  • sortBy(...) alongside bucketBy(...) sorts rows within each bucket — needed for sort-merge bucket (SMB) joins to skip the sort phase.
  • Caveat : Spark’s bucketing format is not Hive-compatible by default (different hash). Tools outside Spark may not understand the buckets.
// 1. Write a bucketed managed table
//    16 buckets hashed on user_id, sorted by event_ts within each bucket
df.write
  .mode("overwrite")
  .bucketBy(16, "user_id")
  .sortBy("event_ts")
  .saveAsTable("warehouse.events_bucketed")

// 2. Combine with partitioning: partition by date, bucket by user_id
df.write
  .mode("overwrite")
  .partitionBy("dt")
  .bucketBy(16, "user_id")
  .sortBy("event_ts")
  .saveAsTable("warehouse.events_p_b")

// 3. Bucket-aware join — Spark skips the shuffle if both sides match
val events = spark.table("warehouse.events_bucketed")     // 16 buckets on user_id
val users  = spark.table("warehouse.users_bucketed")      // 16 buckets on user_id
events.join(users, "user_id")                             // no exchange in the physical plan

References