Sunday, March 31, 2019

ReadProvisionedThroughputExceeded using Kinesis

ReadProvisionedThroughputExceeded

When using Kinesis it is possible you'll get an ReadProvisionedThroughputExceeded when not carefully designing the shards consumption of messages.

That exception indicates "The number of GetRecords calls throttled for the stream over the specified time period".

A cause for this can be you have to many consumer applications at the same time issue GetRecord calls to the same shard.

One recommended solution is to increase the number of shards. But that has a price-tag attached to it. Another solution is process (batches) of records in parallel by spinning off threads within the processRecords() method in KCL. But that will require careful orchestration of threads to keep checkpointing correctly. An interesting analysis of why using KCL and especially version 2 can be found here.

For a more detailed analysis, see the longer blogpost here.

ProvisionedThroughputExceededException using KCL (Kinesis Client Library)

ProvisionedThroughputExceededException

When using the Amazon Kinesis Client Library (KCL) one would not expect the table created by KCL itself to keep track of its streams, shards and checkpoints to ever be under-configured.
But it can be! Then you get the ProvisionedThroughputExceededException

Normally it will recover by retrying itself. But if it can't, other solutions exist:
  1. Change the read/write provisioned for that table to 'on-demand'. 
  2. Make sure your hashkeys are evenly distributed, as mentioned here not-evenly distributed hash-keys.
  3. Modify how many consumers (applications) are reading from the same shard(s) at about the same time, either by changing that design or increase the number of shards.
For a more detailed analysis, see the longer blogpost here

Wednesday, March 20, 2019

Lessons learned using AWS Kinesis to process business events and commands (messages)

Introduction

When using AWS Kinesis as the means of communicating business events, several challenges arise. For the basic key concepts of Kinesis see here

Of course the "normal" use case to use Kinesis is described as here in the section "When should I use Amazon Kinesis Data Streams, and when should I use Amazon SQS?" Another comparison between Kinesis and SQS can be found here.

Especially built for Kafka-like stream-processing of millions of events, where usually the producers (e.g think IoT devices) are much faster than the consumer applications.
Usually business events are not generated millions per minute or second. Events from IoT devices are usually not considered business events.
Any next time I would not recommend using Kinesis as a transport mechanism for business events; the main reason for me is its intended use-case is not matching that type of use. Plus the many technical challenges I had to solve, for most of which any "regular" messaging tool like RabbitMQ would suffice.
Additionally, usually one wants business events to arrive almost instantly - or at least as fast as possible. Kinesis is fast but e.g RabbitMQ is often sufficient as transport for business events.
And yes RabbitMQ is now also available as a managed solution - hosted at AWS but managed not by AWS itself. For example with CloudAMQP.

