A Newbie’s Chronicles on Distributed Data Processing — S01E03: Grand Finale

Athitya Kumar
Analytics Vidhya
Published in
6 min readSep 6, 2020

--

Disclaimer: If you haven’t read the previous episodes of this season, I’d suggest you go ahead and read S01E01 & S01E02 first to get more context and better reading flow.

Recap: In the previous episodes, we had worked on the POC for our requirement, scaled it till 1TB and made a couple of optimizations. But we still weren’t able to proceed further due to an unknown roadblock.

Roadblock: The DAG-ger that cut deep

The application logs by themselves weren’t very useful to debug as we could just see tasks failing — but not the exact conditions in which they were failing. For example, we saw that our code was still failing due to a memory issue at the JOIN step. Which was weird — as memory shouldn’t have been an issue after the optimization changes we had made in the previous episode.

To figure that out, we turned to our secret arsenal: the Spark Web Console!

Sample Spark Web UI, credits: https://spark.apache.org/docs/3.0.0-preview/web-ui.html

Basically, this is a Dashboard UI provided by Spark, that throws a lot of useful hints such as the stages, tasks, time taken, nodes status, memory metrics, etc in a step-by-step manner for a job on-the-fly right when it’s running.

It came in clutch like a Swiss-knife for debugging, to say the least! As we looked at the current application’s run, we saw that for some reason, one of our earlier pre-processing steps to read incremental data was being called again during the JOIN operation, instead of re-using the one we had already got as output for in the count() operation stages.

The mysterious case of re-reading incremental data

Thankfully, Sanket was familiar with this issue as he had faced this before himself once and he explained it to us in detail.

Basically, Spark keeps versions of the computed values according to Catalyst’s Logical Plan, in a DAG (Directed Acyclic Graph). Now, depending on the memory availability (or requirement) for its next operation, the earliest values stored in the DAG keep getting purged one by one till we’ve made enough memory for the next operation to happen.

Which means, one of the variants of our incremental DataFrame must have gotten purged from the DAG to accommodate the join operations, and it was trying to read them all again and start processing from the beginning. This is why it had hit a memory issue — and the job started failing after a maximum tolerance of 4 retries for the failing task.

And here I was — thinking that if the DataFrame was in memory while successfully executing the count(), it’d still be in memory for the consequent JOIN steps. The very fact that the DataFrame could get purged from the DAG in between these 2 lines of code — felt like being betrayed by my own code. 😢

Trust issues: “Hello there, long time no see.”

Arnab Goswami, explaining the betrayal I felt

Scaling up, up, and above!

Now that we had Sherlocked the DAG issue, the solution was pretty obvious. This was the overhead: reading 2 TB of json data (again) is pretty inefficient as json has no compression associated with it and the nested schema needs to be pre-processed again. So rather, if we store the pre-processed DataFrame into a compressed parquet file or somehow persist it in memory, we would be able to proceed with the actual join() without being redirected to read from the json data again.

Quite intuitively, it wasn’t small enough to be directly broadcasted across the cluster — so we had to go with parquet. We made that change, and voila — we jumped to 2TB & even processing 4TB of data worked like a charm! 🎉

At the strike of 8TB, came one final roadblock — which was relatively easier to both debug and solve IMO. Yet another memory issue, but this time, it failed while saving the final materialized data this time.

Of course, we were dealing with a humongous amount of data in which I/O would also need to be taken care of. Hence, we had to partition our data, based on the scale of incremental data being processed. We added that logic too:

Dynamic partitioning for the win!

After all these accumulated changes, our code now consistently and reliably ran for 8TB, 16TB and even 24TB! 🎉

Real footage of our code’s capability scaling up from 100GB to 24TB!

Permutations & Combinations

Now that our code was pretty much scale-tested, we had to finalize on the optimal combination of infra parameters before wrapping up. We took up the list of all instance types, filtered them out by minimum memory requirement minimum vCPU requirement, and maximum cost per hour.

We had the likes of good-but-costly instances like c5.9xlarge, m5.8xlarge etc as well as inferior-but-cheap instances like r5.2xlarge, m4.4xlarge etc. However, with our use-case requiring a balanced trade-off between I/O (read-write) as well as computation (for operations like JOIN), r5.2xlarge and r5.4xlarge instances stood out as the clear co-winners! 🏅

That is, either x machines of r5.4xlarge, or 2x machines of r5.2xlarge — both performed equally well.

And, scene!

To wrap it up, I would say this has been a great hands-on learning experience! Naturally, most of the roadblocks were faced pretty much due to the sheer scale of data on which we were operating. In the first place, it’s great to have the luxury of having such a huge amount of data to work on; and also building a capability on top of it — has been an icing on the cake! Maybe a roller-coaster ride for sure during the process of building it (thanks for the trust issues, Spark DAG); but definitely an awesome experience in retrospect.

A couple of learnings for folks who’re newly getting into this Spark business:

  • The code you write is “lazy”, and won’t get actually executed until an “action” kicks in
  • Pay good attention to all spark configs/variables, their default values and their limitations
  • Make the most out of the Spark Console UI — It’s often easier to solve a problem, rather than to debug and identify it

Finally, a huge shout-out to folks without whom this capability wouldn’t have been possible:

  • Sanket Sahu — The Spark pro. Not just for helping me out with the debugging, but also for teaching me how to do it myself. And for lowkey tolerating all my doubts & queries.

“Give a man a fish and you feed him for a day; Teach a man to fish and you feed him for a lifetime”

  • Pravin Agrawal, Sarang Zargar, Gautam Gupta & co — The customers. For demanding this capability in the first place. And also for being very involved via good productive discussions, without which such a seamless collaboration wouldn’t have been possible!
  • Ashwini Kumar — The binding link. For handling all aspects of the requirement wholistically and planning out the timeline so well.
  • Sourabh De — The product manager. For confidently picking up this challenging task, with a comfortably planned timeline.
  • Kiran A — The manager/director. For putting this interesting task on my platter!

FIN.

Would like to see more of the Newbie Chronicles series? Ping me, or comment down below — just let me know!

--

--