A Newbie’s Chronicles on Distributed Data Processing — S01E01: Pilot

Athitya Kumar
Analytics Vidhya
Published in
5 min readSep 6, 2020

--

As someone who has recently graduated from college and joined the Intuit Data Engineering Org, it’s been a privilege to get hands-on experience with Distributed Systems & relevant Tech Stacks. Though I had previously tried my hand at multiple domains and open-source projects, this domain was one I hadn’t ventured much into.

I recently got to work on a new capability, through which I got to learn a lot of theory as well as practical use-cases, accompanied by intense rants! I’ll try to mix my learnings and experiences in this series of articles — so that it’s hopefully a fun and engaging read.

A bit more context…

Distributed Data Processing, is when you process data on a distributed cluster of connected nodes. Typically, the data is very huge and the distributed architecture ensures that we can correspondingly scale with the increasing amount of data by improving the cluster of nodes — number of nodes, their memories, cores, etc.

Multi-Shadow Clone Jutsu: The Naruto version of the Distributed Architecture. The main Naruto (Name Node) controls the clones, and each clone (Data Node) does the delegated task!

To support such a use-case, we’d need a suitable tech stack and this is one of the well-known stacks:

  • Spark — To submit distributed jobs
  • Scala — To execute spark queries (this could be replaced by one of the other Spark-supported languages like Python, Java or R)
  • Hadoop — To avail utilities for distributed processing, like Hadoop File System (HDFS), Resources allocation and management (YARN), etc
  • Hive — To read, manipulate and store data in Distributed Data Warehouse

There are more components like Kafka, ZooKeeper, etc that come together to complete the picture of Distributed Data Architecture. However, it might get a bit too confusing — so let’s start with these components first.

The Required Capability

Now, let me explain the capability we required and recently added support for. As the Data Engineering Org, we act as a bridge between the source teams that have the ready consumable data, and the analysts who want to run performant queries on such a huge scale of data to derive insights.

In this particular case, we had to support around 16TB of incremental data on a daily basis from the source team, and make it available to the analysts by the next day. To give a better picture, let’s say our source team (say, Nick Fury) has Avengers data — that the analysts (say, SHIELD scientists) would like to derive insights from.

We have some existing data in one of our hive tables, and we keep receiving incremental data every day — we need to merge this incremental data into our hive. Which might look something like this:

Historic Data — “The Original Avengers”. Phew, good times!

And incremental events flowing in, look something like this:

  • UPSERTs — ie, Updates + Inserts
Incremental Updates + Inserts — upto “Avengers: Endgame” :)
  • DELETEs:
Incremental Deletes — also due to “Avengers: Endgame” (sob)

The expected outcome? After merging the incremental data with our existing historic data, we need to store this materialized data back into the same Hive table:

Final Materialized Data, after applying Inserts/Updates/Deletes — ie, after the end of “Avengers: Endgame”. Also becomes the Historic Data for the merging next time!

Of course, the exact order does not matter — as there’s no concept of primary keys in Hive, and data might indeed get shuffled during our processing. However, the exact snapshot of the final data needs to match row by row.

The main constraint of this capability is not the merging itself, but rather making it scale for such a huge amount of data: ~16TB/day!

Partners in crime: Spark ft. Scala

Most of our work relies on Spark & Scala. There are 3 most basic object types that people usually use in any spark processing job:

  • RDD (Resilient Distributed Dataset) — Immutable chunk of data, with no schema attributed to it
  • DataFrame — Immutable chunk of data, but with some schema attributed to it via columns
  • DataSet — Again, an immutable chunk of data, but is usually strongly-typed

Coming from a centralized processing background and being familiar with Python3 Pandas’ DataFrame, I thought I understood how Spark DataFrames work — but I was sooooo wrong! 🙃

Spark DataFrames are very different in their core functionality, as they’re distributed across a cluster of nodes. There’s no concept of “index” as a result, and all Spark interactions are “lazy” by default. That is, they don’t get executed until an “action” kicks in.

For example, let’s take a look at this:

101 ways to incorrectly debug Spark Jobs — welcome to my Ted talk!

If you’re from synchronous programming background like me, you’d make the mistake of trying to interpret the timestamps logged and benchmark the steps based on the difference in times. If it was a Python3 code with Pandas DataFrame, you’d be right. But nope, doesn’t work the same way here.

If you try running something similar in your spark-shell, you’d rather look at the difference b/w the timestamps and infer (incorrectly) that:

  • Reading existing data from Hive is almost instantaneous
  • Reading incremental data from json takes almost no time
  • The union is pretty fast as well! 😮
The first 3 steps executed pretty quickly no?
  • But. But. Why. Why is the count() operation taking a lot of time? It’s supposed to take much lesser time if it’s using a cluster of nodes!
Re-evaluating life decisions, while the count() is still executing

Yup, that’s true — it’s just that we’ve not understood the mysterious workings of Spark yet. So what happens is, Spark prepares a logical execution plan based on our code. All the “lazy” parts keep stacking up from our code, and get executed on our cluster only on encountering an “action”.

In our example, the action is the count() operation. Behind the scenes, all the steps like reading hive data, reading incremental data, performing a join/union; were all lazy in nature and just kept stacking up. It’s only on encountering the count() operation that all these steps actually get executed based on Catalyst’s logical execution plan.

That’s why we misinterpreted the results — it was indeed all the above steps being executed to give the final timestamp difference; and not just the count() operation. If you’re a newbie, this is your cue to be mindblown. 😉

That moment of painfully-slow realization

With these minute details in mind, let’s proceed further to the next episode S01E02 to see how we built our POC and made optimizations.

--

--