A Newbie’s Chronicles on Distributed Data Processing — S01E02: Optimizations Galore!

Athitya Kumar
Analytics Vidhya
Published in
5 min readSep 6, 2020

--

Disclaimer: If you haven’t read the first episode (S01E01) of this season, please feel free to go ahead and read that first to get more context and better reading flow.

Previously on this series, we had introduced our required capability, and some general funda to keep in mind when working with Spark. Alright, let’s continue from where we left-off in the previous episode.

Starting with a POC

Now that we know the required capability and basic things to keep in mind with distributed data domain, we’re ready to get started with a simple POC that does the merging properly.

Of course, the exact work involved a couple of extra steps like serializing/de-serializing from our actual data sources, and pre-processing it — however, as they’re a bit too specific, I’ll skip them and cut right to the chase. To give the crux of it, let’s work from the Avengers example we’re very familiar with:

The basic POC of materializing all incremental events ft. Avengers
Flowchart for the same POC: A second chance, for those who skipped past the code

Works? Great! 🎉

I was able to get this simple POC working on 100 GB of incremental data, with a 2-node cluster, and it took around 55 minutes to complete. Yay! ⭐️

Load Testing

Of course, we didn’t leverage Spark & Scala, only to run it on a 2-node cluster and process just 100 GB of incremental data. Indeed we had to scale more and more, try out different configurations, and see which one would yield the most optimal trade-off between resource utilization, time being taken to complete the job and ultimately, cost.

While load-testing, there were different parameters we had to keep tracking in each run to ensure we could come up with an optimal set of parameter values.

  1. Cluster configuration
    • Cluster Size
    • Driver Memory
    • Driver Cores
    • Executor Memory
    • Executor Cores
    • Number of executors
    • Instance types
  2. Related to data (skew?)
    • Number of unique events
    • Distribution of events by event type (INSERT/UPDATE/DELETE)
    • Total amount of incremental data processed
  3. Output Metrics
    • Time taken to complete job
    • Cost (depends on the cluster configurations)

We started with processing 100 GB of incremental data with 2 cluster nodes, and slowly started increasing them proportionally to 200 GB, 500 GB, 1 TB.

The load testing went well till 1 TB of incremental data, with a 60-node cluster. However, we couldn’t proceed to the next level — 2 TB. After multiple trials of tweaking the parameters, it was clear that the issue of scalability must lie in the code rather than the config.

Optimizations, Optimizations & Optimizations

It’s usually tough to find out scopes of optimization in Spark-esque code, especially if you’re new to it — however, the trick is to pay attention to the important aspects like Memory Optimizations & Minimizing Data Shuffling.

So, we called upon one of the experts in our team to suggest scopes in improvement. Enter, Sanket Sahu. Within an hour of setting the context and running him through the POC code, he had a couple of suggestions that apply to Spark jobs in general:

  • Use of better memory configs:

The memory configs usually follow a bell-curve. If you provide fewer executors and more memory per executor, data shuffling is minimal; but the garbage collection becomes an overhead and parallelism reduces. On the other hand, if you provide more executors and less memory, though you utilise parallelism, there would be a lot of data shuffling as each executor has lesser memory.

It’s about finding a balance/trade-off between these 2 cases!

  • Minimize data shuffling:

Almost all of the heavy-lifting takes place in the executors, and very minimal auxiliary steps run on the driver. Due to this distributed nature, there’s a lot of scope for data shuffling: ie, chunks of data being sent to-and-fro within the cluster.

Wait, hold up. I got lost. Driver, Executor & Shuffling?

Oukay. Sorry, I got ahead of myself. In a nutshell:

Driver: Process that mainly handles reading our code, splitting it into stages/tasks, splitting and delegating them onto executors, etc. Somewhat similar to a dedicated Scrum Master.

Executor: As the name suggests, executors are processes that run in the Data/Worker nodes — they work on the tasks delegated to them by the Driver. Which is probably analogous to the folks picking up tickets in a sprint.

The Driver-Executor Architecture ft. Cluster Manager

Data shuffling: Now we know that chunks of DataFrames are stored in the Distributed File System across the cluster. So, any time we use an action like df1.join(df2), chunks of df1 and df2 have to be joined first and then all these resultant chunks have to be logically merged to give the final result. To facilitate this, data needs to flow across the cluster.

Sooooo why is Data Shuffling bad again?

Hmm, with the usual scale of data we have to process with Spark jobs, unnecessary data shuffling would result in unnecessary network calls within the cluster to make the data flow. As this may indeed end affect the performance, it’s usually desirable to have the least amount of data shuffle.

For example, let’s consider the JOIN operations we were executing:

The JOIN operations from our POC

To remove existing DELETE & UPSERT entries from our existing data, there’s no need to join entire DataFrames. It’s very much enough if we just join the column(s) that’s considered as the primary key(s) — and then we can just retain those non-matching primary keys from our existing data.

In similar lines, the optimized changes would look something like this:

Sending only primary keys for “left_anti” join

However, even after implementing all these suggestions, our job was still failing at 2 TB of incremental data. We had hit a roadblock, and we weren’t exactly sure why these optimizations weren’t enough. We had to dig deeper and see where/why tasks were failing.

To understand how we debugged this unknown roadblock and solved it, let’s proceed to the season finale: S01E03!

--

--