In the below discussion a message can be an event or a command (though most often you'd want the commands to be synchronous, since they should only be delivered to one service anyway)

High level Kinesis architecture overview

Below is a high level overview of Kinesis' architecture: 


There can be multiple streams, and within each stream are one or more shards. Messages in a shard are guaranteed to be delivered at the consumer in the order they were published onto the shard.
The partition-key used while publishing determines into what shard the message is published. Messages with the same partition key get always published into the same shard.

A design decision one has to make is: do I make one stream were all services publish their messages, or do I want a stream per XYZ? Where XYZ could be for example a Bounded Context.
No good reason was found to split into multiple streams, so I decided to go for one Kinesis stream all services publish on.

Lessons learned

When using Kinesis to handle business events between micro-services several challenges were to overcome. Below the lessons learned are described.

More than five different applications processing the same shard at the same time

Kinesis limits documentation  states that each shard within a stream supports up to five read transactions per second. And if you need more than that, it is recommended to increase the number of shards.
If you don't comply to that limit, you'll see ReadProvisionedThroughputExceeded or ProvisionedThroughputExceededException exceptions appear in your logs. This can easily be reached if you have large documents (e.g 1MB) in your DynamoDB too; even with the AWS Console you then can't even view such a large document in your browser!
Increasing the number of shards will spread the published messages across more shards and thus reduce the number of read transactions per second per shard.
BUT: in a microservices architecture you can easily have 10 or more services. And all these services need to process all messages on all shards.
So with for example 10 services, each of those services (container or lambda) will need to poll each shard regularly in some form. And the more services, the more chance that more than 5 services poll any given shard per second... Causing the above ReadProvisionedThroughputExceeded exception.
Thus: increasing the number of shards won't help, because still each service will need to poll each shard.

One workaround for this could be:
  • have each service (container, lambda) publish to one central stream. Note that multiple streams in the end will also not help you, since you'll reach that 5 reads per second limit soon as the number of services increases to 5 or higher
  • have a stream per service that needs to consume messages
  • have a smart lambda that reads from the central stream and re-publishes each message on each of the per-service-streams using the same partitionkey the publisher used
  • in case of error in that smart lambda: the lambda should store the failed events somewhere, e.g DynamoDB for later investigation. But when such an error occurs, that means what is received by the consumer is not in the same order anymore as the order the publisher published the messages (the consumer misses a message! And might get it in later when the investigation decides to republish it on the central stream)
  • note that this introduces at least one second delay because of the smart lambda being allowed to poll once a second. And then if the consuming service is also a lambda, another second delay is introduced.
The above solution makes sure each service-stream has only one type of application consuming and any read limits can be fixed by increasing the number of shards (because only one service will be reading from it anyway).
Note Enhanced Fanout was not released yet then, which seems to also solve the described issue.

Options tried by changing the configurations: 
  1. KCL: increasing withIdleTimeBetweenReadsInMillis  and withIdleTimeBetweenCallsInMillis: only helps in a limited way plus reduces throughput.
  2. KCL: same for withMaxrecords(). Even if there are no records, all services will still from time to time have to poll the shard...
  3. Increasing AWS provisioned maximum could of course help but what's the final maximum there? It will probably need to be increased for every X new services.

Increasing throughput

When the consumer needs to be really fast in processing messages from a Kinesis stream shard, one can spin off a new thread per record in the batch. But several things have to be taken into account when doing that:
  • checkpointing is still at record (or batch) level. So you still have to in some way checkpoint only if you know all message before that moment are processed fine too. So you'll need some thread-orchestration to determine to what record to checkpoint
  • if you care about the order, then a thread per record causes processing-ordering issues; so in that case you'll have to first group all records together that apply to the same entity, then start a thread to process these as a whole 

No direct Kinesis consumer

A totally different approach is to have your container services not be bothered with Kinesis at all, and put Lambdas in front of them, which invoke the services via regular REST calls. This alleviates the consumers completely from processing messages.
An example framework that provides this is Zalando's Nakadi. Though implemented for Kafka, still a viable option. This architecture is definitely something to consider.
A more advanced solution could be to have the lambdas put the messages in an SQS queue. This provides even more decoupling and if your service is not available, the lambdas can still deliver the messages into the SQS; care has to be taken of how to support multiple consumers reading from the same SQS queue. Of course then you almost must start to wonder why you are using Kinesis when you are putting a regular queue behind it...

Kinesis Client Libray (KCL) challenges

For the Java consumers the recommended option by AWS is to use the Kinesis Client Library, KCL for short. A short high level introduction can be found here.
This library takes away a lot of standard challenges in a distributed solution. See below's KPL link to implementing efficient producers for an explanation.
But still quite a few challenges exist. E.g how does it handle multiple instances running a Worker? Should you checkpoint per batch or per record?
Are batches provided to the worker one at at time, sequentially or in parallel? How to handle failed messages?
Should workers have a unique ID?

Here's a summary of lessons learned:
  • a shard will be consumed by a record processor thread (not necessarily always the same thread!) of one single worker only at any given moment. And thus batches will never be handed over to processRecords() in parallel; first batch 1, then batch 2 etc.   consumeShard() is protected via synchronized.
  • starting multiple workers on same host won't improve throughput; one on each host seems most effective, otherwise Workers start maybe also stealing leases too many times/too much
  • give each worker a unique ID per host so you can see which host has what lease
  • seems best to pass in your own Executor to the KCL when using Spring, so Spring knows about that threadpool. 
Note that checkpointing too much (or querying your own DynamoDB database while processing each message) can easily cause a ProvisionedThroughputExceededException.
Therefore you also don't want to do that too often. In the end this is the high level algorithm (pseudo code) that one can use to cover all the above concerns:

In processRecords(records) {
   lastSuccessfulRecord = null;
   for (each record in records) {
      try {
         parse record into domain object;
         process(domain object);
         lastSuccessfulRecord = record;
      } catch (ex) {
         checkpointUpToIncluding(lastSuccessfulRecord);
      }
   checkpointBatch(); // All records in the batch successfully processed
}
Another question for next implementation will be: should the service really be doing smart checkpointing (i.e only checkpoint when record is processed successfully; if not successful, keep processing but at next restart of service start from the last checkpoint)? Because the "normal" messaging implementation of putting a message on a Dead Letter Queue might be a better solution. Though in that solution you'll have to think about edge scenarios, like when a message A to modify entity E is put on the DLQ, but a following message B related to entity E is processed successfully; should message A be replayed (published) on the main stream again? Or should as soon as one message for E is put onto the DLQ, all messages after that for E also be put on the DLQ?

Kinesis Producer Library (KPL) challenges

For the Java producers the recommended option by AWS is to use the Kinesis Producer Library, KPL for short. For key concepts see here.
See here for a thorough explanation of its advantages with example code.

The only challenge there was: how large should the batch size be?  It seems to depend mostly on how well your consumers can handle batches.
In the end I set it to values between 10 and 100.
Note that the standard AWS Kinesis SDK does not implement handling batches. That means if you use the KPL with batchsize 10, then when these batches of records are received by e.g Javascript lambdas using the standards SDK, they by default can't process them correctly because batches are not supported; unless you write code to unwrap the batch.
As a temporary workaround there the batchsize could be set to size 1 of course, which then makes sure each consumer gets to process one message at a time and not in a batch.

Hope these lessons learned help somebody while implementing Kinesis.