Introduction
- Contents
- 3.1 Basic Manipulation
- 3.2 Duplicates
- 3.3 Null
- 3.4 Filter
- 3.5 Group
- 3.6 Join
- 3.7 UDF
3.1 Basic Manipulation
- Schema / shape inspection
printSchema() : prints the column names, types, and nullability of the DataFrame.
dtypes : returns a sequence of (name, type) tuples without printing.
count() : total row count.
- Select
select(col1, col2, ...) : project a subset of columns.
selectExpr("a", "b + 1 as c") : project with SQL expressions; cheaper than chained withColumn.
- Add / transform columns
withColumn("name", expr) : add a new column or replace one of the same name.
lit(value) : turn a Scala value into a Column literal (use inside withColumn).
col("x").cast("int") : type-cast a column.
- Rename / drop columns
withColumnRenamed("old", "new")
drop("col1", "col2")
- Order
orderBy(col("x").desc, col("y").asc) (alias sort(...)).
- Sample / preview
limit(n) : take the first n rows as a DataFrame (lazy).
show(n, truncate=false) : print to driver (action; triggers a job).
head(n) / take(n) : collect first n rows to the driver as Array[Row].
import org.apache.spark.sql.functions.{col, lit, expr}
// inspect
df.printSchema()
df.count()
// select & rename
df.select(col("user_id"), col("ts"))
df.selectExpr("user_id", "from_unixtime(ts) as event_time")
df.withColumnRenamed("ts", "event_ts")
// add / cast / drop
val df2 = df
.withColumn("source", lit("kafka")) // constant column
.withColumn("duration", col("end") - col("start"))
.withColumn("user_id", col("user_id").cast("string"))
.drop("raw_payload")
// order & preview
df2.orderBy(col("event_ts").desc).show(10, truncate = false)
val firstTen = df2.limit(10) // still a DataFrame, lazy
val sample = df2.take(5) // Array[Row], runs a job
3.2 Duplicates
drop_duplicates(subset=["COLUMN"]) :
- Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
distinct() :
- returns the distinct rows of DataFrame.
- takes no args and thus all columns are taken into account when dropping dups
df.join(df.exceptAll(df.distinct()))
- leave duplicates and remove distinct
3.3 Null
df.dropna(how="any", thresh=None, subset=["COLUMN1"])
3.4 Filter
- Filtering
- Pushdown Filtering
- General workflow : read data from file system, filtering on memory
- Pushdown workflow : read necessary data from file sys, not on memory
// single condition
df.filter(df("PRED") === "Entertainment")
df.filter(df("VIDEO_PLAY_TIME") > 20)
// multiple conditions
df.filter((df("PRED") === "Entertainment") && (df("VIDEO_PLAY_TIME") > 20))
3.5 Group
groupBy : Groups the DF using the specified columns, so we can run aggregation on them. returns GroupdedData
.agg(collect_set())
.agg(collect_list())
// grouped data : GroupBy allows you to group rows together based off some column value
df.groupBy("column_name")
.count() // Returns the count of rows for each group.
.mean() // Returns the mean of values for each group.
.max() // Returns the maximum of values for each group.
.min() // Returns the minimum of values for each group.
.sum() // Returns the total for values for each group.
.avg() // Returns the average for values for each group.
.agg() // Using agg() function, we can calculate more than one aggregate at a time.
.pivot() // This function is used to Pivot the DataFrame
3.6 Join
- Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(
on expression is true) from both sides of the table
- Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
- Left join : all rows of the left table remain unchanged, regardless of whether there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
- Right join : performs the same task as the left join, but for the right table.
3.7 UDF
- User-Defined Function (UDF) : wrap a plain Scala function as a Column expression when no built-in covers the logic.
- Prefer built-ins. Catalyst can’t optimize through a UDF — it treats it as a black box, blocking predicate pushdown, constant folding, and codegen. Use
org.apache.spark.sql.functions first; only reach for a UDF when nothing fits.
- Define & apply via DataFrame API
udf((args) => result) returns a UserDefinedFunction you call like any built-in column function.
- Declare an explicit return type for stability (e.g.
udf[String, String, Int](...)).
- Register for Spark SQL
spark.udf.register("name", fn) lets you call it from SQL strings: spark.sql("select name(col) from t").
- Null safety : a UDF receives Scala
nulls as null for boxed types, but as the type’s default (0, "") for primitives — guard inside the function or use Option[T].
- Language performance
- Scala UDFs run inside the JVM executor — fast.
- Python UDFs serialize each row to a Python worker (PySpark) — slow; prefer pandas UDFs (
@pandas_udf, Arrow-based vectorized batches) when you must stay in Python.
import org.apache.spark.sql.functions.{udf, col, lit}
import org.apache.spark.sql.expressions.UserDefinedFunction
// 1. define
val tld: UserDefinedFunction = udf { (url: String) =>
if (url == null) null
else url.split('.').lastOption.getOrElse("")
}
// 2. apply on a column
val df2 = df.withColumn("tld", tld(col("url")))
// 3. Option[T] for clean null handling
val safeLen = udf { (s: String) => Option(s).map(_.length) }
df.withColumn("len", safeLen(col("name")))
// 4. SQL registration
spark.udf.register("tld", tld)
spark.sql("select url, tld(url) as tld from urls")