Cassandra Failover

Apache Cassandra is the always-on NoSQL database that is highly scalable and available. That sounds magical and is in fact true – IF you understand how to configure it correctly ! This article describes an issue we ran into when setting up a multi-DC configuration for Cassandra failover and how it was resolved.

Cassandra Configuration

Single Region, Dual AZ

The diagram below shows the initial system configuration for a cluster deployed across two availability zones (AZ) in the US-East region of AWS.

Multi-AZ Cassandra

Cassandra was configured in a multi-DC mode with each AZ being in one DC. The replication factor was set to 3. Cassandra ensures that at least one replica of each partition will reside across the two data centers.

Thus the dual AZs provide some protection against failures – since even if one AZ went down, the database would still be up. 

But what happens if the entire US-East Region became unavailable? An always-on database that is accessed from anywhere in the country should be able to survive a full region failure (or most likely network failure to the region).

Cassandra Failover using Multi-Region, Multi-AZ

We modified the above configuration for protection against region failure but without adding any additional nodes. In future, it can be enhanced with additional nodes. The beauty of Cassandra is that you do not have to over-provision infrastructure – nodes can be added dynamically without stopping operations (albeit carefully!)

Shown below is the multi-region configuration. The definition of the DC was changed. We still use 2 DCs but now all 4 nodes in US-East are in a single DC while the 2 nodes in US-West are in the second DC.

Client Configuration

Contact Points

In a Multi-DC configuration, it is important to ensure that clients are connecting to the “local” DC by default. You do this by using a load balancing policy that is both token-aware and DC-aware. For example, if a web application is running in US-East, you want it to access the Cassandra nodes in the US-East DC. This will ensure that it doesn’t suffer from latency by going across the country in its default mode. You can specify the “contact points”  for the client to connect to as well – it is a good idea to specify at least one node in each DC. An example to do this programmatically is shown below:

Cluster cluster = Cluster.builder()
    .addContactPoint(new InetSocketAddress("1.2.3.4", 9042))
    .addContactPoint(new InetSocketAddress("5.6.7.8", 9042))
.withLoadBalancingPolicy(new TokenAwarePolicy(
new DCAwareRoundRobinPolicy(localDC,
usedHostsPerRemoteDC, allowRemoteDCsForLocalConsistencyLevel
)))
    .build();

These settings can also be configured in the application.conf file. See the Java Driver documentation for details.

Consistency Level

The default Consistency Level in Cassandra is ONE i.e. when a query reads or writes a partition, it is sufficient for one of the replicas to acknowledge the query. This level is insufficient for most non-trivial applications; especially not for applications deploying across multiple data centers that require high availability.

In general, a consistency level of QUORUM applied to both reads and writes ensures that a Cassandra database is consistent. In our example, this implies that at least 2 replicas needs to acknowledge completion of the transaction. 

When a cluster is deployed across multiple regions, QUORUM could be problematic as it requires the coordinator to constantly send requests across regions that can cause significant latencies. To avoid performance issues, it is recommended to use LOCAL_QUORUM i.e. a quorum of nodes within the “local” DC is used to maintain consistency. Therefore, we configured the consistency level to be LOCAL_QUORUM.

Cassandra Failover Test

Now that the cluster and clients were configured, the next step was to do a Cassandra failover test. That is, we want to ensure that if all the US-East nodes failed, Cassandra would automatically failover to the US-West DC and clients would be blissfully unaware !

All the nodes in US-East were brought down; running a query after this failed with the following error:

Not enough replicas available for query at consistency LOCAL_QUORUM.

The LOCAL_QUORUM consistency level while performant, does not allow for the query to switch DCs. If the requirement is to have a transparent failover while maintaining a consistency across 2 nodes, then a better strategy is to use a consistency level of 2. With a level of 2, if the client cannot contact any of the local hosts, it will automatically switch to the remote DC.

Conclusion

Cassandra is an always-on database that can provide tremendous scalability. However, ensuring that it remains always-on requires that it be configured correctly while also maintaining its high performance.

 

Data Aces has experience developing and managing Apache Cassandra and DataStax Enterprise deployments. Please contact us for more information.

The Distributed Systems Challenge : Part 1 – Dask

As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.

To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries. To illustrate this, we have selected the following use case from financial services:

The Business Use Case

Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.

You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.

If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.

Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.

And in the Red corner … introducing .. Dask!!!

For data analysis, the go-to tool for most data scientists has been python.   The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits.  Dask supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up.  This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.

Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark. https://dask.org/

As we started using Dask, we noticed one the concept of ‘pure task graphs’ was limiting us in some use cases. Hence, we started looking into other options – and came up with this:
http://distributed.dask.org/en/latest/actors.html
Actors allow us to hold on to state, and reduce overhead on the scheduler.

Dask Design and Implementation

A few design points :

  • The design is based on Actor Architecture.  Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask Driver Client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object.
  • Master Node houses the Flask Web Application, Driver, and Dask Scheduler
  • The computation is split up and parallelized amongst the Dask Workers running on Worker Nodes. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks
  • Dask Workers are single threaded. Numpy vector operations are fast on a single thread.  Numpy arrays can be memory mapped to disk, and this allows the system to load only the subset of data needed for the computation directly from disk.  
  • Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations
  • Correlation calculation: User picks multiple date ranges in the Web Browser and one stock symbol. This request is sent to the Flask Web Application that translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Driver, which sends it to all the Dask Workers
  • Driver assembles the response from the workers before returning it to the Web Browser.
  • Dask scheduler can coordinate about 4000 tasks per second. However operations on Dask actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies
  • Another approach was tried: We used chunk based parallelization on multiple cores using Dask Array. This turned out to be about 10x slower than our current design

Results:

This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.

If the web application is also python based, Numpy arrays would be a better choice to return from the Flask application than JSON. Serialization of JSON takes more overhead. It is good to have both options though as other systems may need JSON. JSON Serialization would create an overhead of 100 or more ms based on how much data is returned and from how many actors.

Next, we are going to try the same use case in the other distributed computing system that is all the rage these days – Spark. Stay tuned for Part 2