Spark is distributed data processing engine which relies a lot on memory available for computation. Also if you have worked on spark, then you must have faced job/task/stage failures due to memory issues. Hence making memory management as one of the key techniques for efficient Spark environment. In this post, we will see how Spark UI can help in understanding your jobs better thereby making performance tuning a process rather than going for trial & error method on adhoc basis.
I know many new spark developers who completely ignore Spark UI and they think they can understand everything the job throws at console. My first recommendation is start using Spark UI and keep it open all the times even when you are not facing any memory issues. As spark developer, you must be comfortable with Spark UI as it gives so much information about your job which you may not know or never bother to know.
Let’s quickly look at some of the tabs you can see in Spark UI and what all information it can give to developers.
1. Jobs
This is the first tab and it gives lot of summarised information on jobs executed in the environment. Some information you can get here is number of Jobs completed, duration for each Job , Job consisted of how many stages & tasks. If you need info on executors, then just click on “Event Timeline” to see graphical view of executors addition and removal. Considering that going forward , we may want to use dynamic allocation for executors , this can tell you a lot about how many executors the system allocated for your job.
In the above screenshot, we can see 2 Executors (Executor 3 & 4) created for this job. Also it took 46 seconds to complete the job with 6 stages and 623 Tasks.
2. Stages
You can get so much information about each stage that was created for the job on this page. Let’s look at the screenshot below:
You can get following information from this page:
1) Number of Completed Stages. In this case it is 6.
2) Duration for each stage. Also you can see that 3 stages ran in parallel as it starts at same time.
3) Table Scan Volume:.In this job, spark read data from S3 location. So for S3, default block-size is 64 MB & 128 MB depending on the spark environment. In this case, it is 64 MB. So if you divide file size by 64 MB, you can see the logical partitions is same as tasks created in the Table Scan stage. Example : 547.8/64 = ~9 blocks. Hence 9 tasks created in this stage
4) Shuffle Read/Write. One can also check how much volume of data was Read or Write during redistribution of data or in other words Shuffle phase. This is very important information as it allows one to identify proper number of shuffle partitions required for this job. Default value of shuffle partitions (spark.sql.shuffle.partitions) is 200 which is clearly way too much for the job in this example. You can change number of shuffle partitions anytime in the job.
3. Environment
Check for all the default values and custom values set for various configuration parameters that your job used for execution. This gives lot of information and you should be well aware of few key parameters related with executors, drivers, memory management, shuffle partitions etc.
4. SQL
It gives you the detailed DAG (Direct Acyclic Graph) for the query. This directly refer to the execution plan optimizer prepared to complete this job. At every step you can see how many rows are impacted and what operation/transformation was applied to RDD. Let’s look at the screenshot below:
You can classify any SQL tuning activity into broadly 3 categories : efficient File/Table Scan , Optimising Joins & well planned other operations like Aggregation, Filters etc.
Efficient file/table scan
- you can try use compression to reduce the size.
- convert to another optimised formats like parquet.
- split single file to multiple parts however avoid small file parts.
- increase block-size to create less tasks
Optimising Joins
- Broadcast Join is the recommended one whenever possible. Reducing the scan size can directly help in accommodating more tables to be broadcasted. Also default value (spark.sql.autoBroadcastJoinThreshold) is 10MB which can be increased to include more bigger tables. However avoid broadcasting big tables as it may result in errors. Sometimes merge join is better and infallible.
- Most of the tuning techniques applicable to other RDBMS are also true in Spark like partition pruning, using buckets , avoid operations on joining columns etc.
- Shuffle Strategy we have seen earlier in the post that default value of 200 partitions was way too much for the shuffle data volume. So keeping optimised value for shuffle partitions can make the most significant improvement in query performance. Single partition cannot be more than 2 GB else it will result in fatal error.
Number of Cores & Executors
Setting right value of cores & executors is never ending discussion. You can find lot of posts on web which will tell the maths to come up with right number of executors & memory. However practically this may not help a lot. Sometimes you may need a “fat” executor while most of the time “thin” executor shall work perfectly fine. In short, one configuration may not fit all situations.So you may have to do some trial & error method here. I will prefer to keep dynamic allocation enabled and cores per executors to 4 and then play with executor memory for your job.
Let’s look at the screenshot below which shows change in DAG after doing 2 changes – converting file type to parquet from csv and increasing broadcast join threshold value.
Hope this post helps you in your Spark tuning activity. Feel free to share more tuning exercise which you generally do in Spark with us.