Using Cassandra for Real-time Analytics: Part 2

In the part 1 of this series, we talked about the speed-consistency-volume trade-offs that come along with implementation choices you make in data analytics and why Cassandra is a great choice for real-time analytics. In this post, we’re going to dive a little deeper on the basics of the Cassandra data model and illustrate with the help of MarkedUp’s own model, followed by a short discussion about our read and write strategies.

Once again, lets start off of our LA Cassandra User group meetup’s presentation deck on slideshare. Slides 12-18 are relevant for this post.

Cassandra Data Model

The Cassandra data model consists of a keyspace (analogous to a database), column families (analogous to tables in the relational model), keys and columns. Here’s what the basic Cassandra table (also known as a column family) structure looks like:

Cassandra Column Family structure

  Figure 1. Structure of a super column family in Cassandra

Cassandra’s API also refers to this structure as a map within a map. where the outer map key is the row key and inner map key is the column key. In reality, a typical Cassandra keyspace for, say, an analytics platform, might also contain what’s known as a super column family.

 

Cassandra Super column family structure

                                              Figure 2. Structure of a super column family in Cassandra

 

Evan weaver’s blogpost has a good illustration of the twitter keyspace as a real world example.

MarkedUp’s keyspace has column families such as DailyAppLogs (that count the number of times a particular log or event triggered per app) and Logs (that capture information about each log entry). These are also illustrated in figure 1 below.

The Datastax post about modeling a time series with Cassandra is particularly helpful in deciding upon the schema design. We index our columns on the basis of dates.

Note that since we use a randomPartitioner  where rows are ordered by the MD5 of their keys, using dates as column keys helps in storing data points in a sorted manner within each row. Other analytics applications might prefer indexing by hours or even minutes, if, for example, the exact time of day when the app activity peaks needs to be measured and reported. The only drawback would be more data points and more columns in the keyspace. With a limit of about 2 billion column families in Cassandra though, its almost impossible to exceed the limit. Thus, the fact that Cassandra offers really wide column families leaves us with enough leg room.

The row key in a Cassandra column family is also the “shard” key, which implies that columns for a particular row key are always stored contiguously and in the same node. If you are worried that some of your shards will keep growing at a faster rate than others, resulting in “hotspot” nodes that store those shards, you can further shard your rows by means of composite keys. Eg: (App1, 1) and (App1, 2) can be two shards for App1.

The counter for all events of a particular type coming from apps using MarkedUp are recorded in the same shard. (“What about hotspots then?”, you might wonder! Well, Cassandra offers semi-automatic load balancing so we load balance if a node starts becoming a hotspot. Refer to the Cassandra wiki for more on load balancing)

MarkedUp’s Read/Write Strategy

Now that we have a better understanding of the Cassandra data model, lets look at how we handle writes in MarkedUp. Logs from the Windows 8 apps that use Markedup arrive randomly on a daily basis. For incoming logs, we leverage the batch mutate method.

As you might have probably guessed, a batch_mutate operation groups calls on several keys into a single call. Each incoming log, therefore, triggers updates or inserts in multiple column families, as shown in figure 3. For example, a RuntimeException in AppX on Jan1, 2013 will update the DailyAppLogs CF with key AppX by incrementing the counter stored in the column key corresponding to Jan1, 2013 as well as the Logs CF by inserting a new key LogId. 

MarkedUp write strategy

Figure 3. MarkedUp’s write strategy

 

MarkedUp’s read strategy leverages Cassandra’s get_slice query, which allows you to read a wide range of data focused on the intended query, reducing waste (A ‘slice’ indicates a range of columns within a row). A query to count a wide range of columns can be performed in minimal disk I/O operations. Setting up a get_slice query is as simple as specifying which keyspace and column family you want to use and then setting up the slice predicate by defining which columns within the row you need.

The slice predicate itself can be set up in two ways. You can either specify exactly which columns you need, or you can specify a range of ‘contiguous’ columns using a splice range. Using column keys that can be sorted meaningfully is thus critical.

Figure 4 below illustrates the query “Get all Crash and Error logs for App1 between Date1 and DateN”. The get_slice_range query can easily read the counters as a complete block from the AppLogsByLevel CF because the CF is sorted by dates.

MarkedUp read strategy

Figure 4. MarkedUp’s read strategy

 

