Merge branch 'kafka/message-ordering-BAEL-6796' of https://github.com/aamolgote/tutorials into kafka/message-ordering-BAEL-6796
This commit is contained in:
commit
c4a467b60a
|
@ -10,8 +10,8 @@ public class MultiPartitionProducer {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.put("bootstrap.servers", "localhost:9092");
|
props.put("bootstrap.servers", "localhost:9092");
|
||||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||||
|
|
||||||
KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
|
KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
|
||||||
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||||
|
|
|
@ -9,11 +9,11 @@ import java.util.Properties;
|
||||||
public class ProducerConfigurations {
|
public class ProducerConfigurations {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.put("bootstrap.servers", "localhost:9092");
|
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
props.put("max.in.flight.requests.per.connection", "1");
|
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
||||||
props.put("batch.size", "16384");
|
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
|
||||||
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
|
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,10 @@ import org.apache.kafka.common.serialization.Deserializer;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configured via {@link org.apache.kafka.clients.consumer.ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class JacksonDeserializer<T> implements Deserializer<T> {
|
public class JacksonDeserializer<T> implements Deserializer<T> {
|
||||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
private Class<T> type;
|
private Class<T> type;
|
||||||
|
|
|
@ -3,6 +3,10 @@ package com.baeldung.kafka.message.ordering.serialization;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configured via {@link org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG}
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class JacksonSerializer<T> implements Serializer<T> {
|
public class JacksonSerializer<T> implements Serializer<T> {
|
||||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue