PySpark script example and how to run pyspark script

In the previous post we saw how to create and run a very basic pyspark script in Hadoop environment. In this post, we will walkthrough a pyspark script template in detail. We will see different options while creating a pyspark script and also how to run a pyspark script with multiple configurations.

In this post, I have shared a basic PySpark template which I generally use to write PySpark script. Let's look at the template below and then we will walkthrough each section in it.

PySpark Template

"""
# Title : PySpark Script Template
# Description : This template can be used to create pyspark script
# Author : sqlandhadoop.com
# Date : 30-June-2021
# Version : 1.0 (Initial Draft)
# Usage : spark-submit --executor-memory 4G --executor-cores 4 PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &
"""

# import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys,logging
from datetime import datetime

# Logging configuration
formatter = logging.Formatter('[%(asctime)s] %(levelname)s @ line %(lineno)d: %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)

# current time variable to be used for logging purpose
dt_string = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
# change it to your app name
AppName = "MyPySparkApp"


# adding dummy function. change or remove it.
def some_function1():
    logger.info("Inside some_function 1")

# adding dummy function. change or remove it.
def some_function2():
    logger.info("Inside some_function 2")

def main():
    # start spark code
    spark = SparkSession.builder.appName(AppName+"_"+str(dt_string)).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    logger.info("Starting spark application")

    #calling function 1
    some_function1()

    #calling function 2
    some_function2()

    #do something here
    logger.info("Reading CSV File")
    df_category = spark.read.option("delimiter","|").csv("hdfs:///var/data/category_pipe.txt")
    logger.info("Previewing CSV File Data")
    df_category.show(truncate=False)

    logger.info("Ending spark application")
    # end spark code
    spark.stop()
    return None

# Starting point for PySpark
if __name__ == '__main__':
    main()
    sys.exit()

Save the file as "PySpark_Script_Template.py"

Let us look at each section in the pyspark script template.

How to Create a PySpark Script ?

Section 1: PySpark Script : Comments/Description

The first section which begins at the start of the script is typically a comment section in which I tend to describe about the pyspark script. I will answer few questions like why am I creating this script and what tasks is it suppose to complete. Also I will include author ,date & version information in the comments section. Script usage or command to execute the pyspark script can also be added in this section.

"""
# Title : PySpark Script Template
# Description : This template can be used to create pyspark script
# Author : sqlandhadoop.com
# Date : 30-June-2021
# Version : 1.0 (Initial Draft)
# Usage : spark-submit --executor-memory 4G --executor-cores 4 PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &
"""

The comment section is really very important and often the most ignored section in pyspark script. However with proper comments section you can make sure that anyone else can understand and run pyspark script easily without any help.

Section 2: PySpark script : Import modules/library

Right after comments section , comes the second section in which I import all the modules and libraries required for the pyspark script execution. Few common modules which you will require for running pyspark scripts are mentioned below. You can add , modify or remove as per your requirement.

# import modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys,logging
from datetime import datetime

It is good practice to include all import modules together at the start. As you will write more pyspark code , you may require more modules and you can add in this section.

Section 3 : PySpark script : Logging information

Logging is very important section and it is must have for any pyspark script. When you are running any pyspark script , it becomes necessary to create a log file for each run. Most of the time, you don't want to go through yarn logs to understand the execution status. Hence it is really important to have a dedicated log file for each run which contains custom output as required by developer and user of the script.

# Logging configuration
formatter = logging.Formatter('[%(asctime)s] %(levelname)s @ line %(lineno)d: %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)

I generally use above mentioned logging settings in pyspark script. The output results in a very readable format which is very useful during debugging the issues if any in pyspark.

Now once you have created a logger, just use it to display messages on screen or into a log file.

logger.info("Starting spark application")

The output of above logging configuration used in the pyspark script mentioned above will look something like this.

[2021-05-28 05:06:06,312] INFO @ line 42: Starting spark application
[2021-05-28 05:06:06,312] INFO @ line 30: Inside some_function 1
[2021-05-28 05:06:06,312] INFO @ line 34: Inside some_function 2
[2021-05-28 05:06:06,312] INFO @ line 51: Reading CSV File
[2021-05-28 05:06:15,583] INFO @ line 53: Previewing CSV File Data
+---+--------+---------+------------------------------------------+
|_c0|_c1     |_c2      |_c3                                       |
+---+--------+---------+------------------------------------------+
|1  |Sports  |MLB      |Major League Baseball                     |
|2  |Sports  |NHL      |National Hockey League                    |
|3  |Sports  |NFL      |National Football League                  |
|4  |Sports  |NBA      |National Basketball Association           |
|5  |Sports  |MLS      |Major League Soccer                       |
|6  |Shows   |Musicals |Musical theatre                           |
|7  |Shows   |Plays    |All non-musical theatre                   |
|8  |Shows   |Opera    |All opera and light opera                 |
|9  |Concerts|Pop      |All rock and pop music concerts           |
|10 |Concerts|Jazz     |All jazz singers and bands                |
|11 |Concerts|Classical|All symphony, concerto, and choir concerts|
+---+--------+---------+------------------------------------------+
[2021-05-28 05:06:22,549] INFO @ line 56: Ending spark application

It prints the TIMESTAMP and then the LOG LEVEL which is INFO and then it prints the LINE number at which the command is running in pyspark script. Finally it will print the actual message in the log file.

Section 4 : PySpark script : Variable declaration and initialisation

Use this section to define the parameters or variables to be used in pyspark script. You can define it where ever you wish to in python however I prefer to define it at the start in this section. The reason is it makes it simple to manage and modify any existing parameter value.

# current time variable to be used for logging purpose
dt_string = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
# change it to your app name
AppName = "MyPySparkApp"

You can add , modify or remove the variables as per your requirement.

Section 5: PySpark script : custom defined functions

It is very good practice to break your code into multiple small code chunks. For this, I prefer to create multiple functions specific to each functionality and then I create it as separate functions in python. So I define all the user defined functions in this section.

# adding dummy function. change or remove it.
def some_function1():
    logger.info("Inside some_function 1")

# adding dummy function. change or remove it.
def some_function2():
    logger.info("Inside some_function 2")

Section 6: PySpark script : main function

This is the main function in which I generally keep most of the important tasks. In this function I also call other functions to complete the required processing. In the main function I also define SparkSession and perform spark dataframe related activities.

def main():
    # start spark code
    spark = SparkSession.builder.appName(AppName+"_"+str(dt_string)).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    logger.info("Starting spark application")

    #calling function 1
    some_function1()

    #calling function 2
    some_function2()

    #do something here
    logger.info("Reading CSV File")
    df_category = spark.read.option("delimiter","|").csv("hdfs:///var/data/category_pipe.txt")
    logger.info("Previewing CSV File Data")
    df_category.show(truncate=False)

    logger.info("Ending spark application")
    # end spark code
    spark.stop()
    return None

How to create spark session in pyspark ?

You can call SparkSession.builder to create a new sparksession. In the below command we have also assigned a name to it.

spark = SparkSession.builder.appName(AppName+"_"+str(dt_string)).getOrCreate()

How to set log level in pyspark ?

You can use sparkContext to set Log Level in pyspark. In the below example , we have set it to ERROR. Other possible values are INFO, WARN, DEBUG.

spark.sparkContext.setLogLevel("ERROR")

How to stop spark application in pyspark ?

Once you have completed all the spark tasks you must stop the spark session using below command:

spark.stop()

Section 7 : Calling the python main module

Just like SparkSession is starting point for any spark application we have to define a main module in Python which is starting point of any Python script. You can define main module using the command below:

# Starting point for PySpark
if __name__ == '__main__':
    main()
    sys.exit()

I generally write the main module at last in the python script and I call the main() function in it. Once it completes I call the exit() function to exit the pyspark script.

Now we know how to create a pyspark script. Let us see how to run this script as well.

How to run pyspark script ?

You can run pyspark script by invoking spark-submit utility. While calling the pyspark script you can also pass multiple options and we will see few important ones in this post.

Run PySpark script with spark-submit

Run the script using below command

spark-submit PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &

The above command will run the pyspark script and will also create a log file. In the log file you can also check the output of logger easily.

PySpark script : set executor-memory and executor-cores

You can easily pass executor memory and executor-cores in spark-submit command to be used for your application. I generally don't pass num-executors because I tend to use "spark.dynamicAllocation.enabled" to true.

spark-submit --executor-memory 6G --executor-cores 4 PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &

Also for most of my applications I keep executor memory roughly to 1.2 – 1.5 times the executor-cores. I have seen as per the ETL tasks I have done earlier that this number works good for my use cases. In some exceptional cases I do increase executor memory in case of memory related failures. This also helps in utilising cluster to the max and leaves minimum free memory in yarn.

PySpark script : set spark configurations

You can also pass multiple configurations in spark-submit command as well. If you are passing more than one configuration make sure to pass it with separate –conf command.

spark-submit --executor-memory 6G --executor-cores 4 --conf spark.sql.parquet.mergeSchema=true --conf spark.sql.parquet.filterPushdown=true --conf spark.sql.parquet.writeLegacyFormat=false PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &

PySpark script : set master

You can run pyspark script in yarn or in local machine. For this you can use below command:

–master yarn/local/local[*]

spark-submit --master yarn --executor-memory 6G --executor-cores 4 --conf spark.sql.parquet.mergeSchema=true --conf spark.sql.parquet.filterPushdown=true --conf spark.sql.parquet.writeLegacyFormat=false PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &

I generally use yarn to leverage the distributed environment. You can also use local to run it in local machine with single thread. local[*] will run in local machine with maximum possible threads.

PySpark script : mode cluster or client

This configuration decided whether you want your driver to be in master node (if connected via master) or it should be selected dynamically among one of the worker nodes. I generally run in the client mode when I have a bigger and better master node than worker nodes. This means my master node will become driver for my spark applications. If all the machines are frugal and does not have much memory then I go for cluster mode else client mode works best for me.

–deploy-mode client/cluster

spark-submit --master yarn --deploy-mode client --executor-memory 6G --executor-cores 4 --conf spark.sql.parquet.mergeSchema=true --conf spark.sql.parquet.filterPushdown=true --conf spark.sql.parquet.writeLegacyFormat=false PySpark_Script_Template.py > ./PySpark_Script_Template.log 2>&1 &

There are other properties too which you may want to add as per requirement like –jars if calling external jar in pyspark script.

In this post, I wished to cover this much only. Now you know how to write a good pyspark script and also how to run pyspark script in Hadoop environment like EMR.

Leave a Reply

Your email address will not be published. Required fields are marked *