The Distributed Systems Challenge : Part 1 – Dask

As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.

To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries. To illustrate this, we have selected the following use case from financial services:

The Business Use Case

Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.

You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.

If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.

Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.

And in the Red corner … introducing .. Dask!!!

For data analysis, the go-to tool for most data scientists has been python.   The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits.  Dask supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up.  This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.

Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark. https://dask.org/

As we started using Dask, we noticed one the concept of ‘pure task graphs’ was limiting us in some use cases. Hence, we started looking into other options – and came up with this:
http://distributed.dask.org/en/latest/actors.html
Actors allow us to hold on to state, and reduce overhead on the scheduler.

Dask Design and Implementation

A few design points :

  • The design is based on Actor Architecture.  Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask Driver Client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object.
  • Master Node houses the Flask Web Application, Driver, and Dask Scheduler
  • The computation is split up and parallelized amongst the Dask Workers running on Worker Nodes. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks
  • Dask Workers are single threaded. Numpy vector operations are fast on a single thread.  Numpy arrays can be memory mapped to disk, and this allows the system to load only the subset of data needed for the computation directly from disk.  
  • Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations
  • Correlation calculation: User picks multiple date ranges in the Web Browser and one stock symbol. This request is sent to the Flask Web Application that translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Driver, which sends it to all the Dask Workers
  • Driver assembles the response from the workers before returning it to the Web Browser.
  • Dask scheduler can coordinate about 4000 tasks per second. However operations on Dask actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies
  • Another approach was tried: We used chunk based parallelization on multiple cores using Dask Array. This turned out to be about 10x slower than our current design

Results:

This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.

If the web application is also python based, Numpy arrays would be a better choice to return from the Flask application than JSON. Serialization of JSON takes more overhead. It is good to have both options though as other systems may need JSON. JSON Serialization would create an overhead of 100 or more ms based on how much data is returned and from how many actors.

Next, we are going to try the same use case in the other distributed computing system that is all the rage these days – Spark. Stay tuned for Part 2

Securing Cassandra CQLSH

At Data-Aces, we provide managed services running Cassandra for our customers. With more and more attention being given to data security these days, end-to-end encryption of ALL communication is becoming a mandatory requirement. This includes traffic between the Cassandra nodes or from client to the Cassandra cluster. The default config file (Cassandra.yaml) is configured for normal, unencrypted communication between clients and Cassandra on port 9042.

It is fairly easy to setup client to cluster encryption by creating certificates and adding them to Java keystore/truststore. This process is documented here: https://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/secureSSLClientToNode.html

As described in the link above, this involves changing ‘enabled’ to ‘true’ in the following section of the Cassandra.yaml file:
client_encryption_options:
enabled: true
# If enabled and optional is set to true encrypted and unencrypted connections are handled.
optional: false
 
This will ensure that ALL client connections will be encrypted even if still using port 9042.  Note that after the above changes, you will no longer be able to connect to the database using the CQL command line tool cqlsh!
The Cassandra documentation provides basic information as to how to setup cqlsh to use SSL. https://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/secureCqlshSSL.html
 

However, following these steps above as is did not work in our cluster. This is because, it refers to certfile without giving us an important caveat: cqlsh is a python application and does not use the Java keystore/truststore setup for normal Cassandra and Java clients.

Instead, the certificate in ‘keystore.jks’ needs to be converted to PKCS12 format. Luckily, keytool has an inbuilt facility to convert to PKCS12 format.

keytool -importkeystore -srckeystore keystore -destkeystore pkcs12ks -deststoretype PKCS12 -srcstorepass keystorepassword -deststorepass keystorepassword

This will create a keystore file named ‘pkcs12ks’ in the current directory.

The next step is to create a PEM file from this keystore as follows

openssl pkcs12 -in pkcs12ks -nokeys -out cqlsh.pem -passin pass:keystorepassword 

Setting up .cqlshrc file

Now that we have our PEM file, we can point to it in the cqlshrc file

[connection]  
factory = cqlshlib.ssl.ssl_transport_factory 
[ssl] 
certfile = /home/user/cqlsh.pem validate = false

Note that setting ‘validate=true’ implies that the certificate needs to be validated at connection time.

Now, you can run cqlsh as follows

$ cqlsh <host_ip> --ssl

TimescaleDB for Time-series Data

Are you an analyst working on Time Series Data? Are you intrigued by which databases to use? Considering the fact that even traditional databases can also serve the purpose of storing and retrieving Time Series data, why is there a need for a specialized Time Series database?

We will be examining most of these questions in this blog.  We will also answer them, based on our understanding of our use-cases. While doing so, we have compared two fledgling Time Series databases, OpenTSDB and TimeScaleDB on their key characteristics, technical features and architecture.

Time-series Data :

Basic definition of Time Series data points to two key characteristics –

  • A discrete time-data sequence taken at successive equally spaced points in time-indexed manner.
  • Data that collectively represents how a system, process, or behavior changes over time.

Components of Time-series Data :

  • Seasonal effect (Seasonal Variation or Seasonal Fluctuations)
  • Other Cyclic Changes (Cyclical Variation or Cyclic Fluctuations)
  • Trend (Secular Trend or Long Term Variation)
  • Other Irregular Variation (Irregular Fluctuations)

Weather monitoring and Stock Market Analysis data are example of Time Series data that are easy to relate in our daily life. At the back-end, most of these data are generated as a part of various IoT platforms or applications. Thus, the speed at which the data are generated becomes a  key complexity. Thus database to which the data is ingested should have the ability to record and retrieve the data coming from thousands of IoT devices every microsecond to ensure timely monitoring.

In this digital environment, a great deal of data is gathered by various devices and applications. For example, current location, browsing data, personal fitness/metrics trackers etc. In this kind of scenario, it is really important to store the data for the overall population in an effective time-series database for future predictions/forecast.

fig1: Example of Time series data points

TimescaleDB Overview :

TimescaleDB is the first time-series database specifically designed for scale, ease of use, and complex queries. While TimescaleDB is an extension of PostgreSQL, it provides the following:

  • Automatic partitioning across time and space (partitioning key)
  • Full SQL support
  • Easy to use; like a relational database
  • Fast and parallel ingestion

Source:https://blog.timescale.com/when-boring-is-awesome-building-a-scalable-time-series-database-on-postgresql-2900ea453ee2/

fig 2: PostgreSQL and TimescaleDB – A Comparison of Insert Rates

As we can see in the above figure (fig 2), the insert rates go down as the Dataset size increases in PostgreSQL. While in TimeScaleDB, steady insertion rate is maintained irrespective of the size of the Dataset. Thus, the performance of the application that sits on top of TimescaleDB improves greatly.

TimescaleDB executes the query on Hypertable comprising of many Chunks partitioned by time and space which really look like regular tables.

Time-series data is largely immutable. Writes primarily occur as new appends to recent time intervals, not as updates to existing rows. Both read and writes have a natural partitioning across both time and space.

-TimeScale DB Developers

Data Handling in TimescaleDB :

HyperTable Outlook: Abstracts the table as the Hypertable composed of many right-sized chunks partitioned as per Time and Space.

Optimized Query Execution: During Query Execution it checks whether only the necessary chunks are used for retrieval of data. This can be done by aggressive constraint exclusion.

Data Model: TimeScale DB follows the Wide table Model which helps in the estimate, measure, or note the similarity between data.

Benchmarking TimeScaleDB :

Our use case required running complex aggregation queries while also supporting simultaneous ingestion of incoming time-series data. To ensure that the chosen platform can handle this load, we did some benchmarking.

An ingestion application kept pumping in data into the database. We ran three types of queries that access a varying number of rows. Every query was run several times to ensure stable results. Execution time over a table varied only slightly. This is because the query hits only the selected number of chunks satisfying the filtering conditions. The results are shown in the table below. We will describe the individual queries in a future blog post.

Results of TimescaleDB benchmark:

Records Retrieved

Query 1 Execution time(seconds)

Query 2 Execution Time(seconds)

Query 3 Execution time(seconds)

1098212 19.35 20.60 18.29
1431323 27.21 25.55 27.42
1790467 34.49 35.19 34.32
2152595 41.94 43.10 41.52
2498503 46.94 48.06 46.54

TimeScaleDB(v0.4.2) Vs OpenTSDB(v2.3.0)

Conclusion :

Based on our use case for handling and storing time-series data for IoT implementation in a Big Data environment, TimescaleDB compares better on features such as partitioning, data retention, access methods, and compatibility to scripting for automation. Moreover, ease of access and simple retrieval of data for further application makes it more convenient when compared to the other time-series databases that we have used.

Setting up GPU support for Tensorflow on Windows 10

TensorFlow supports running computations on a variety of types of devices, including CPU and GPU. Originally used for display functions, GPUs were developed to scale up parallel computations using thousands of cores.  This happens to match very well with how we execute the training of Machine Learning algorithms in Deep Learning Models. In python, the base ‘tensorflow’ package is for CPU only; a separate package, tensorflow-gpu is used when we plan to run on GPUs.

So why did we write this blog? When we started running tensorflow, our developers wanted to do testing on smaller datasets locally on their own machines, before pushing code to dev/test clusters with many GPUs. Most developer laptops/desktops these days come with NVidia Quadro or Geforce discrete GPUs. While there are a lot of posts that come up on a Google search, we encountered some issues that we did not see addressed on these posts. In this blog, we will outline the issues that we ran into during installation of tensorflow-gpu on Windows 10, and the solutions to each of them.

Pre-requisites for TensorFlow – https://www.tensorflow.org/install/gpu  

For using Tensorflow-gpu, we need to install the following software, in this order:
  1. Visual Studio Express or Full (Visual Studio is a pre-requisite for CUDA)
  2. CUDA Toolkit for Windows 10
  3. cuDNN .  https://developer.nvidia.com/cudnn (Membership required)
  4. Tensorflow-gpu (python package)

We followed this post as a guide: https://towardsdatascience.com/installing-tensorflow-with-cuda-cudnn-and-gpu-support-on-windows-10-60693e46e781

Step1: Drivers and installation

After Visual studio, we first installed Cuda toolkit for Window – cuda_10.0.130_411.31_win10.exe. Next we need cudnn. First, we installed – cudnn-10.0-windows10-x64-v7.4.2.24.zip. We used this code to test for all the components being installed successfully: https://github.com/NarenData/TensorflowWindows/blob/master/TensorflowValidate.py

We first got an error on cudnn not being available.
cuDNN failed to initialize tensorflow.
Turns out, the instructions were not every explicit, we need to set the path for ‘\bin’ folder of cuDNN in “Environmental Variables” list in Control Panel.  

Once the install folder \bin was added to the path, the cudnn dll error went away

Step2: Resolve DLL Load error

However, we kept getting the error message below about Tensorflow. We tried installing the older version, cudnn-10.0-windows10-x64-v7.3.0.29.zip, but the same error message persisted.

from tensorflow.python.pywrap_tensorflow_internal import *

ImportError: DLL load failed: The specified module could not be found.
ImportError: No module named ‘_pywrap_tensorflow_internal’

We first spent some time to check if the Cuda toolkit and CUDNN install was correct, and if the DLLs were loading properly. To verify this, we used process monitor as suggested here: https://stackoverflow.com/questions/43553149/on-windows-running-import-tensorflow-generates-no-module-named-pywrap-tenso

This gave us clarity that the DLLs are loading properly, and the problem is elsewhere:

Since the DLLs are loading properly, suspicion now falls on the tensorflow-gpu package in Pycharm. We were running 1.12.0, but a new release candidate was available, 1.13.0rc. Upgrading to the latest release candidate 1.13.0rc0 from 1.12.0 resolved the issue.

After the update, the problem we finally got success from TensorflowValidate.py:

After this, we were able to run a couple of sample TensorFlow programs and make sure that it was running successfully on the GPU.   

Step 3: Multiple GPU test

Many machines come with multiple GPUs, and TensorFlow can also be run on multiple GPUs. We test for this with  https://github.com/NarenData/TensorflowWindows/blob/master/multigputest.py

The following errors can come up while testing with multigputest.py.

Some good news and bad news here. Looks like our CPU is the newer version, supporting AVX2, which tensorflow doesn’t support yet, it is still on AVX. It is finding the Quadro GPU and its specs – which is great.

We can also see that, despite running the internal display with two 32 inch 4K monitors, 3.3Gb out of 4Gb of VRAM is free for use by tensorflow.

Step 4: Solve CUDA Driver error

These two links suggest that we need a newer driver:
https://github.com/tensorflow/tensorflow/issues/21832

https://docs.nvidia.com/cuda/cuda-toolkit-release-notes/index.html

For windows, >= 411.31 is suggested, but we have April 2018 version installed on the machine by default:

Strange thing is that the toolkit (Installed in Step1) is supposed to install the updated driver, but this doesn’t seem to work properly.

Once the new driver is installed, we are seeing the right output. The code doesn’t work fully – it is not able to access the other GPU (Intel 630 Integrated graphics) – but we don’t really need this for running our Tensorflow code. It probably would have worked properly if the machine had 2 Nvidia GPUs.