Skip to main content

Apache Kafka - Producer ExampleAnd Group Example

Simple Producer Example
Let us create an application for publishing and consuming messages using a Java client. Kafka producer client consists of the following API’s.

KafkaProducer API
Let us understand the most important set of Kafka producer API in this section. The central part of the KafkaProducer API is KafkaProducer class. The KafkaProducer class provides an option to connect a Kafka broker in its constructor with the following methods.

KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord − The producer manages a buffer of records waiting to be sent.

Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).

KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −

public void flush()
KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −

public Map metrics()
It returns the map of internal metrics maintained by the producer.

public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.

Producer API
The central part of the Producer API is Producer class. Producer class provides an option to connect Kafka broker in its constructor by the following methods.

The Producer Class
The producer class provides send method to send messages to either single or multiple topics using the following signatures.

public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop);
There are two types of producers – Sync and Async.

The same API configuration applies to Sync producer as well. The difference between them is a sync producer sends messages directly, but sends messages in background. Async producer is preferred when you want a higher throughput. In the previous releases like 0.8, an async producer does not have a callback for send() to register error handlers. This is available only in the current release of 0.9.

public void close()

Producer class provides close method to close the producer pool connections to all Kafka bro-kers.

Configuration Settings
The Producer API’s main configuration settings are listed in the following table for better under-standing −

S.NoConfiguration Settings and Description1
client.id
identifies producer application
2
producer.type
either sync or async
3
acks
The acks config controls the criteria under producer requests are con-sidered complete.
4
retries
If producer request fails, then automatically retry with specific value.
5
bootstrap.servers
bootstrapping list of brokers.
6
linger.ms
if you want to reduce the number of requests you can set linger.ms to something greater than some value.
7
key.serializer
Key for the serializer interface.
8
value.serializer
value for the serializer interface.
9
batch.size
Buffer size.
10
buffer.memory
controls the total amount of memory available to the producer for buff-ering.
ProducerRecord API

ProducerRecord is a key/value pair that is sent to Kafka cluster.ProducerRecord class constructor for creating a record with partition, key and value pairs using the following signature.

public ProducerRecord (string topic, int partition, k key, v value)
Topic − user defined topic name that will appended to record.

Partition − partition count

Key − The key that will be included in the record.

Value − Record contents

public ProducerRecord (string topic, k key, v value)
ProducerRecord class constructor is used to create a record with key, value pairs and without partition.

Topic − Create a topic to assign record.

Key − key for the record.

Value − record contents.

public ProducerRecord (string topic, v value)
ProducerRecord class creates a record without partition and key.

Topic − create a topic.

Value − record contents.

The ProducerRecord class methods are listed in the following table −

S.NoClass Methods and Description1
public string topic()
Topic will append to the record.
2
public K key()
Key that will be included in the record. If no such key, null will be re-turned here.
3
public V value()
Record contents.
4
partition()
Partition count for the record

Simple Consumer Example

As of now we have created a producer to send messages to Kafka cluster. Now let us create a consumer to consume messages form the Kafka cluster. KafkaConsumer API is used to consume messages from the Kafka cluster. KafkaConsumer class constructor is defined below.

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs − Return a map of consumer configs.

KafkaConsumer class has the following significant methods that are listed in the table below.

S.NoMethod and Description1
public java.util.Set<TopicPar-tition> assignment()
Get the set of partitions currently assigned by the con-sumer.
2
public string subscription()
Subscribe to the given list of topics to get dynamically as-signed partitions.
3
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener)
Subscribe to the given list of topics to get dynamically as-signed partitions.
4
public void unsubscribe()
Unsubscribe the topics from the given list of partitions.
5
public void sub-scribe(java.util.List<java.lang.String> topics)
Subscribe to the given list of topics to get dynamically as-signed partitions. If the given list of topics is empty, it is treated the same as unsubscribe().
6
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)
The argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.
7
public void as-sign(java.util.List<TopicParti-tion> partitions)
Manually assign a list of partitions to the customer.
8
poll()
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. This will return error, if the topics are not subscribed before the polling for data.
9
public void commitSync()
Commit offsets returned on the last poll() for all the sub-scribed list of topics and partitions. The same operation is applied to commitAsyn().
10
public void seek(TopicPartition partition, long offset)
Fetch the current offset value that consumer will use on the next poll() method.
11
public void resume()
Resume the paused partitions.
12
public void wakeup()
Wakeup the consumer.
ConsumerRecord API

