Cassandra Consistency and Failover – Part 2

In Part 1 of this article, we saw how to configure Apache Cassandra across multiple Datacenters (DCs) as well as the clients for Cassandra solution availability. This ensures that if sufficient nodes in one DC were unavailable (or the entire DC went down), clients would seamlessly failover to the other DC. When the failed DC came back up, Cassandra would stream back the missed transactions and everything would be back to normal.

In this Part 2 of the article, we will expand our outlook to consider the whole solution – not just the database to understand how failures affect the Cassandra solution availability.

Cassandra backed application

Cassandra is a highly scalable and fast database typically used for storing high volume data such as those that arise from IoT sensors, mobile games or apps or web applications with a large user base. With the DataStax Enterprise (DSE) version that bundles Spark and Solr, there are many other interesting use cases as well. But for the purposes of this article, we will keep our focus on a simple web application; the architecture for which might look like something that is shown below:

 

Web app Cassandra architecture

Users from across the country access a set of web servers through load balancers (not shown). The request is processed by an application server tier which accesses the backed Cassandra database to fetch and store data. Scalability is built into this architecture – as our app gains new users, we can seamlessly scale all tiers to handle the load. 

Failover and Availability

As with most applications, let us assume our entire solution is deployed on cloud infrastructure like AWS. How do we ensure that our application is highly available even if an entire Availability Zone (AZ) goes down? What if an entire region (say US-West) is not accessible due to a network glitch? 

In Part 1 of this article, I showed you how to setup and configure Cassandra to be able to survive a DC failure. Our architecture using multiple DCs for Cassandra could then look like the image below.

Cassandra Solution Availability

The Cassandra database is replicated across DCs so that if one goes down, the application can failover to the other DC automatically.  This assumes that the application itself is highly available. But this assumption is likely flawed as if a DC goes down, most likely all tiers deployed on that DC will also go down.

But what good is it if Cassandra can failover to a different DC but the web and app tiers cannot? 

Cassandra Solution Availability 

We need to ensure Cassandra solution availability i.e. the entire application is highly available while ensuring that users from a particular region access only the resources within that region for maximum performance. The correct solution then, is spread the web and app server layers across the DCs. This is shown in the diagram below. Note that this still assumes that load on US-West is lower so the Cassandra US-West DC has lower number of nodes. 

 

 

Cassandra Solution Availability

Each DC’s stack only accesses the Cassandra nodes within its own DC using LOCAL_QUORUM. If a DC failure occurs, DNS switches users to the web servers in the other DC. In this case, these users from the remote DC will see longer latencies but the application will be available along with all their data. Note that Cassandra will still replicate data across the DCs ensuring consistency but each app server will only access the local DC.

Conclusion

For true high availability, it is not sufficient to look at one piece of the solution in isolation. The entire Cassandra solution availability must be taken care of by the architecture.

Snowflake – Loading JSON to table in 3 steps

We frequently come across situations where we get a data source in JSON that we need to load into Snowflake. While Snowflake supports JSON data sources, there are some nuances of how the load process works that can take a while to understand and work around. The result is that we ended up spending more time than we initially planned for the tasks. Hopefully this post helps you get there faster!

