CheatSheet | pyspark


1. Introduction

  • 1.1 Terminologies

    • Job : A piece of code that reads some input (HDFS or local), performs some computation and writes output.

    • Stages : Jobs are divided into stages. Stages are classified as a Map or reduce stages. Stages are divided based on computational boundaries, all computations cannot be Updated in a single Stage. It happens over many stages.

    • Tasks : Each stage has some tasks. One task is executed on one partition of data on one executor (machine).

    • DAG : DAG stands for Directed Acyclic Graph, in the present context its a DAG of operators.

    • Executor : The process responsible for executing a task.

    • Master : The machine on which the Driver program runs

    • Slave : The machine on which the Executor program runs

  • 1.2 Spark Components

    • Spark Driver

      • separate process to execute user applications

      • creates SparkContext to schedule jobs execution and negotiate with cluster manager

    • Executors

      • run tasks scheduled by driver

      • store computation results in memory, on disk or off-heap

      • interact with storage systems

    • Cluster Manager

      • Apache Mesos : a general cluster manager that can also run Hadoop MapReduce and service applications.

      • Hadoop YARN : the resource manager in Hadoop.

      • Spark Standalone : a simple cluster manager included with Spark that makes it easy to set up

  • 1.3 How Spark Works?

    • Spark has a small code base and the system is divided in various layer.

    • Each layers has some responsibilities. The layers are independent of each other

      • Interpreter : The first layer is the interpreter, Spark uses a Scala interpreter, with some modifications. As you enter your code in spark console (creating RDD’s and applying operators), Spark creates a operator graph.

      • DAG Scheduler : When the user runs an action (like collect), the Graph is submitted to a DAG Scheduler. The DAG scheduler divides operator graph into (map and reduce) stages. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together to optimize the graph. For e.g. Many map operators can be scheduled in a single stage. This optimization is key to Sparks performance. The final result of a DAG scheduler is a set of stages.

      • Task Schedular : The stages are passed on to the Task Scheduler. The task scheduler launches tasks via cluster manager. (Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies among stages.



2. Install Spark Locally

  • 2.1 Prerequisite

    • Check java version first

    • Then, just install miniconda3 and create virtual environment based on python 3.6

    java -version
    # The command above might show something like below
    >> openjdk version "1.8.0_262"
    >> OpenJDK Runtime Environment (build 1.8.0_262-b10)
    >> OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)
    
  • 2.2 Download Spark

    # Download spark
    wget https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
    # unzip 
    tar -xvzf spark-3.0.1-bin-hadoop2.7.tgz
    # move to home and rename
    mv spark-3.0.1-bin-hadoop2.7 ~/spark
    
  • 2.3 Install pyspark

    pip install pyspark
    
  • 2.4 Change the execution path for pyspark

    export SPARK_HOME="/your_home_directory/spark/"
    export PATH="$SPARK_HOME/bin:$PATH"
    
  • 2.5 Test

    $ pyspark
    
  • 2.6 PySpark in Jupyter

    # add below lines in .bashrc 
    export PYSPARK_DRIVER_PYTHON=jupyter 
    export PYSPARK_DRIVER_PYTHON_OPTS='notebook' 
    # And run $ pyspark >> will give you a url for jupyter notebook
    


3. Cheat Sheet

3.1 Initialization & Configuration

  • SparkContext : provides connection to Spark with the ability to create RDDs

  • SQLContext : procides connection to Spark with the ability to run SQL queries on data

  • SparkSession : all encompassing context which includes coverage for SparkContext, SQLContext and HiveContext

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# create a SparkSession instance with the name 'appname' 
spark = SparkSession.builder.appName("appname").getOrCreate()
# create a SparkContext instance which allows the Spark Application to access 
sc = SparkContext.getOrCreate()
# create a SQLContext instance to access the SQL query engine built on top of Spark
sqlContext = SQLContext(spark)

3.2 Dataframe

  • Explore Dataframe
# Prints out the schema in the tree format.
df.printSchema()
# To get the number of rows
df.count()
  • Modifying Dataframe
# Create a column
df = df.withColumn('column_new', F.lit('abc')) #  with the defalut value 'abc'
df = df.withColumn('column_new', 2*F.col('exisisting column')) #  using an existing column

# Change Column Name
df.withColumnRenamed("column_ori", "column_new")

# add index column
from pyspark.sql.functions import monotonically_increasing_id 
df_index = df.select("*").withColumn("id", monotonically_increasing_id())

# filter (=where)
df.filter(df.column_float.between(7.5,8.2))

# remove column
df.drop("column_drop")

# grouped data : GroupBy allows you to group rows together based off some column value
df.groupBy("column_name").
  .count() # Returns the count of rows for each group.
  .mean() # Returns the mean of values for each group.
  .max() # Returns the maximum of values for each group.
  .min() # Returns the minimum of values for each group.
  .sum() # Returns the total for values for each group.
  .avg() # Returns the average for values for each group.
  .agg() # Using agg() function, we can calculate more than one aggregate at a time.
  .pivot() # This function is used to Pivot the DataFrame
  • Read and Write
