Saturday, August 16, 2014

Embedded Kafka and Zookeeper for unit testing

Recently I wanted to setup embedded Kafka cluster for my unit tests, and suprisingly it wasn't that trivial because most of examples I found around were made for some older versions of Kafka/Zookeeper or they didn't work for some other reasons, so it took me some time to find some proper version.

The project I took it from is Camus which is Kafka->Hadoop ETL project, and I just made some slight changes related to newer Zookeeper, as well as some changes configuration-wise.

My embedded Kafka & Zookeeper servers are available at this gist. All the code is tested against Kafka 0.8.1.1 and Zookeeper 3.4.6.

Here is simple example on how to setup Zookeeper at fixed port (2181), and 2 Kafka servers at random available ports:

     EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(2181);  
     List<Integer> kafkaPorts = new ArrayList<Integer>();  
     // -1 for any available port  
     kafkaPorts.add(-1);  
     kafkaPorts.add(-1);  
     EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(), new Properties(), kafkaPorts);  
     embeddedZookeeper.startup();  
     System.out.println("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());  
     embeddedKafkaCluster.startup();  
     System.out.println("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());  
     Thread.sleep(10000);  
     embeddedKafkaCluster.shutdown();  
     embeddedZookeeper.shutdown();  


13 comments:

  1. I'm using your example to test reading from Kafka into a Storm topology. How would you setup a topic and populate it with some messages?

    Thanks

    ReplyDelete
  2. Well, you just use standard client APIs to produce a messages (producer API), same as you would do if there was remote broker in play here. Same thing for creating a topic.
    Although creating a topic is something mostly done via CLI command, but here you need to use programmatic way, thus look at Kafka's AdminClient class.

    Something like:
    ZkClient zkClient = new ZkClient(embeddedZookeeper.getConnection());
    AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());

    ReplyDelete
  3. Any advice on how you coordinate producing messages and then consuming them as part of a test? Particularly where the test logic isn't in the consumer but the test calls the producer and consumer to generate and fetch the data to validate. It is simplest to produce and consume concurrently, but that could be a bit tricky to coordinate. Not sure if one can coordinate it to producing test messages first then consuming after the production serially/sequentially to simplify testing. Your thoughts?

    ReplyDelete
  4. If I understood your needs, you need some kind of "promise" mechanism for fetching the result of async process (in this case consumer callback) in the original thread that produced the message. There are other ways, but if you're using java 8, the most obvious way is CompletableFuture. You just call future.compete(message) from your consumer code, and have producer waiting for it in original thread via future.get();

    ReplyDelete
  5. Hi,

    I just tried the code under 0.8.2 kafka version. I have an issue when I try to push something in the only one Kafka broker.

    [KafkaApi-1] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 321764; ClientId: producer-1; Topics: offers
    kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0

    From my understanding, it seems that no Kafka server is started ?

    ReplyDelete
  6. I solved it by using

    ZkClient zkClient = new ZkClient(embeddedZookeeper.getConnection(), 10000, 10000, ZKStringSerializer$.MODULE$);
    AdminUtils.createTopic(zkClient, topic, 10, 1, new Properties());

    ReplyDelete
  7. your information is really awesome as well as it is very excellent and i got more interesting information from your blog.
    PHP Training in Chennai

    ReplyDelete
  8. I wondered upon your blog and wanted to say that I have really enjoyed reading your blog posts. Any way I’ll be subscribing to your feed and I hope you post again soon.

    Android App Development Company

    ReplyDelete
  9. Pretty article! I found some useful information in your blog, it was awesome to read, thanks for sharing this great content to my vision, keep sharing..
    iOS App Development Company
    iOS App Development Company

    ReplyDelete
  10. This article is very much helpful and i hope this will be an useful information for the needed one. Keep on updating these kinds of informative things...
    Fitness SMS
    Fitness Text
    Salon SMS
    Salon Text
    Investor Relation SMS
    Investor Relation Text
    Mobile Marketing Services
    mobile marketing companies
    Sms API

    ReplyDelete
  11. great and nice blog thanks sharing..I just want to say that all the information you have given here is awesome...Thank you very much for this one.
    web design Company
    web development Company
    web design Company in chennai
    web development Company in chennai
    web design Company in India
    web development Company in India

    ReplyDelete
  12. This is excellent information. It is amazing and wonderful to visit your site.Thanks for sharing this information,this is useful to me...
    Mobile Marketing Service
    Mobile Marketing Companies
    Sms API
    Texting API
    sms marketing

    ReplyDelete