diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java index 8b2a49b2b5..04ffdd3336 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/MultiPartitionProducer.java @@ -10,8 +10,8 @@ public class MultiPartitionProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer"); KafkaProducer producer = new KafkaProducer<>(props); for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) { diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java index 2c885b7caa..5dc917e308 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/ProducerConfigurations.java @@ -9,11 +9,11 @@ import java.util.Properties; public class ProducerConfigurations { public static void main(String[] args) { Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("max.in.flight.requests.per.connection", "1"); - props.put("batch.size", "16384"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); props.put(ProducerConfig.LINGER_MS_CONFIG, "5"); KafkaProducer producer = new KafkaProducer<>(props); diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java index cb0b77e4c0..2def07f987 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java @@ -4,6 +4,10 @@ import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; +/** + * Configured via {@link org.apache.kafka.clients.consumer.ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ +@SuppressWarnings("unused") public class JacksonDeserializer implements Deserializer { private final ObjectMapper objectMapper = new ObjectMapper(); private Class type; diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java index fa9d25dd85..2d7432cc7b 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java @@ -3,6 +3,10 @@ package com.baeldung.kafka.message.ordering.serialization; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.serialization.Serializer; +/** + * Configured via {@link org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG} + */ +@SuppressWarnings("unused") public class JacksonSerializer implements Serializer { private final ObjectMapper objectMapper = new ObjectMapper();