Cassandra Consistency and Failover – Part 2

In Part 1 of this article, we saw how to configure Apache Cassandra across multiple Datacenters (DCs) as well as the clients for Cassandra solution availability. This ensures that if sufficient nodes in one DC were unavailable (or the entire DC went down), clients would seamlessly failover to the other DC. When the failed DC came back up, Cassandra would stream back the missed transactions and everything would be back to normal.

In this Part 2 of the article, we will expand our outlook to consider the whole solution – not just the database to understand how failures affect the Cassandra solution availability.

Cassandra backed application

Cassandra is a highly scalable and fast database typically used for storing high volume data such as those that arise from IoT sensors, mobile games or apps or web applications with a large user base. With the DataStax Enterprise (DSE) version that bundles Spark and Solr, there are many other interesting use cases as well. But for the purposes of this article, we will keep our focus on a simple web application; the architecture for which might look like something that is shown below:


Web app Cassandra architecture

Users from across the country access a set of web servers through load balancers (not shown). The request is processed by an application server tier which accesses the backed Cassandra database to fetch and store data. Scalability is built into this architecture – as our app gains new users, we can seamlessly scale all tiers to handle the load. 

Failover and Availability

As with most applications, let us assume our entire solution is deployed on cloud infrastructure like AWS. How do we ensure that our application is highly available even if an entire Availability Zone (AZ) goes down? What if an entire region (say US-West) is not accessible due to a network glitch? 

In Part 1 of this article, I showed you how to setup and configure Cassandra to be able to survive a DC failure. Our architecture using multiple DCs for Cassandra could then look like the image below.

Cassandra Solution Availability

The Cassandra database is replicated across DCs so that if one goes down, the application can failover to the other DC automatically.  This assumes that the application itself is highly available. But this assumption is likely flawed as if a DC goes down, most likely all tiers deployed on that DC will also go down.

But what good is it if Cassandra can failover to a different DC but the web and app tiers cannot? 

Cassandra Solution Availability 

We need to ensure Cassandra solution availability i.e. the entire application is highly available while ensuring that users from a particular region access only the resources within that region for maximum performance. The correct solution then, is spread the web and app server layers across the DCs. This is shown in the diagram below. Note that this still assumes that load on US-West is lower so the Cassandra US-West DC has lower number of nodes. 



Cassandra Solution Availability

Each DC’s stack only accesses the Cassandra nodes within its own DC using LOCAL_QUORUM. If a DC failure occurs, DNS switches users to the web servers in the other DC. In this case, these users from the remote DC will see longer latencies but the application will be available along with all their data. Note that Cassandra will still replicate data across the DCs ensuring consistency but each app server will only access the local DC.


For true high availability, it is not sufficient to look at one piece of the solution in isolation. The entire Cassandra solution availability must be taken care of by the architecture.

Multi-AZ Cassandra

Cassandra Failover and Consistency

Apache Cassandra is the always-on NoSQL database that is highly scalable and available. That sounds magical and is in fact true – IF you understand how to configure it correctly ! This article describes an issue we ran into when setting up a multi-DC configuration for Cassandra failover and how it was resolved.

Cassandra Configuration

Single Region, Dual AZ

The diagram below shows the initial system configuration for a cluster deployed across two availability zones (AZ) in the US-East region of AWS.

Multi-AZ Cassandra

We  configured Cassandra to use multiple DataCenters with each AZ being in one DC. The replication factor was set to 3. Cassandra ensures that at least one replica of each partition will reside across the two data centers.

Thus the dual AZs provide some protection against failures –  if one AZ went down, the database would still be up. 

But what happens if the entire US-East Region became unavailable? An always-on database that is accessed from anywhere in the country should be able to survive a full region failure (or most likely network failure to the region).

Cassandra Failover using Multi-Region, Multi-AZ

We modified the above configuration for protection against region failure but without adding any additional nodes. In future, we can enhance it with additional nodes as required. The beauty of Cassandra is that you do not have to over-provision infrastructure – nodes can be added dynamically without stopping operations (albeit carefully!)

Shown below is the multi-region configuration. The definition of the DC was changed. We still use 2 DCs but now all 4 nodes in US-East are in a single DC while the 2 nodes in US-West are in the second DC.

Client Configuration

Contact Points

In a Multi-DC configuration, it is important to ensure that clients are connecting to the “local” DC by default. You do this by using a load balancing policy that is both token-aware and DC-aware. For example, if a web application is running in US-East, you want it to access the Cassandra nodes in the US-East DC. This will ensure that it doesn’t suffer from latency by going across the country in its default mode. You can specify the “contact points”  for the client to connect to as well – it is a good idea to specify at least one node in each DC. An example to do this programmatically is shown below:

Cluster cluster = Cluster.builder()
    .addContactPoint(new InetSocketAddress("", 9042))
    .addContactPoint(new InetSocketAddress("", 9042))
