Home > Tutorials > Frameworks > Apache Kafka

2 ways to publish an Object to a Kafka topic

Last updated : February 11, 2023

Before publishing an object to a Kafka topic, I must serialize the object first. Only serialized objects are publishable to Kafka topics.

I will show you two different ways to serialize an object before publishing it to a topic.

  1. Serialize the object using Apache Kafka's Serializer interface
  2. Serialize the object by converting the object to a byte array

When I publish something to a Kafka topic, I must explicitly specify the serializer types for the key and values. In this case, the value is an object. Therefore, I must provide a proper serializer for the object I publish.

In this example, I publish a simple OrderDetails object to the Kafka topic orders-topic.

Using Apache Kafka's Serializer interface.

Below is the Class on that my order details are based. Note that it is Serializable.

import java.io.Serializable;
import java.math.BigDecimal;

public class OrderDetails implements Serializable {
        String OrderNo;
        BigDecimal total;
        public String getOrderNo() {
            return OrderNo;
        }
        public void setOrderNo(String orderNo) {
            OrderNo = orderNo;
        }
        public BigDecimal getTotal() {
            return total;
        }
        public void setTotal(BigDecimal total) {
            this.total = total;
        }
}

Here I use Kafka's Serializer interface to serialize my object.

import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
import java.io.Serializable;
import java.util.Map;

public class OrderSerializer T extends Serializable> implements Serializer T> {
    @Override
    public void configure(Map String, ?> configs, boolean isKey) {
    }
    @Override
    public byte[] serialize(String topic, T data) {
        return SerializationUtils.serialize(data);
    }
    @Override
    public void close() {
    }
}

Now I can create a Kafka producer to publish my OrderDetails objects with OrderSerializer as the value.serializer.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.math.BigDecimal;
import java.util.Properties;

public class Producer {
    public static void main(String[] args) throws Exception{
        String topic = "orders-topic";
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.serializers.OrderSerializer");

        org.apache.kafka.clients.producer.Producer String, OrderDetails> producer = new KafkaProducer String, OrderDetails>(props);

        OrderDetails od = new OrderDetails();
        od.setOrderNo("100200300");
        od.setTotal(BigDecimal.valueOf(500));

        producer.send(new ProducerRecord String, OrderDetails>(topic, od));
        producer.close();
    }
}

Converting the object to a byte array and publishing to a topic.

Here I use Java's ByteArrayOutputStream and ObjectOutputStream to serialize my OrderDetails object. But note that my value serializer is org.apache.kafka.common.serialization.ByteArraySerializer.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.util.Properties;

public class Producer {
public static void main(String[] args) {

    Properties config = new Properties();
    config.put("bootstrap.servers", "localhost:9092");
    config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    config.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    KafkaProducer String, byte[]> producer = new KafkaProducer >(config);

    OrderDetails od = new OrderDetails();
    od.setOrderNo("100200300");
    od.setTotal(BigDecimal.valueOf(500));
    byte[] serializedOd = serializeObject(od);

    ProducerRecord String, byte[]> message = new ProducerRecord >("orders-topic", serializedOd);
    producer.send(message);
    producer.close();
}

private static byte[] serializeObject(OrderDetails obj) {
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    try {
        ObjectOutputStream oos = new ObjectOutputStream(stream);
        oos.writeObject(obj);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return stream.toByteArray();
}
}
L Raney
By: L Raney
Lance is a software engineer with over 15 years of experience in full-stack software development.
Read more...

Comments are disabled

No Comments