Introduction
- Contents
- 4.1 options
- 4.2 partitioning
- 4.3 bucketing
4.1. Options
.option("maxRecordsPerFile", 100): set the max number of records per file- The last number from the name of created files like
c000,c001,c002
- The last number from the name of created files like
4.2 Partitioning
-
Partition : In HDFS, physical partitions are called as Block (Default block size is 128MB), Spark creates one partition for every block of a file
-
Partitioning : Dividing the dataset into multiple pars across the cluster.
- To improve the job performance, you can distribute the input dataset into smaller chunks and process those vid Spark executors.
-
CheatSheets
df.repartition():- in-memory partitioning
- increase or decrease the number of partitions in a dataset
- involves a full reshuffle of data across the cluster
- slow due to high network I/O during shuffling
- The partitions in the newly created dataset are roughly equal in size
df.coalesce():- in-memory partitioning
- can only decrease the number of partitions or keep them the same as in the dataset
- does not involve a full reshuffle and faster than repartition
- The partition sizes created can be of unequal sizes
df.repartitionByRange():- in-memory partitioning
- create folders for each unique value in a given column and distribute the rows
df.partitionBy():- on-disk partitioning
- create N partitions by column and distribute the rows
// Usage example
df.repartition(100)
df.coalesce(1)
df.repartitionByRange(300, col("VIDEO_LENGTH"))
df.partitionBy("TOPIC", "COUNTRY")
4.3 Bucketing
- Bucketing : on-disk pre-shuffle. Rows are hash-partitioned by one or more columns into a fixed number of files (buckets) at write time, so subsequent joins/aggregations on those columns can skip the shuffle stage entirely.
- vs. Partitioning
- Partitioning creates a separate directory per column value → unbounded cardinality is dangerous (small-files problem). Best for low-cardinality columns and pruning filters.
- Bucketing creates a bounded number of files (e.g. 256) regardless of value cardinality. Best for high-cardinality join/group keys.
- The two compose: partition by date, then bucket by
user_id.
- Requires a managed table :
bucketByonly works throughsaveAsTable(Hive metastore needs to record the bucketing metadata). Plaindf.write.parquet(path)ignores it. - Bucket pruning gains apply only when
- Both sides of a join are bucketed on the same columns, with the same number of buckets, by the same hash function (Spark’s default), and
- the join keys are exactly the bucketing columns.
sortBy(...)alongsidebucketBy(...)sorts rows within each bucket — needed for sort-merge bucket (SMB) joins to skip the sort phase.- Caveat : Spark’s bucketing format is not Hive-compatible by default (different hash). Tools outside Spark may not understand the buckets.
// 1. Write a bucketed managed table
// 16 buckets hashed on user_id, sorted by event_ts within each bucket
df.write
.mode("overwrite")
.bucketBy(16, "user_id")
.sortBy("event_ts")
.saveAsTable("warehouse.events_bucketed")
// 2. Combine with partitioning: partition by date, bucket by user_id
df.write
.mode("overwrite")
.partitionBy("dt")
.bucketBy(16, "user_id")
.sortBy("event_ts")
.saveAsTable("warehouse.events_p_b")
// 3. Bucket-aware join — Spark skips the shuffle if both sides match
val events = spark.table("warehouse.events_bucketed") // 16 buckets on user_id
val users = spark.table("warehouse.users_bucketed") // 16 buckets on user_id
events.join(users, "user_id") // no exchange in the physical plan