diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaFailureStrategyIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaFailureStrategyIT.java index a136a31e02..fe44014cb7 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaFailureStrategyIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaFailureStrategyIT.java @@ -45,8 +45,7 @@ public class PublishKafkaFailureStrategyIT extends AbstractPublishKafkaIT { // attempt to send a non-json FlowFile to Kafka using record strategy; // this will fail on the record parsing step prior to send; triggering the failure strategy logic final Map attributes = new HashMap<>(); - final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_RESOURCE))); + final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE))); runner.enqueue(bytesFlowFile, attributes); runner.run(1); runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1); @@ -65,10 +64,10 @@ public class PublishKafkaFailureStrategyIT extends AbstractPublishKafkaIT { // attempt to send a non-json FlowFile to Kafka using record strategy; // this will fail on the record parsing step prior to send; triggering the failure strategy logic final Map attributes = new HashMap<>(); - final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_RESOURCE))); + final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE))); runner.enqueue(bytesFlowFile, attributes); runner.run(1); + // on rollback, FlowFile is returned to source queue runner.assertTransferCount(PublishKafka.REL_SUCCESS, 0); runner.assertTransferCount(PublishKafka.REL_FAILURE, 0); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index dbc9dca2e3..e6afe13ceb 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -44,8 +44,8 @@ import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; -import org.apache.nifi.kafka.shared.property.SaslMechanism; import org.apache.nifi.kafka.shared.property.IsolationLevel; +import org.apache.nifi.kafka.shared.property.SaslMechanism; import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider; import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; @@ -175,7 +175,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement .name("default.api.timeout.ms") .displayName("Client Timeout") .description("Default timeout for Kafka client operations. Mapped to Kafka default.api.timeout.ms. The Kafka request.timeout.ms property is derived from half of the configured timeout") - .defaultValue("60 s") + .defaultValue("60 sec") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) @@ -191,7 +191,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .defaultValue("5 s") + .defaultValue("5 sec") .build(); public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() @@ -204,7 +204,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) - .defaultValue("5 s") + .defaultValue("5 sec") .build(); private static final List PROPERTY_DESCRIPTORS = List.of( @@ -244,7 +244,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement @OnDisabled public void onDisabled() { if (consumerService == null) { - getLogger().warn("Consumer Service not configured"); + getLogger().debug("Consumer Service not configured"); } else { consumerService.close(); } @@ -278,6 +278,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement if (partitionClass != null && partitionClass.startsWith("org.apache.kafka")) { propertiesProducer.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionClass); } + return new Kafka3ProducerService(propertiesProducer, serviceConfiguration, producerConfiguration); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java index 06864452a3..2db9bcfcd8 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java @@ -34,6 +34,8 @@ import org.apache.nifi.kafka.service.consumer.pool.ConsumerObjectPool; import org.apache.nifi.kafka.service.consumer.pool.Subscription; import org.apache.nifi.logging.ComponentLog; +import java.io.Closeable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -43,14 +45,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * Kafka 3 Consumer Service implementation with Object Pooling for subscribed Kafka Consumers */ -public class Kafka3ConsumerService implements KafkaConsumerService { +public class Kafka3ConsumerService implements KafkaConsumerService, Closeable { private final ComponentLog componentLog; private final ConsumerObjectPool consumerObjectPool; @@ -68,10 +69,14 @@ public class Kafka3ConsumerService implements KafkaConsumerService { final Map offsets = getOffsets(pollingSummary); final long started = System.currentTimeMillis(); - final long elapsed = runConsumerFunction(subscription, (consumer) -> { + final Consumer consumer = borrowConsumer(subscription); + final long elapsed; + try { consumer.commitSync(offsets); - return started - System.currentTimeMillis(); - }); + elapsed = started - System.currentTimeMillis(); + } finally { + returnConsumer(subscription, consumer); + } componentLog.debug("Committed Records in [{} ms] for {}", elapsed, pollingSummary); } @@ -81,10 +86,13 @@ public class Kafka3ConsumerService implements KafkaConsumerService { Objects.requireNonNull(pollingContext, "Polling Context required"); final Subscription subscription = getSubscription(pollingContext); - return runConsumerFunction(subscription, (consumer) -> { + final Consumer consumer = borrowConsumer(subscription); + try { final ConsumerRecords consumerRecords = consumer.poll(pollingContext.getMaxUncommittedTime()); return new RecordIterable(consumerRecords); - }); + } finally { + returnConsumer(subscription, consumer); + } } @Override @@ -96,12 +104,16 @@ public class Kafka3ConsumerService implements KafkaConsumerService { if (topics.hasNext()) { final String topic = topics.next(); - partitionStates = runConsumerFunction(subscription, (consumer) -> - consumer.partitionsFor(topic) - .stream() - .map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition())) - .collect(Collectors.toList()) - ); + + final Consumer consumer = borrowConsumer(subscription); + try { + partitionStates = consumer.partitionsFor(topic) + .stream() + .map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + } finally { + returnConsumer(subscription, consumer); + } } else { partitionStates = Collections.emptyList(); } @@ -140,21 +152,26 @@ public class Kafka3ConsumerService implements KafkaConsumerService { return offsets; } - private T runConsumerFunction(final Subscription subscription, final Function, T> consumerFunction) { - Consumer consumer = null; + private Consumer borrowConsumer(final Subscription subscription) { try { - consumer = consumerObjectPool.borrowObject(subscription); - return consumerFunction.apply(consumer); + return consumerObjectPool.borrowObject(subscription); } catch (final Exception e) { throw new ConsumerException("Borrow Consumer failed", e); - } finally { - if (consumer != null) { - try { - consumerObjectPool.returnObject(subscription, consumer); - } catch (final Exception e) { - componentLog.warn("Return Consumer failed", e); - } + } + } + + private void returnConsumer(final Subscription subscription, final Consumer consumer) { + try { + consumerObjectPool.returnObject(subscription, consumer); + } catch (final Exception e) { + try { + consumerObjectPool.invalidateObject(subscription, consumer); + } catch (final Exception e2) { + componentLog.debug("Failed to invalidate Kafka Consumer", e2); } + + consumer.close(Duration.ofSeconds(30)); + componentLog.warn("Failed to return Kafka Consumer to pool", e); } } @@ -191,6 +208,7 @@ public class Kafka3ConsumerService implements KafkaConsumerService { final RecordHeader recordHeader = new RecordHeader(header.key(), header.value()); recordHeaders.add(recordHeader); }); + return new ByteRecord( consumerRecord.topic(), consumerRecord.partition(), diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java index 6f97e94f5c..6ef2efe6e6 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java @@ -33,22 +33,22 @@ import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper; import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; public class Kafka3ProducerService implements KafkaProducerService { + private final Producer producer; - private final List callbacks; - private final ServiceConfiguration serviceConfiguration; - private final KafkaProducerWrapper wrapper; + private volatile boolean closed = false; + public Kafka3ProducerService(final Properties properties, final ServiceConfiguration serviceConfiguration, final ProducerConfiguration producerConfiguration) { @@ -65,50 +65,69 @@ public class Kafka3ProducerService implements KafkaProducerService { @Override public void close() { - producer.close(); + closed = true; + producer.close(Duration.ofSeconds(30)); } @Override - public void init() { - wrapper.init(); + public boolean isClosed() { + return closed; } @Override public void send(final Iterator kafkaRecords, final PublishContext publishContext) { final ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile()); callbacks.add(callback); - Optional.ofNullable(publishContext.getException()).ifPresent(e -> callback.getExceptions().add(e)); - if (callback.getExceptions().isEmpty()) { + + final List callbackExceptions = callback.getExceptions(); + + final Exception publishException = publishContext.getException(); + if (publishException != null) { + callbackExceptions.add(publishException); + } + + if (callbackExceptions.isEmpty()) { try { wrapper.send(kafkaRecords, publishContext, callback); } catch (final UncheckedIOException e) { - callback.getExceptions().add(e); + // We don't throw the Exception because we will later deal with this by + // checking if there are any Exceptions. + callbackExceptions.add(e); + } catch (final Exception e) { + // We re-throw the Exception in this case because it is an unexpected Exception + callbackExceptions.add(e); + throw e; } } } @Override public RecordSummary complete() { - final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure); - if (shouldCommit) { - producer.flush(); // finish Kafka processing of in-flight data - wrapper.commit(); // commit Kafka transaction (when transactions configured) - } else { - // rollback on transactions + exception - wrapper.abort(); - } - - final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data - final List flowFileResults = recordSummary.getFlowFileResults(); - for (final ProducerCallback callback : callbacks) { - // short-circuit the handling of the flowfile results here - if (callback.isFailure()) { - flowFileResults.add(callback.toFailureResult()); + try { + final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure); + if (shouldCommit) { + producer.flush(); // finish Kafka processing of in-flight data + wrapper.commit(); // commit Kafka transaction (when transactions configured) } else { - flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis())); + // rollback on transactions + exception + wrapper.abort(); } + + final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data + final List flowFileResults = recordSummary.getFlowFileResults(); + for (final ProducerCallback callback : callbacks) { + // short-circuit the handling of the flowfile results here + if (callback.isFailure()) { + flowFileResults.add(callback.toFailureResult()); + } else { + flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis())); + } + } + + return recordSummary; + } finally { + callbacks.clear(); } - return recordSummary; } @Override diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaNonTransactionalProducerWrapper.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaNonTransactionalProducerWrapper.java index feee4218ad..8d67d84d41 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaNonTransactionalProducerWrapper.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaNonTransactionalProducerWrapper.java @@ -24,10 +24,6 @@ public class KafkaNonTransactionalProducerWrapper extends KafkaProducerWrapper { super(producer); } - @Override - public void init() { - } - @Override public void commit() { } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaProducerWrapper.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaProducerWrapper.java index 39f71b4ae7..0d82693d4d 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaProducerWrapper.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaProducerWrapper.java @@ -43,11 +43,6 @@ public abstract class KafkaProducerWrapper { this.producer = producer; } - /** - * Transaction-enabled publish to Kafka involves the use of special Kafka client library APIs. - */ - public abstract void init(); - public void send(final Iterator kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) { while (kafkaRecords.hasNext()) { final KafkaRecord kafkaRecord = kafkaRecords.next(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaTransactionalProducerWrapper.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaTransactionalProducerWrapper.java index ab027aabaf..1e3ded07da 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaTransactionalProducerWrapper.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/transaction/KafkaTransactionalProducerWrapper.java @@ -17,31 +17,73 @@ package org.apache.nifi.kafka.service.producer.transaction; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.service.producer.ProducerCallback; + +import java.util.Iterator; public class KafkaTransactionalProducerWrapper extends KafkaProducerWrapper { + private volatile boolean inTransaction = false; public KafkaTransactionalProducerWrapper(final Producer producer) { super(producer); + producer.initTransactions(); } @Override - public void init() { - producer.initTransactions(); - producer.beginTransaction(); + public void send(final Iterator kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) { + if (!inTransaction) { + producer.beginTransaction(); + inTransaction = true; + } + + super.send(kafkaRecords, publishContext, callback); } @Override public void commit() { try { - producer.commitTransaction(); + // Commit the transaction. If a TimeoutException is thrown, retry up to 3 times. + // The producer will throw an Exception if we attempt to abort a transaction + // after a commit times out. + boolean failure = false; + for (int i = 0; i < 3; i++) { + try { + producer.commitTransaction(); + + // If we logged any warning that we timed out and will retry, we should log a notification + // that we were successful this time. Otherwise, don't spam the logs. + if (failure) { + logger.info("Successfully commited producer transaction after {} retries", i); + } + break; + } catch (final TimeoutException te) { + failure = true; + if (i == 2) { + logger.warn("Failed to commit producer transaction after 3 attempts, each timing out. Aborting transaction."); + throw te; + } + + logger.warn("Timed out while committing producer transaction. Retrying..."); + } + } + + inTransaction = false; } catch (final Exception e) { - logger.debug("Failure during producer transaction commit", e); + logger.error("Failed to commit producer transaction", e); abort(); } } @Override public void abort() { + if (!inTransaction) { + return; + } + + inTransaction = false; producer.abortTransaction(); } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java index 5435e63b77..ceb421bb70 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java @@ -188,7 +188,6 @@ public class Kafka3ConnectionServiceBaseIT { final KafkaProducerService producerService = service.getProducerService(producerConfiguration); final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, RECORD_VALUE, Collections.emptyList()); final List kafkaRecords = Collections.singletonList(kafkaRecord); - producerService.init(); producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC + "-produce", null, null, null)); final RecordSummary summary = producerService.complete(); assertNotNull(summary); @@ -200,46 +199,43 @@ public class Kafka3ConnectionServiceBaseIT { final KafkaProducerService producerService = service.getProducerService(producerConfiguration); final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, RECORD_VALUE, Collections.emptyList()); final List kafkaRecords = Collections.singletonList(kafkaRecord); - producerService.init(); producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC + "-produce", null, null, null)); final RecordSummary summary = producerService.complete(); assertNotNull(summary); } @Test - void testProduceConsumeRecord() throws Exception { + void testProduceConsumeRecord() { final ProducerConfiguration producerConfiguration = new ProducerConfiguration(false, null, null, null, null); final KafkaProducerService producerService = service.getProducerService(producerConfiguration); final long timestamp = System.currentTimeMillis(); final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp, RECORD_KEY, RECORD_VALUE, Collections.emptyList()); final List kafkaRecords = Collections.singletonList(kafkaRecord); - producerService.init(); producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC, null, null, null)); final RecordSummary summary = producerService.complete(); assertNotNull(summary); - try (KafkaConsumerService consumerService = service.getConsumerService(null)) { - final PollingContext pollingContext = new PollingContext( - GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); - final Iterator consumerRecords = poll(consumerService, pollingContext); + final KafkaConsumerService consumerService = service.getConsumerService(null); + final PollingContext pollingContext = new PollingContext( + GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); + final Iterator consumerRecords = poll(consumerService, pollingContext); - assertTrue(consumerRecords.hasNext(), "Consumer Records not found"); + assertTrue(consumerRecords.hasNext(), "Consumer Records not found"); - final ByteRecord consumerRecord = consumerRecords.next(); - assertEquals(TOPIC, consumerRecord.getTopic()); - assertEquals(0, consumerRecord.getOffset()); - assertEquals(0, consumerRecord.getPartition()); - assertEquals(timestamp, consumerRecord.getTimestamp()); + final ByteRecord consumerRecord = consumerRecords.next(); + assertEquals(TOPIC, consumerRecord.getTopic()); + assertEquals(0, consumerRecord.getOffset()); + assertEquals(0, consumerRecord.getPartition()); + assertEquals(timestamp, consumerRecord.getTimestamp()); - final Optional keyFound = consumerRecord.getKey(); - assertTrue(keyFound.isPresent()); + final Optional keyFound = consumerRecord.getKey(); + assertTrue(keyFound.isPresent()); - assertArrayEquals(RECORD_KEY, keyFound.get()); - assertArrayEquals(RECORD_VALUE, consumerRecord.getValue()); + assertArrayEquals(RECORD_KEY, keyFound.get()); + assertArrayEquals(RECORD_VALUE, consumerRecord.getValue()); - assertFalse(consumerRecords.hasNext()); - } + assertFalse(consumerRecords.hasNext()); } @Test diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java index 10b7c86bba..0cf646c7ae 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java @@ -23,11 +23,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kafka.processors.common.KafkaUtils; @@ -330,17 +328,13 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess } else { headerNamePattern = null; } + keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class); commitOffsets = context.getProperty(COMMIT_OFFSETS).asBoolean(); outputStrategy = context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class); keyFormat = context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class); } - @OnStopped - public void onStopped() { - // discard reference; leave controller service state intact - consumerService = null; - } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { @@ -399,9 +393,9 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess } private Iterator transformDemarcator(final ProcessContext context, final Iterator consumerRecords) { - final PropertyValue propertyValueDemarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR); - if (propertyValueDemarcator.isSet()) { - final byte[] demarcator = propertyValueDemarcator.getValue().getBytes(StandardCharsets.UTF_8); + final String demarcatorValue = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).getValue(); + if (demarcatorValue != null) { + final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8); final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords); } else { diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java index e1ccd6c5c5..5f42369a6d 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java @@ -22,11 +22,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.kafka.processors.producer.PartitionStrategy; import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; @@ -78,9 +80,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Supplier; import java.util.regex.Pattern; +import java.util.stream.Collectors; @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"}) @CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. " @@ -211,6 +216,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .description("The format used to publish the incoming FlowFile record to Kafka.") .required(true) .defaultValue(PublishStrategy.USE_VALUE) + .dependsOn(RECORD_READER) .allowableValues(PublishStrategy.class) .build(); @@ -319,6 +325,9 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); + private final Queue producerServices = new LinkedBlockingQueue<>(); + + @Override public List getSupportedPropertyDescriptors() { return DESCRIPTORS; @@ -352,6 +361,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(attributes).getValue(); try { final List partitionStates = producerService.getPartitionStates(topicName); + verificationPartitions .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Partitions [%d] found for Topic [%s]", partitionStates.size(), topicName)); @@ -366,6 +376,17 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo return verificationResults; } + + @OnStopped + public void onStopped() { + // Ensure that we close all Producer services when stopped + KafkaProducerService service; + + while ((service = producerServices.poll()) != null) { + service.close(); + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final List flowFiles = PublishKafkaUtil.pollFlowFiles(session); @@ -373,6 +394,33 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo return; } + final KafkaProducerService producerService = getProducerService(context); + try { + publishFlowFiles(context, session, flowFiles, producerService); + } catch (final Exception e) { + final String uuids = flowFiles.stream() + .map(ff -> ff.getAttribute(CoreAttributes.UUID.key())) + .collect(Collectors.joining(", ")); + + getLogger().error("Failed to publish {} FlowFiles to Kafka: uuids={}", flowFiles.size(), uuids, e); + producerService.close(); + } finally { + if (!producerService.isClosed()) { + producerServices.offer(producerService); + } + } + } + + private KafkaProducerService getProducerService(final ProcessContext context) { + final KafkaProducerService producerService = producerServices.poll(); + if (producerService != null) { + return producerService; + } + + return createProducerService(context); + } + + private KafkaProducerService createProducerService(final ProcessContext context) { final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); final boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean(); @@ -381,27 +429,36 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); final ProducerConfiguration producerConfiguration = new ProducerConfiguration( - transactionsEnabled, transactionalIdPrefix, deliveryGuarantee, compressionCodec, partitionClass); + transactionsEnabled, transactionalIdPrefix, deliveryGuarantee, compressionCodec, partitionClass); - try (final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration)) { - publishFlowFiles(context, session, flowFiles, producerService); - } catch (final Throwable e) { - getLogger().error("Publishing FlowFiles failed", e); - context.yield(); - } + return connectionService.getProducerService(producerConfiguration); } + private void publishFlowFiles(final ProcessContext context, final ProcessSession session, final List flowFiles, final KafkaProducerService producerService) { - producerService.init(); - for (final FlowFile flowFile : flowFiles) { - publishFlowFile(context, session, flowFile, producerService); - } - final RecordSummary recordSummary = producerService.complete(); - if (recordSummary.isFailure()) { - routeFailureStrategy(context, session, flowFiles); - } else { - routeResults(session, recordSummary.getFlowFileResults()); + + // Publish all FlowFiles and ensure that we call complete() on the producer and route flowfiles as appropriate, regardless + // of the outcome. If there are failures, the complete() method will abort the transaction (if transactions are enabled). + // Otherwise, it will commit the transaction (if transactions are enabled). We then route the FlowFiles based on the results. + try { + for (final FlowFile flowFile : flowFiles) { + publishFlowFile(context, session, flowFile, producerService); + } + } finally { + RecordSummary recordSummary = null; + try { + recordSummary = producerService.complete(); + } catch (final Exception e) { + getLogger().warn("Failed to complete transaction with Kafka", e); + producerService.close(); + } + + if (recordSummary == null || recordSummary.isFailure()) { + routeFailureStrategy(context, session, flowFiles); + } else { + routeResults(session, recordSummary.getFlowFileResults()); + } } } @@ -420,6 +477,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo final long msgCount = flowFileResult.getSentCount(); final FlowFile flowFile = session.putAttribute(flowFileResult.getFlowFile(), MSG_COUNT, String.valueOf(msgCount)); session.adjustCounter("Messages Sent", msgCount, true); + final Relationship relationship = flowFileResult.getExceptions().isEmpty() ? REL_SUCCESS : REL_FAILURE; session.transfer(flowFile, relationship); } @@ -427,26 +485,31 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo private void publishFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final KafkaProducerService producerService) { - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class); final String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(flowFile.getAttributes()).getValue(); final Integer partition = getPartition(context, flowFile); final PublishContext publishContext = new PublishContext(topic, partition, null, flowFile); - final PropertyValue propertyDemarcator = context.getProperty(MESSAGE_DEMARCATOR); - final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverter(context, flowFile); + final PublishCallback callback = new PublishCallback(producerService, publishContext, kafkaRecordConverter, flowFile.getAttributes(), flowFile.getSize()); - final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue()); - final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue()); + session.read(flowFile, callback); + } - final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue(); - final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); - final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); - final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null)) - ? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger()) - : new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding); + private Integer getPartition(final ProcessContext context, final FlowFile flowFile) { + final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); + + if (PartitionStrategy.EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) { + final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + return Objects.hashCode(partition); + } + + return null; + } + + private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext context, final FlowFile flowFile) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final String attributeHeaderPatternProperty = context.getProperty(ATTRIBUTE_HEADER_PATTERN).getValue(); final Pattern attributeHeaderPattern = (attributeHeaderPatternProperty == null) ? null : Pattern.compile(attributeHeaderPatternProperty); @@ -454,54 +517,37 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo final Charset headerEncodingCharacterSet = Charset.forName(headerEncoding); final HeadersFactory headersFactory = new AttributesHeadersFactory(attributeHeaderPattern, headerEncodingCharacterSet); - final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverter( - publishStrategy, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, - keyFactory, headersFactory, propertyDemarcator, flowFile, maxMessageSize); - final PublishCallback callback = new PublishCallback( - producerService, publishContext, kafkaRecordConverter, flowFile.getAttributes(), flowFile.getSize()); - session.read(flowFile, callback); - } + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); - private Integer getPartition(final ProcessContext context, final FlowFile flowFile) { - final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); - if (PartitionStrategy.EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) { - final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); - return Objects.hashCode(partition); - } - return null; - } + if (readerFactory != null && writerFactory != null) { + final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class); + final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue()); + final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue()); + + final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue(); + final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null)) + ? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger()) + : new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding); - private KafkaRecordConverter getKafkaRecordConverter( - final PublishStrategy publishStrategy, - final RecordMetadataStrategy metadataStrategy, - final RecordReaderFactory readerFactory, - final RecordSetWriterFactory writerFactory, - final RecordSetWriterFactory keyWriterFactory, - final KeyFactory keyFactory, - final HeadersFactory headersFactory, - final PropertyValue propertyValueDemarcator, - final FlowFile flowFile, - final int maxMessageSize - ) { - final KafkaRecordConverter kafkaRecordConverter; - if ((readerFactory != null) && (writerFactory != null)) { if (publishStrategy == PublishStrategy.USE_WRAPPER) { - kafkaRecordConverter = new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, - readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger()); + return new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger()); } else { - kafkaRecordConverter = new RecordStreamKafkaRecordConverter( - readerFactory, writerFactory, headersFactory, keyFactory, maxMessageSize, getLogger()); + return new RecordStreamKafkaRecordConverter(readerFactory, writerFactory, headersFactory, keyFactory, maxMessageSize, getLogger()); } - } else if (propertyValueDemarcator.isSet()) { - final String demarcator = propertyValueDemarcator.evaluateAttributeExpressions(flowFile).getValue(); - kafkaRecordConverter = new DelimitedStreamKafkaRecordConverter( - demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory); - } else { - kafkaRecordConverter = new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory); } - return kafkaRecordConverter; + + final PropertyValue demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR); + if (demarcatorValue.isSet()) { + final String demarcator = demarcatorValue.evaluateAttributeExpressions(flowFile).getValue(); + return new DelimitedStreamKafkaRecordConverter(demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory); + } + + return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory); } + private static class PublishCallback implements InputStreamCallback { private final KafkaProducerService producerService; private final PublishContext publishContext; @@ -515,6 +561,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo final KafkaRecordConverter kafkaConverter, final Map attributes, final long inputLength) { + this.producerService = producerService; this.publishContext = publishContext; this.kafkaConverter = kafkaConverter; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java index 66a9bcc3dc..f6e2d35287 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/FlowFileStreamKafkaRecordConverter.java @@ -19,13 +19,12 @@ package org.apache.nifi.kafka.processors.producer.convert; import org.apache.nifi.kafka.processors.producer.common.ProducerUtils; import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; import org.apache.nifi.kafka.service.api.record.KafkaRecord; -import org.apache.nifi.stream.io.StreamUtils; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -42,13 +41,16 @@ public class FlowFileStreamKafkaRecordConverter implements KafkaRecordConverter } @Override - public Iterator convert( - final Map attributes, final InputStream in, final long inputLength) throws IOException { + public Iterator convert(final Map attributes, final InputStream in, final long inputLength) throws IOException { ProducerUtils.checkMessageSize(maxMessageSize, inputLength); - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - StreamUtils.copy(in, bos); - final KafkaRecord kafkaRecord = new KafkaRecord( - null, null, null, null, bos.toByteArray(), headersFactory.getHeaders(attributes)); - return Collections.singletonList(kafkaRecord).iterator(); + + final byte[] recordBytes; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + in.transferTo(baos); + recordBytes = baos.toByteArray(); + } + + final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, recordBytes, headersFactory.getHeaders(attributes)); + return List.of(kafkaRecord).iterator(); } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java index 6fe304839b..d74498b980 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java @@ -24,7 +24,7 @@ import java.util.List; /** * Kafka Consumer Service must be closed to avoid leaking connection resources */ -public interface KafkaConsumerService extends AutoCloseable { +public interface KafkaConsumerService { /** * Commit record information to Kafka Brokers * diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java index e35932f50e..b4e33115f7 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java @@ -25,12 +25,6 @@ import java.util.List; public interface KafkaProducerService extends Closeable { - /** - * Initialize the Kafka `Producer` for the publish API call sequence. This has significance in the case of - * transactional publish activity. - */ - void init(); - /** * Send the record(s) associated with a single FlowFile. * @@ -51,6 +45,11 @@ public interface KafkaProducerService extends Closeable { */ void close(); + /** + * @return true if the producer is closed, false otherwise + */ + boolean isClosed(); + /** * Fetch metadata associated with the Kafka partitions associated with the topic. */