.withLoadBalancingPolicy(new TokenAwarePolicy(
new DCAwareRoundRobinPolicy(localDC,
usedHostsPerRemoteDC, allowRemoteDCsForLocalConsistencyLevel

These settings can also be configured in the application.conf file. See the Java Driver documentation for details.

Consistency Level

The default Consistency Level in Cassandra is ONE i.e. when a query reads or writes a partition, it is sufficient for one of the replicas to acknowledge the query. This level is insufficient for most non-trivial applications; especially not for applications deploying across multiple data centers that require high availability.

In general, a consistency level of QUORUM applied to both reads and writes ensures that a Cassandra database is consistent. In our example, this implies that at least 2 replicas needs to acknowledge completion of the transaction. 

When a cluster is deployed across multiple regions, QUORUM could be problematic as it requires the coordinator to constantly send requests across regions that can cause significant latencies. To avoid performance issues, it is recommended to use LOCAL_QUORUM i.e. a quorum of nodes within the “local” DC is used to maintain consistency. Therefore, we configured the consistency level to be LOCAL_QUORUM.

Cassandra Failover Test

Now that the cluster and clients were configured, the next step was to do a Cassandra failover test. That is, we want to ensure that if all the US-East nodes failed, Cassandra would automatically failover to the US-West DC and clients would be blissfully unaware !

All the nodes in US-East were brought down; running a query after this failed with the following error:

Not enough replicas available for query at consistency LOCAL_QUORUM.

The LOCAL_QUORUM consistency level while performant, does not allow for the query to switch DCs. If the requirement is to have a transparent failover while maintaining a consistency across 2 nodes, then a better strategy is to use a consistency level of 2. With a level of 2, if the client cannot contact any of the local hosts, it will automatically switch to the remote DC.


Cassandra is an always-on database that can provide tremendous scalability. However, ensuring that it remains always-on requires that it be configured correctly while also maintaining its high performance.


Data Aces has experience developing and managing Apache Cassandra and DataStax Enterprise deployments. Please contact us for more information.

Naren Gokul

Using Dask for Distributed Systems

As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.

To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries. 

The Business Use Case

Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.

Asset Correlation

You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.

If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.

Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.

Dask for Analysis

For data analysis, the go-to language for most data scientists has been python.   The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits. However, these libraries are inherently single system and cannot scale to run on distributed systems.  DaskDask solves this problem. It supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up.  This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.


Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark. 

As we started using Dask, we noticed that the concept of ‘pure task graphs’ was limiting us in some use cases. We started looking into other options – and found Actors.  Actors allow us to hold on to state, and reduce overhead on the scheduler.

Dask for Distributed Systems Architecture 

Dask for Distributed Systems Architecture

Design Details

  • The design is based on Actor Architecture.  Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask Driver Client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object.
  • Master Node houses the Flask Web Application, Driver, and Dask Scheduler.
  • The computation is split up and parallelized amongst the Dask Workers running on Worker Nodes. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks.
  • Dask Workers are single threaded. Numpy vector operations are fast on a single thread.  Numpy arrays can be memory mapped to disk, and this allows the system to load only the subset of data needed for the computation directly from disk.  
  • Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations
  • Correlation calculation: User picks multiple date ranges in the Web Browser and one stock symbol. This request is sent to the Flask Web Application that translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Driver, which sends it to all the Dask Workers
  • Driver assembles the response from the workers before returning it to the Web Browser.
  • Dask scheduler can coordinate about 4000 tasks per second. However operations on Dask actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies
  • Another approach was tried: We used chunk based parallelization on multiple cores using Dask Array. This turned out to be about 10x slower than our current design

Dask for Distributed Systems Results

This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.

If the web application is also python based, Numpy arrays would be a better choice to return from the Flask application than JSON. Serialization of JSON takes more overhead. It is good to have both options though as other systems may need JSON. JSON Serialization would create an overhead of 100 or more ms based on how much data is returned and from how many actors.

Next, we are going to try the same use case in the other popular distributed computing system – Spark. Stay tuned for Part 2

Naren Gokul

Cqlsh with SSL: Securing Cassandra Cqlsh

At Data-Aces, we provide managed services running Cassandra for our customers. With more and more attention being given to data security these days, end-to-end encryption of ALL communication is becoming a mandatory requirement. This includes traffic between the Cassandra nodes or from client to the Cassandra cluster. This means, cqlsh with SSL is also a requirement. The default config file (Cassandra.yaml) is configured for normal, unencrypted communication between clients and Cassandra on port 9042.

It is fairly easy to setup client to cluster encryption by creating certificates and adding them to Java keystore/truststore. This process is documented here:

As described in the link above, this involves changing ‘enabled’ to ‘true’ in the following section of the Cassandra.yaml file:
enabled: true
# If enabled and optional is set to true encrypted and unencrypted connections are handled.
optional: false
This will ensure that ALL client connections will be encrypted even if still using port 9042.  Note that after the above changes, you will no longer be able to connect to the database using the CQL command line tool cqlsh!
The Cassandra documentation provides basic information as to how to setup cqlsh with SSL.

However, following these steps, I couldn’t get cqlsh with SSL to work in our cluster. This is because, it refers to certfile without giving us an important caveat: cqlsh is a python application and does not use the Java keystore/truststore setup for normal Cassandra and Java clients.

Instead, I had to convert the certificate in ‘keystore.jks’ to PKCS12 format. Luckily, keytool has an inbuilt facility to convert to PKCS12 format.

keytool -importkeystore -srckeystore keystore -destkeystore pkcs12ks -deststoretype PKCS12 -srcstorepass keystorepassword -deststorepass keystorepassword

This will create a keystore file named ‘pkcs12ks’ in the current directory.

The next step is to create a PEM file from this keystore as follows:

openssl pkcs12 -in pkcs12ks -nokeys -out cqlsh.pem -passin pass:keystorepassword 

Setting up .cqlshrc file

Now that we have our PEM file, we can point to it in the cqlshrc file

factory = cqlshlib.ssl.ssl_transport_factory 
certfile = /home/user/cqlsh.pem validate = false

Note that setting ‘validate=true’ implies that the certificate needs to be validated at connection time.

We are now all set to run cqlsh with SSL using the option –ssl as below:

$ cqlsh <host_ip> --ssl