PySpark

Spark Case Study – optimise executor memory and cores per executor

I was recently working on a task where I have to read more than a Terabyte of data spread across multiple parquet files. Also some filters were applied on that data to get the required result set. I did a small test where I ran the same spark read command with filter condition multiple times. Only parameter that changed was executor memory & cores per executor in each run. In most of the cases, you may want to keep spark.dynamic.allocation as enabled unless you know your data very well. So in this test I have kept it enabled as well.

The 2 parameters of interest are:

  • spark.executor.memory
  • spark.executor.cores

Details of Spark Environment:

I am using spark 2.4.7 and node which comes with 4 vcpu and 32 GB memory.

Details of Input Data:

  • Total Objects: 14841
  • Total Size: 1.3 TiB
  • Total Rows : 57,137,782,485
  • Total Columns : 21

So you can see the test data was actually good in volume i.e. more than a TB of data with more than 57 Bn rows spread across 14.8K parquet snappy file-parts.

Details of Test Code:

df_transaction = spark.read.parquet("s3://s3bucket/prefix/transaction/")
df_transaction.filter("transaction_id in (1,2,3,4,5)").show(truncate=False)

I ran the code in PySpark with different parameters value for the above mentioned 2 configurations and noted the timings.

Details of Code used to set configuration and run test

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from datetime import datetime

#start time
x1=datetime.now()
#set the value of 2 parameters
conf = SparkConf().setAll([('spark.executor.memory', '5g'),('spark.executor.cores', '4')])
#create a spark session
spark = SparkSession.builder.config(conf=conf).appName("5GB_4Cores").getOrCreate()
# read the data and apply filter
df_transaction = spark.read.parquet("s3://s3bucket/prefix/transaction/")
df_transaction.filter("transaction_id in (1,2,3,4,5)").show(truncate=False)
# end time
x2=datetime.now()
print ("Time Taken for 5GB-4cores/exec is : "+str(x2-x1))
#stop the spark session
spark.stop()

Details of Test Case:

  • executor memory : 5 GB , cores per executor : 4
  • executor memory : 5 GB , cores per executor : 3
  • executor memory : 5 GB , cores per executor : 2
  • executor memory : 5 GB , cores per executor : 1
  • executor memory : 10 GB , cores per executor : 4
  • executor memory : 10 GB , cores per executor : 3
  • executor memory : 10 GB , cores per executor : 2
  • executor memory : 10 GB , cores per executor : 1
  • executor memory : 15 GB , cores per executor : 4
  • executor memory : 15 GB , cores per executor : 3
  • executor memory : 15 GB , cores per executor : 2
  • executor memory : 15 GB , cores per executor : 1
  • executor memory : 20 GB , cores per executor : 4
  • executor memory : 20 GB , cores per executor : 3
  • executor memory : 20 GB , cores per executor : 2
  • executor memory : 20 GB , cores per executor : 1

Details of the Timings & Result:

Executor Memory (GB)4 cores3 cores2 cores1 core
50:09:08.5606330:08:45.9555840:08:26.1302910:08:19.960625
100:08:29.5041250:08:00.7996220:08:03.3511380:10:36.386021
150:07:17.5236320:07:52.7406750:10:05.8908190:18:46.811154
200:07:07.1698930:07:45.4394920:09:45.5700460:18:33.362918

The best timing is for : executor memory 20 GB and 4 cores per executor.

* The cluster was set to auto-scale. When first few iterations were running it scaled up. Hence you can see that 5GB -1 core is better than 4 cores. But that’s not the case if I keep nodes at same count. This I did after first round of run.

Observations:

  1. Increasing executor memory definitely improved performance of the task.
  2. Single core performance was significantly bad when compare to 3 or 4 cores with same executor memory.
  3. I did not go beyond 20GB executor memory because the time improvement I can see from 15 GB to 20 GB is not much i.e. lesser than 10 seconds with 3/4 cores per executor.
  4. For this and other similar task , 20 GB – 4 cores per executor is the optimum combination.
  5. You don’t want to keep executor memory to a very high number else it may impact overall parallelism and concurrent jobs that can run at cluster level. Also some memory may go under-utilised.

Let me know if you can derive any more conclusions from this experiment.

2 thoughts on “Spark Case Study – optimise executor memory and cores per executor”

  1. hi Nitin ,

    I just don’t understand this part .
    df_transaction.filter(“transaction_id in (1,2,3,4,5)”).show(truncate=False)

    what it means “transaction_id in (1,2,3,4,5)”).show(truncate=False)

    what is the the meaning for 1,2,3,4,5
    I understand that it is in the list.. but .. I don’t understand. . what it refers to?

Leave a Reply