BAEL-6552: kafka consumer's min/max fetch size
This commit is contained in:
parent
11417d3c0d
commit
6c736c0c88
@ -0,0 +1,41 @@
|
||||
package com.baeldung.kafka.consumer;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class VariableFetchSizeKafkaListener implements Runnable {
|
||||
private static Logger log = LoggerFactory.getLogger(VariableFetchSizeKafkaListener.class);
|
||||
private final String topic;
|
||||
private final KafkaConsumer<String, String> consumer;
|
||||
|
||||
public VariableFetchSizeKafkaListener(String topic, Properties consumerProperties) {
|
||||
this.topic = topic;
|
||||
this.consumer = new KafkaConsumer<>(consumerProperties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.subscribe(Collections.singletonList(topic));
|
||||
int pollCount = 1;
|
||||
while (true) {
|
||||
List<ConsumerRecord<String, String>> records = new ArrayList<>();
|
||||
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(1_000))) {
|
||||
records.add(record);
|
||||
}
|
||||
if(!records.isEmpty()) {
|
||||
String batchOffsets = String.format("%s -> %s", records.get(0).offset(), records.get(records.size() - 1).offset());
|
||||
String groupId = consumer.groupMetadata().groupId();
|
||||
log.info("groupId: {}, poll: #{}, fetched: #{} records, offsets: {}", groupId, pollCount++, records.size(), batchOffsets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,117 @@
|
||||
package com.baeldung.kafka.consumer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
@Testcontainers
|
||||
class VariableFetchSizeKafkaListenerLiveTest {
|
||||
|
||||
@Container
|
||||
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
|
||||
|
||||
@Test
|
||||
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
|
||||
publishSensorData(300, "engine.sensors.temperature");
|
||||
|
||||
// max.partition.fetch.bytes = 500 Bytes
|
||||
Properties fetchSize_500B = kafkaConsumerProperties();
|
||||
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
|
||||
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + "");
|
||||
CompletableFuture.runAsync(
|
||||
new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_500B)
|
||||
);
|
||||
|
||||
// max.partition.fetch.bytes = 5.000 Bytes
|
||||
Properties fetchSize_5KB = kafkaConsumerProperties();
|
||||
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
|
||||
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + "");
|
||||
CompletableFuture.runAsync(
|
||||
new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_5KB)
|
||||
);
|
||||
|
||||
Thread.sleep(10_000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void whenChangingMinFetchBytesProperty_thenAdjustWaitTimeWhilePolling() throws Exception {
|
||||
publishSensorData(300, "engine.sensors.temperature", 100L);
|
||||
|
||||
// fetch.min.bytes = 1 byte (default)
|
||||
Properties minFetchSize_1B = kafkaConsumerProperties();
|
||||
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
|
||||
minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + "");
|
||||
CompletableFuture.runAsync(
|
||||
new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_1B)
|
||||
);
|
||||
|
||||
// fetch.min.bytes = 500 bytes
|
||||
Properties minFetchSize_500B = kafkaConsumerProperties();
|
||||
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
|
||||
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + "");
|
||||
CompletableFuture.runAsync(
|
||||
new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_500B)
|
||||
);
|
||||
|
||||
Thread.sleep(10_000L);
|
||||
}
|
||||
|
||||
|
||||
private static Properties kafkaConsumerProperties() {
|
||||
Properties props = new Properties();
|
||||
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
||||
return props;
|
||||
}
|
||||
|
||||
private void publishSensorData(int measurementsCount, String topic) {
|
||||
publishSensorData(measurementsCount, topic, 0L);
|
||||
}
|
||||
|
||||
private void publishSensorData(int measurementsCount, String topic, long delayInMillis) {
|
||||
List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount)
|
||||
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try (KafkaProducer<String, String> producer = testKafkaProducer()) {
|
||||
for (ProducerRecord<String, String> rec : records) {
|
||||
producer.send(rec);
|
||||
sleep(delayInMillis);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static KafkaProducer<String, String> testKafkaProducer() {
|
||||
Properties props = new Properties();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
|
||||
return new KafkaProducer<>(props);
|
||||
}
|
||||
|
||||
private static void sleep(long delayInMillis) {
|
||||
try {
|
||||
Thread.sleep(delayInMillis);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user