CheatSheet | pyspark


Introduction

  • Contents

    • (1) Initialization & Configuration

    • (2) Read data

    • (3) Data processing

    • (4) Write

    • (5) Visualization

    • (6) Logging



1. Initialization & Configuration

  • SparkContext : provides connection to Spark with the ability to create RDDs

  • SQLContext : procides connection to Spark with the ability to run SQL queries on data

  • SparkSession : all encompassing context which includes coverage for SparkContext, SQLContext and HiveContext

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# create a SparkSession instance with the name 'appname' 
spark = SparkSession.builder.appName("appname").getOrCreate()
# create a SparkContext instance which allows the Spark Application to access 
sc = SparkContext.getOrCreate()
# create a SQLContext instance to access the SQL query engine built on top of Spark
sqlContext = SQLContext(spark)


2. Read data

  • Read dataframe
# read (csv, json, text, parquet)
df = spark.read.csv('PATH_CSV')
  • Data Exploration
# Prints out the schema in the tree format.
df.printSchema()
# To get the number of rows
df.count()


3. Data processing

  • Basic Manipulation
# Create a column
df = df.withColumn('column_new', F.lit('abc')) #  with the defalut value 'abc'
df = df.withColumn('column_new', 2*F.col('exisisting column')) #  using an existing column

# Change Column Name
df.withColumnRenamed("column_ori", "column_new")

# add index column
from pyspark.sql.functions import monotonically_increasing_id 
df_index = df.select("*").withColumn("id", monotonically_increasing_id())

# remove column
df.drop("column_drop")
  • filtering
# filter (=where)
df.filter(df.column_float.between(7.5,8.2))
  • grouping (groupBy)
# grouped data : GroupBy allows you to group rows together based off some column value
df.groupBy("column_name")
  .count() # Returns the count of rows for each group.
  .mean() # Returns the mean of values for each group.
  .max() # Returns the maximum of values for each group.
  .min() # Returns the minimum of values for each group.
  .sum() # Returns the total for values for each group.
  .avg() # Returns the average for values for each group.
  .agg() # Using agg() function, we can calculate more than one aggregate at a time.
  .pivot() # This function is used to Pivot the DataFrame
  • Join
# Use union, if scheme of dfs are same.
df1.join(df2, on='column', how='inner')

# Drop duplicate rows using dropDuplicates(), otherwise df_new will also have duplicate rows
df2 = df2.dropDuplicates((["KEY", "ID"])) # rows with same KEY but diff ID will remain

# update df1 with df2 using outer join
df_new = (
    df1.alias("a")
         .join(df2.alias("b"), "KEY", how="outer")
         .select("KEY", F.coalesce("b.col1", "a.col1").alias("col1") )
    )


# update df1 with df2 using outer join
cols = ["TITLE", "NAME", "URL", "THUMBNAIL", "DATE"]
df_new = df.alias("a")
           .join(df2.alias("b"), ["KEY", "ID"], how="outer")
           .select("KEY","ID", *(F.coalesce("b." + col, "a." + col).alias(col) for col in cols))
  • Union

    • in spark Union is not done on metadata of columns and data is not shuffled.
    • rather union is done on the column numbers as in, if you are unioning 2 DFs both must have the same numbers of columns.
    • you will have to take in consideration of positions of your columns previous to doing union.
  • User Defined Function

    • input arguments for udf should be column name as you can see in line no.3 or you might get an TypeError: Invalid argument, not a string or column: 2 of type <class ‘int’>. For column literals, use ‘lit’, ‘array’, ‘struct’ or ‘create_map’ function
# User Defined Functions
def func(x): 
    return x
test_udf = F.udf(lambda x : func(x), ArrayType(StringType()))
df_udf = df.withColumn('udf', test_udf('input_column_name')) 


4. Write

  • todo


5. Visualization (Plotting)

  • data visualization
# zeppelin plot
def show(p):
    import io
    img = io.StringIO()
    p.savefig(img, format='svg')
    img.seek(0)
    print("%html <div style='width:400px'>" + img.getvalue() + "</div>")ß


5. Logging

  • Writing logs : details in below

  • Check Log in Spark History Server

    • App ID > Executors > Executor ID > stderr
# set logger
Logger= spark._jvm.org.apache.log4j.Logger                                  
logger = Logger.getLogger(__name__)                                         
# leave log                         
logger.error(f"{len_videos}, hw_error")                                     
logger.info(f"{len_videos}, hw_info")              


References