If you’ve read our previous blog post closely, you might be wondering if the returned information is even correct, given the fact that Cassandra compromises on consistency in favor of speed and volume (remember the SCV triangle?). Cassandra guarantees what is known as eventual consistency, which means that at some given point (milliseconds away from the triggering of the write operation), some nodes may still have the stale value, although by the end of the operation, every node will have been updated.

Luckily, Cassandra offers tunable consistency levels for queries. So, depending on your appetite for consistent output vis-a-vis speed, you can configure the desired consistency level by chosing different levels of “quorum”. MarkedUp uses ONE for writes and TWO for reads, to keep the web front-end as fluid as possible.

In the part 3 of this series, we’ll talk about some best practices of working with Cassandra and choosing a schema that fits your needs. Stay tuned for more!

Using Cassandra for Real-time Analytics: Part 1

In a previous blog post titled “Cassandra, Hive, and Hadoop: How We Picked Our Analytics Stack” we talked about our process for selecting Cassandra as our data system of choice for supporting our real-time analytics platform.

We’ve been live with Cassandra in production for a couple of months now and shared some of the lessons and best practices for implementing it at the Los Angeles Cassandra Users Group on March 12, 2013. You can see the presentation on slideshare if you’d like to view our slides.

We wanted to expand on what we shared in the presentation itself and share some of our applied knowledge on how to put Cassandra to work in the field of real-time analytics.

Let’s start by helping you understand how an analytics system needs to be built.

Real-time Analytics and the CAP Theorem

For those of you who aren’t familiar with Brewer’s CAP theorem, it stipulates that it is impossible for any distributed computer system to simultaneously provide all three of the following guarantees:

  1. Consistency;

  2. Availability; and

  3. Partition tolerance.

In the real-world all distributed systems fall on a gradient with each of these three guarantees, but the kernel of truth is that there are trade offs. A system with high partition tolerance and availability (like Cassandra) will sacrifice some consistency in order do it.

When it comes to analytics, there’s a transitive application of the CAP theorem to analytic systems – we call it SCV:

analytics cap theorem

  1. Speed is how quickly you can return an appropriate analytic result from the time it was first observed – a “real-time” system will have an updated analytic result within a relatively short time of an observed event, whereas a non-real-time system might take hours or even days to process all of the observations into an analytic result.

  2. Consistency is how accurate or precise (two different things) the analytic outcome is. A totally consistent result accounts for 100% of observed data accounted for with complete accuracy and some tunable degree of precision. A less consistent system might use statistical sampling or approximations to produce a reasonably precise but less accurate result.

  3. Data Volume refers to the total amount of observed events and data that need to be analyzed. At the point when data starts to exceed the bounds of what can be fit into memory is when this starts to become a factor. Massive or rapidly growing data sets have to be analyzed by distributed systems.

If your working data set is never going to grow beyond 40-50GB over the course of its lifetime, then you can use an RDBMS like SQL Server or MySQL and have 100% consistent analytic results delivered to you in real-time – because your entire working set can fit into memory on a single machine and doesn’t need to be distributed.

Or if you’re building an application like MarkedUp Analytics, which has a rapidly growing data set and unpredictable burst loads, you’re going to need a system that sacrifices some speed or consistency in order to be distributed so it can handle the large volume of raw data.

Think about this trade off carefully before you go about building a real-time analytics system.

What Data Needs to be Real-time?

“Egads, ALL DATA SHOULD BE ALWAYS REPORTED IN REAL-TIME!” shouted every software developer ever.

Hold your horses! Real-time analytics forces a trade off between other important factors like accuracy / precision and data size. Therefore, real-time analytics isn’t inherently superior or better for every conceivable use case.

Real-time analysis is important for operational metrics and anything else you or your users need to respond to in real-time:

  • Error rates or health monitoring;

  • Dynamic prices, like stock prices or ticket fares;

  • On-the-spot personalizations and recommendations, like the product recommendations you might see when browsing Netflix or Ebay.

In these scenarios, the exact price or the exact error rate isn’t as important the rate of change or confidence interval, which can be done in real-time.

Retrospective or batch analysis is important for product / behavior analysis – these are metrics that tell you how you should or shouldn’t do something, and they are data that you can’t or shouldn’t respond to in real-time.

You don’t want to redesign your product based on fluctuations during day-to-day use – you want to redesign it based on long-term trends over all of your cohorts, and it naturally takes a long time for that data to accrue and be analyzed / studied.

In this type of analysis it’s more important for the data to be comprehensive (large) and accounted consistently.

