Spark #2 | Read


Introduction

  • Contents
    • 2.1 Read
    • 2.2 Read Streaming


2.1 Read

  • Entry point : spark.read returns a DataFrameReader. Generic form is spark.read.format("fmt").option(...).load(path); each built-in format has a shorthand (parquet, csv, json, orc, text, jdbc, table).
  • Schema
    • Default: schema is inferred (parquet/orc from the file footer; csv/json only if inferSchema = true, which requires an extra pass).
    • Production: always pass an explicit StructType via .schema(...). Skips inference, makes the job idempotent, and lets you handle type changes.
  • Common options
    • CSV: header, delimiter, quote, escape, nullValue, mode (PERMISSIVE / DROPMALFORMED / FAILFAST).
    • JSON: multiLine for pretty-printed JSON, primitivesAsString to preserve numeric precision.
    • Parquet / ORC: mergeSchema to union schemas across files (slow; off by default).
  • Path semantics
    • You can pass a file, a directory, a glob (s3a://bucket/yyyy=2024/mm=*/data-*.parquet), or a comma-separated list.
    • Hive-style partitioning (/year=2024/month=01/...) is auto-discovered and exposed as columns.
  • Other sources
    • spark.read.jdbc(url, table, props) — for relational DBs. Use partitionColumn + lowerBound/upperBound/numPartitions to parallelize.
    • spark.read.table("db.t") — Hive/metastore-managed tables.
import org.apache.spark.sql.types._

// 1. shorthand by format
val df = spark.read.parquet("s3a://bucket/events/dt=2024-05-18/")

// 2. CSV with explicit schema and options
val schema = StructType(Seq(
  StructField("user_id",  StringType,    nullable = false),
  StructField("ts",       LongType,      nullable = false),
  StructField("payload",  StringType,    nullable = true)
))
val raw = spark.read
  .schema(schema)
  .option("header", "true")
  .option("mode",   "DROPMALFORMED")   // skip bad rows instead of failing
  .csv("/data/events/*.csv")

// 3. JDBC with parallel partitioning
val jdbcDf = spark.read
  .format("jdbc")
  .option("url",            "jdbc:postgresql://host/db")
  .option("dbtable",        "events")
  .option("partitionColumn","id")
  .option("lowerBound",     "1")
  .option("upperBound",     "10000000")
  .option("numPartitions",  "16")
  .load()

// 4. Hive table
val orders = spark.read.table("warehouse.orders")


2.2 Read Streaming

  • Structured Streaming : same DataFrame API, batched into micro-batches (or continuous). Entry point is spark.readStream instead of spark.read. The returned DataFrame is unbounded; transformations are the same as batch.
  • Schema is required for file sources (no inference for streaming).
  • Common sources
    • File source (csv, json, parquet, orc, text) : watches a directory for new files. Options: maxFilesPerTrigger (throughput cap), latestFirst, fileNameOnly.
    • Kafka (format("kafka")) : the standard message-bus source. Reads return columns (key, value, topic, partition, offset, timestamp, ...)value is a binary blob you typically cast("string") then parse.
    • rate / socket : test sources only — never use in production.
  • Triggers (configured on the write side via .trigger(...)) : ProcessingTime("30 seconds"), Once (one micro-batch and exit), AvailableNow (process all current data then exit), or Continuous (experimental).
  • Watermarks : .withWatermark("event_time", "10 minutes") lets Spark drop very late records and bound state size for windowed aggregations.
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types._

// 1. File source — incremental ingestion of new files
val schema = new StructType()
  .add("user_id", StringType)
  .add("ts",      LongType)
  .add("event",   StringType)

val fileStream = spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 10)
  .json("s3a://bucket/landing/")

// 2. Kafka source — JSON-encoded payload
val eventSchema = new StructType()
  .add("user_id", StringType)
  .add("ts",      LongType)
  .add("event",   StringType)

val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
  .option("subscribe",               "events.v1")
  .option("startingOffsets",         "latest")   // or "earliest", or per-partition JSON
  .load()
  .selectExpr("CAST(value AS STRING) as json")
  .select(from_json(col("json"), eventSchema).as("e"))
  .select("e.*")
  .withWatermark("ts", "10 minutes")             // bound state for downstream windows