Message Ordering - First Draft.
This commit is contained in:
parent
daaefb0f26
commit
307d0f5e40
|
@ -57,6 +57,11 @@
|
|||
<version>${lombok.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>2.15.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
public class ConsumerConfigurations {
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put("max.poll.records", "500");
|
||||
props.put("fetch.min.bytes", "1");
|
||||
props.put("fetch.max.wait.ms", "500");
|
||||
Consumer<String, String> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
|
||||
|
||||
while (true) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
|
||||
records.forEach(record -> {
|
||||
System.out.println("Partition: " + record.partition() + ", Message: " + record.value());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
public class ExtSeqWithTimeWindowConsumer {
|
||||
private static final long BUFFER_PERIOD_MS = 5000;
|
||||
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
|
||||
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put("value.deserializer.serializedClass", Message.class);
|
||||
Consumer<String, Message> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
|
||||
List<Message> buffer = new ArrayList<>();
|
||||
long lastProcessedTime = System.nanoTime();
|
||||
while (true) {
|
||||
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
|
||||
records.forEach(record -> {
|
||||
if (record != null && record.value() != null) {
|
||||
buffer.add(record.value());
|
||||
}
|
||||
});
|
||||
if (System.nanoTime() - lastProcessedTime > BUFFER_PERIOD_MS) {
|
||||
processBuffer(buffer);
|
||||
lastProcessedTime = System.nanoTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void processBuffer(List<Message> buffer) {
|
||||
Collections.sort(buffer);
|
||||
buffer.forEach(message -> {
|
||||
System.out.println("Processing message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
});
|
||||
buffer.clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class ExtSeqWithTimeWindowProducer {
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||
|
||||
KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
|
||||
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||
long messageId = Message.getRandomMessageId();
|
||||
String key = "Key-" + insertPosition;
|
||||
Message message = new Message(insertPosition, messageId);
|
||||
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
|
||||
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
}
|
||||
producer.close();
|
||||
System.out.println("ExternalSequencingProducer Completed.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
public class MultiPartitionConsumer {
|
||||
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put("value.deserializer.serializedClass", Message.class);
|
||||
Consumer<String, Message> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(Collections.singletonList("multi_partition_topic"));
|
||||
while (true) {
|
||||
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
|
||||
records.forEach(record -> {
|
||||
Message message = record.value();
|
||||
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class MultiPartitionProducer {
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||
|
||||
KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
|
||||
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||
long messageId = Message.getRandomMessageId();
|
||||
String key = "Key-" + insertPosition;
|
||||
Message message = new Message(insertPosition, messageId);
|
||||
producer.send(new ProducerRecord<>("multi_partition_topic", key, message));
|
||||
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
}
|
||||
producer.close();
|
||||
System.out.println("SinglePartitionProducer Completed.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class ProducerConfigurations {
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("max.in.flight.requests.per.connection", "1");
|
||||
props.put("batch.size", "16384");
|
||||
props.put("linger.ms", "5");
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String key = "Key-" + (i % 3); // Assuming 3 partitions
|
||||
producer.send(new ProducerRecord<>("multi_partition_topic", key, "Message-" + i));
|
||||
}
|
||||
|
||||
producer.close();
|
||||
System.out.println("MultiPartitionProducer Completed.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
public class SinglePartitionConsumer {
|
||||
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofMillis(100);
|
||||
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer");
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put("value.deserializer.serializedClass", Message.class);
|
||||
Consumer<String, Message> consumer = new KafkaConsumer<>(props);
|
||||
consumer.subscribe(Collections.singletonList("single_partition_topic"));
|
||||
while (true) {
|
||||
ConsumerRecords<String, Message> records = consumer.poll(TIMEOUT_WAIT_FOR_MESSAGES);
|
||||
records.forEach(record -> {
|
||||
Message message = record.value();
|
||||
System.out.println("Process message with Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
package com.baeldung.kafka.message.ordering;
|
||||
|
||||
import com.baeldung.kafka.message.ordering.payload.Message;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
|
||||
public class SinglePartitionProducer {
|
||||
public static void main(String[] args) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
props.put("value.serializer", "com.baeldung.kafka.message.ordering.serialization.JacksonSerializer");
|
||||
|
||||
KafkaProducer<String, Message> producer = new KafkaProducer<>(props);
|
||||
for (long insertPosition = 1; insertPosition <= 10 ; insertPosition++) {
|
||||
long messageId = Message.getRandomMessageId();
|
||||
String key = "Key-" + insertPosition;
|
||||
Message message = new Message(insertPosition, messageId);
|
||||
producer.send(new ProducerRecord<>("single_partition_topic", key, message));
|
||||
System.out.println("Insert Position: " + message.getInsertPosition() + ", Message Id: " + message.getMessageId());
|
||||
}
|
||||
producer.close();
|
||||
System.out.println("SinglePartitionProducer Completed.");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package com.baeldung.kafka.message.ordering.payload;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
public class Message implements Comparable<Message> {
|
||||
private long insertPosition;
|
||||
private long messageId;
|
||||
|
||||
public Message(){
|
||||
|
||||
}
|
||||
|
||||
public Message(long insertPosition, long messageId) {
|
||||
this.insertPosition = insertPosition;
|
||||
this.messageId = messageId;
|
||||
}
|
||||
|
||||
public long getInsertPosition() {
|
||||
return insertPosition;
|
||||
}
|
||||
|
||||
public long getMessageId() {
|
||||
return messageId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Message other) {
|
||||
return Long.compare(this.messageId, other.messageId);
|
||||
}
|
||||
|
||||
public static long getRandomMessageId() {
|
||||
Random rand = new Random();
|
||||
return rand.nextInt(1000);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
package com.baeldung.kafka.message.ordering.serialization;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class JacksonDeserializer<T> implements Deserializer<T> {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private Class<T> tClass;
|
||||
|
||||
public JacksonDeserializer(Class<T> tClass) {
|
||||
this.tClass = tClass;
|
||||
}
|
||||
|
||||
public JacksonDeserializer() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
this.tClass = (Class<T>) configs.get("value.deserializer.serializedClass");
|
||||
}
|
||||
|
||||
@Override
|
||||
public T deserialize(String topic, byte[] bytes) {
|
||||
if (bytes == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return objectMapper.readValue(bytes, tClass);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error deserializing value", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
package com.baeldung.kafka.message.ordering.serialization;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
public class JacksonSerializer<T> implements Serializer<T> {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, T data) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
return objectMapper.writeValueAsBytes(data);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error serializing value", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue