Sunday, March 31, 2019

ReadProvisionedThroughputExceeded using Kinesis

ReadProvisionedThroughputExceeded

When using Kinesis it is possible you'll get an ReadProvisionedThroughputExceeded when not carefully designing the shards consumption of messages.

That exception indicates "The number of GetRecords calls throttled for the stream over the specified time period".

A cause for this can be you have to many consumer applications at the same time issue GetRecord calls to the same shard.

One recommended solution is to increase the number of shards. But that has a price-tag attached to it. Another solution is process (batches) of records in parallel by spinning off threads within the processRecords() method in KCL. But that will require careful orchestration of threads to keep checkpointing correctly. An interesting analysis of why using KCL and especially version 2 can be found here.

For a more detailed analysis, see the longer blogpost here.

ProvisionedThroughputExceededException using KCL (Kinesis Client Library)

ProvisionedThroughputExceededException

When using the Amazon Kinesis Client Library (KCL) one would not expect the table created by KCL itself to keep track of its streams, shards and checkpoints to ever be under-configured.
But it can be! Then you get the ProvisionedThroughputExceededException

Normally it will recover by retrying itself. But if it can't, other solutions exist:
  1. Change the read/write provisioned for that table to 'on-demand'. 
  2. Make sure your hashkeys are evenly distributed, as mentioned here not-evenly distributed hash-keys.
  3. Modify how many consumers (applications) are reading from the same shard(s) at about the same time, either by changing that design or increase the number of shards.
For a more detailed analysis, see the longer blogpost here

Wednesday, March 20, 2019

Lessons learned using AWS Kinesis to process business events and commands (messages)

Introduction

When using AWS Kinesis as the means of communicating business events, several challenges arise. For the basic key concepts of Kinesis see here

Of course the "normal" use case to use Kinesis is described as here in the section "When should I use Amazon Kinesis Data Streams, and when should I use Amazon SQS?" Another comparison between Kinesis and SQS can be found here.

Especially built for Kafka-like stream-processing of millions of events, where usually the producers (e.g think IoT devices) are much faster than the consumer applications.
Usually business events are not generated millions per minute or second. Events from IoT devices are usually not considered business events.
Any next time I would not recommend using Kinesis as a transport mechanism for business events; the main reason for me is its intended use-case is not matching that type of use. Plus the many technical challenges I had to solve, for most of which any "regular" messaging tool like RabbitMQ would suffice.
Additionally, usually one wants business events to arrive almost instantly - or at least as fast as possible. Kinesis is fast but e.g RabbitMQ is often sufficient as transport for business events.
And yes RabbitMQ is now also available as a managed solution - hosted at AWS but managed not by AWS itself. For example with CloudAMQP.

In the below discussion a message can be an event or a command (though most often you'd want the commands to be synchronous, since they should only be delivered to one service anyway)

High level Kinesis architecture overview

Below is a high level overview of Kinesis' architecture: 


There can be multiple streams, and within each stream are one or more shards. Messages in a shard are guaranteed to be delivered at the consumer in the order they were published onto the shard.
The partition-key used while publishing determines into what shard the message is published. Messages with the same partition key get always published into the same shard.

A design decision one has to make is: do I make one stream were all services publish their messages, or do I want a stream per XYZ? Where XYZ could be for example a Bounded Context.
No good reason was found to split into multiple streams, so I decided to go for one Kinesis stream all services publish on.

Lessons learned

When using Kinesis to handle business events between micro-services several challenges were to overcome. Below the lessons learned are described.

More than five different applications processing the same shard at the same time

Kinesis limits documentation  states that each shard within a stream supports up to five read transactions per second. And if you need more than that, it is recommended to increase the number of shards.
If you don't comply to that limit, you'll see ReadProvisionedThroughputExceeded or ProvisionedThroughputExceededException exceptions appear in your logs. This can easily be reached if you have large documents (e.g 1MB) in your DynamoDB too; even with the AWS Console you then can't even view such a large document in your browser!
Increasing the number of shards will spread the published messages across more shards and thus reduce the number of read transactions per second per shard.
BUT: in a microservices architecture you can easily have 10 or more services. And all these services need to process all messages on all shards.
So with for example 10 services, each of those services (container or lambda) will need to poll each shard regularly in some form. And the more services, the more chance that more than 5 services poll any given shard per second... Causing the above ReadProvisionedThroughputExceeded exception.
Thus: increasing the number of shards won't help, because still each service will need to poll each shard.

One workaround for this could be:
  • have each service (container, lambda) publish to one central stream. Note that multiple streams in the end will also not help you, since you'll reach that 5 reads per second limit soon as the number of services increases to 5 or higher
  • have a stream per service that needs to consume messages
  • have a smart lambda that reads from the central stream and re-publishes each message on each of the per-service-streams using the same partitionkey the publisher used
  • in case of error in that smart lambda: the lambda should store the failed events somewhere, e.g DynamoDB for later investigation. But when such an error occurs, that means what is received by the consumer is not in the same order anymore as the order the publisher published the messages (the consumer misses a message! And might get it in later when the investigation decides to republish it on the central stream)
  • note that this introduces at least one second delay because of the smart lambda being allowed to poll once a second. And then if the consuming service is also a lambda, another second delay is introduced.
The above solution makes sure each service-stream has only one type of application consuming and any read limits can be fixed by increasing the number of shards (because only one service will be reading from it anyway).
Note Enhanced Fanout was not released yet then, which seems to also solve the described issue.

Options tried by changing the configurations: 
  1. KCL: increasing withIdleTimeBetweenReadsInMillis  and withIdleTimeBetweenCallsInMillis: only helps in a limited way plus reduces throughput.
  2. KCL: same for withMaxrecords(). Even if there are no records, all services will still from time to time have to poll the shard...
  3. Increasing AWS provisioned maximum could of course help but what's the final maximum there? It will probably need to be increased for every X new services.

Increasing throughput

When the consumer needs to be really fast in processing messages from a Kinesis stream shard, one can spin off a new thread per record in the batch. But several things have to be taken into account when doing that:
  • checkpointing is still at record (or batch) level. So you still have to in some way checkpoint only if you know all message before that moment are processed fine too. So you'll need some thread-orchestration to determine to what record to checkpoint
  • if you care about the order, then a thread per record causes processing-ordering issues; so in that case you'll have to first group all records together that apply to the same entity, then start a thread to process these as a whole 

No direct Kinesis consumer

A totally different approach is to have your container services not be bothered with Kinesis at all, and put Lambdas in front of them, which invoke the services via regular REST calls. This alleviates the consumers completely from processing messages.
An example framework that provides this is Zalando's Nakadi. Though implemented for Kafka, still a viable option. This architecture is definitely something to consider.
A more advanced solution could be to have the lambdas put the messages in an SQS queue. This provides even more decoupling and if your service is not available, the lambdas can still deliver the messages into the SQS; care has to be taken of how to support multiple consumers reading from the same SQS queue. Of course then you almost must start to wonder why you are using Kinesis when you are putting a regular queue behind it...

Kinesis Client Libray (KCL) challenges

For the Java consumers the recommended option by AWS is to use the Kinesis Client Library, KCL for short. A short high level introduction can be found here.
This library takes away a lot of standard challenges in a distributed solution. See below's KPL link to implementing efficient producers for an explanation.
But still quite a few challenges exist. E.g how does it handle multiple instances running a Worker? Should you checkpoint per batch or per record?
Are batches provided to the worker one at at time, sequentially or in parallel? How to handle failed messages?
Should workers have a unique ID?

Here's a summary of lessons learned:
  • a shard will be consumed by a record processor thread (not necessarily always the same thread!) of one single worker only at any given moment. And thus batches will never be handed over to processRecords() in parallel; first batch 1, then batch 2 etc.   consumeShard() is protected via synchronized.
  • starting multiple workers on same host won't improve throughput; one on each host seems most effective, otherwise Workers start maybe also stealing leases too many times/too much
  • give each worker a unique ID per host so you can see which host has what lease
  • seems best to pass in your own Executor to the KCL when using Spring, so Spring knows about that threadpool. 
Note that checkpointing too much (or querying your own DynamoDB database while processing each message) can easily cause a ProvisionedThroughputExceededException.
Therefore you also don't want to do that too often. In the end this is the high level algorithm (pseudo code) that one can use to cover all the above concerns:

In processRecords(records) {
   lastSuccessfulRecord = null;
   for (each record in records) {
      try {
         parse record into domain object;
         process(domain object);
         lastSuccessfulRecord = record;
      } catch (ex) {
         checkpointUpToIncluding(lastSuccessfulRecord);
      }
   checkpointBatch(); // All records in the batch successfully processed
}
Another question for next implementation will be: should the service really be doing smart checkpointing (i.e only checkpoint when record is processed successfully; if not successful, keep processing but at next restart of service start from the last checkpoint)? Because the "normal" messaging implementation of putting a message on a Dead Letter Queue might be a better solution. Though in that solution you'll have to think about edge scenarios, like when a message A to modify entity E is put on the DLQ, but a following message B related to entity E is processed successfully; should message A be replayed (published) on the main stream again? Or should as soon as one message for E is put onto the DLQ, all messages after that for E also be put on the DLQ?

Kinesis Producer Library (KPL) challenges

For the Java producers the recommended option by AWS is to use the Kinesis Producer Library, KPL for short. For key concepts see here.
See here for a thorough explanation of its advantages with example code.

The only challenge there was: how large should the batch size be?  It seems to depend mostly on how well your consumers can handle batches.
In the end I set it to values between 10 and 100.
Note that the standard AWS Kinesis SDK does not implement handling batches. That means if you use the KPL with batchsize 10, then when these batches of records are received by e.g Javascript lambdas using the standards SDK, they by default can't process them correctly because batches are not supported; unless you write code to unwrap the batch.
As a temporary workaround there the batchsize could be set to size 1 of course, which then makes sure each consumer gets to process one message at a time and not in a batch.

Hope these lessons learned help somebody while implementing Kinesis.



Wednesday, April 4, 2018

gitlab-runner cache key bug: cache not being created anymore in gitlab steps

Introduction

Besides auto-updating also to the most recent version of the Docker image of maven:3-jdk-8, the Gitlab runners were also always updated to the most recent version.

Again not best-practice and again I learned the hard way: builds suddenly starting to fail.


The issue and workarounds/solutions

 Because suddenly indeed the docker image build of the application started to fail with this error:

Step 9/12 : ADD service1/target/*.jar app.jar
ADD failed: no source files were specified
ERROR: Job failed: exit code 1

It turned out the service1/target directory was empty (or missing, I forgot).
Investigating some more showed that the cache produced in previous steps in Gitlab was not there anymore. The version that this started appearing was:

Running with gitlab-runner 10.1.0 (c1ecf97f)
on gitlab-runner-hosted (70e74c0e)
From older successful builds I saw that at the end of the previous step, when the cache is created, you see something like this:   

Creating cache developbranch:1...
WARNING: target/: no matching files
service1/target/: found 87 matching files
service2/target/: found 33 matching files
untracked: found 200 files
Created cache
Job succeeded

But those loglines were now suddenly missing! And I noticed that the gitlab-runner version changed between the last successful build and this failing one.
So I had an area to focus on: the caching didn't work anymore.

The cache definition in the gitlab-ci.yml was:

cache:  key: "$CI_COMMIT_REF_NAME"
  paths:    - target/
    - service1/target/
    - service2/target/
  untracked: true

I suspected maybe the environment variable of the key: field being empty or something.
But when I added logging in other script steps, the $CI_COMMIT_REF_NAME variable was filled with the value 'developbranch'.  So it is not empty.
Then I had an epiphany and prefixed the above environment variable with a string, making the cache key: definition look like this:

cache:  key: prefix-"$CI_COMMIT_REF_NAME"
  paths:    - target/
    - service1/target/
    - service2/target/
  untracked: true

In the above you can see I prefixed the key with the hardcoded string "prefix-".  And indeed that did it, the creating of the cache worked again and looked like this:

Creating cache prefix-developbranch:1...
WARNING: target/: no matching files
service1/target/: found 87 matching files
service2/target/: found 33 matching files
untracked: found 200 files
Created cache
Job succeeded


So also here I learned (as I really already knew): don't auto-update but do that in a controlled way, so you know when to expected potentially failing builds.









Friday, March 30, 2018

Gitlab maven:3-jdk-8 Cannot get the revision information from the scm repository, cannot run program "git" in directory error=2, No such file or directory

Introduction

In a Gitlab project setup the most recent Docker image for maven was always retrieved from the internet before each run by having specified image: maven:3-jdk-8 in the gitlab-ci.yml. The image details can be found here.

Of course this is not a best-practice; your build can suddenly start failing at a certain point because an update to the image might have something changed internally causing things to fail.
What you want is controlled updates. That way you can anticipate on builds failing and plan the upgrades in your schedule.

The issue and workarounds/solutions

And indeed suddenly on March 29 2018 our builds started failing with this error:

[ERROR] Failed to execute goal org.codehaus.mojo:buildnumber-maven-plugin:1.4:create (useLastCommittedRevision) on project abc: Cannot get the revision information from the scm repository :
[ERROR] Exception while executing SCM command.: Error while executing command. Error while executing process. Cannot run program "git" (in directory "/builds/xyz"): error=2, No such file or directory

That message is quite unclear: is git missing? Or is the directory wrong? Or could the maven buildnumber plugin not find the SCM repository?
After lots of investigation it turned out the maven:3-jdk-8 image indeed had changed about 18 hours before.
And after running the maven command in a local version of that Docker image indeed the same error occured!  Awesome, the error was reproducable.
And after installing git again in the image with:

- apt-get update
- apt-get install git -y


the error disappeared!  But a new one appeared:

[ERROR] The forked VM terminated without properly saying goodbye. VM crash or System.exit called?
This also hadn't happened before. After some searching it turned out it might be the surefire and failsafe plugins being outdated.
So I updated them to 2.21.0 and indeed the build succeeded.

Here's the issue reported in the Docker Maven github. UPDATE: it is caused by an openjdk issue (on which the maven:3-jdk-8 is based upon.

This issue made us realize we really need an internal Docker repository. And so we implemented that :)

One disadvantage about Docker images is that you can't specify a commit hash to use. Yes you can specify a digest instead of a tag, but that is a unique UUID hashcode only. You can't see from that hashcode anymore the (related) tagname.







Saturday, February 3, 2018

How to prevent Chromium from rebooting a Raspberry Pi 3 model B Rev 1.2




I tried to use a Raspberry Pi 3 model B Rev 1.2 as a dashboard for monitoring a couple of systems using Chromium as browser.

Tip: use  this to have it never turn off the display:
sudo xset s off
sudo xset -dpms
sudo xset s noblank

I had only two tabs open all the time and was using the Revolver browser extension to rotate the tabs. One tab had the default Datadog page open, another a custom dashboard within Kibana that refreshed every 15 minutes.
Using all default settings, within a few hours the Pi would reboot out of its own! So something got it to do that.

It seemed the browser (tabs) or the Javascript in them were just leaking so much memory that the Pi ran out of memory.  I tried multiple times with the same default setup, but the behavior was the same each time.

So I tried a couple of other things:
  1. Have the Revolver plugin fully reload the page. Still a reboot of the Pi, though it took a bit longer

  2. Added --process-per-site to the startup shortcut of Chromium. This causes Chrome to create less processes and that should reduce the memory usage a bit. But still a reboot of the Pi; though again it took a bit longer.
    Note that this also comes with its own weaknesses.

  3. Added --disable-gpu-program-cache to the startup shortcut of Chromium. Again still rebooted the Pi after a while.

  4. Tried other browsers like Midori and Firefox Iceweasel.  Midori does not have a Revolver-like plugin, so it didn't fit the requirements. Firefox's only add-on that should work gave some kind of "invalid format" error (don't remember exactly) when trying to install it. The other add-ons for Firefox were not compatible with Iceweasel.

So in the end I did not find a solution :(  I just built a cron-job that would restart the browser every 5 hours.
If you found a way to fix this problem, let the world know in the comments!


Friday, December 29, 2017

Upgrading Dell M4700 from 500G HDD to 1T Samsung Evo 850 SSD v-nand

Below is a list of things encountered when upgrading a Dell Precision M4700 laptop with Windows 7 with a 500G HDD to a 1 terabyte Samsung Evo v-nand 850 SSD.

  • Used HDClone 7 Free Edition to make a copy of the current harddisk. Copied onto a Samsung T5 external SSD.  HDClone very nicely copies over everything, even from a live (running) Windows 7 machine. It creates all partitions also on the external SSD; these are visible as separate drives when reconnecting the USB drive. It can be even made bootable, but I didn't need that.

  • Swapped in the SSD as shown here: https://www.youtube.com/watch?v=D6Cn3bONxEo
    Note how the SDD has to "click" with the notches of the bracket (metal frame).

  • Put in the Dell Windows 7 SP1 DVD. It installed. For quite a while it shows "Windows is loading files...", but in the end it got through, after about 5-10 minutes.

  • After logging in, the wireless device was not detected. So no internet. Also other drivers were not installed yet or failed:

  • So: "Ethernet controller", "Network controller", "PCI Simple Communications Controller", "SM Bus Controller", "Universal Serial Bus (USB) Controller", "Unknown device"

  • Then tried to use the Dell 'Resource Media' DVD to install the drivers. But the usage of the program that starts then is just plain impossible to understand. E.g see the screenshot below:


    The touchpad is marked as installed (I think that the checkbox indicates that). When I then installed any other driver, no checkbox appeared at all on the left of any of them.
    Plus, what is the order to install the drivers? Found this post but seems a lot of manual work. Plus you have to know which devices you have in your machine to know which driver matches. Eg this post shows what driver to look for for one specific error. That list you should be able to find on the bill when you ordered your M4700.

    In the end managed to get the ethernet driver installed (filter on the word 'ethernet' on the Dell drivers page, you should find "Intel I2xx/825xx Gigabit Ethernet Network Controller Drivers")

  • Then managed to get the internet connection working via a wired cable after installing the above ethernet driver. Then used the Dell's Analyze Detect Drivers option  to update the correct drivers in the right order.
    All but the 'Unknown device' errors where gone in the Device Manager. Didn't dare to update the BIOS since that was working fine before.

  • After that about 180 windows updates to install and then all worked fine. Score of machine is now 7.3 on a scale of maximum 7.9 (no idea what it was before I upgraded):



    But I do notice the difference in startup for example: complete Windows 7 startup from powered off state is about 10-15 seconds. Not bad :)

  • And then the tedious job of installing all non-OS software began...
Lessons learned for next time:
  • HDClone is very handy
  • Export all browsers's favorites before taking out the old disk. You can find the favorites back, but not in an easy importable format.
  • Impossible to understand the Dell Resource Media DVD.  
  • Keep the service tag of your M4700 ready.

Thursday, December 28, 2017

Logback DBAppender sometimes gives error on AWS Aurora: IllegalStateException: DBAppender cannot function if the JDBC driver does not support getGeneratedKeys method *and* without a specific SQL dialect

LOGBack DBAppender IllegalStateException


Sometimes when starting a Spring Boot application with Logback DBAppender configured for PostgreSQL or AWS Aurora in logback-spring.xml, it gives this error:

java.lang.IllegalStateException: Logback configuration error detected: ERROR in ch.qos.logback.core.joran.spi.Interpreter@22:16 - RuntimeException in Action for tag [appender] java.lang.IllegalStateException: DBAppender cannot function if the JDBC driver does not support getGeneratedKeys method *and* without a specific SQL dialect

The error can be quite confusing. From the documentation it says that Logback should be able to detect the dialect from the driver class.

But apparently it doesn't. Sometimes. After investigating, it turns out that this error is also given when the driver can't connect correctly to the database. Because it will then not be able to find the metadata either, which it uses to detect the dialect. And thus you get this error too in that case!
A confusing error message indeed.

A suggestion in some post was to specify the <sqlDialect> tag, but that is not needed anymore in recent Logback versions. Indeed, it now gives these errors when putting it in logback-spring.xml file either below <password> or below <connectionSource>:

ERROR in ch.qos.logback.core.joran.spi.Interpreter@25:87 - no applicable action for [sqlDialect], current ElementPath  is [[configuration][appender][connectionSource][dataSource][sqlDialect]]
or
ERROR in ch.qos.logback.core.joran.spi.Interpreter@27:79 - no applicable action for [sqlDialect], current ElementPath  is [[configuration][appender][sqlDialect]]
To get a better error message it's better to implement the setup of the LogBack DBAppender in code, instead of in the logback-spring.xml. See for examples here and here.




Thursday, November 2, 2017

What's not so good about my new Dell XPS 15 laptop (+ a bunch of good things)

Recenty I got a new laptop, again a Dell. I decided to go for a thin "ultrabook", the XPS 15, 16G RAM, 512G SSD. A good review you can find here. I didn't take the 4K version on purpose, since some reviews say it is struggling a bit with that. Plus some software just can't handle it, like Remote Desktop, so  you have to scale down anyway.



Here is an overview of the pros and cons I found while using it.

Pros

  • Sleek design

  • Thin

  • A lot lighter than the M4700

  • Smaller power-supply

  • Fast; no problem with 3-4 IntelliJ workspaces open, over 50 Chrome tabs, DBeaver, Firefox with about 10 tabs

Cons

  • Some backlight bleeding in the bottom right corner of the screen. Most notable when showing a black screen. Not really noticeable during daylight. Here's an example of a really bad case.

  • Crappy keyboard; the page up-down, home, end keys can only be used by pressing the Fn key. If you are a coder, it's really annoying, since you use those keys a lot. Hope next time they make separate keys again.

  • Corners get scratched easily if you put it in your bag without a protecting sleeve

  • Sometimes a flicker (screen turns completely black) on the externally connected screen. Not sure yet if it's the cable.

  • Screen can't fold back flat fully.

  • For some reason the default resolution is set to 125% right after using it for the first time.

  • The connection for the HDMI cable is on the side! And sometimes sits a bit in the way when using the mouse.

  • The professional version of the XPS 15 named Precision 5520 should give you better quality components. But the warranty period has been reduced from 3 years to 1 year. Does it still make it worth to buy the over 500 euros more expensive Precision 5520? Apparently they don't dare to give a longer warrant anymore for the better quality components...


Just got a sleeve for the XPS-15, a CushCase. Ordered via Amazon. Took about 3 weeks to arrive. Fits in a regular mailbox. Fits the XPS-15 nicely; no need to really push.

Wednesday, August 16, 2017

Lessons learned - Jackson, API design, Kafka

Introduction

This blogpost describes a bunch of lessons learned during a recent project I worked on.
They are just a bunch grouped together, too small to "deserve" their own separate post :)
Items discussed are: Jackson, JSON, API design, Kafka, Git.


Lessons learned

  • Pretty print (nicely format) JSON in a Linux shell prompt:

    cat file.json | jq

    You might have to 'apt-get install jq' first.

  • OpenOffice/LibreOffice formulas:

    To increase date and year by one from cell H1031=DATE(YEAR(H1031)+1; MONTH(H1031)+1; DAY(H1031))

    Count how many times a range of cells A2:A4501 has the value 1: =COUNTIF(A2:A4501; 1)

  • For monitoring a system a split between a health-status page and performance details is handy. The first one is to show issues/metrics that would require immediate action. Performance is for informational purposes, and usually does not require immediate action.

  • API design: Even if you have a simple method that just returns a date (string) for example, always return JSON (and not just a String of that value). Usueful for backwards compatibility: more fields can be easily added later.

  • When upgrading gitlab, it (gitlab) had changed a repository named 'users' to 'users0'. Turns out 'users' is a reserved repository name in gitlab since version 8.15.

    To change your local git settings to the new users0 perform these steps to update your remote origin:

    # check current setting
    $ git remote -vorigin  https://gitlab.local/gitlab/backend/users (fetch)
    origin  https://gitlab.local/gitlab/backend/users (push)

    # change it to the new one
    $ git remote set-url origin https://gitlab.local/gitlab/backend/users0

    # see it got changed
    $ git remote -v
    origin  https://gitlab.local/gitlab/backend/users0 (fetch)
    origin  https://gitlab.local/gitlab/backend/users0 (push)

  • Jackson JSON generating (serializing): probably a good practice is to not use @JsonInclude(JsonInclude.Include.NON_EMPTY)  or NON_NULLm since that would mean a key will be just not in the JSON when its value is empty or null. That could be confusing to the caller: sometimes it's there sometimes not.  So just leave it in, so it will be set to null.   Unless it would be a totally unrelated field like: amount and currency. If amount is null, currency (maybe) doesn't make sense, so then it could be left out.

  • Java:

    Comparator userComparator = (o1, o2)->o1.getCreated().compareTo(o2.getCreated());

    can be replaced now in Java 8 by:

    Comparator userComparator = Comparator.comparing(UserByPhoneNumber::getCreated);

  • Kafka partitioning tips: http://blog.rocana.com/kafkas-defaultpartitioner-and-byte-arrays

  • Kafka vs RabbitMQ:

    - Kafka is optimized for producers producing lots of data (batch-oriented producers) and consumers that are usually slower that the producers.
    - Performance: Rabbit: makes about 20K/s  Kafka: up to 150K/s.
    - Unlike other message system, Kafka brokers are stateless. This means that the consumer has to maintain how much it has consumed.