Spark #3 | Processing


Introduction

  • Contents
    • 3.1 Basic Manipulation
    • 3.2 Duplicates
    • 3.3 Null
    • 3.4 Filter
    • 3.5 Group
    • 3.6 Join
    • 3.7 UDF


3.1 Basic Manipulation

  • Schema / shape inspection
    • printSchema() : prints the column names, types, and nullability of the DataFrame.
    • dtypes : returns a sequence of (name, type) tuples without printing.
    • count() : total row count.
  • Select
    • select(col1, col2, ...) : project a subset of columns.
    • selectExpr("a", "b + 1 as c") : project with SQL expressions; cheaper than chained withColumn.
  • Add / transform columns
    • withColumn("name", expr) : add a new column or replace one of the same name.
    • lit(value) : turn a Scala value into a Column literal (use inside withColumn).
    • col("x").cast("int") : type-cast a column.
  • Rename / drop columns
    • withColumnRenamed("old", "new")
    • drop("col1", "col2")
  • Order
    • orderBy(col("x").desc, col("y").asc) (alias sort(...)).
  • Sample / preview
    • limit(n) : take the first n rows as a DataFrame (lazy).
    • show(n, truncate=false) : print to driver (action; triggers a job).
    • head(n) / take(n) : collect first n rows to the driver as Array[Row].
import org.apache.spark.sql.functions.{col, lit, expr}

// inspect
df.printSchema()
df.count()

// select & rename
df.select(col("user_id"), col("ts"))
df.selectExpr("user_id", "from_unixtime(ts) as event_time")
df.withColumnRenamed("ts", "event_ts")

// add / cast / drop
val df2 = df
  .withColumn("source", lit("kafka"))               // constant column
  .withColumn("duration", col("end") - col("start"))
  .withColumn("user_id", col("user_id").cast("string"))
  .drop("raw_payload")

// order & preview
df2.orderBy(col("event_ts").desc).show(10, truncate = false)
val firstTen = df2.limit(10)                        // still a DataFrame, lazy
val sample   = df2.take(5)                          // Array[Row], runs a job


3.2 Duplicates

  • drop_duplicates(subset=["COLUMN"]) :
    • Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
  • distinct() :
    • returns the distinct rows of DataFrame.
    • takes no args and thus all columns are taken into account when dropping dups
  • df.join(df.exceptAll(df.distinct()))
    • leave duplicates and remove distinct


3.3 Null

  • df.dropna(how="any", thresh=None, subset=["COLUMN1"])


3.4 Filter

  • Filtering
  • Pushdown Filtering
    • General workflow : read data from file system, filtering on memory
    • Pushdown workflow : read necessary data from file sys, not on memory
// single condition
df.filter(df("PRED") === "Entertainment")
df.filter(df("VIDEO_PLAY_TIME") > 20)
// multiple conditions
df.filter((df("PRED") === "Entertainment") && (df("VIDEO_PLAY_TIME") > 20))


3.5 Group

  • groupBy : Groups the DF using the specified columns, so we can run aggregation on them. returns GroupdedData
    • .agg(collect_set())
    • .agg(collect_list())
// grouped data : GroupBy allows you to group rows together based off some column value
df.groupBy("column_name")
  .count() // Returns the count of rows for each group.
  .mean() // Returns the mean of values for each group.
  .max() // Returns the maximum of values for each group.
  .min() // Returns the minimum of values for each group.
  .sum() // Returns the total for values for each group.
  .avg() // Returns the average for values for each group.
  .agg() // Using agg() function, we can calculate more than one aggregate at a time.
  .pivot() // This function is used to Pivot the DataFrame


3.6 Join

  • Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(on expression is true) from both sides of the table
  • Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
  • Left join : all rows of the left table remain unchanged, regardless of whether there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
  • Right join : performs the same task as the left join, but for the right table.


3.7 UDF

  • User-Defined Function (UDF) : wrap a plain Scala function as a Column expression when no built-in covers the logic.
  • Prefer built-ins. Catalyst can’t optimize through a UDF — it treats it as a black box, blocking predicate pushdown, constant folding, and codegen. Use org.apache.spark.sql.functions first; only reach for a UDF when nothing fits.
  • Define & apply via DataFrame API
    • udf((args) => result) returns a UserDefinedFunction you call like any built-in column function.
    • Declare an explicit return type for stability (e.g. udf[String, String, Int](...)).
  • Register for Spark SQL
    • spark.udf.register("name", fn) lets you call it from SQL strings: spark.sql("select name(col) from t").
  • Null safety : a UDF receives Scala nulls as null for boxed types, but as the type’s default (0, "") for primitives — guard inside the function or use Option[T].
  • Language performance
    • Scala UDFs run inside the JVM executor — fast.
    • Python UDFs serialize each row to a Python worker (PySpark) — slow; prefer pandas UDFs (@pandas_udf, Arrow-based vectorized batches) when you must stay in Python.
import org.apache.spark.sql.functions.{udf, col, lit}
import org.apache.spark.sql.expressions.UserDefinedFunction

// 1. define
val tld: UserDefinedFunction = udf { (url: String) =>
  if (url == null) null
  else url.split('.').lastOption.getOrElse("")
}

// 2. apply on a column
val df2 = df.withColumn("tld", tld(col("url")))

// 3. Option[T] for clean null handling
val safeLen = udf { (s: String) => Option(s).map(_.length) }
df.withColumn("len", safeLen(col("name")))

// 4. SQL registration
spark.udf.register("tld", tld)
spark.sql("select url, tld(url) as tld from urls")