# create a SparkSession instance with the name 'appname' 
spark = SparkSession.builder.appName("appname").getOrCreate()
# read (csv, json, text, parquet)
df = spark.read.csv('PATH_CSV')
# write
df.coalesce(1).write.csv("sample.csv")

3.3 Join

  • Inner join : essentially removes anything that is not common in both tables. It returns all data that has a match under the join condition(on expression is true) from both sides of the table
  • Outer join : allows us to include in the result rows of one table for which there are no matching rows round in another table
  • Left join : all rows of the left table remain unchanged, regardless of whether there is a match in the right table or not. When a id match is found in the right table, it will be returned or null otherwise
  • Right join : performs the same task as the left join, but for the right table.
# Use union, if scheme of dfs are same.
df1.join(df2, on='column', how='inner')
# Drop duplicate rows using dropDuplicates(), otherwise df_new will also have duplicate rows
df2 = df2.dropDuplicates((["KEY", "UID"])) # rows with same KEY but diff UID will remain

# update df1 with df2 using outer join
cols = ["DOC_TITLE", "CHANNEL_NAME", "CHANNEL_PC_URL", "VIDEO_THUMBNAIL", "MODIFY_DATE", "INFER_DATE"]
df_new = df.alias("a")
           .join(df2.alias("b"), ["KEY", "UID"], how="outer")
           .select("KEY","UID", *(F.coalesce("b." + col, "a." + col).alias(col) for col in cols))

3.4 SQL functions

from pyspark.sql import functions as F

# Aggregate function: returns a set of objects with duplicate elements eliminated.
F.collect_set()
# Bucketize rows into one or more time windows given a timestamp specifying column. 
F.window(timeColumn='timestamp', windowDuration='20 minute')
# Creates a Column of literal value.
F.lit()
# Replace all substrings of the specified string value that match regexp with rep.
F.regexp_replace()

3.5 User Defined Function

  • input arguments for udf should be column name as you can see in line no.3 or you might get an TypeError: Invalid argument, not a string or column: 2 of type <class ‘int’>. For column literals, use ‘lit’, ‘array’, ‘struct’ or ‘create_map’ function

    # User Defined Functions
    def func(x): return x
    test_udf = F.udf(lambda x : func(x), ArrayType(StringType()))
    df_udf = df.withColumn('udf', test_udf('input_column_name')) 
    

3.6 Others

  • $"colname" is converted to Column by SQLContext.implicits$.StringToColumn
# zeppelin plot
def show(p):
    import io
    img = io.StringIO()
    p.savefig(img, format='svg')
    img.seek(0)
    print("%html <div style='width:400px'>" + img.getvalue() + "</div>")


4. Submit Pyspark

PYSPARK_PYTHON=python3 spark-submit \
    --master local[*] \
    --executor-memory 20g \
    --driver-java-options "-XX:-OmitStackTraceInFastThrow" \
    --conf spark.driver.memory=20g \
    --conf spark.driver.maxResultSize=5g \
    --conf spark.executor.memoryOverhead=4096 \
    --conf spark.rpc.timeout=6000s \
    --conf spark.eventLog.enabled=true \
    /opt/dump/src/download_channel.py \
PYSPARK_PYTHON=python3 spark-submit \    --name ${JOB_NAME} \
    --master yarn \ 
    # --master local[*] \ #to use local machine
    --deploy-mode cluster \
    --num-executors 200 \
    --executor-cores 1 \
    --executor-memory 2g \ # <20g
    --driver-java-options "-XX:-OmitStackTraceInFastThrow" \
    --conf spark.driver.memory=4g \ # <20g
    --conf spark.eventLog.enabled=true \
    --conf spark.eventLog.dir=hdfs://user/${USERNAME}/spark2-history/ \
    --conf spark.executor.memoryOverhead=4096 \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE} \
    --conf spark.executorEnv.JAVA_HOME=/usr/jdk64/jdk1.8.0_112 \
    --conf spark.yarn.queue=batch \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${DOCKER_IMAGE} \
    --conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/jdk64/jdk1.8.0_112 \
    --conf spark.yarn.keytab=${KEYTAB} \
    --conf spark.yarn.principal=${C3S_PRINCIPAL} \
    pyspark_entrypoint.py \


5. Logging

  • Writing logs : details in below

  • Check Log in Spark History Server

    • App ID > Executors > Executor ID > stderr
# set logger
Logger= spark._jvm.org.apache.log4j.Logger                                  
logger = Logger.getLogger(__name__)                                         
# leave log                         
logger.error(f"{len_videos}, hw_error")                                     
logger.info(f"{len_videos}, hw_info")              

References