The ConsumerRecord API is used to receive records from the Kafka cluster. This API consists of a topic name, partition number, from which the record is being received and an offset that points to the record in a Kafka partition. ConsumerRecord class is used to create a consumer record with specific topic name, partition count and <key, value> pairs. It has the following signature.

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
Topic − The topic name for consumer record received from the Kafka cluster.

Partition − Partition for the topic.

Key − The key of the record, if no key exists null will be returned.

Value − Record contents.

ConsumerRecords API

ConsumerRecords API acts as a container for ConsumerRecord. This API is used to keep the list of ConsumerRecord per partition for a particular topic. Its Constructor is defined below.

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition − Return a map of partition for a particular topic.

Records − Return list of ConsumerRecord.

ConsumerRecords class has the following methods defined.

S.NoMethods and Description1
public int count()
The number of records for all the topics.
2
public Set partitions()
The set of partitions with data in this record set (if no data was returned then the set is empty).
3
public Iterator iterator()
Iterator enables you to cycle through a collection, obtaining or re-moving elements.
4
public List records()
Get list of records for the given partition.
Configuration Settings

The configuration settings for the Consumer client API main configuration settings are listed below −

S.NoSettings and Description1
bootstrap.servers
Bootstrapping list of brokers.
2
group.id
Assigns an individual consumer to a group.
3
enable.auto.commit
Enable auto commit for offsets if the value is true, otherwise not committed.
4
auto.commit.interval.ms
Return how often updated consumed offsets are written to ZooKeeper.
5
session.timeout.ms
Indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.

Consumer Group

  • Consumers can join a group by using the samegroup.id.
  • The maximum parallelism of a group is that the number of consumers in the group ← no of partitions.
  • Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group.
  • Kafka guarantees that a message is only ever read by a single consumer in the group.
  • Consumers can see the message in the order they were stored in the log.

Re-balancing of a Consumer

Adding more processes/threads will cause Kafka to re-balance. If any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. During this re-balance, Kafka will assign available partitions to the available threads, possibly moving a partition to another process.
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerGroup {
   public static void main(String[] args) throws Exception {
      if(args.length < 2){
         System.out.println("Usage: consumer <topic> <groupname>");
         return;
      }
      
      String topic = args[0].toString();
      String group = args[1].toString();
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", group);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer",          
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serializa-tion.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
      consumer.subscribe(Arrays.asList(topic));
      System.out.println("Subscribed to topic " + topic);
      int i = 0;
         
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
               System.out.printf("offset = %d, key = %s, value = %s\n", 
               record.offset(), record.key(), record.value());
      }     
   }  
}

Compilation

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java

Execution

>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
>>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. 
ConsumerGroup <topic-name> my-group
Here we have created a sample group name as my-group with two consumers. Similarly, you can create your group and number of consumers in the group.

Input

Open producer CLI and send some messages like −
Test consumer group 01
Test consumer group 02

Output of the First Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 01

Output of the Second Process

Subscribed to topic Hello-kafka
offset = 3, key = null, value = Test consumer group 02
Now hopefully you would have understood SimpleConsumer and ConsumeGroup by using the Java client demo. Now you have an idea about how to send and receive messages using a Java client. Let us continue Kafka integration with big data technologies in the next chapter.

Comments

Popular posts from this blog

JavaScript Array Methods

JavaScript Arrays JavaScript arrays are used to store multiple values in a single variable. Displaying Arrays In this tutorial we will use a script to display arrays inside a <p> element with id="demo": Example < p  id= "demo" > < /p > < script > var cars = ["Saab", "Volvo", "BMW"]; document.getElementById("demo").innerHTML = cars; < /script > The first line (in the script) creates an array named cars. The second line "finds" the element with id="demo", and "displays" the array in the "innerHTML" of it. Example var cars = ["Saab", "Volvo", "BMW"]; Spaces and line breaks are not important. A declaration can span multiple lines: Example var cars = [     "Saab",     "Volvo",     "BMW" ]; Never put a comma after the last element (like &