diff --git a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListener.java b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListener.java index 199a9e7b5c..5de2c0e8cb 100644 --- a/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListener.java +++ b/apache-kafka-2/src/main/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListener.java @@ -16,18 +16,19 @@ public class VariableFetchSizeKafkaListener implements Runnable { private final String topic; private final KafkaConsumer consumer; - public VariableFetchSizeKafkaListener(String topic, Properties consumerProperties) { + public VariableFetchSizeKafkaListener(String topic, KafkaConsumer consumer) { this.topic = topic; - this.consumer = new KafkaConsumer<>(consumerProperties); + this.consumer = consumer; } @Override public void run() { consumer.subscribe(Collections.singletonList(topic)); int pollCount = 1; + while (true) { List> records = new ArrayList<>(); - for (ConsumerRecord record : consumer.poll(Duration.ofMillis(1_000))) { + for (ConsumerRecord record : consumer.poll(Duration.ofMillis(500))) { records.add(record); } if(!records.isEmpty()) { diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListenerLiveTest.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListenerLiveTest.java index 2465139d34..1a794af48e 100644 --- a/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListenerLiveTest.java +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/consumer/VariableFetchSizeKafkaListenerLiveTest.java @@ -7,6 +7,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -24,24 +25,40 @@ class VariableFetchSizeKafkaListenerLiveTest { @Container private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + @Test + void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception { + String topic = "engine.sensors.temperature"; + publishSensorData(300, topic); + + Properties props = commonConsumerProperties(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config"); + KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props); + + CompletableFuture.runAsync( + new VariableFetchSizeKafkaListener(topic, kafkaConsumer) + ); + + Thread.sleep(10_000L); + } + @Test void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception { publishSensorData(300, "engine.sensors.temperature"); // max.partition.fetch.bytes = 500 Bytes - Properties fetchSize_500B = kafkaConsumerProperties(); + Properties fetchSize_500B = commonConsumerProperties(); fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B"); fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + ""); CompletableFuture.runAsync( - new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_500B) + new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B)) ); // max.partition.fetch.bytes = 5.000 Bytes - Properties fetchSize_5KB = kafkaConsumerProperties(); + Properties fetchSize_5KB = commonConsumerProperties(); fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB"); fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + ""); CompletableFuture.runAsync( - new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_5KB) + new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_5KB)) ); Thread.sleep(10_000L); @@ -52,26 +69,26 @@ class VariableFetchSizeKafkaListenerLiveTest { publishSensorData(300, "engine.sensors.temperature", 100L); // fetch.min.bytes = 1 byte (default) - Properties minFetchSize_1B = kafkaConsumerProperties(); + Properties minFetchSize_1B = commonConsumerProperties(); minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B"); minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + ""); CompletableFuture.runAsync( - new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_1B) + new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_1B)) ); // fetch.min.bytes = 500 bytes - Properties minFetchSize_500B = kafkaConsumerProperties(); + Properties minFetchSize_500B = commonConsumerProperties(); minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B"); minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + ""); CompletableFuture.runAsync( - new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_500B) + new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_500B)) ); Thread.sleep(10_000L); } - private static Properties kafkaConsumerProperties() { + private static Properties commonConsumerProperties() { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -86,8 +103,8 @@ class VariableFetchSizeKafkaListenerLiveTest { private void publishSensorData(int measurementsCount, String topic, long delayInMillis) { List> records = IntStream.range(0, measurementsCount) - .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F")) - .collect(Collectors.toList()); + .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F")) + .collect(Collectors.toList()); CompletableFuture.runAsync(() -> { try (KafkaProducer producer = testKafkaProducer()) {