Recently, I was tasked with loading some JSON from the Open FDA API (https://open.fda.gov/data/downloads/) into a single table. The data required was broken into 9 compressed JSON files…each of which being 600+ MB unzipped. All in all, there were 23,158,988 lines of JSON that needed to be loaded to the table…that’s a lot of data. I didn’t anticipate it at the time, but it’s no wonder that the size of these files would end up being the first issue that I encountered.

I had never handled loading JSON into a Snowflake table before, but after only a quick search within the Snowflake docs, I had found my solution. A script, specifically for loading JSON data into relational tables. Bingo. This solution involved the use of the PARSE_JSON function. PARSE_JSON interprets an input string as a JSON document, allowing you to access key/value pairs using dot notation, and load the values directly into the table.

So, I created my staging environment, staged my files and prepared them to be loaded to the table. Only there was a problem…the file size was too large to be accepted, and I was met with an error on load. Further research led me to believe that file sizes of between 100-250 MB of compressed size were perfectly suitable to be loaded. So then why wasn’t this working? The compressed file size of my JSON was less than 150 MB.

No big deal. As we all know, googling and reading docs is part of the job. So, I dug a little deeper into the Snowflake docs. On further investigation, it seemed that the problem was in the staging of my JSON. JSON is stored in the staging area as a string, and in one single column. Perhaps storing all of that data in a single column was the problem. I needed another method.

After another go at digging through documentation, I found the FLATTEN function. Flattening a column produces a lateral view of the data. Because JSON is staged in a single column, I thought that maybe this would be a great solution. It would allow me to individually load each value directly into the corresponding table columns, one row at a time. My syntax and logic were just sound, and this approach would work just fine under normal circumstances. But again, the command failed as I received my second load error due to excessive file size.

I proceeded to spend the next several hours scouring the Snowflake docs and community, looking for a solution. I tried combining the two previous methods, a method that involved transforming the JSON on load, and countless other functions. I even tried bypassing the CLI and loading the file directly through the Snowflake GUI. Yet here I was, getting the same file size error over and over.

What I ended up learned in the long run, is that although a single file of such size can be staged for upload, it cannot actually be copied into the table. It turns out that the max file size for copying JSON to a table is capped at just over 16.5 MB.

Given the amount of data I had to upload, I was looking at having to break the files up into ~ 375 different files in order to meet the 16.5 MB limit. I now had a different problem on my hands.

I opened the first of the 9 JSON files, which took several seconds to load in my IDE due to its sheer size. I scrolled to the bottom of the file…only 1.65 million lines, no big deal. If I started now, I could manually break this thing down into appropriately sized chunks at least within a year or two, right? Clearly, I was going to need an alternate method to accomplish this.

I began looking for a JSON splitting tool online and came across a couple of different options. Some were browser-based applications, while others were open-sourced projects. I trialed a couple of different applications, however none of them appeared to be made to handle JSON. They would all split the file, yes, but without any concern for where the split was happening. The integrity of my JSON was being compromised and the output was invalid JSON.

Finally, I stumbled across a python based JSON splitter, which ended up being the key that unlocked this entire process for me. And with that, let’s get to the stuff that you came here to read.

Step 1: Splitting Your Files

For this task, I was fortunate enough to stumble on JSON Splitter, an open source tool for breaking up large JSON files in to smaller, user specified sized files. This program was built in Python, which I didn’t have.

First, if you don’t already have it, download Python from https://www.python.org/. Once installed, you can ensure that the installation was successful by running the command py – version in your terminal.

JSON Splitter can be downloaded at https://github.com/jhsu98/json-splitter. Once installed, navigate to the directory that the splitter was installed in. For ease, I would suggest that the target file(s) that need to be split be stored within the same directory, or a new folder within that directory. Note: Your JSON file must be an array of objects, or multi-dimensional array to work!

Next, ensure you’re in the same directory in which the program is installed, and run the command py splitter.py.

If successful, you will be prompted to select the target JSON file that needs to be split. Again, the file must be an array of objects or multi-dimensional. The JSON data that I used was sampled from an API endpoint, which required me to manually alter it to fit the splitter criteria for use. Once the program accepts your file, you will be prompted for the file size that you wish your file to split into. I chose 15 MB just to ensure that I wouldn’t exceed the snowflake file size limit during the upload process. If successful, the newly created files will be created within the same directory as the splitter application.

Step 2: Stage Your Files

Once you’ve got all of your files split, it is time to stage them to be loaded into your snowflake table. To do this, you will need to log into your Snowflake environment using the SnowSQL CLI.

First, we must create our file format. Note that I am naming my format ‘json_file_format’ in this example.

Create file format json_file_format
type=’json’
allow_duplicate=false
strip_null_values=false
strip_outer_array=true
ignore_utf8_errors=false;

Next, we need to create our staging environment using the file format that we just set up. I am naming my staging environment ‘json_stage’, and referencing the previously created file format.

Create stage if not exists json_stage file_format = json_file_format;

Now we have our staging environment set up to receive our split files. Depending on the size of your original JSON file, it is likely that you now have more than a handful of files that need to be staged. So what’s the best way to do this?

Simple. All we have to do is specify that we want to load ALL of files. The command to put files into your staging area is a little different depending on whether you’re a Mac or PC user.

Linux/Mac

Put file:///directory

Windows

Put file://C:\directory

Again, we need to specify that we want to upload ALL files within the directory. This is the perfect scenario for us to use the * character to load all the files.

File://C:\directory\my_split_file_* @json_stage;

This tells the program to load all files starting with ‘my_split_file_’ and any following characters, into our staging environment ‘@json_stage’.

If this works correctly, you should see a display of all the uploaded files. To check the files that were loaded, you can run the command list @json_stage; to get a complete list of files currently staged.

Stage 3: Load Your Data

Finally, it is time to load our data into the table. Again, this assumes that you already have a table set up and ready to go. I used fields of the VARCHAR data type in my example.

Copy into (col1, col2, col3)
From (select
Parse_json($1):json_key_1,
Parse_json($1):nested.json_key_1,
Parse_json($1):json_key_3[0],
From @json_stage t)
On_error=’continue’;

Let’s go over what is happening here.

First, we are using the ‘copy into’ clause to tell the program what fields from our table we want to load our data into. Next, we use a sub-query to get our data. The JSON must be parsed in order to be read, so we will use the ‘parse_json’ function. The staging area stores our JSON in one column, which we reference using the ‘$1’ syntax. We can now reference the values we want by using dot notation. Lastly, we reference where this JSON data is coming from…our JSON staging area.

That’s it! Our data can be queried using normal SQL syntax.

Time Travel on Snowflake: UNDROP to the Rescue

I really like the Time Travel feature in Snowflake. It is very handy and absurdly simple. If you have incorrectly updated one of your Snowflake tables and if you know that your table was in the correct state at 9 am on Sun, Oct 13, 2019, then you can simply run the query:

SELECT * FROM TABLE_NAME AT(TIMESTAMP => 'Sun, 13 Oct 2019 09:00:00 -0800'::TIMESTAMP);

where -0800 refers to your time zone in relation to UTC.

However, this query will not work if you dropped or replaced the table altogether; it only works if you modified or deleted the rows in the table.

Time Travel for Dropped Tables

For dropped or replaced tables, Time Travel provides another way: the UNDROP command. But, hang on, even this command will not work directly, as a recent experience with our ETL Pipeline illustrates.

Say your table is called LOCATIONS, and you ran a CREATE OR REPLACE LOCATIONS query that ended up dropping the correct table and replaced it with an empty table. This happened to us because of an incorrect incoming file in the ETL pipeline that slipped through a validation step. Say, moreover, that the process ran twice before you realized that your LOCATIONS table is empty!

The way the UNDROP command works, it first checks the existing list of tables in the current schema. If the command does not find the table, it will look for the most recent dropped or replaced table with the same name in History.

Since you already have a LOCATIONS table in your schema, the query UNDROP TABLE LOCATIONS, will return an error:

Time travel data is not available for table LOCATIONS.

To allow the UNDROP command to find the replaced LOCATIONS in History, you will need to rename the current table:

ALTER TABLE LOCATIONS RENAME AS LOCATIONS_EMPTY;

Now run the UNDROP TABLE DIM_LOCATIONS query, which will retrieve the most recent table named DIM_LOCATIONS from History. Since you ran the incorrect query twice, even this table will turn out to be empty – Oops!!

Travel Back … Again

That’s right. Rinse and Repeat. Once again, change the name of this table:

ALTER TABLE LOCATIONS RENAME AS LOCATIONS_EMPTY_2;

Run the UNDROP TABLE LOCATIONS a second time. You will now get the correct table. The two REPLACE commands had to be undone with two UNDROP commands. Simple, right? This is explained with more examples in the Snowflake documentation.

You can now relax and tell your boss that everything’s fine!

For another example of the brilliance of Time Travel, see how we recovered an entire database.

Snowflake Data Retention Time

Remember that Time Travel can only retrieve your data if it is within the retention time that has been configured for your Snowflake instance or the particular database object. Time Travel comes with several more commands and options such as CLONE and STATEMENT, which you can check out in the documentation.

Multi-AZ Cassandra

Cassandra Failover and Consistency

Apache Cassandra is the always-on NoSQL database that is highly scalable and available. That sounds magical and is in fact true – IF you understand how to configure it correctly ! This article describes an issue we ran into when setting up a multi-DC configuration for Cassandra failover and how it was resolved.

Cassandra Configuration

Single Region, Dual AZ

The diagram below shows the initial system configuration for a cluster deployed across two availability zones (AZ) in the US-East region of AWS.

Multi-AZ Cassandra

We  configured Cassandra to use multiple DataCenters with each AZ being in one DC. The replication factor was set to 3. Cassandra ensures that at least one replica of each partition will reside across the two data centers.

Thus the dual AZs provide some protection against failures –  if one AZ went down, the database would still be up. 

But what happens if the entire US-East Region became unavailable? An always-on database that is accessed from anywhere in the country should be able to survive a full region failure (or most likely network failure to the region).

Cassandra Failover using Multi-Region, Multi-AZ

We modified the above configuration for protection against region failure but without adding any additional nodes. In future, we can enhance it with additional nodes as required. The beauty of Cassandra is that you do not have to over-provision infrastructure – nodes can be added dynamically without stopping operations (albeit carefully!)

Shown below is the multi-region configuration. The definition of the DC was changed. We still use 2 DCs but now all 4 nodes in US-East are in a single DC while the 2 nodes in US-West are in the second DC.

Client Configuration

Contact Points

In a Multi-DC configuration, it is important to ensure that clients are connecting to the “local” DC by default. You do this by using a load balancing policy that is both token-aware and DC-aware. For example, if a web application is running in US-East, you want it to access the Cassandra nodes in the US-East DC. This will ensure that it doesn’t suffer from latency by going across the country in its default mode. You can specify the “contact points”  for the client to connect to as well – it is a good idea to specify at least one node in each DC. An example to do this programmatically is shown below:

Cluster cluster = Cluster.builder()
    .addContactPoint(new InetSocketAddress("1.2.3.4", 9042))
    .addContactPoint(new InetSocketAddress("5.6.7.8", 9042))
.withLoadBalancingPolicy(new TokenAwarePolicy(
new DCAwareRoundRobinPolicy(localDC,
usedHostsPerRemoteDC, allowRemoteDCsForLocalConsistencyLevel
)))
    .build();

These settings can also be configured in the application.conf file. See the Java Driver documentation for details.

Consistency Level

The default Consistency Level in Cassandra is ONE i.e. when a query reads or writes a partition, it is sufficient for one of the replicas to acknowledge the query. This level is insufficient for most non-trivial applications; especially not for applications deploying across multiple data centers that require high availability.

In general, a consistency level of QUORUM applied to both reads and writes ensures that a Cassandra database is consistent. In our example, this implies that at least 2 replicas needs to acknowledge completion of the transaction. 

When a cluster is deployed across multiple regions, QUORUM could be problematic as it requires the coordinator to constantly send requests across regions that can cause significant latencies. To avoid performance issues, it is recommended to use LOCAL_QUORUM i.e. a quorum of nodes within the “local” DC is used to maintain consistency. Therefore, we configured the consistency level to be LOCAL_QUORUM.

Cassandra Failover Test

Now that the cluster and clients were configured, the next step was to do a Cassandra failover test. That is, we want to ensure that if all the US-East nodes failed, Cassandra would automatically failover to the US-West DC and clients would be blissfully unaware !

All the nodes in US-East were brought down; running a query after this failed with the following error:

Not enough replicas available for query at consistency LOCAL_QUORUM.

The LOCAL_QUORUM consistency level while performant, does not allow for the query to switch DCs. If the requirement is to have a transparent failover while maintaining a consistency across 2 nodes, then a better strategy is to use a consistency level of 2. With a level of 2, if the client cannot contact any of the local hosts, it will automatically switch to the remote DC.

Conclusion

Cassandra is an always-on database that can provide tremendous scalability. However, ensuring that it remains always-on requires that it be configured correctly while also maintaining its high performance.

 

Data Aces has experience developing and managing Apache Cassandra and DataStax Enterprise deployments. Please contact us for more information.

Snowflake Database Recovery

If you google the word ‘snowflake‘ for the first time, you will likely get references to the ‘it insult‘ word,  Taiwanese dessert, a crystal of snow, etc.  Once you find that Snowflake is the hot new data warehouse in the cloud, your subsequent searches are likely to be more productive in terms of digging up documentation. However, complex topics such as “Snowflake database recovery” have likely not made it into Stackoverflow or your favorite internet group yet. 

In this article, I will tell you a story about a day I sweated and thought I would lose my job and how Snowflake saved it!

Cloud Big Data Architecture

The project I am working on involved building a data warehouse from scratch on AWS. It is quite complex, with myriads of data sources, ETL pipelines, integration with multiple systems, etc. A high level architecture is shown in the diagram below:


Figure 1: Cloud Big Data Architecture

Cloud Data Warehouse

We chose Snowflake to serve as the Cloud Data Warehouse. It is a scalable, performant and resilient data warehouse explicitly built for the cloud. It is available on AWS and Azure with GCP in the works.

Setting up and using Snowflake is a breeze; I won’t go into details as you can read about it in their documentation.

Cloning in Snowflake

One of the cool features of Snowflake is the ability to clone an object. The object can be a table, an entire schema or even an entire database! The best part is that this is a super fast operation as no physical copying of data is involved (until either database starts to change).

As part of our regular operations, we maintain multiple databases and schemas – for different aspects of development, testing, etc. At some specific points in time, we clone objects from one database or schema to another. 

The Sandbox Clone

As users were on-boarded, we created a sandbox database as a playground for them to start playing with the data. As part of my duties, I re-clone this database at the start of every week. This way, users have a fairly stable environment to work with while also periodically getting the latest data. 

Creating this clone database is super simple using a SQL statement like this:

CREATE OR REPLACE DATABASE sandbox CLONE devdb;

This statement will delete the old database named sandbox and create a new version of it as a clone of the current state of the devdb database. How simple is that? 

Well, too simple to cause mistakes … as I found out!

One Monday morning, this is what I typed into the Worksheet and without thinking, hit RUN:

CREATE OR REPLACE DATABASE devdb CLONE sandbox;

After I do the clone, I do spot checks on the sandbox to make sure that “new” tables that were created show up in it. This time, not only did the new tables not show up, but worse – they disappeared from the devdb!

When I realized my mistake, I went into panic mode. We were still in development and so had no backups yet – the traditional way to recover from such errors.   

Snowflake Time Travel

I needn’t have panicked. Snowflake has this very cool feature (definitely even cooler than cloning) called Time Travel. (If you ever were a Dr. Who fan, you will be thrilled!)

Figure2: Time Travel

Think of Time Travel has a way to access the state of an object at a particular point in time in the past … up to 90 days back for the Enterprise Edition. For example, if you want to check the status of an order from a day ago:

SELECT status FROM orders AT(offset => -24*60*60);

It is as though you have a live backup of all the data at your fingertips … sweet!

Note that the default data retention period is one (1) day. You must explicitly enable a larger retention period at table creation time (or change the default setting for your database) to avail of the full 90 days. 

The UNDROP command

I bet every database admin has wished for this command more than once in their career. And yes – Snowflake has an UNDROP command. If you drop a table, schema or even an entire database – you can undrop it. 

UNDROP works in conjunction with Time Travel to help restore the state of an object as it was at a particular time. 

Recovering the Snowflake Database

Back to my story … In our Snowflake system, we had not changed the default retention period – so it remained at the default  value of 1 day. However, since I detected my mistaken within the hour, it was still possible to recover. 

Here are the steps that helped with Snowflake database recovery of the original devdb database:

  1. Since the devdb is now garbage, move it away. DO NOT  delete it – that comes later:
    • ALTER DATABASE devdb RENAME TO devdb_old;
  2. The original devdb was dropped when I did the clone. Running the UNDROP command will restore the original:
    • UNDROP DATABASE devdb;
  3. We no longer need the devdb_old database; so we can drop it:

    • DROP DATABASE devdb_old;

That’s it! The database was restored in no time and I could now re-clone it. However, I learned my lesson. So instead of typing away the clone command in a Snowflake worksheet, I created a script to execute it.

Conclusion

Snowflake is a very user-friendly data warehouse, allowing developers to do a lot of the functions that typically require a database administrator. However, it is still possible to make mistakes as this article on “Snowflake Database Recovery” shows.

Naren Gokul

Using Dask for Distributed Systems

As part of our Analytics practice, we repeatedly encounter use cases where complex calculations over large datasets are required. In this series, we will compare two distributed computing systems – Dask and Spark – that have become very popular in recent times.

To evaluate these, we will select a use case that comes up in many industries – calculating cross correlations across large datasets. This has several applications in healthcare (correlational study of outcomes, gene expression), insurance (risk correlations) and other industries. 

The Business Use Case

Asset correlation is a measure of how investments move in relation to one another and when. When assets move in the same direction at the same time, they are considered to be highly correlated. When one asset tends to move up when the another goes down, the two assets are considered to be negatively correlated. If two assets are considered to be non-correlated, the price movement of one asset has no effect on the price movement of the other asset.

Asset Correlation

You can reduce the overall risk in an investment portfolio and even boost your overall returns by investing in asset combinations that are not correlated. This means they don’t tend to move in the same way at the same time. If there is zero correlation or negative or non-correlation, one asset will go up when the other is down, and vice versa. Buy owning a bit of both, you do a pretty well in any market, without the steep climbs and deep dips of just one asset type.

If you take the 5 minute history of an asset ( like a stock ) for the last 5 years, you would cover roughly 100,000 data points. Comparing one stock with 100,000 other stocks for that period would use 10 billion data points.

Also, we may need to reevaluate this at various date ranges to gain additional insight into how these assets correlate at specific points in history. To do this dynamically and at scale, we need an efficient distributed adhoc computing architecture.

Dask for Analysis

For data analysis, the go-to language for most data scientists has been python.   The extensive set of rich libraries such numpy, pandas and scikit-learn makes it the swiss army knife in our toolkits. However, these libraries are inherently single system and cannot scale to run on distributed systems.  DaskDask solves this problem. It supports Pandas dataframes and numpy arrays, but enables you to run it locally or scale it up.  This means that you can use your regular pythonic code and validate it on smaller datasets on your laptop, then push the same code up to a cluster and execute it on a large dataset.

 

Our perspective on Dask is this – having used Numpy, Pandas and Scikit-Learn extensively, Dask is really easy to learn and understand and very quick to prototype solutions. In many ways, the learning curve seemed quite a bit shorter than with Spark. 

As we started using Dask, we noticed that the concept of ‘pure task graphs’ was limiting us in some use cases. We started looking into other options – and found Actors.  Actors allow us to hold on to state, and reduce overhead on the scheduler.

Dask for Distributed Systems Architecture 

Dask for Distributed Systems Architecture

Design Details

  • The design is based on Actor Architecture.  Actors enable stateful computations in Dask, and are ideal when you need additional performance and are willing to sacrifice resilience. Here, the Dask Driver Client gets a handle to the remote workers. This handle is called an Actor, and is a pointer to a user-defined-object living on the workers. This enables us to call methods on that remote object.
  • Master Node houses the Flask Web Application, Driver, and Dask Scheduler.
  • The computation is split up and parallelized amongst the Dask Workers running on Worker Nodes. Each Dask Worker performs computation on a 2D Numpy array: 2K dates by 2k stocks.
  • Dask Workers are single threaded. Numpy vector operations are fast on a single thread.  Numpy arrays can be memory mapped to disk, and this allows the system to load only the subset of data needed for the computation directly from disk.  
  • Each Dask Worker has access to all of the Numpy Arrays but performs computation on only one of them. This is useful when it needs information about a Stock that is not part of the array that it does computation on, such as in calculating correlations
  • Correlation calculation: User picks multiple date ranges in the Web Browser and one stock symbol. This request is sent to the Flask Web Application that translates this to a unique list of starting and ending date ids (2d array); sends this as the request to the Driver, which sends it to all the Dask Workers
  • Driver assembles the response from the workers before returning it to the Web Browser.
  • Dask scheduler can coordinate about 4000 tasks per second. However operations on Dask actors do not inform the central scheduler, and so do not contribute to the 4000 task/second overhead. They also avoid an extra network hop and so have lower latencies
  • Another approach was tried: We used chunk based parallelization on multiple cores using Dask Array. This turned out to be about 10x slower than our current design

Dask for Distributed Systems Results

This architecture provides extreme performance with good stability. We are running the application 10×5 with very good availability with only the occasional node restart. Almost all request return within 100ms, with most (90th percentile) coming back in 50ms.

If the web application is also python based, Numpy arrays would be a better choice to return from the Flask application than JSON. Serialization of JSON takes more overhead. It is good to have both options though as other systems may need JSON. JSON Serialization would create an overhead of 100 or more ms based on how much data is returned and from how many actors.

Next, we are going to try the same use case in the other popular distributed computing system – Spark. Stay tuned for Part 2

Naren Gokul

Cqlsh with SSL: Securing Cassandra Cqlsh

At Data-Aces, we provide managed services running Cassandra for our customers. With more and more attention being given to data security these days, end-to-end encryption of ALL communication is becoming a mandatory requirement. This includes traffic between the Cassandra nodes or from client to the Cassandra cluster. This means, cqlsh with SSL is also a requirement. The default config file (Cassandra.yaml) is configured for normal, unencrypted communication between clients and Cassandra on port 9042.

It is fairly easy to setup client to cluster encryption by creating certificates and adding them to Java keystore/truststore. This process is documented here: https://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/secureSSLClientToNode.html

As described in the link above, this involves changing ‘enabled’ to ‘true’ in the following section of the Cassandra.yaml file:
client_encryption_options:
enabled: true
# If enabled and optional is set to true encrypted and unencrypted connections are handled.
optional: false
 
This will ensure that ALL client connections will be encrypted even if still using port 9042.  Note that after the above changes, you will no longer be able to connect to the database using the CQL command line tool cqlsh!
 
The Cassandra documentation provides basic information as to how to setup cqlsh with SSL. https://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/secureCqlshSSL.html
 

However, following these steps, I couldn’t get cqlsh with SSL to work in our cluster. This is because, it refers to certfile without giving us an important caveat: cqlsh is a python application and does not use the Java keystore/truststore setup for normal Cassandra and Java clients.

Instead, I had to convert the certificate in ‘keystore.jks’ to PKCS12 format. Luckily, keytool has an inbuilt facility to convert to PKCS12 format.

keytool -importkeystore -srckeystore keystore -destkeystore pkcs12ks -deststoretype PKCS12 -srcstorepass keystorepassword -deststorepass keystorepassword

This will create a keystore file named ‘pkcs12ks’ in the current directory.

The next step is to create a PEM file from this keystore as follows:

openssl pkcs12 -in pkcs12ks -nokeys -out cqlsh.pem -passin pass:keystorepassword 

Setting up .cqlshrc file

Now that we have our PEM file, we can point to it in the cqlshrc file

[connection]  
factory = cqlshlib.ssl.ssl_transport_factory 
[ssl] 
certfile = /home/user/cqlsh.pem validate = false

Note that setting ‘validate=true’ implies that the certificate needs to be validated at connection time.

We are now all set to run cqlsh with SSL using the option –ssl as below:

$ cqlsh <host_ip> --ssl

Setting up GPU support for Tensorflow on Windows 10

TensorFlow supports running computations on a variety of types of devices, including CPU and GPU. Originally used for display functions, GPUs were developed to scale up parallel computations using thousands of cores.  This happens to match very well with how we execute the training of Machine Learning algorithms in Deep Learning Models. In python, the base ‘tensorflow’ package is for CPU only; a separate package, tensorflow-gpu is used when we plan to run on GPUs.

So why did we write this blog? When we started running tensorflow, our developers wanted to do testing on smaller datasets locally on their own machines, before pushing code to dev/test clusters with many GPUs. Most developer laptops/desktops these days come with NVidia Quadro or Geforce discrete GPUs. While there are a lot of posts that come up on a Google search, we encountered some issues that we did not see addressed on these posts. In this blog, we will outline the issues that we ran into during installation of tensorflow-gpu on Windows 10, and the solutions to each of them.

Pre-requisites for TensorFlow – https://www.tensorflow.org/install/gpu  

For using Tensorflow-gpu, we need to install the following software, in this order:
  1. Visual Studio Express or Full (Visual Studio is a pre-requisite for CUDA)
  2. CUDA Toolkit for Windows 10
  3. cuDNN .  https://developer.nvidia.com/cudnn (Membership required)
  4. Tensorflow-gpu (python package)

We followed this post as a guide: https://towardsdatascience.com/installing-tensorflow-with-cuda-cudnn-and-gpu-support-on-windows-10-60693e46e781

Step1: Drivers and installation

After Visual studio, we first installed Cuda toolkit for Window – cuda_10.0.130_411.31_win10.exe. Next we need cudnn. First, we installed – cudnn-10.0-windows10-x64-v7.4.2.24.zip. We used this code to test for all the components being installed successfully: https://github.com/NarenData/TensorflowWindows/blob/master/TensorflowValidate.py

We first got an error on cudnn not being available.
cuDNN failed to initialize tensorflow.
Turns out, the instructions were not every explicit, we need to set the path for ‘\bin’ folder of cuDNN in “Environmental Variables” list in Control Panel.  

Once the install folder \bin was added to the path, the cudnn dll error went away

Step2: Resolve DLL Load error

However, we kept getting the error message below about Tensorflow. We tried installing the older version, cudnn-10.0-windows10-x64-v7.3.0.29.zip, but the same error message persisted.

from tensorflow.python.pywrap_tensorflow_internal import *

ImportError: DLL load failed: The specified module could not be found.
ImportError: No module named ‘_pywrap_tensorflow_internal’

We first spent some time to check if the Cuda toolkit and CUDNN install was correct, and if the DLLs were loading properly. To verify this, we used process monitor as suggested here: https://stackoverflow.com/questions/43553149/on-windows-running-import-tensorflow-generates-no-module-named-pywrap-tenso

This gave us clarity that the DLLs are loading properly, and the problem is elsewhere:

Since the DLLs are loading properly, suspicion now falls on the tensorflow-gpu package in Pycharm. We were running 1.12.0, but a new release candidate was available, 1.13.0rc. Upgrading to the latest release candidate 1.13.0rc0 from 1.12.0 resolved the issue.

After the update, the problem we finally got success from TensorflowValidate.py:

After this, we were able to run a couple of sample TensorFlow programs and make sure that it was running successfully on the GPU.   

Step 3: Multiple GPU test

Many machines come with multiple GPUs, and TensorFlow can also be run on multiple GPUs. We test for this with  https://github.com/NarenData/TensorflowWindows/blob/master/multigputest.py

The following errors can come up while testing with multigputest.py.

Some good news and bad news here. Looks like our CPU is the newer version, supporting AVX2, which tensorflow doesn’t support yet, it is still on AVX. It is finding the Quadro GPU and its specs – which is great.

We can also see that, despite running the internal display with two 32 inch 4K monitors, 3.3Gb out of 4Gb of VRAM is free for use by tensorflow.

Step 4: Solve CUDA Driver error

These two links suggest that we need a newer driver:
https://github.com/tensorflow/tensorflow/issues/21832

https://docs.nvidia.com/cuda/cuda-toolkit-release-notes/index.html

For windows, >= 411.31 is suggested, but we have April 2018 version installed on the machine by default:

Strange thing is that the toolkit (Installed in Step1) is supposed to install the updated driver, but this doesn’t seem to work properly.

Once the new driver is installed, we are seeing the right output. The code doesn’t work fully – it is not able to access the other GPU (Intel 630 Integrated graphics) – but we don’t really need this for running our Tensorflow code. It probably would have worked properly if the machine had 2 Nvidia GPUs.