BAEL-6552: small changes

This commit is contained in:
emanuel.trandafir 2023-12-28 15:10:10 +01:00
parent 1ac666749b
commit c5892eec6e

View File

@ -3,6 +3,7 @@ package com.baeldung.kafka.consumer;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@ -28,9 +29,13 @@ class VariableFetchSizeKafkaListenerLiveTest {
@Test @Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception { void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
String topic = "engine.sensors.temperature"; String topic = "engine.sensors.temperature";
publishSensorData(300, topic); publishTestData(300, topic);
Properties props = commonConsumerProperties(); Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
@ -38,27 +43,29 @@ class VariableFetchSizeKafkaListenerLiveTest {
new VariableFetchSizeKafkaListener(topic, kafkaConsumer) new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
); );
Thread.sleep(10_000L); Thread.sleep(5_000L);
} }
@Test @Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception { void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
publishSensorData(300, "engine.sensors.temperature"); String topic = "engine.sensors.temperature";
publishTestData(300, topic);
Thread.sleep(1_000L);
// max.partition.fetch.bytes = 500 Bytes // max.partition.fetch.bytes = 500 Bytes
Properties fetchSize_500B = commonConsumerProperties(); Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B"); fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + ""); fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B)) new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_500B))
); );
// max.partition.fetch.bytes = 5.000 Bytes // max.partition.fetch.bytes = 5.000 Bytes
Properties fetchSize_5KB = commonConsumerProperties(); Properties fetchSize_5KB = commonConsumerProperties();
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB"); fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + ""); fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5000");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_5KB)) new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(fetchSize_5KB))
); );
Thread.sleep(10_000L); Thread.sleep(10_000L);
@ -66,22 +73,22 @@ class VariableFetchSizeKafkaListenerLiveTest {
@Test @Test
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception { void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
publishSensorData(300, "engine.sensors.temperature", 100L); String topic = "engine.sensors.temperature";
publishTestData(300, topic, 100L);
// fetch.min.bytes = 1 byte (default) // fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = commonConsumerProperties(); Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B"); minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + "");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_1B)) new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_1B))
); );
// fetch.min.bytes = 500 bytes // fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = commonConsumerProperties(); Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B"); minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + ""); minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_500B)) new VariableFetchSizeKafkaListener(topic, new KafkaConsumer<>(minFetchSize_500B))
); );
Thread.sleep(10_000L); Thread.sleep(10_000L);
@ -97,11 +104,11 @@ class VariableFetchSizeKafkaListenerLiveTest {
return props; return props;
} }
private void publishSensorData(int measurementsCount, String topic) { private void publishTestData(int measurementsCount, String topic) {
publishSensorData(measurementsCount, topic, 0L); publishTestData(measurementsCount, topic, 0L);
} }
private void publishSensorData(int measurementsCount, String topic, long delayInMillis) { private void publishTestData(int measurementsCount, String topic, long delayInMillis) {
List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount) List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount)
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F")) .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -109,13 +116,17 @@ class VariableFetchSizeKafkaListenerLiveTest {
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try (KafkaProducer<String, String> producer = testKafkaProducer()) { try (KafkaProducer<String, String> producer = testKafkaProducer()) {
for (ProducerRecord<String, String> rec : records) { for (ProducerRecord<String, String> rec : records) {
producer.send(rec); producer.send(rec).get();
sleep(delayInMillis); sleep(delayInMillis);
} }
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} }
}); });
} }
private static KafkaProducer<String, String> testKafkaProducer() { private static KafkaProducer<String, String> testKafkaProducer() {
Properties props = new Properties(); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());