Using kafka with go
In this article I want to give a little introduction how to use kafka with go. This article shows not how kafka is working. It is a high level intro for beginner who want to get a idea of how to use it.
Setting up Kafka with Docker
For this example the docker image from confluent is used. How to run it is described here https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html. The only thing that is needed to run the example is the broker.
$ docker-compose up broker -d
If everything works as expected docker ps
should show something like this.
... confluentinc/cp-server:5.4.1 “/etc/confluent/dock…” ... 0.0.0.0:9092->9092/tcp broker... confluentinc/cp-zookeeper:5.4.1 “/etc/confluent/dock…” ... 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
Create a Topic
Now we have our infrastucture part done. Now we need a library to talking to kafka. After some some research and checking the recommended site from apache kafka. I decided to take sarama from shopify (https://shopify.github.io/sarama/).
To manipulate our kafka instance we need to create a ClusterAdmin. This is the administrative client for Kafka. In this case we only use it to manage the topics. At the end we have to call Close()
the to avoid leaks.
brokerAddrs := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer admin.Close()
Now we want to add a topic to our kafka instance.
admin.CreateTopic(“test”, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
We do this by calling CreateTopic(...)
with the created ClusterAdmin. This topic will be created in one partition and be called “test”. The replication factor is set to 1 because we don’t need only 1 partition copy for this project.
After the topic is created i want to list all topic in the kafka instance. This can be done with admin.ListTopics
and et voila our log should shown something like this:
Topcis: _confluent-license, _confluent-metrics, __confluent.support.metrics, test
Because we are using the confluent image there some meta topics already created by confluent.
Produce messages
Now we are getting to the interesting part. If we want to produce messages we have to create a producer. For this we need a config. As a partitioner I choose a random partitioner. This will chooses a random partition each time we sending a message. Sarama offers other partitioners as well for example a HashPartitioner, a RoundRobinPartitioner etc. but for this example the random one does the job. For required acknowledgement we choose the default one. After that we create a instance of a producer.
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForLocal config.Producer.Return.Successes = true
producer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
In the next step a message has to be created. For this the ProducerMessage{}
struct is used. With the following code the message is produced on topic “test”. As value a Encoder implementation have to been used. Sarama brings it’s own implementation which is used in the example code.
msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("blub"), }
To publish the message the method SendMessage(...)
have to been called. After the message is successfully sent to the broker the method returns some interesting values like partition and offset that can be used for logging or other things.
partition, offset, _ := producer.SendMessage(msg)
log.Printf(“Sent to partion %v and the offset is %v”, partition, offset)
Consuming messages
After some messages are produced. We want to consume them. To do this it is necessary to know all partitions. The function Partitions(...)
returns a sorted list of all partitions that belong to the topic.
consumer, _ := sarama.NewConsumer([]string{"localhost:9092"}, nil)
partitionList, _ := consumer.Partitions(“test”)
The messages are consumed via a go routine. A go routine is created for every partition. In this use-case the offset is set to the oldest message because all message should be processed.
for _, partition := range partitionList {
pc, _ := consumer.ConsumePartition("test", partition, sarama.OffsetOldest)
go func(pc sarama.PartitionConsumer) {
for message := range pc.Messages() {
log.Printf(“received message %v\n”, string(message.Value))
}
}(pc)
}