Introduction
- Contents
- 2.1 Read
- 2.2 Read Streaming
2.1 Read
- Entry point :
spark.read returns a DataFrameReader. Generic form is spark.read.format("fmt").option(...).load(path); each built-in format has a shorthand (parquet, csv, json, orc, text, jdbc, table).
- Schema
- Default: schema is inferred (
parquet/orc from the file footer; csv/json only if inferSchema = true, which requires an extra pass).
- Production: always pass an explicit
StructType via .schema(...). Skips inference, makes the job idempotent, and lets you handle type changes.
- Common options
- CSV:
header, delimiter, quote, escape, nullValue, mode (PERMISSIVE / DROPMALFORMED / FAILFAST).
- JSON:
multiLine for pretty-printed JSON, primitivesAsString to preserve numeric precision.
- Parquet / ORC:
mergeSchema to union schemas across files (slow; off by default).
- Path semantics
- You can pass a file, a directory, a glob (
s3a://bucket/yyyy=2024/mm=*/data-*.parquet), or a comma-separated list.
- Hive-style partitioning (
/year=2024/month=01/...) is auto-discovered and exposed as columns.
- Other sources
spark.read.jdbc(url, table, props) — for relational DBs. Use partitionColumn + lowerBound/upperBound/numPartitions to parallelize.
spark.read.table("db.t") — Hive/metastore-managed tables.
import org.apache.spark.sql.types._
// 1. shorthand by format
val df = spark.read.parquet("s3a://bucket/events/dt=2024-05-18/")
// 2. CSV with explicit schema and options
val schema = StructType(Seq(
StructField("user_id", StringType, nullable = false),
StructField("ts", LongType, nullable = false),
StructField("payload", StringType, nullable = true)
))
val raw = spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED") // skip bad rows instead of failing
.csv("/data/events/*.csv")
// 3. JDBC with parallel partitioning
val jdbcDf = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host/db")
.option("dbtable", "events")
.option("partitionColumn","id")
.option("lowerBound", "1")
.option("upperBound", "10000000")
.option("numPartitions", "16")
.load()
// 4. Hive table
val orders = spark.read.table("warehouse.orders")
2.2 Read Streaming
- Structured Streaming : same DataFrame API, batched into micro-batches (or continuous). Entry point is
spark.readStream instead of spark.read. The returned DataFrame is unbounded; transformations are the same as batch.
- Schema is required for file sources (no inference for streaming).
- Common sources
- File source (
csv, json, parquet, orc, text) : watches a directory for new files. Options: maxFilesPerTrigger (throughput cap), latestFirst, fileNameOnly.
- Kafka (
format("kafka")) : the standard message-bus source. Reads return columns (key, value, topic, partition, offset, timestamp, ...) — value is a binary blob you typically cast("string") then parse.
- rate / socket : test sources only — never use in production.
- Triggers (configured on the write side via
.trigger(...)) : ProcessingTime("30 seconds"), Once (one micro-batch and exit), AvailableNow (process all current data then exit), or Continuous (experimental).
- Watermarks :
.withWatermark("event_time", "10 minutes") lets Spark drop very late records and bound state size for windowed aggregations.
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types._
// 1. File source — incremental ingestion of new files
val schema = new StructType()
.add("user_id", StringType)
.add("ts", LongType)
.add("event", StringType)
val fileStream = spark.readStream
.schema(schema)
.option("maxFilesPerTrigger", 10)
.json("s3a://bucket/landing/")
// 2. Kafka source — JSON-encoded payload
val eventSchema = new StructType()
.add("user_id", StringType)
.add("ts", LongType)
.add("event", StringType)
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events.v1")
.option("startingOffsets", "latest") // or "earliest", or per-partition JSON
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), eventSchema).as("e"))
.select("e.*")
.withWatermark("ts", "10 minutes") // bound state for downstream windows