Analytic speed is always a trade-off between data volume and consistency – you have to be concious of that when you design your system.

The one property you’re never going to want to intentionally sacrifice is data volume – data is a business asset. Data has inherent value. You want to design your analytic systems to consume and retain as much of it as possible.

At MarkedUp we use a blend of both real-time and retrospective analytics:

markedup analytics blend

In this post and the next we’re going to focus on how we use Cassandra for real-time analytics – we use Hive and Hadoop for our retrospective analysis.

Why Cassandra for Real-time Analytics?

Cassandra is an excellent choice for real-time analytic workloads, if you value speed and data volume over consistency (which we do for many of our metrics.)

So what makes Cassandra attractive?

    • Cassandra is highly available and distributed; it has high tolerance to individual node failures and makes it possible to add multi-data center support easily if data affinity or sovereignty is an issue. On top of that it’s easy to expand a Cassandra cluster with new nodes if necessary (although this shouldn’t be done frivolously since there is a high cost to rebalancing a cluster.)
    • It has amazing write performance; we’ve clocked Cassandra writes taking up to  200µs on average for us, and that’s doubly impressive considering that most of our writes are big, heavily denormalized batch mutations.
    • Batch mutations give us the ability to denormalize data heavily and update lots of counters at once – in Cassandra it’s generally a good idea to write your data to make it easy to read back out, even if that means writing it multiple times. Batch mutations make this really easy and inexpensive for our front-end data collection servers.
    • Distributed counters were added to Cassandra due at Twitter’s insistence, and they’re a major boon to anyone trying to build real-time analytic systems. Most of MarkedUp’s real time analytics are implemented using counters – they provide a simple, inexpensive, and remarkably consistent mechanism to update metrics and statistics at write time. There are some trade offs (namely the loss of idempotency) but they make up for it in simplicity and speed.
    • Physically sorted columns are one of the Cassandra database implementation details worth learning, because with it you can create easily predictable and pre-sorted slices of data. This makes for really efficient storage of time-series data and other common types of analytic output. When you combined physically sorted columns with dynamic columns and slice predicates you can create lookup systems which retrieve large data sets in constant time.
    • Dynamic columns are a Cassandra feature that takes getting used to, but they are enormously powerful for analytic workloads when coupled with sorted columns – they allow you to create flexible, predictable data structures that are easy to read and extend.

We’re going to publish a series of posts about working with Cassandra for real-time analytics. Make sure you read part 2 where we go into detail on our read / write strategy with Cassandra for analytics!

Cassandra, Hive, and Hadoop: How We Picked Our Analytics Stack

When we first made MarkedUp Analytics available on an invite-only basis to back in September we had no idea how quickly the service would be adopted. By the time we completely opened MarkedUp to the public in December, our business was going gangbusters.

But we ran into a massive problem by the end of November: it was clear that RavenDB, our chosen database while we were prototyping our service, wasn’t going to be able to keep growing with us.

So we had to find an alternative database and data analysis system, quickly!

The Nature of Analytic Data

The first place we started was by thinking about our data, now that we were moving out of the “validation” and into the “scaling” phase of our business.

Analytics is a weird business when it comes to read / write characteristics and data access patterns.

In most CRUD applications, mobile apps, and e-commerce software you tend to see read / write characteristics like this:

Read and Write characteristics in a traditional application

This isn’t a controversial opinion – it’s just a fact of how most networked applications work. Data is read far more often than it’s written.

That’s why all relational databases and most document databases are optimized to cache frequently read items into memory – because that’s how the data is used in the vast majority of use cases.

In analytics though, the relationship is inverted:

analytics-readwrite-charactertistics

By the time a MarkedUp customer views a report on our dashboard, that data has been written to anywhere from 1,000 to 10,000,000 times since they viewed their report last. In analytics, data is written multiple orders of magnitude more frequently than it’s read.

So what implications does this have for our choice of database?

Database Criteria

