I was recently working on a task where I have to read more than a Terabyte of data spread across multiple parquet files. Also some filters were applied on that data to get the required result set. I did a small test where I ran the same spark read command with filter condition multiple times. Only parameter that changed was executor memory & cores per executor in each run. In most of the cases, you may want to keep spark.dynamic.allocation as enabled unless you know your data very well. So in this test I have kept it enabled as well.
The 2 parameters of interest are:
- spark.executor.memory
- spark.executor.cores
Details of Spark Environment:
I am using spark 2.4.7 and node which comes with 4 vcpu and 32 GB memory.
Details of Input Data:
- Total Objects: 14841
- Total Size: 1.3 TiB
- Total Rows : 57,137,782,485
- Total Columns : 21
So you can see the test data was actually good in volume i.e. more than a TB of data with more than 57 Bn rows spread across 14.8K parquet snappy file-parts.
Details of Test Code:
df_transaction = spark.read.parquet("s3://s3bucket/prefix/transaction/") df_transaction.filter("transaction_id in (1,2,3,4,5)").show(truncate=False)
I ran the code in PySpark with different parameters value for the above mentioned 2 configurations and noted the timings.
Details of Code used to set configuration and run test
from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.sql import DataFrame from datetime import datetime #start time x1=datetime.now() #set the value of 2 parameters conf = SparkConf().setAll([('spark.executor.memory', '5g'),('spark.executor.cores', '4')]) #create a spark session spark = SparkSession.builder.config(conf=conf).appName("5GB_4Cores").getOrCreate() # read the data and apply filter df_transaction = spark.read.parquet("s3://s3bucket/prefix/transaction/") df_transaction.filter("transaction_id in (1,2,3,4,5)").show(truncate=False) # end time x2=datetime.now() print ("Time Taken for 5GB-4cores/exec is : "+str(x2-x1)) #stop the spark session spark.stop()
Details of Test Case:
- executor memory : 5 GB , cores per executor : 4
- executor memory : 5 GB , cores per executor : 3
- executor memory : 5 GB , cores per executor : 2
- executor memory : 5 GB , cores per executor : 1
- executor memory : 10 GB , cores per executor : 4
- executor memory : 10 GB , cores per executor : 3
- executor memory : 10 GB , cores per executor : 2
- executor memory : 10 GB , cores per executor : 1
- executor memory : 15 GB , cores per executor : 4
- executor memory : 15 GB , cores per executor : 3
- executor memory : 15 GB , cores per executor : 2
- executor memory : 15 GB , cores per executor : 1
- executor memory : 20 GB , cores per executor : 4
- executor memory : 20 GB , cores per executor : 3
- executor memory : 20 GB , cores per executor : 2
- executor memory : 20 GB , cores per executor : 1
Details of the Timings & Result:
Executor Memory (GB) | 4 cores | 3 cores | 2 cores | 1 core |
---|---|---|---|---|
5 | 0:09:08.560633 | 0:08:45.955584 | 0:08:26.130291 | 0:08:19.960625 |
10 | 0:08:29.504125 | 0:08:00.799622 | 0:08:03.351138 | 0:10:36.386021 |
15 | 0:07:17.523632 | 0:07:52.740675 | 0:10:05.890819 | 0:18:46.811154 |
20 | 0:07:07.169893 | 0:07:45.439492 | 0:09:45.570046 | 0:18:33.362918 |
The best timing is for : executor memory 20 GB and 4 cores per executor.
* The cluster was set to auto-scale. When first few iterations were running it scaled up. Hence you can see that 5GB -1 core is better than 4 cores. But that’s not the case if I keep nodes at same count. This I did after first round of run.
Observations:
- Increasing executor memory definitely improved performance of the task.
- Single core performance was significantly bad when compare to 3 or 4 cores with same executor memory.
- I did not go beyond 20GB executor memory because the time improvement I can see from 15 GB to 20 GB is not much i.e. lesser than 10 seconds with 3/4 cores per executor.
- For this and other similar task , 20 GB – 4 cores per executor is the optimum combination.
- You don’t want to keep executor memory to a very high number else it may impact overall parallelism and concurrent jobs that can run at cluster level. Also some memory may go under-utilised.
Let me know if you can derive any more conclusions from this experiment.
I understood it.. thanks
hi Nitin ,
I just don’t understand this part .
df_transaction.filter(“transaction_id in (1,2,3,4,5)”).show(truncate=False)
what it means “transaction_id in (1,2,3,4,5)”).show(truncate=False)
what is the the meaning for 1,2,3,4,5
I understand that it is in the list.. but .. I don’t understand. . what it refers to?