Cheat Sheet | Spark

1. Initialization & Configuration

  • SparkContext : provides connection to Spark with the ability to create RDDs

  • SQLContext : procides connection to Spark with the ability to run SQL queries on data

  • SparkSession : all encompassing context which includes coverage for SparkContext, SQLContext and HiveContext

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# create a SparkSession instance with the name 'appname' 
spark = SparkSession.builder.appName("appname").getOrCreate()

# create a SparkContext instance which allows the Spark Application to access 
sc = SparkContext.getOrCreate()

# create a SQLContext instance to access the SQL query engine built on top of Spark
sqlContext = SQLContext(spark)




2. Dataframe

Explore Dataframe

# Prints out the schema in the tree format.
df.printSchema()
# To get the number of rows
df.count()

Modifying Dataframe

# Create a column
df = df.withColumn('column_new', F.lit('abc')) #  with the defalut value 'abc'
df = df.withColumn('column_new', 2*F.col('exisisting column')) #  using an existing column

# Change Column Name
df.withColumnRenamed("column_ori", "column_new")

# add index column
from pyspark.sql.functions import monotonically_increasing_id 
df_index = df.select("*").withColumn("id", monotonically_increasing_id())

# filter (=where)
df.filter(df.column_float.between(7.5,8.2))

# remove column
df.drop("column_drop")


Join

  • Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(on expression is true) from both sides of the table
  • Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
  • Left join : all rows of the left table remain unchanged, regardless of wheter there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
  • Right join : performs the same task as the left join, but for the right table.
df1.join(df2, on='column', how='inner')

Read and Write

# create a SparkSession instance with the name 'appname' 
spark = SparkSession.builder.appName("appname").getOrCreate()

# read (csv, json, text, parquet)
df = spark.read.csv('PATH_CSV')

# write
df.coalesce(1).write.csv("sample.csv")

GroupedData

  • df.groupBy() returns GroupedData

  • GroupBy allows you to group rows together based off some column value

  • 즉, groupBy는 column name을 입력으로 받는데, 이때 입력된 column에 동일한 value가 있는 row들 끼리 일단 group 해줌, 여기서 column을 필요에따라 자유롭게 변형하여 넣어줄 수 있고, 결과로 만들어진 group 역시 자유롭게 변형하여 원하는 출력을 얻을 수 있음

df.groupBy("column_name").
  .count() # Returns the count of rows for each group.
  .mean() # Returns the mean of values for each group.
  .max() # Returns the maximum of values for each group.
  .min() # Returns the minimum of values for each group.
  .sum() # Returns the total for values for each group.
  .avg() # Returns the average for values for each group.
  .agg() # Using agg() function, we can calculate more than one aggregate at a time.
  .pivot() # This function is used to Pivot the DataFrame




3. SQL functions

from pyspark.sql import functions as F

# Aggregate function: returns a set of objects with duplicate elements eliminated.
F.collect_set()

# Bucketize rows into one or more time windows given a timestamp specifying column. 
F.window(timeColumn='timestamp', windowDuration='20 minute')

# Creates a Column of literal value.
F.lit()

# Replace all substrings of the specified string value that match regexp with rep.
F.regexp_replace()

User Defined Function

  • input arguments for udf should be column name as you can see in line no.3 or you might get an TypeError: Invalid argument, not a string or column: 2 of type <class ‘int’>. For column literals, use ‘lit’, ‘array’, ‘struct’ or ‘create_map’ function

    # User Defined Functions
    def func(x): return x
    test_udf = F.udf(lambda x : func(x), ArrayType(StringType()))
    df_udf = df.withColumn('udf', test_udf('input_column_name')) 
    




4. Others

  • $"colname" is converted to Column by SQLContext.implicits$.StringToColumn
# zeppelin plot
def show(p):
    import io
    img = io.StringIO()
    p.savefig(img, format='svg')
    img.seek(0)
    print("%html <div style='width:400px'>" + img.getvalue() + "</div>")




References