Looking back to what went wrong with RavenDB, we determined that it was fundamentally flawed in the following ways:

  • Raven’s indexing system is very expensive on disk, which makes it difficult to scale vertically – even on SSDs Raven’s indexing system would keep indexes stale by as much as three or four days;
  • Raven’s map/reduce system requires re-aggregation once it’s written by our data collection API, which works great at low volumes but scales at an inverted ratio to data growth – the more people using us, the worse the performance gets for everyone;
  • Raven’s sharding system is really more of a hack at the client level which marries your network topology to your data, which is a really bad design choice – it literally appends the ID of your server to all document identifiers;
  • Raven’s sharding system actually makes read performance on indices orders of magnitude worse (has to hit every server in the cluster on every request to an index) and doesn’t alleviate any issues with writing to indexes – no benefit there;
  • Raven’s map/reduce pipeline was too simplistic, which stopped us from being able to do some more in-depth queries that we wanted; and
  • We had to figure out everything related to RavenDB on our own – we even had to write our own backup software and our own indexing-building tool for RavenDB; there’s very little in the way of a RavenDB ecosystem.

So based on all of this, we decided that our next database system needed to be capable of:

  1. Integrating with Hadoop and the Hadoop ecosystem, so we could get more powerful map/reduce capabilities;
  2. “Linear” hardware scale – make it easy for us to increase our service’s capacity with better / more hardware;
  3. Aggregate-on-write – eliminate the need to constantly iterate over our data set;
  4. Utilizing higher I/O – it’s difficult to get RavenDB to move any of its I/O to memory, hence why it’s so hard on disk;
  5. Fast setup time – need to be able to move quickly;
  6. Great ecosystem support – we don’t want to be the biggest company using whatever database we pick next.

The Candidates

Based on all of the above criteria, we narrowed down the field of contenders to the following:

  1. MongoDB
  2. Riak
  3. HBase
  4. Cassandra

Evaluation Process

The biggest factor to consider in our migration was time to deployment – how quickly could we move off of Raven and restore a high quality of service for our customers?

We tested this in two phases:

  1. Learning curve of the database – how long would it take us to set up an actual cluster and a basic test schema?
  2. Acceptance test – how quickly could we recreate a median-difficulty query on any of these systems?

So we did this in phases, as a team – first up was HBase.

HBase

HBase was highly recommended to us by some of our friends on the analytics team at Hulu, so this was first on our list. HBase has a lot of attractive features and satisfied most of our technical requirements, save the most important one – time to deployment.

The fundamental problem with HBase is that cluster setup is difficult, particularly if you don’t have much JVM experience (we didn’t.) It also has a single point of failure (edit: turns out this hasn’t been an issue since 0.9x,) is a memory hog, and has a lot of moving parts.

That being said, HBase is a workhorse – it’s capable of handling immensely large workloads. Ultimately we decided that it was overkill for us at this stage in our company and the setup overhead was too expensive. We’ll likely revisit HBase at some point in the future though.

Riak

Riak One of our advisors is a heavy Riak user, so we decided it was worth exploring. Riak, on the surface, is a very impressive database – it’s heinously easy to set up a cluster and the HTTP REST API made it possible for us to test it using only curl.

After getting an initial 4-node cluster setup and writing a couple of “hello world” applications, we decided that it was time to move onto phase 2: see how long it would take to port a real portion of our analytics engine over to Riak.

I decided to use Node.JS for this since there’s great node drivers for both Raven and Riak and it was frankly a lot less work than C#. I should point out that CorrugatedIron is a decent C# driver for Riak though.

So, it took me about 6 hours to write the script to migrate a decent-sized data set into Riak – just enough to simulate a real query for a single MarkedUp app.

Once we had the data stuffed into our Riak cluster I wrote a simple map/reduce query using JavaScript and ran it – took 90 seconds to run a basic count query. Yeesh. And this map/reduce query even used key filtering and all of the other m/r best practices for Riak.

Turns out that Map/Reduce performance with the JavaScript VM is atrocious and well-known in Riak.

So, I tried a query using the embedded Erlang console using only standard modules – 50 seconds.

Given the poor map/reduce performance and the fact that we’d all have to learn Erlang, Riak was out. Riak is a pretty impressive technology and it’s easy to set up, but not good for our use case as is.

MongoDB

mongodb I’ve used MongoDB in production before and had good experiences with it. Mongo’s collections / document system is nearly identical to RavenDB, which gave it a massive leg up in terms of migration speed.

On top of that, Mongo has well-supported integration with Hadoop and its own aggregation framework.

Things were looking good for Mongo – I was able to use Node.JS to replicate the same query I used to test Riak and used the aggregation framework to get identical results within 3 hours of starting.

However, the issue with MongoDB was that it required us to re-aggregate all of our data regularly and introduced a lot of operational complexity for us. At small scale, it worked great, but under a live load it would be very difficult to manage Mongo’s performance, especially when adding new features to our analytics engine.

