1. Initialization & Configuration
: provides connection to Spark with the ability to create RDDs -
: procides connection to Spark with the ability to run SQL queries on data -
: all encompassing context which includes coverage forSparkContext
and HiveContext
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# create a SparkSession instance with the name 'appname'
spark = SparkSession.builder.appName("appname").getOrCreate()
# create a SparkContext instance which allows the Spark Application to access
sc = SparkContext.getOrCreate()
# create a SQLContext instance to access the SQL query engine built on top of Spark
sqlContext = SQLContext(spark)
2. Dataframe
Explore Dataframe
# Prints out the schema in the tree format.
# To get the number of rows
Modifying Dataframe
# Create a column
df = df.withColumn('column_new', F.lit('abc')) # with the defalut value 'abc'
df = df.withColumn('column_new', 2*F.col('exisisting column')) # using an existing column
# Change Column Name
df.withColumnRenamed("column_ori", "column_new")
# add index column
from pyspark.sql.functions import monotonically_increasing_id
df_index = df.select("*").withColumn("id", monotonically_increasing_id())
# filter (=where)
# remove column
- Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(
expression is true) from both sides of the table - Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
- Left join : all rows of the left table remain unchanged, regardless of wheter there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
- Right join : performs the same task as the left join, but for the right table.
df1.join(df2, on='column', how='inner')
Read and Write
# create a SparkSession instance with the name 'appname'
spark = SparkSession.builder.appName("appname").getOrCreate()
# read (csv, json, text, parquet)
df = spark.read.csv('PATH_CSV')
# write
df.groupBy() returns GroupedData
GroupBy allows you to group rows together based off some column value
즉, groupBy는 column name을 입력으로 받는데, 이때 입력된 column에 동일한 value가 있는 row들 끼리 일단 group 해줌, 여기서 column을 필요에따라 자유롭게 변형하여 넣어줄 수 있고, 결과로 만들어진 group 역시 자유롭게 변형하여 원하는 출력을 얻을 수 있음
.count() # Returns the count of rows for each group.
.mean() # Returns the mean of values for each group.
.max() # Returns the maximum of values for each group.
.min() # Returns the minimum of values for each group.
.sum() # Returns the total for values for each group.
.avg() # Returns the average for values for each group.
.agg() # Using agg() function, we can calculate more than one aggregate at a time.
.pivot() # This function is used to Pivot the DataFrame
3. SQL functions
from pyspark.sql import functions as F
# Aggregate function: returns a set of objects with duplicate elements eliminated.
# Bucketize rows into one or more time windows given a timestamp specifying column.
F.window(timeColumn='timestamp', windowDuration='20 minute')
# Creates a Column of literal value.
# Replace all substrings of the specified string value that match regexp with rep.
User Defined Function
input arguments for udf should be column name as you can see in line no.3 or you might get an
TypeError: Invalid argument, not a string or column: 2 of type <class ‘int’>. For column literals, use ‘lit’, ‘array’, ‘struct’ or ‘create_map’ function
# User Defined Functions def func(x): return x test_udf = F.udf(lambda x : func(x), ArrayType(StringType())) df_udf = df.withColumn('udf', test_udf('input_column_name'))
4. Others
is converted to Column by SQLContext.implicits$.StringToColumn
# zeppelin plot
def show(p):
import io
img = io.StringIO()
p.savefig(img, format='svg')
print("%html <div style='width:400px'>" + img.getvalue() + "</div>")