Some Tips for Spark Code Optimization

Alexandre Dall Alba
4 min readMay 15, 2021

--

Photo by Jamie Street on Unsplash

Hi everybody, today I’m going to talk a little bit about code optimization in spark environment. I don’t intend to go into depth here, but give some tips on problems I’ve faced in projects.

As the saying goes, “life is an eternal learning process” and like so many data engineers, every day we learn something new that we bring to our clients. Thinking this way, I don’t consider myself an expert on the subject, but someone who seeks to improve every day by reading and exchanging ideas.

Why optimization is important

It is very important to understand that a Spark environment runs in a cluster, i.e. several distributed machines and it is in this distributed environment that your applications will run, so, if you are programming in Python for example, you must know Pyspark to use these features.

Furthermore,understanding this will guide you how you will develop your code. The idea is always to create faster code that consumes fewer resources. This directly impacts your client’s time and financial costs.

Since every application is different and has different behaviors, some best practices will help your application make better use of Spark’s resources.

There are a number of important issues to address, in this post I will focus on some of them:

Caching

Whenever you are going to use the same dataframe multiple times, it is ideal to cache it. This way, with the component in memory, its execution will be much faster, decreasing the processing time, consequently, optimizing the cluster resources.

There are basically two ways to put your dataframe in memory:

1. Cache(). Will always store the dataframe in memory

2. Persist(). Keeps the dataframe in memory or on disk, depending on the need, the object size and the memory x available storage ratio.

Tip: Don’t forget to unpersist in order to release memory.

Filtering

Always try to filter your dataframe as much as possible before using operations like groupBy, join, sort, etc. Start with the most selective joins. Also, move joins that increase the number of rows after aggregations if possible.

Partitioning

By default Spark creates 200 partitions during a shuffle operation. Depending on the volume of data and the relationship/operation of some dataframes (join, groupBy, etc)this process can be very time consuming.

I will not treat the shuffle concept in this post, the important thing here is to understand that this process can consume or even exhaust the cluster execution memory. Always avoid wide dependency between parent and child tables, this improves data traffic between nodes, increasing the speed of your process.

One way to deal with this issue is to properly partition your dataframe.

Always partition datasets to use narrow dependency, i.e., make parent and child processes clear and on a 1:1 format. When the dataset is not partitioned or when you use groupbykey, sort and other such functions, you have no idea how the dataset is partitioned, parents may have too many child processes and vice versa, requiring too much network traffic between nodes and this is bad for spark in terms of performance.

There are two strategies for performing dataframe partitioning:

1. Repartition: The repartition does reshuffling, so I can put in a larger number of shares if I want to. i.e., if a file has 100 lines, if you do a repartition 10, it will generate 10 partitions (files) with 10 lines in each file. if you put repartition 1000, it will generate 1000 partitions but only 100 will be populated, the rest will be empty.

2. Coalesce: It does not reshuffle, there is no point in putting a number greater than the number of lines of the dataframe. i.e., if a file has 100 lines, and is set 10, each partition will have 10 files (10x10 = 100), if the coalesce is 4, it will create files (partition) in each containing 25 lines within each file, so the maximum number of coalesce can be 100, where each partition would have 1 file (partition) corresponding to 1 line.

Broadcasting

When you need to perform joins between datasets, one solution to avoid data traffic is to store the smaller tables locally. When you have one dataset which is smaller than other dataset, broadcast joining is highly recommended.

The last important tip is to always use the parquet format to store and process your data.

According to the official page, “Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language…” “Parquet is built to support very efficient compression and encoding schemes…”

https://parquet.apache.org/

So, why use the parquet format ?

· To reduces IO operations

· To fetch specific columns that you need to access.

· To consume less space.

· To support type-specific encoding.

It is true that some of these tips are automatically implemented by the optimizer, but as any relational database, relying on the optimizer alone does not always bring the expected result. The more control you have over your code, more control you’ll have over its behavior.

See you!!!!

Don’t forget to give your 👏 !

--

--

Alexandre Dall Alba
Alexandre Dall Alba

Written by Alexandre Dall Alba

Data Engineer Specialist and data lover

No responses yet