It's quite intuitive that businesses often deal with various data segments that necessitate distinct processing approaches. Take our recent client’s use case: a TV Ads measurement company divided user data into two distinct categories—users exposed to ads and those not exposed to ads. Specific transformations were exclusively applied to the former group. Following this segregated processing, the data was recombined for future analyses, such as conversion analysis.
This business use case embodies a common pattern in Spark—the utilization of the union operator. This pattern involves breaking down a single data frame into multiple subsets (split), applying diverse transformations to each, and ultimately consolidating them into a unified dataset (union). However, this 'split-union' process can introduce inefficiencies stemming from redundant executions.
It's important to note that ASCENDING recommends the adoption of Amazon EKS Kubernetes for enhanced Spark cluster management. The implementation of Amazon EKS empowers the client's Data Engineer Team to streamline their focus towards Spark optimizations. Furthermore, ASCENDING harnessed the capabilities of JupyterHub on Amazon EKS Kubernetes, significantly enhancing the efficiency of iterative development and debugging—a tool that plays a pivotal role in optimizing Spark job performance. This blog delves into the strategies employed by ASCENDING to address this common performance bottleneck effectively.
Union ‘Trap’ in Spark - Causing Long Job Execution
The union operator is one of the set operators to merge two input data frames into one. Union is a convenient operation in Apache Spark for combining rows with the same order of columns.
The ways of using the union operation in Spark are often discussed widely. However, a hidden fact that has been less discussed is the performance caveat associated with the union operator. If we didn’t understand the caveat of the union operator in Spark, we might fall into the trap of doubling the execution time to get the result.
To illustrate, let's turn to a recent project undertaken by ASCENDING. In this project, our client tasked us with the filtering and processing of user data into two distinct categories: users exposed to ads (highlighted in green) and users not exposed to ads (highlighted in blue). The transformations applied to the unexposed and exposed user data are indicated by the circled regions.
During the execution of the 'UNION' operation, both union datasets are treated as distinct branches, with some operations (enclosed within both green and blue squares) executed twice. In our specific scenario, two joins were performed redundantly. These duplicated operations are why sometimes union could take an unexpectedly long time.
Root Cause of Performance Bottleneck
Spark data frame leverages Catalyst optimizer, which takes the data frame code you had, then performs code analysis, logical optimization, physical planning, and code generation. Catalyst tries to create an optimal plan that executes your Spark job efficiently.
In recent years, Spark has extensively accomplished a lot of optimization on Catalyst to improve performance on Spark join operations. The join operation has more scenarios to use than the union operation, leading to less effort put into the union operation.
If users don’t use union on entirely different data sources, union operators will face a potential performance bottleneck — Catalyst isn’t “smart” to identify the shared data frames to reuse.
In this case, Spark will take each data frame as separate branches, then perform everything from the root multiple times.
ASCENDING Approach
Here's a real-world example from one of our clients that vividly illustrates the challenge. Initially, there were no issues with counting two separate datasets, filteredUnexposedDF
and filteredExposedDF
before performing the union operation. However, the real trouble emerged when we attempted to count the resulting dataset, unexposedAndExposed
, after merging these two. This unexpected Out-of-Memory (OOM) error occurred due to the exact reason we discussed earlier.
Want to know how to troubleshoot complex Spark jobs interactively, we highly recommend reading our blog Interactive Spark Mode.
So, how did we solve this issue?
Step 1: Cache Dataset before Joining
First, we harnessed the power of dataset reuse by caching or persisting them in either memory or disk storage. It is straightforward to use the cache()
or persist()
methods on the datasets before performing union.
Step 2: Break Lineage
Second, we provided a subtle hint to Catalyst, enabling it to leverage the pre-cached joined dataframe. By doing so, Catalyst gains insight into a shortcut to retrieve the data, bypassing the need to re-fetch it from the original source.
To accomplish this, we introduced a method known breakLineage()
to truncate the lengthy physical plan and compel Catalyst to utilize the cached data. Instead of regenerating the data from scratch, our filteredUnexposedDF
now scans an existing RDD based on this significantly abbreviated physical plan. This intelligent approach not only enhances performance but also optimizes resource utilization.
The two aforementioned optimizations not only resolved the client's OOM (Out of Memory) issue but also delivered a significant 45% reduction in union operation time for this specific job. Given the extensive reliance on the union operator across all of the client's Spark jobs, these methods have proven effective in alleviating similar bottlenecks in numerous other jobs.
Conclusion:
To put it simply, the performance enhancement achieved through optimizing the union operation alone has the potential to yield substantial benefits. It can translate into a remarkable 20% reduction in overall processing time and a 15% decrease in total project costs for our valued client.
Celeste Shao
Data Engineer @ASCENDING
Ryo Hang
Solution Architect @ASCENDING