We didn’t write Mongo off, but we decided to take a look at Cassandra first before we made our decision.

Cassandra

File:Cassandra logo.pngWe started studying Cassandra more closely when we were trying to determine if Basho had any future plans for Riak which included support for distributed counters.

Cassandra really impressed us from the get-go – it would require a lot more schema / data modeling than Riak or MongoDB, but its support for dynamic columns and distributed counters solved a major problem for us: being able to aggregate most statistics as they’re written, rather than aggregating them with map/reduce afterwards.

On top of that, Cassandra’s slice predicate system gave us a constant-time lookup speed for reading time-series data back into all of our charts.

But Cassandra didn’t have all of the answers – we still needed map/reduce for some queries (ones that can’t or shouldn’t be done with counters) and we also needed the ability to traverse the entire data set.

Enter DataStax Enterprise Edition – a professional Cassandra distribution which includes Hive, Hadoop, Solr, and OpsCenter for managing backups and cluster health. It eliminated a ton of setup overhead and complexity for us and dramatically shortened our timeline to going live.

Evaluating Long-Term Performance

Cassandra had MongoDB edged out on features, but we still needed to get a feel for Cassandra’s performance. eBay uses Cassandra for managing time-series data that is similar to ours (mobile device diagnostics) to the tune of 500 million events a day, so we were feeling optimistic.

Our performance assessment was a little unorthodox – after we had designed our schema for Cassandra we wrote a small C# driver using FluentCassandra and replayed a 100GB slice of our production data set (restored from backup on a new RavenDB XL4 EC2 machine with 16 cores, 64GB of RAM, and SSD storage) to the Cassandra cluster; this simulated four month’s worth of production data written to Cassandra in… a little under 24 hours.

We used DataStax OpsCenter to graph the CPU, Memory, I/O, and latency over all four of our writeable nodes over the entire migration. We set our write consistency to 1, which is what we use in production.

Here are some interesting benchmarks – all of our Cassandra servers are EC2 Large Ubuntu 12.04 LTS machines:

  1. During peak load, our cluster completed 422 write requests per second – all of these operations were large batch mutations with hundreds rows / columns at once. We weren’t bottlenecked by Cassandra though – we were bottlenecked by our read speed pulling data out RavenDB.
  2. Cassandra achieved a max CPU utilization of 5%, with an average utilization of less than 1%.
  3. The amount of RAM consumed remained pretty much constant regardless of load, which tells me that our memory requirements never exceeded the pre-allocated buffer on any individual node (although we’ve spiked it since during large Hive jobs.)
  4. Cassandra replicated the contents of our 100GB RavenDB data set 3 times (replication factor of 3 is the standard) and our schema denormalized it heavily – despite both of those factors (which should contribute to data growth) Cassandra actually compressed our data set down to a slim 30GB, which provided us with storage savings of nearly 1000%! This is due to the fact that RavenDB saves its data as tokenized JSON documents, whereas everything is as byte arrays in Cassandra (layman’s terms.)
  5. Maximum write latency for Cassandra was 70731µs per operation with an an average write latency of 731µs. Under normal loads the average write latency is around 200µs.

Our performance testing tools ran out of gas long before Cassandra did. Based on our ongoing monitoring of Cassandra we’ve observed that our cluster is operating at less than 2% capacity under our production load. We’ll see how that changes once we start driving up the amount of Hive queries we run on any given day.

We never bothered running this test with MongoDB – Cassandra already had a leg up feature-set wise and the performance improvements were so remarkably good that we just decided to move forward with a full migration shortly after reviewing the results.

Hive and Hadoop

The last major piece of our stack is our map/reduce engine, which is powered by Hive and Hadoop.

Hadoop is notoriously slow, but that’s ok. We don’t serve live queries with it – we batch data periodically and use Hive to re-insert it back into Cassandra.

Hive is our tool of choice for most queries, because it’s an abstraction that feels intuitive to our entire team (lots of SQL experience) and is easy to extend and test on the fly. We’ve found it easy to tune and it integrates well with the rest of DataStax Enterprise Edition.

Conclusion

It’s important to think carefully about your data and your technology choices, and sometimes it can be difficult to do that in a data vacuum. Cassandra, Hive, and Hadoop ended up being the right tools for us at this stage, but we only arrived at that conclusion after actually doing live acceptance tests and performance tests.

Your mileage may vary, but feel free to ask us questions in the comments!