download (3).png

Using Dask for Distributed Systems

As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.

To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries.

The Business Use Case

Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.

You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.

If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.

Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.

Dask for Analysis

For data analysis, the go-to language for most data scientists has been python. The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits. However, these libraries are inherently single system and cannot scale to run on

distributed systems. Dask solves this problem. It supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up. This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.

Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark.

As we started using Dask, we noticed that the concept of ‘pure task graphswas limiting us in some use cases. We started looking into other options – and found Actors. Actors allow us to hold on to state, and reduce overhead on the scheduler.

Dask for Distributed Systems Architecture

Design Details

  • The design is based on Actor Architecture. Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask Driver Client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object.

  • Master Node houses the Flask Web Application, Driver, and Dask Scheduler.

  • The computation is split up and parallelized amongst the Dask Workers running on Worker Nodes. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks.

  • Dask Workers are single threaded. Numpy vector operations are fast on a single thread. Numpy arrays can be memory mapped to disk, and this allows the system to load only the subset of data needed for the computation directly from disk.

  • Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations

  • Correlation calculation: User picks multiple date ranges in the Web Browser and one stock symbol. This request is sent to the Flask Web Application that translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Driver, which sends it to all the Dask Workers

  • Driver assembles the response from the workers before returning it to the Web Browser.

  • Dask scheduler can coordinate about 4000 tasks per second. However operations on Dask actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies

  • Another approach was tried: We used chunk based parallelization on multiple cores using Dask Array. This turned out to be about 10x slower than our current design

Dask for Distributed Systems Results

This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.

If the web application is also python based, Numpy arrays would be a better choice to return from the Flask application than JSON. Serialization of JSON takes more overhead. It is good to have both options though as other systems may need JSON. JSON Serialization would create an overhead of 100 or more ms based on how much data is returned and from how many actors.

Next, we are going to try the same use case in the other popular distributed computing system – Spark. Stay tuned for Part 2

For more information, please contact us.

Recent Posts

See All

We frequently come across situations where we get a data source in JSON that we need to load into Snowflake. While Snowflake supports JSON data sources, there are some nuances of how the load process

I really like the Time Travel feature in Snowflake. It is very handy and absurdly simple. If you have incorrectly updated one of your Snowflake tables and if you know that your table was in the correc