Home > Tutorials > Frameworks > Apache Kafka

How to consume object messages from a Kafka topic

Last updated : February 19, 2023

Kafka consumers consume messages from Kafka topics. The article 2 ways to publish an Object to a Kafka topic explains how to publish an object to a topic. This article describes how to consume that object.

Serializable Kafka consumer object

Here is the object that my consumer consumes in this example.

import java.io.Serializable;
import java.math.BigDecimal;
public class OrderDetails implements Serializable {
    String OrderNo;
    BigDecimal total;
    public String getOrderNo() {
        return OrderNo;
    }
    public void setOrderNo(String orderNo) {
        OrderNo = orderNo;
    }
    public BigDecimal getTotal() {
        return total;
    }
    public void setTotal(BigDecimal total) {
        this.total = total;
    }
}

Kafka object de-serializer

To consume an Object, the consumer needs to have an Object Deserializer. The OrderDetails is the class being de-serialized.

import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
public class OrderDeSerializer implements Deserializer<OrderDetails> {
private ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public OrderDetails deserialize(String s, byte[] bytes) {
      try {
          ByteArrayInputStream in = new ByteArrayInputStream(bytes);
          ObjectInputStream is = new ObjectInputStream(in);
          return (OrderDetails)is.readObject();
      } catch (Exception e) {
          e.printStackTrace();
      }
      return null;
  }
}

Kafka consumer that consumes objects

Note that I refer to my DeSerializer in the value.deserializer property.

properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
That's how the consumer knows what type of object to expect as the value.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "com.core.json_objects.delete_later.OrderDeSerializer");
        properties.put("group.id", "test-group");

        KafkaConsumer consumer = new KafkaConsumer(properties);
        List topics = new ArrayList();
        topics.add("TEST-TOPIC");
        consumer.subscribe(topics);
        try{
            while (true) {
                final ConsumerRecords<Long, OrderDetails> consumerRecords =
                        consumer.poll(1000);
                consumerRecords.forEach(record -> {
                    System.out.println(record.value());
                });
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }finally {
            consumer.close();
        }
    }
}
L Raney
By: L Raney
Lance is a software engineer with over 15 years of experience in full-stack software development.
Read more...

Comments are disabled

No Comments