As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.
To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries. To illustrate this, we have selected the following use case from financial services.
The Business Use Case
Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.
You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.
If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.
Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.
And in the Red corner … introducing .. Dask!!!
For data analysis, the go-to tool for most data scientists has been python. The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits. Dask supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up. This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.
Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark. https://dask.org/
As we started using Dask, we noticed one the concept of ‘pure task graphs’ was limiting us in some use cases. Hence, we started looking into other options – and came up with this:
Actors allow us to hold on to state, and reduce overhead on the scheduler.
Dask Design and Implementation
A few design points :
- The design is based on Actor Architecture. Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object
- Master Node houses the web application, Flask, Dask Client and Scheduler
- The computation is split up and parallelized. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks
- Dask Workers are single threaded. Numpy vector operations are optimized for a single thread
- Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations
- Correlation calculation: User picks multiple date ranges in the UI. Dask Client translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Flask HTTP Server, which sends it to all the Dask Workers
- Dask Client assembles the response from the workers
- Another approach was tried: We used parallel algorithms using Dask Array and XArray that use multiple cores using chunks. This turned out to be about 8x slower than our current design. Dask Actors Architecture uses low latency direct connections to Dask workers and trades resilience for performance
This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.
We tried both JSON data nd Numpy arrays in memory mapped mode. Numpy array is 3 times faster than JSON. JSON took around 320 ms, whereas Numpy is responding in 100ms for the largest dataset in our system.
Next, we are going to try the same use case in the other distributed computing system that is all the rage these days – Spark. Stay tuned for Part 2