BAEL-6552: added test using the default configuration

This commit is contained in:
emanueltrandafir1993 2023-12-27 22:33:40 +01:00
parent 6c736c0c88
commit 1ac666749b
2 changed files with 32 additions and 14 deletions

View File

@ -16,18 +16,19 @@ public class VariableFetchSizeKafkaListener implements Runnable {
private final String topic; private final String topic;
private final KafkaConsumer<String, String> consumer; private final KafkaConsumer<String, String> consumer;
public VariableFetchSizeKafkaListener(String topic, Properties consumerProperties) { public VariableFetchSizeKafkaListener(String topic, KafkaConsumer<String, String> consumer) {
this.topic = topic; this.topic = topic;
this.consumer = new KafkaConsumer<>(consumerProperties); this.consumer = consumer;
} }
@Override @Override
public void run() { public void run() {
consumer.subscribe(Collections.singletonList(topic)); consumer.subscribe(Collections.singletonList(topic));
int pollCount = 1; int pollCount = 1;
while (true) { while (true) {
List<ConsumerRecord<String, String>> records = new ArrayList<>(); List<ConsumerRecord<String, String>> records = new ArrayList<>();
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(1_000))) { for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(500))) {
records.add(record); records.add(record);
} }
if(!records.isEmpty()) { if(!records.isEmpty()) {

View File

@ -7,6 +7,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -24,24 +25,40 @@ class VariableFetchSizeKafkaListenerLiveTest {
@Container @Container
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@Test
void whenUsingDefaultConfiguration_thenProcessInBatchesOf() throws Exception {
String topic = "engine.sensors.temperature";
publishSensorData(300, topic);
Properties props = commonConsumerProperties();
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "default_config");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener(topic, kafkaConsumer)
);
Thread.sleep(10_000L);
}
@Test @Test
void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception { void whenChangingMaxPartitionFetchBytesProperty_thenAdjustBatchSizesWhilePolling() throws Exception {
publishSensorData(300, "engine.sensors.temperature"); publishSensorData(300, "engine.sensors.temperature");
// max.partition.fetch.bytes = 500 Bytes // max.partition.fetch.bytes = 500 Bytes
Properties fetchSize_500B = kafkaConsumerProperties(); Properties fetchSize_500B = commonConsumerProperties();
fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B"); fetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_500B");
fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + ""); fetchSize_500B.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 500 + "");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_500B) new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_500B))
); );
// max.partition.fetch.bytes = 5.000 Bytes // max.partition.fetch.bytes = 5.000 Bytes
Properties fetchSize_5KB = kafkaConsumerProperties(); Properties fetchSize_5KB = commonConsumerProperties();
fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB"); fetchSize_5KB.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "max_fetch_size_5KB");
fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + ""); fetchSize_5KB.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5_000 + "");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", fetchSize_5KB) new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(fetchSize_5KB))
); );
Thread.sleep(10_000L); Thread.sleep(10_000L);
@ -52,26 +69,26 @@ class VariableFetchSizeKafkaListenerLiveTest {
publishSensorData(300, "engine.sensors.temperature", 100L); publishSensorData(300, "engine.sensors.temperature", 100L);
// fetch.min.bytes = 1 byte (default) // fetch.min.bytes = 1 byte (default)
Properties minFetchSize_1B = kafkaConsumerProperties(); Properties minFetchSize_1B = commonConsumerProperties();
minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B"); minFetchSize_1B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "min_fetch_size_1B");
minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + ""); minFetchSize_1B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1 + "");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_1B) new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_1B))
); );
// fetch.min.bytes = 500 bytes // fetch.min.bytes = 500 bytes
Properties minFetchSize_500B = kafkaConsumerProperties(); Properties minFetchSize_500B = commonConsumerProperties();
minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B"); minFetchSize_500B.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mim_fetch_size_500B");
minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + ""); minFetchSize_500B.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 500 + "");
CompletableFuture.runAsync( CompletableFuture.runAsync(
new VariableFetchSizeKafkaListener("engine.sensors.temperature", minFetchSize_500B) new VariableFetchSizeKafkaListener("engine.sensors.temperature", new KafkaConsumer<>(minFetchSize_500B))
); );
Thread.sleep(10_000L); Thread.sleep(10_000L);
} }
private static Properties kafkaConsumerProperties() { private static Properties commonConsumerProperties() {
Properties props = new Properties(); Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@ -86,8 +103,8 @@ class VariableFetchSizeKafkaListenerLiveTest {
private void publishSensorData(int measurementsCount, String topic, long delayInMillis) { private void publishSensorData(int measurementsCount, String topic, long delayInMillis) {
List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount) List<ProducerRecord<String, String>> records = IntStream.range(0, measurementsCount)
.mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F")) .mapToObj(__ -> new ProducerRecord<>(topic, "key1", "temperature=255F"))
.collect(Collectors.toList()); .collect(Collectors.toList());
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try (KafkaProducer<String, String> producer = testKafkaProducer()) { try (KafkaProducer<String, String> producer = testKafkaProducer()) {