Using Cassandra for Real-time Analytics: Part 2

In the part 1 of this series, we talked about the speed-consistency-volume trade-offs that come along with implementation choices you make in data analytics and why Cassandra is a great choice for real-time analytics. In this post, we’re going to dive a little deeper on the basics of the Cassandra data model and illustrate with the help of MarkedUp’s own model, followed by a short discussion about our read and write strategies.

Once again, lets start off of our LA Cassandra User group meetup’s presentation deck on slideshare. Slides 12-18 are relevant for this post.

Cassandra Data Model

The Cassandra data model consists of a keyspace (analogous to a database), column families (analogous to tables in the relational model), keys and columns. Here’s what the basic Cassandra table (also known as a column family) structure looks like:

Cassandra Column Family structure

  Figure 1. Structure of a super column family in Cassandra

Cassandra’s API also refers to this structure as a map within a map. where the outer map key is the row key and inner map key is the column key. In reality, a typical Cassandra keyspace for, say, an analytics platform, might also contain what’s known as a super column family.

 

Cassandra Super column family structure

                                              Figure 2. Structure of a super column family in Cassandra

 

Evan weaver’s blogpost has a good illustration of the twitter keyspace as a real world example.

MarkedUp’s keyspace has column families such as DailyAppLogs (that count the number of times a particular log or event triggered per app) and Logs (that capture information about each log entry). These are also illustrated in figure 1 below.

The Datastax post about modeling a time series with Cassandra is particularly helpful in deciding upon the schema design. We index our columns on the basis of dates.

Note that since we use a randomPartitioner  where rows are ordered by the MD5 of their keys, using dates as column keys helps in storing data points in a sorted manner within each row. Other analytics applications might prefer indexing by hours or even minutes, if, for example, the exact time of day when the app activity peaks needs to be measured and reported. The only drawback would be more data points and more columns in the keyspace. With a limit of about 2 billion column families in Cassandra though, its almost impossible to exceed the limit. Thus, the fact that Cassandra offers really wide column families leaves us with enough leg room.

The row key in a Cassandra column family is also the “shard” key, which implies that columns for a particular row key are always stored contiguously and in the same node. If you are worried that some of your shards will keep growing at a faster rate than others, resulting in “hotspot” nodes that store those shards, you can further shard your rows by means of composite keys. Eg: (App1, 1) and (App1, 2) can be two shards for App1.

The counter for all events of a particular type coming from apps using MarkedUp are recorded in the same shard. (“What about hotspots then?”, you might wonder! Well, Cassandra offers semi-automatic load balancing so we load balance if a node starts becoming a hotspot. Refer to the Cassandra wiki for more on load balancing)

MarkedUp’s Read/Write Strategy

Now that we have a better understanding of the Cassandra data model, lets look at how we handle writes in MarkedUp. Logs from the Windows 8 apps that use Markedup arrive randomly on a daily basis. For incoming logs, we leverage the batch mutate method.

As you might have probably guessed, a batch_mutate operation groups calls on several keys into a single call. Each incoming log, therefore, triggers updates or inserts in multiple column families, as shown in figure 3. For example, a RuntimeException in AppX on Jan1, 2013 will update the DailyAppLogs CF with key AppX by incrementing the counter stored in the column key corresponding to Jan1, 2013 as well as the Logs CF by inserting a new key LogId. 

MarkedUp write strategy

Figure 3. MarkedUp’s write strategy

 

MarkedUp’s read strategy leverages Cassandra’s get_slice query, which allows you to read a wide range of data focused on the intended query, reducing waste (A ‘slice’ indicates a range of columns within a row). A query to count a wide range of columns can be performed in minimal disk I/O operations. Setting up a get_slice query is as simple as specifying which keyspace and column family you want to use and then setting up the slice predicate by defining which columns within the row you need.

The slice predicate itself can be set up in two ways. You can either specify exactly which columns you need, or you can specify a range of ‘contiguous’ columns using a splice range. Using column keys that can be sorted meaningfully is thus critical.

Figure 4 below illustrates the query “Get all Crash and Error logs for App1 between Date1 and DateN”. The get_slice_range query can easily read the counters as a complete block from the AppLogsByLevel CF because the CF is sorted by dates.

MarkedUp read strategy

Figure 4. MarkedUp’s read strategy

 

If you’ve read our previous blog post closely, you might be wondering if the returned information is even correct, given the fact that Cassandra compromises on consistency in favor of speed and volume (remember the SCV triangle?). Cassandra guarantees what is known as eventual consistency, which means that at some given point (milliseconds away from the triggering of the write operation), some nodes may still have the stale value, although by the end of the operation, every node will have been updated.

Luckily, Cassandra offers tunable consistency levels for queries. So, depending on your appetite for consistent output vis-a-vis speed, you can configure the desired consistency level by chosing different levels of “quorum”. MarkedUp uses ONE for writes and TWO for reads, to keep the web front-end as fluid as possible.

In the part 3 of this series, we’ll talk about some best practices of working with Cassandra and choosing a schema that fits your needs. Stay tuned for more!

Using Cassandra for Real-time Analytics: Part 1

In a previous blog post titled “Cassandra, Hive, and Hadoop: How We Picked Our Analytics Stack” we talked about our process for selecting Cassandra as our data system of choice for supporting our real-time analytics platform.

We’ve been live with Cassandra in production for a couple of months now and shared some of the lessons and best practices for implementing it at the Los Angeles Cassandra Users Group on March 12, 2013. You can see the presentation on slideshare if you’d like to view our slides.

We wanted to expand on what we shared in the presentation itself and share some of our applied knowledge on how to put Cassandra to work in the field of real-time analytics.

Let’s start by helping you understand how an analytics system needs to be built.

Real-time Analytics and the CAP Theorem

For those of you who aren’t familiar with Brewer’s CAP theorem, it stipulates that it is impossible for any distributed computer system to simultaneously provide all three of the following guarantees:

  1. Consistency;

  2. Availability; and

  3. Partition tolerance.

In the real-world all distributed systems fall on a gradient with each of these three guarantees, but the kernel of truth is that there are trade offs. A system with high partition tolerance and availability (like Cassandra) will sacrifice some consistency in order do it.

When it comes to analytics, there’s a transitive application of the CAP theorem to analytic systems – we call it SCV:

analytics cap theorem

  1. Speed is how quickly you can return an appropriate analytic result from the time it was first observed – a “real-time” system will have an updated analytic result within a relatively short time of an observed event, whereas a non-real-time system might take hours or even days to process all of the observations into an analytic result.

  2. Consistency is how accurate or precise (two different things) the analytic outcome is. A totally consistent result accounts for 100% of observed data accounted for with complete accuracy and some tunable degree of precision. A less consistent system might use statistical sampling or approximations to produce a reasonably precise but less accurate result.

  3. Data Volume refers to the total amount of observed events and data that need to be analyzed. At the point when data starts to exceed the bounds of what can be fit into memory is when this starts to become a factor. Massive or rapidly growing data sets have to be analyzed by distributed systems.

If your working data set is never going to grow beyond 40-50GB over the course of its lifetime, then you can use an RDBMS like SQL Server or MySQL and have 100% consistent analytic results delivered to you in real-time – because your entire working set can fit into memory on a single machine and doesn’t need to be distributed.

Or if you’re building an application like MarkedUp Analytics, which has a rapidly growing data set and unpredictable burst loads, you’re going to need a system that sacrifices some speed or consistency in order to be distributed so it can handle the large volume of raw data.

Think about this trade off carefully before you go about building a real-time analytics system.

What Data Needs to be Real-time?

“Egads, ALL DATA SHOULD BE ALWAYS REPORTED IN REAL-TIME!” shouted every software developer ever.

Hold your horses! Real-time analytics forces a trade off between other important factors like accuracy / precision and data size. Therefore, real-time analytics isn’t inherently superior or better for every conceivable use case.

Real-time analysis is important for operational metrics and anything else you or your users need to respond to in real-time:

  • Error rates or health monitoring;

  • Dynamic prices, like stock prices or ticket fares;

  • On-the-spot personalizations and recommendations, like the product recommendations you might see when browsing Netflix or Ebay.

In these scenarios, the exact price or the exact error rate isn’t as important the rate of change or confidence interval, which can be done in real-time.

Retrospective or batch analysis is important for product / behavior analysis – these are metrics that tell you how you should or shouldn’t do something, and they are data that you can’t or shouldn’t respond to in real-time.

You don’t want to redesign your product based on fluctuations during day-to-day use – you want to redesign it based on long-term trends over all of your cohorts, and it naturally takes a long time for that data to accrue and be analyzed / studied.

In this type of analysis it’s more important for the data to be comprehensive (large) and accounted consistently.

Analytic speed is always a trade-off between data volume and consistency – you have to be concious of that when you design your system.

The one property you’re never going to want to intentionally sacrifice is data volume – data is a business asset. Data has inherent value. You want to design your analytic systems to consume and retain as much of it as possible.

At MarkedUp we use a blend of both real-time and retrospective analytics:

markedup analytics blend

In this post and the next we’re going to focus on how we use Cassandra for real-time analytics – we use Hive and Hadoop for our retrospective analysis.

Why Cassandra for Real-time Analytics?

Cassandra is an excellent choice for real-time analytic workloads, if you value speed and data volume over consistency (which we do for many of our metrics.)

So what makes Cassandra attractive?

    • Cassandra is highly available and distributed; it has high tolerance to individual node failures and makes it possible to add multi-data center support easily if data affinity or sovereignty is an issue. On top of that it’s easy to expand a Cassandra cluster with new nodes if necessary (although this shouldn’t be done frivolously since there is a high cost to rebalancing a cluster.)
    • It has amazing write performance; we’ve clocked Cassandra writes taking up to  200µs on average for us, and that’s doubly impressive considering that most of our writes are big, heavily denormalized batch mutations.
    • Batch mutations give us the ability to denormalize data heavily and update lots of counters at once – in Cassandra it’s generally a good idea to write your data to make it easy to read back out, even if that means writing it multiple times. Batch mutations make this really easy and inexpensive for our front-end data collection servers.
    • Distributed counters were added to Cassandra due at Twitter’s insistence, and they’re a major boon to anyone trying to build real-time analytic systems. Most of MarkedUp’s real time analytics are implemented using counters – they provide a simple, inexpensive, and remarkably consistent mechanism to update metrics and statistics at write time. There are some trade offs (namely the loss of idempotency) but they make up for it in simplicity and speed.
    • Physically sorted columns are one of the Cassandra database implementation details worth learning, because with it you can create easily predictable and pre-sorted slices of data. This makes for really efficient storage of time-series data and other common types of analytic output. When you combined physically sorted columns with dynamic columns and slice predicates you can create lookup systems which retrieve large data sets in constant time.
    • Dynamic columns are a Cassandra feature that takes getting used to, but they are enormously powerful for analytic workloads when coupled with sorted columns – they allow you to create flexible, predictable data structures that are easy to read and extend.

We’re going to publish a series of posts about working with Cassandra for real-time analytics. Make sure you read part 2 where we go into detail on our read / write strategy with Cassandra for analytics!