Thursday, November 4, 2010

Idempotent Updates with Cassandra

One of the things we don’t get with NoSQL databases is ACID transactions. We get Durability, which means data is persistently stored, but we don’t get:
  • Atomicity: If we update multiple records in a single transaction, there’s no guarantee that all or none of them will be applied when a failure occurs.
  • Consistency: NoSQL databases don’t use locking or data integrity rules to guarantee that every transaction moves the database from one consistent state to another.
  • Isolation: There’s no guarantee that the records updated on one transaction won’t be read–or even updated–by another transaction before the first transaction is complete.
Some NoSQL databases offer some A, C, or I features in a limited context such as for “single partition” updates. We could emulate ACID features by supplementing the database with a synchronization framework such as ZooKeeper, but this reduces performance and scalability. If we’re using a NoSQL database, it’s no doubt largely because we want its high throughput. So, we have to design our updates to be compatible with its eventually consistent model.

With Cassandra, the closest we get to a transaction is a “batch mutate”, which allows us to submit multiple updates in one request. But while those updates are being applied, they can be immediately accessed by other applications. If our application (or Cassandra) crashes, there’s no guarantee about which updates in the batch will succeed. Without ACID transactions, how do we prevent conflicts between applications? How do we recover if a crash occurs?

One strategy is to design updates to be idempotent, which means that applying the same update twice produces the same results as applying the update once. In other words, if an update is successfully processed once, applying the same update again is essentially a no-op. As we’ll see in a moment, this strategy not only simplifies restarts after a crash, it can prevent conflicts between multiple updaters. But given our long dependency on ACID transactions, it might not be obvious how to do this.

To start, we must shed some baggage from SQL programming. First, consider that in Cassandra there is no distinction between inserting a new row and modifying an existing row; we merely update a row. Second, each row can have an arbitrary number of columns, and we can add new columns to an existing row at any time. Here’s an example: for a specific table (which Cassandra calls a ColumnFamily), a typical update looks something like this:

    “Update the row with key k with columns: C1=V1, C2=V2, …”

Cassandra interprets this update as follows:
  • If no row exists in the specified table with the key k, a new row is created with the specified column names and values.
  • If a row already exists with key k, its columns are updated as follows:
            o If a given column Cn does not exist, it is added to the row with value Vn.
            o If column Cn already exists, its value is replaced value Vn.

So, the first time we say “update the row with key k…”, the row is created. But if we process the same update again, since the row already exists and no columns are actually changed, nothing happens. (What happens under the hood is not so simple, but this is effectively what an application sees.) Batch mutates can also delete columns and remove rows and, as we’d expect, deleting a column or row that is already deleted has no effect.

If every batch merely created a single row, it’s pretty obvious that if our application (or Cassandra itself) crashed, we could just figure out where we left off and reprocess the most recent batches. But of course, not all batches are this simple. What happens, for example, if batches from separate applications need to update the same row?

Let’s take a simple example. Suppose we have a database that captures log records and maintains an index of values found within specific log fields. Say we want to easily find all log records with a given value for “User” or “Request” fields. A reasonable approach is to create one table to store log records and a second table as a field-specific index into the log records. For example:


ColumnFamily: LogRecords
Key (timestamp)
Columns
1265040242124
User=Bob
Request=1
1265040242678
User=Mary
Request=1
1265043781000
User=Bob
Request=5



ColumnFamily: Fields (problematic!)
Key (field)
Columns
User
Bob=[1265040242124,1265043781000]
Mary=[1265040242678]
Request
1=[1265040242124,1265040242678]
5=[1265043781000]


In this approach, each log record is stored as a row in LogRecords using its timestamp (in milliseconds) as the key and its fields stored as individual columns. For each field we want to index, we create or update a row within Fields whose key is the indexed field name. For each value found for that field, we create a column whose name is the field value and whose value is a list of log record keys that contained that value (perhaps stored as a long[]). Hence, to find all the log records where “Bob” is mentioned in the “User” field, we fetch the Fields row with key=User, find the column whose name=Bob, and the column value leads us to the appropriate log records. Likewise, we can quickly find all log records where the Request value is “1”. (This approach is a little naïve for a scalable application, but it will help make our point.)

Although this approach basically works, it is problematic when we consider how we might update these tables. In a concurrent environment, suppose two instances simultaneously attempt to update the Fields row with key=User and both attempt to set the column whose name is “Bob”. Even if they first attempted to read the column named “Bob” to see if it already exists, they may both believe they are the first. One will set “Bob” to one value; the other will set “Bob” to another value. When such conflicts occur, because there is no locking, Cassandra will effectively set the “Bob” column to the last update, thereby wiping out the first update. If the updates arrive in the reverse order, we get a different result.

Notice that multiple applications can add columns to the same row without conflicting–they only conflict if they try to update a column with the same name. Hence, we can resolve the conflict by “moving” the column name in the Fields table to the row key, creating a compound key: <field name>/<field value>. (For this example, we’re using a slash ‘/’ to separate the two parts of the key.) This will yield one row per field name/field value pair. As a result, our improved Fields table looks like this:


ColumnFamily: Fields (improved)
Key (Field)
Columns
User/Bob
1265040242124=null
1265043781000=null
User/Mary
1265040242678=null
Request/1
1265040242124=null
1265040242678=null
Request/5
1265043781000=null

If two applications update the Fields row for the “User” field corresponding to “Bob”, there’s no conflict at the row level. It doesn’t matter which order the updates are applied since a single “User/Bob" row will be created regardless. At the column level, each application only sets the column whose name matches the timestamp of the log record it is processing. (In this example, we don’t use the column value, so we just set it to null.) It also doesn’t matter which order the column updates occur since they will all be added to the same row. The only rule we have to follow is that each log record is processed by a single update application.

A side benefit of this approach is that a potentially large set of columns is now distributed over many rows: one per user. If we configure our database schema with an “order preserving partitioner”, our rows will be sorted, and we can perform range queries. For example, we can ask for all Fields rows corresponding to users whose name starts “A” by searching for keys in the range “User/A” (inclusive) and “User/B” (exclusive). Similarly, we can ask for all references to a specific range of “Request” values. (This technique is similar to the one used by the Lucene/Cassandra hybrid called Lucandra.)

There are more complex scenarios in which multiple applications may need to update the same row in the same “transaction”. One example is statistical aggregates, such as maintaining a sum or average of some data value. Even within the confines of the eventually consistent model used by NoSQL databases such as Cassandra, there are ways to accommodate these needs. The key is to use an idempotent update strategy that prevents update conflicts and facilitates application restarts.

2 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. A problem with this approach is that inserts cannot be balanced across replica nodes.

    A better approach is to have a similar structure to LogRecords (Key (timestamp, Columns)
    but instead of a column user and column request to have a column for each user and request as a value in that column.

    So we will have this table:
    keystamp (1265040242124), Bob(1)
    keystamp (1265040242678), Mary (1)
    keystamp (1265043781000), Bob (5)

    columns names are keystamp, Bob, Marry and the values are in brackets.

    In this way we can have a better balance loading of requests and also we can use MapReduce to search through all the values in column Bob.

    Another thing is that Cassandra is a column oriented db so fetching all the values from a column Bob will be very fast.

    In your example it will not be the case because every keystamp column in your second table will just create an empty column. I think that we should use the fact that Cassandra has a column-oriented storage but in your example you are not using the benefits of that feature.

    Important thing is that we need to stop thinking in terms of RDBS where schema is static and we should use the full potential of dynamic schema.

    Srdjan Mitrovic

    ReplyDelete