1. Initialization & Configuration
-
SparkContext
: provides connection to Spark with the ability to create RDDs -
SQLContext
: procides connection to Spark with the ability to run SQL queries on data -
SparkSession
: all encompassing context which includes coverage forSparkContext
,SQLContext
and HiveContext
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# create a SparkSession instance with the name 'appname'
spark = SparkSession.builder.appName("appname").getOrCreate()
# create a SparkContext instance which allows the Spark Application to access
sc = SparkContext.getOrCreate()
# create a SQLContext instance to access the SQL query engine built on top of Spark
sqlContext = SQLContext(spark)
2. Dataframe
Explore Dataframe
# Prints out the schema in the tree format.
df.printSchema()
# To get the number of rows
df.count()
Modifying Dataframe
# Create a column
df = df.withColumn('column_new', F.lit('abc')) # with the defalut value 'abc'
df = df.withColumn('column_new', 2*F.col('exisisting column')) # using an existing column
# Change Column Name
df.withColumnRenamed("column_ori", "column_new")
# add index column
from pyspark.sql.functions import monotonically_increasing_id
df_index = df.select("*").withColumn("id", monotonically_increasing_id())
# filter (=where)
df.filter(df.column_float.between(7.5,8.2))
# remove column
df.drop("column_drop")
Join
- Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(
on
expression is true) from both sides of the table - Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
- Left join : all rows of the left table remain unchanged, regardless of wheter there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
- Right join : performs the same task as the left join, but for the right table.
df1.join(df2, on='column', how='inner')
Read and Write
# create a SparkSession instance with the name 'appname'
spark = SparkSession.builder.appName("appname").getOrCreate()
# read (csv, json, text, parquet)
df = spark.read.csv('PATH_CSV')
# write
df.coalesce(1).write.csv("sample.csv")
GroupedData
-
df.groupBy() returns GroupedData
-
GroupBy allows you to group rows together based off some column value
-
즉, groupBy는 column name을 입력으로 받는데, 이때 입력된 column에 동일한 value가 있는 row들 끼리 일단 group 해줌, 여기서 column을 필요에따라 자유롭게 변형하여 넣어줄 수 있고, 결과로 만들어진 group 역시 자유롭게 변형하여 원하는 출력을 얻을 수 있음
df.groupBy("column_name").
.count() # Returns the count of rows for each group.
.mean() # Returns the mean of values for each group.
.max() # Returns the maximum of values for each group.
.min() # Returns the minimum of values for each group.
.sum() # Returns the total for values for each group.
.avg() # Returns the average for values for each group.
.agg() # Using agg() function, we can calculate more than one aggregate at a time.
.pivot() # This function is used to Pivot the DataFrame
3. SQL functions
from pyspark.sql import functions as F
# Aggregate function: returns a set of objects with duplicate elements eliminated.
F.collect_set()
# Bucketize rows into one or more time windows given a timestamp specifying column.
F.window(timeColumn='timestamp', windowDuration='20 minute')
# Creates a Column of literal value.
F.lit()
# Replace all substrings of the specified string value that match regexp with rep.
F.regexp_replace()
User Defined Function
-
input arguments for udf should be column name as you can see in line no.3 or you might get an
TypeError: Invalid argument, not a string or column: 2 of type <class ‘int’>. For column literals, use ‘lit’, ‘array’, ‘struct’ or ‘create_map’ function
# User Defined Functions def func(x): return x test_udf = F.udf(lambda x : func(x), ArrayType(StringType())) df_udf = df.withColumn('udf', test_udf('input_column_name'))
4. Others
$"colname"
is converted to Column by SQLContext.implicits$.StringToColumn
# zeppelin plot
def show(p):
import io
img = io.StringIO()
p.savefig(img, format='svg')
img.seek(0)
print("%html <div style='width:400px'>" + img.getvalue() + "</div>")