diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractConsumeKafkaIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractConsumeKafkaIT.java index 8f60c5dcf4..381706552a 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractConsumeKafkaIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractConsumeKafkaIT.java @@ -45,8 +45,8 @@ public abstract class AbstractConsumeKafkaIT extends AbstractKafkaBaseIT { } protected void produceOne(final String topic, final Integer partition, - final String key, final String value, final List
headers) - throws ExecutionException, InterruptedException { + final String key, final String value, final List
headers) throws ExecutionException, InterruptedException { + try (final KafkaProducer producer = new KafkaProducer<>(getKafkaProducerProperties())) { final ProducerRecord record = new ProducerRecord<>(topic, partition, key, value, headers); final Future future = producer.send(record); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java index 743c9d0b00..837432ef18 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/AbstractKafkaBaseIT.java @@ -42,7 +42,7 @@ public abstract class AbstractKafkaBaseIT { protected static final String CONNECTION_SERVICE_ID = Kafka3ConnectionService.class.getSimpleName(); - protected static final Duration DURATION_POLL = Duration.ofMillis(1000L); + protected static final Duration DURATION_POLL = Duration.ofSeconds(3); protected static final KafkaContainer kafkaContainer; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java index 2e49ef772d..5c7529e8fc 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java @@ -144,18 +144,16 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT { */ @Test public void testTopicNames() throws ExecutionException, InterruptedException { - final String topic = UUID.randomUUID().toString(); - final String groupId = topic.substring(0, topic.indexOf("-")); - final String topicTestCase = topic + "-C"; - final String topicNames = topic + "-D," + topicTestCase; + final String topic = "testTopicNames"; + final String groupId = "testTopicNames"; runner.setProperty(ConsumeKafka.GROUP_ID, groupId); - runner.setProperty(ConsumeKafka.TOPICS, topicNames); + runner.setProperty(ConsumeKafka.TOPICS, topic + "," + topic + "-2"); runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_NAME); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); runner.run(1, false, true); - produceOne(topicTestCase, 0, null, RECORD_VALUE, null); + produceOne(topic, 0, null, RECORD_VALUE, null); final long pollUntil = System.currentTimeMillis() + DURATION_POLL.toMillis(); while ((System.currentTimeMillis() < pollUntil) && (runner.getFlowFilesForRelationship("success").isEmpty())) { runner.run(1, false, false); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java index 897d307ef5..2901943e25 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRecordIT.java @@ -118,9 +118,6 @@ class ConsumeKafkaRecordIT extends AbstractConsumeKafkaIT { flowFile.assertContentEquals(flowFileString); flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, topic); flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION, Integer.toString(FIRST_PARTITION)); - flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, Long.toString(FIRST_OFFSET)); - flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP); - flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT, "3"); flowFile.assertAttributeEquals("record.count", Long.toString(TEST_RECORD_COUNT)); flowFile.assertAttributeEquals("aaa", "value"); flowFile.assertAttributeNotExists("bbb"); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml index 814f649de6..af054dff6b 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/pom.xml @@ -49,11 +49,6 @@ nifi-kafka-shared 2.0.0-SNAPSHOT - - org.apache.commons - commons-pool2 - 2.12.0 - org.testcontainers junit-jupiter diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index e6afe13ceb..8b808efc21 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -20,13 +20,15 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; @@ -38,11 +40,13 @@ import org.apache.nifi.controller.VerifiableControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kafka.service.api.KafkaConnectionService; import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; -import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; +import org.apache.nifi.kafka.service.consumer.pool.Subscription; import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; import org.apache.nifi.kafka.shared.property.IsolationLevel; import org.apache.nifi.kafka.shared.property.SaslMechanism; @@ -58,8 +62,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; @@ -222,33 +229,20 @@ public class Kafka3ConnectionService extends AbstractControllerService implement ); private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2); - private static final String CONNECTION_STEP = "Kafka Broker Connection"; - private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing"; - private Properties clientProperties; - - private ServiceConfiguration serviceConfiguration; - - private Kafka3ConsumerService consumerService; + private volatile Properties clientProperties; + private volatile ServiceConfiguration serviceConfiguration; + private volatile Properties consumerProperties; @OnEnabled public void onEnabled(final ConfigurationContext configurationContext) { clientProperties = getClientProperties(configurationContext); serviceConfiguration = getServiceConfiguration(configurationContext); - final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties); - consumerService = new Kafka3ConsumerService(getLogger(), consumerProperties); + consumerProperties = getConsumerProperties(configurationContext, clientProperties); } - @OnDisabled - public void onDisabled() { - if (consumerService == null) { - getLogger().debug("Consumer Service not configured"); - } else { - consumerService.close(); - } - } @Override protected List getSupportedPropertyDescriptors() { @@ -256,10 +250,42 @@ public class Kafka3ConnectionService extends AbstractControllerService implement } @Override - public KafkaConsumerService getConsumerService(final ConsumerConfiguration consumerConfiguration) { + public KafkaConsumerService getConsumerService(final PollingContext pollingContext) { + Objects.requireNonNull(pollingContext, "Polling Context required"); + + final Subscription subscription = createSubscription(pollingContext); + + final Properties properties = new Properties(); + properties.putAll(consumerProperties); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscription.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, subscription.getAutoOffsetReset().getValue()); + + final ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); + final Consumer consumer = new KafkaConsumer<>(properties, deserializer, deserializer); + + final Optional topicPatternFound = subscription.getTopicPattern(); + if (topicPatternFound.isPresent()) { + final Pattern topicPattern = topicPatternFound.get(); + consumer.subscribe(topicPattern); + } else { + final Collection topics = subscription.getTopics(); + consumer.subscribe(topics); + } + + final Kafka3ConsumerService consumerService = new Kafka3ConsumerService(getLogger(), consumer, subscription, pollingContext.getMaxUncommittedTime()); return consumerService; } + private Subscription createSubscription(final PollingContext pollingContext) { + final String groupId = pollingContext.getGroupId(); + final Optional topicPatternFound = pollingContext.getTopicPattern(); + final AutoOffsetReset autoOffsetReset = pollingContext.getAutoOffsetReset(); + + return topicPatternFound + .map(pattern -> new Subscription(groupId, pattern, autoOffsetReset)) + .orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset)); + } + @Override public KafkaProducerService getProducerService(final ProducerConfiguration producerConfiguration) { final Properties propertiesProducer = new Properties(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java index 2db9bcfcd8..f935270cea 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java @@ -24,13 +24,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.nifi.kafka.service.api.common.OffsetSummary; import org.apache.nifi.kafka.service.api.common.PartitionState; import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary; -import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; -import org.apache.nifi.kafka.service.api.consumer.PollingContext; import org.apache.nifi.kafka.service.api.consumer.PollingSummary; import org.apache.nifi.kafka.service.api.header.RecordHeader; import org.apache.nifi.kafka.service.api.record.ByteRecord; -import org.apache.nifi.kafka.service.consumer.pool.ConsumerObjectPool; import org.apache.nifi.kafka.service.consumer.pool.Subscription; import org.apache.nifi.logging.ComponentLog; @@ -43,77 +40,87 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.regex.Pattern; +import java.util.Set; import java.util.stream.Collectors; /** * Kafka 3 Consumer Service implementation with Object Pooling for subscribed Kafka Consumers */ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable { + private final ComponentLog componentLog; + private final Consumer consumer; + private final Subscription subscription; + private final Duration maxUncommittedTime; + private volatile boolean closed = false; - private final ConsumerObjectPool consumerObjectPool; + public Kafka3ConsumerService(final ComponentLog componentLog, final Consumer consumer, final Subscription subscription, + final Duration maxUncommittedTime) { - public Kafka3ConsumerService(final ComponentLog componentLog, final Properties properties) { this.componentLog = Objects.requireNonNull(componentLog, "Component Log required"); - this.consumerObjectPool = new ConsumerObjectPool(properties); + this.consumer = consumer; + this.subscription = subscription; + this.maxUncommittedTime = maxUncommittedTime; } @Override public void commit(final PollingSummary pollingSummary) { - Objects.requireNonNull(pollingSummary, "Polling Summary required"); - - final Subscription subscription = getSubscription(pollingSummary); final Map offsets = getOffsets(pollingSummary); final long started = System.currentTimeMillis(); - final Consumer consumer = borrowConsumer(subscription); - final long elapsed; - try { - consumer.commitSync(offsets); - elapsed = started - System.currentTimeMillis(); - } finally { - returnConsumer(subscription, consumer); - } + consumer.commitSync(offsets); + final long elapsed = started - System.currentTimeMillis(); componentLog.debug("Committed Records in [{} ms] for {}", elapsed, pollingSummary); } @Override - public Iterable poll(final PollingContext pollingContext) { - Objects.requireNonNull(pollingContext, "Polling Context required"); - final Subscription subscription = getSubscription(pollingContext); + public void rollback() { + final Set assignment = consumer.assignment(); - final Consumer consumer = borrowConsumer(subscription); try { - final ConsumerRecords consumerRecords = consumer.poll(pollingContext.getMaxUncommittedTime()); - return new RecordIterable(consumerRecords); - } finally { - returnConsumer(subscription, consumer); + final Map metadataMap = consumer.committed(assignment); + for (final Map.Entry entry : metadataMap.entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + final OffsetAndMetadata offsetAndMetadata = entry.getValue(); + + if (offsetAndMetadata == null) { + consumer.seekToBeginning(Collections.singleton(topicPartition)); + componentLog.debug("Rolling back offsets so that {}-{} it is at the beginning", topicPartition.topic(), topicPartition.partition()); + } else { + consumer.seek(topicPartition, offsetAndMetadata.offset()); + componentLog.debug("Rolling back offsets so that {}-{} has offset of {}", topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset()); + } + } + } catch (final Exception rollbackException) { + componentLog.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + close(); } } @Override - public List getPartitionStates(final PollingContext pollingContext) { - final Subscription subscription = getSubscription(pollingContext); + public boolean isClosed() { + return closed; + } + + @Override + public Iterable poll() { + final ConsumerRecords consumerRecords = consumer.poll(maxUncommittedTime); + return new RecordIterable(consumerRecords); + } + + @Override + public List getPartitionStates() { final Iterator topics = subscription.getTopics().iterator(); final List partitionStates; if (topics.hasNext()) { final String topic = topics.next(); - - final Consumer consumer = borrowConsumer(subscription); - try { - partitionStates = consumer.partitionsFor(topic) - .stream() - .map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition())) - .collect(Collectors.toList()); - } finally { - returnConsumer(subscription, consumer); - } + partitionStates = consumer.partitionsFor(topic) + .stream() + .map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); } else { partitionStates = Collections.emptyList(); } @@ -123,17 +130,8 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable { @Override public void close() { - consumerObjectPool.close(); - } - - private Subscription getSubscription(final PollingContext pollingContext) { - final String groupId = pollingContext.getGroupId(); - final Optional topicPatternFound = pollingContext.getTopicPattern(); - final AutoOffsetReset autoOffsetReset = pollingContext.getAutoOffsetReset(); - - return topicPatternFound - .map(pattern -> new Subscription(groupId, pattern, autoOffsetReset)) - .orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset)); + closed = true; + consumer.close(); } private Map getOffsets(final PollingSummary pollingSummary) { @@ -152,28 +150,6 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable { return offsets; } - private Consumer borrowConsumer(final Subscription subscription) { - try { - return consumerObjectPool.borrowObject(subscription); - } catch (final Exception e) { - throw new ConsumerException("Borrow Consumer failed", e); - } - } - - private void returnConsumer(final Subscription subscription, final Consumer consumer) { - try { - consumerObjectPool.returnObject(subscription, consumer); - } catch (final Exception e) { - try { - consumerObjectPool.invalidateObject(subscription, consumer); - } catch (final Exception e2) { - componentLog.debug("Failed to invalidate Kafka Consumer", e2); - } - - consumer.close(Duration.ofSeconds(30)); - componentLog.warn("Failed to return Kafka Consumer to pool", e); - } - } private static class RecordIterable implements Iterable { private final Iterator records; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerFactory.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerFactory.java deleted file mode 100644 index 45c9b37851..0000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.service.consumer.pool; - -import org.apache.kafka.clients.consumer.Consumer; - -import java.util.Properties; - -/** - * Factory abstraction for creating Kafka Consumer objects - */ -interface ConsumerFactory { - /** - * Create new Kafka Consumer using supplied Properties - * - * @param properties Consumer configuration properties - * @return Kafka Consumer - */ - Consumer newConsumer(Properties properties); -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerObjectPool.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerObjectPool.java deleted file mode 100644 index fbac887ceb..0000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerObjectPool.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.service.consumer.pool; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.commons.pool2.impl.GenericKeyedObjectPool; - -import java.util.Properties; - -/** - * Kafka Consumer Object Pool - */ -public class ConsumerObjectPool extends GenericKeyedObjectPool> { - /** - * Consumer Object Pool constructor with Kafka Consumer Properties - * - * @param consumerProperties Kafka Consumer Properties required - */ - public ConsumerObjectPool(final Properties consumerProperties) { - super(new ConsumerPooledObjectFactory(consumerProperties, new StandardConsumerFactory())); - } -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactory.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactory.java deleted file mode 100644 index 6b91340bbd..0000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.service.consumer.pool; - -import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import java.util.Collection; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.regex.Pattern; - -/** - * Pooled Object Factory for Kafka Consumers - */ -class ConsumerPooledObjectFactory extends BaseKeyedPooledObjectFactory> { - private final Properties consumerProperties; - - private final ConsumerFactory consumerFactory; - - /** - * Consumer Pooled Object Factory constructor with Kafka Consumer Properties - * - * @param consumerProperties Kafka Consumer Properties - * @param consumerFactory Kafka Consumer Factory - */ - ConsumerPooledObjectFactory(final Properties consumerProperties, final ConsumerFactory consumerFactory) { - this.consumerProperties = Objects.requireNonNull(consumerProperties, "Consumer Properties required"); - this.consumerFactory = Objects.requireNonNull(consumerFactory, "Consumer Factory required"); - } - - @Override - public Consumer create(final Subscription subscription) { - Objects.requireNonNull(subscription, "Topic Subscription required"); - - final Properties properties = new Properties(); - properties.putAll(consumerProperties); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscription.getGroupId()); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, subscription.getAutoOffsetReset().getValue()); - - final Consumer consumer = consumerFactory.newConsumer(properties); - - final Optional topicPatternFound = subscription.getTopicPattern(); - if (topicPatternFound.isPresent()) { - final Pattern topicPattern = topicPatternFound.get(); - consumer.subscribe(topicPattern); - } else { - final Collection topics = subscription.getTopics(); - consumer.subscribe(topics); - } - - return consumer; - } - - /** - * Wrap Kafka Consumer using Default Pooled Object for tracking - * - * @param consumer Kafka Consumer - * @return Pooled Object wrapper - */ - @Override - public PooledObject> wrap(final Consumer consumer) { - Objects.requireNonNull(consumer, "Consumer required"); - return new DefaultPooledObject<>(consumer); - } - - /** - * Destroy Pooled Object closes wrapped Kafka Consumer - * - * @param subscription Subscription - * @param pooledObject Pooled Object with Consumer to be closed - */ - @Override - public void destroyObject(final Subscription subscription, final PooledObject> pooledObject) { - Objects.requireNonNull(pooledObject, "Pooled Object required"); - final Consumer consumer = pooledObject.getObject(); - consumer.unsubscribe(); - consumer.close(); - } -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/StandardConsumerFactory.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/StandardConsumerFactory.java deleted file mode 100644 index 8cbec41007..0000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/StandardConsumerFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.service.consumer.pool; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; - -import java.util.Objects; -import java.util.Properties; - -/** - * Standard Kafka Consumer Factory with Byte Array Deserializer for Key and Value elements - */ -class StandardConsumerFactory implements ConsumerFactory { - /** - * Create new Kafka Consumer with Byte Array Deserializer and configured properties - * - * @param properties Consumer configuration properties - * @return Kafka Consumer - */ - @Override - public Consumer newConsumer(final Properties properties) { - Objects.requireNonNull(properties, "Properties required"); - final ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); - return new KafkaConsumer<>(properties, deserializer, deserializer); - } -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/Subscription.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/Subscription.java index 7f5a08eb45..4df88bc421 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/Subscription.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/pool/Subscription.java @@ -28,12 +28,10 @@ import java.util.regex.Pattern; * Subscription for pooled Kafka Consumers */ public class Subscription { + private final String groupId; - private final Collection topics; - private final Pattern topicPattern; - private final AutoOffsetReset autoOffsetReset; public Subscription(final String groupId, final Collection topics, final AutoOffsetReset autoOffsetReset) { @@ -68,26 +66,21 @@ public class Subscription { @Override public boolean equals(final Object object) { - final boolean equals; - if (object == null) { - equals = false; - } else if (object instanceof Subscription) { - final Subscription subscription = (Subscription) object; - if (groupId.equals(subscription.groupId)) { - if (isTopicSubscriptionMatched(subscription)) { - equals = autoOffsetReset == subscription.autoOffsetReset; - } else { - equals = false; - } - } else { - equals = false; - } - } else { - equals = false; + return false; } - return equals; + if (object == this) { + return true; + } + + if (object instanceof final Subscription subscription) { + return groupId.equals(subscription.groupId) + && isTopicSubscriptionMatched(subscription) + && autoOffsetReset == subscription.autoOffsetReset; + } + + return false; } @Override @@ -101,20 +94,12 @@ public class Subscription { } private boolean isTopicSubscriptionMatched(final Subscription subscription) { - final boolean matched; - - if (topics.size() == subscription.topics.size()) { - if (topics.containsAll(subscription.topics)) { - final String regexLeft = (topicPattern == null ? null : topicPattern.pattern()); - final String regexRight = (subscription.topicPattern == null ? null : subscription.topicPattern.pattern()); - matched = Objects.equals(regexLeft, regexRight); - } else { - matched = false; - } - } else { - matched = false; + if (topics.size() == subscription.topics.size() && topics.containsAll(subscription.topics)) { + final String regexLeft = (topicPattern == null ? null : topicPattern.pattern()); + final String regexRight = (subscription.topicPattern == null ? null : subscription.topicPattern.pattern()); + return Objects.equals(regexLeft, regexRight); } - return matched; + return false; } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java index ceb421bb70..d635c50aca 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java @@ -216,10 +216,9 @@ public class Kafka3ConnectionServiceBaseIT { final RecordSummary summary = producerService.complete(); assertNotNull(summary); - final KafkaConsumerService consumerService = service.getConsumerService(null); - final PollingContext pollingContext = new PollingContext( - GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); - final Iterator consumerRecords = poll(consumerService, pollingContext); + final PollingContext pollingContext = new PollingContext(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); + final KafkaConsumerService consumerService = service.getConsumerService(pollingContext); + final Iterator consumerRecords = poll(consumerService); assertTrue(consumerRecords.hasNext(), "Consumer Records not found"); @@ -282,10 +281,9 @@ public class Kafka3ConnectionServiceBaseIT { @Test void testGetConsumerService() { - final KafkaConsumerService consumerService = service.getConsumerService(null); - final PollingContext pollingContext = new PollingContext( - GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); - final List partitionStates = consumerService.getPartitionStates(pollingContext); + final PollingContext pollingContext = new PollingContext(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1)); + final KafkaConsumerService consumerService = service.getConsumerService(pollingContext); + final List partitionStates = consumerService.getPartitionStates(); assertPartitionStatesFound(partitionStates); } @@ -296,11 +294,11 @@ public class Kafka3ConnectionServiceBaseIT { assertEquals(0, partitionState.getPartition()); } - private Iterator poll(final KafkaConsumerService consumerService, final PollingContext pollingContext) { + private Iterator poll(final KafkaConsumerService consumerService) { Iterator consumerRecords = Collections.emptyIterator(); for (int i = 0; i < POLLING_ATTEMPTS; i++) { - final Iterable records = consumerService.poll(pollingContext); + final Iterable records = consumerService.poll(); assertNotNull(records); consumerRecords = records.iterator(); if (consumerRecords.hasNext()) { diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactoryTest.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactoryTest.java deleted file mode 100644 index 5833b0067f..0000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/test/java/org/apache/nifi/kafka/service/consumer/pool/ConsumerPooledObjectFactoryTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.service.consumer.pool; - -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.Collection; -import java.util.Collections; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyCollection; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class ConsumerPooledObjectFactoryTest { - private static final String GROUP_ID = ConsumerPooledObjectFactoryTest.class.getSimpleName(); - - private static final String TOPIC = String.class.getSimpleName(); - - @Mock - ConsumerFactory consumerFactory; - - @Mock - Consumer mockConsumer; - - ConsumerPooledObjectFactory factory; - - @BeforeEach - void setFactory() { - final Properties properties = new Properties(); - factory = new ConsumerPooledObjectFactory(properties, consumerFactory); - } - - @Test - void testCreate() { - final Collection topics = Collections.singleton(TOPIC); - final Subscription subscription = new Subscription(GROUP_ID, topics, AutoOffsetReset.EARLIEST); - - when(consumerFactory.newConsumer(any(Properties.class))).thenReturn(mockConsumer); - final Consumer consumer = factory.create(subscription); - - assertNotNull(consumer); - - verify(mockConsumer).subscribe(anyCollection()); - } - - @Test - void testWrap() { - final PooledObject> pooledObject = factory.wrap(mockConsumer); - - assertNotNull(pooledObject); - } - - @Test - void testDestroyObject() { - final PooledObject> pooledObject = new DefaultPooledObject<>(mockConsumer); - - final Subscription subscription = new Subscription(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST); - factory.destroyObject(subscription, pooledObject); - - verify(mockConsumer).close(); - } -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java index 0cf646c7ae..35600aec0e 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java @@ -23,6 +23,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; @@ -39,7 +40,6 @@ import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafk import org.apache.nifi.kafka.service.api.KafkaConnectionService; import org.apache.nifi.kafka.service.api.common.PartitionState; import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; -import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; import org.apache.nifi.kafka.service.api.consumer.PollingContext; import org.apache.nifi.kafka.service.api.record.ByteRecord; @@ -59,16 +59,18 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.util.StringUtils; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; @@ -162,7 +164,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages " + "than when we're not as there is much less for us to keep track of in memory.") .required(true) - .defaultValue("1 s") + .defaultValue("1 sec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .dependsOn(COMMIT_OFFSETS, "true") .build(); @@ -269,6 +271,11 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess .description("FlowFiles containing one or more serialized Kafka Records") .build(); + public static final Relationship PARSE_FAILURE = new Relationship.Builder() + .name("parse failure") + .description("If configured to use a Record Reader, a Kafka message that cannot be parsed using the configured Record Reader will be routed to this relationship") + .build(); + private static final List DESCRIPTORS = List.of( CONNECTION_SERVICE, GROUP_ID, @@ -290,21 +297,19 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess SEPARATE_BY_KEY ); - private static final Set RELATIONSHIPS = Collections.singleton(SUCCESS); + private static final Set SUCCESS_RELATIONSHIP = Set.of(SUCCESS); + private static final Set SUCCESS_FAILURE_RELATIONSHIPS = Set.of(SUCCESS, PARSE_FAILURE); - private KafkaConsumerService consumerService; + private volatile Charset headerEncoding; + private volatile Pattern headerNamePattern; + private volatile KeyEncoding keyEncoding; + private volatile OutputStrategy outputStrategy; + private volatile KeyFormat keyFormat; + private volatile boolean commitOffsets; + private volatile boolean useReader; + private volatile PollingContext pollingContext; - private Charset headerEncoding; - - private Pattern headerNamePattern; - - private KeyEncoding keyEncoding; - - private OutputStrategy outputStrategy; - - private KeyFormat keyFormat; - - private boolean commitOffsets; + private final Queue consumerServices = new LinkedBlockingQueue<>(); @Override public List getSupportedPropertyDescriptors() { @@ -313,13 +318,20 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess @Override public Set getRelationships() { - return RELATIONSHIPS; + return useReader ? SUCCESS_FAILURE_RELATIONSHIPS : SUCCESS_RELATIONSHIP; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.equals(RECORD_READER)) { + useReader = newValue != null; + } } @OnScheduled public void onScheduled(final ProcessContext context) { final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - consumerService = connectionService.getConsumerService(new ConsumerConfiguration()); + pollingContext = createPollingContext(context); headerEncoding = Charset.forName(context.getProperty(HEADER_ENCODING).getValue()); final String headerNamePatternProperty = context.getProperty(HEADER_NAME_PATTERN).getValue(); @@ -335,17 +347,46 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess keyFormat = context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class); } + @OnStopped + public void onStopped() { + // Ensure that we close all Producer services when stopped + KafkaConsumerService service; + + while ((service = consumerServices.poll()) != null) { + try { + service.close(); + } catch (IOException e) { + getLogger().warn("Failed to close Kafka Consumer Service", e); + } + } + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { - final PollingContext pollingContext = getPollingContext(context); + final KafkaConsumerService consumerService = getConsumerService(context); - final Iterator consumerRecords = consumerService.poll(pollingContext).iterator(); - if (consumerRecords.hasNext()) { - processConsumerRecords(context, session, pollingContext, consumerRecords); - } else { - getLogger().debug("No Kafka Records consumed: {}", pollingContext); - context.yield(); + try { + final Iterator consumerRecords = consumerService.poll().iterator(); + if (!consumerRecords.hasNext()) { + getLogger().debug("No Kafka Records consumed: {}", pollingContext); + return; + } + + processConsumerRecords(context, session, consumerService, pollingContext, consumerRecords); + } catch (final Exception e) { + getLogger().error("Failed to consume Kafka Records", e); + consumerService.rollback(); + + try { + consumerService.close(); + } catch (IOException ex) { + getLogger().warn("Failed to close Kafka Consumer Service", ex); + } + } finally { + if (!consumerService.isClosed()) { + consumerServices.offer(consumerService); + } } } @@ -354,15 +395,14 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess final List verificationResults = new ArrayList<>(); final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - final KafkaConsumerService consumerService = connectionService.getConsumerService(new ConsumerConfiguration()); + final PollingContext pollingContext = createPollingContext(context); + final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext); final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() .verificationStepName("Verify Topic Partitions"); - final PollingContext pollingContext = getPollingContext(context); - try { - final List partitionStates = consumerService.getPartitionStates(pollingContext); + final List partitionStates = consumerService.getPartitionStates(); verificationPartitions .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) .explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics())); @@ -377,68 +417,69 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess return verificationResults; } - private void processConsumerRecords(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator consumerRecords) { - final ProcessingStrategy processingStrategy = ProcessingStrategy.valueOf(context.getProperty(PROCESSING_STRATEGY).getValue()); - // model this switch on the existing implementation at `ConsumerLease.processRecords()` - if (ProcessingStrategy.FLOW_FILE == processingStrategy) { - processInputFlowFile(session, pollingContext, consumerRecords); - } else if (ProcessingStrategy.DEMARCATOR == processingStrategy) { - final Iterator iteratorDemarcator = transformDemarcator(context, consumerRecords); - processInputFlowFile(session, pollingContext, iteratorDemarcator); - } else if (ProcessingStrategy.RECORD == processingStrategy) { - processInputRecords(context, session, pollingContext, consumerRecords); - } else { - throw new ProcessException(String.format("Processing Strategy not supported [%s]", processingStrategy)); + private KafkaConsumerService getConsumerService(final ProcessContext context) { + final KafkaConsumerService consumerService = consumerServices.poll(); + if (consumerService != null) { + return consumerService; } + + final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); + return connectionService.getConsumerService(pollingContext); + } + + private void processConsumerRecords(final ProcessContext context, final ProcessSession session, final KafkaConsumerService consumerService, + final PollingContext pollingContext, final Iterator consumerRecords) { + + final ProcessingStrategy processingStrategy = ProcessingStrategy.valueOf(context.getProperty(PROCESSING_STRATEGY).getValue()); + + switch (processingStrategy) { + case RECORD -> processInputRecords(context, session, consumerService, pollingContext, consumerRecords); + case FLOW_FILE -> processInputFlowFile(session, consumerService, pollingContext, consumerRecords); + case DEMARCATOR -> { + final Iterator demarcatedRecords = transformDemarcator(context, consumerRecords); + processInputFlowFile(session, consumerService, pollingContext, demarcatedRecords); + } + } + } private Iterator transformDemarcator(final ProcessContext context, final Iterator consumerRecords) { final String demarcatorValue = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).getValue(); - if (demarcatorValue != null) { - final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8); - final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); - return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords); - } else { + if (demarcatorValue == null) { return consumerRecords; } + + final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8); + final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); + return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords); } - private void processInputRecords(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator consumerRecords) { + private void processInputRecords(final ProcessContext context, final ProcessSession session, final KafkaConsumerService consumerService, + final PollingContext pollingContext, final Iterator consumerRecords) { + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final OffsetTracker offsetTracker = new OffsetTracker(); + final Runnable onSuccess = commitOffsets + ? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext))) + : session::commitAsync; + + final KafkaMessageConverter converter; if (OutputStrategy.USE_VALUE.equals(outputStrategy)) { - processOutputStrategyUseValue(context, session, pollingContext, consumerRecords); + converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory, + headerEncoding, headerNamePattern, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger()); } else if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) { - processOutputStrategyUseWrapper(context, session, pollingContext, consumerRecords); + final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class); + converter = new WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, keyReaderFactory, + headerEncoding, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger()); } else { throw new ProcessException(String.format("Output Strategy not supported [%s]", outputStrategy)); } - } - private void processOutputStrategyUseWrapper(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator consumerRecords) { - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class); - final OffsetTracker offsetTracker = new OffsetTracker(); - final Runnable onSuccess = commitOffsets - ? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext))) - : session::commitAsync; - final KafkaMessageConverter converter = new WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, keyReaderFactory, - headerEncoding, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger()); converter.toFlowFiles(session, consumerRecords); } - private void processOutputStrategyUseValue(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator consumerRecords) { - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final OffsetTracker offsetTracker = new OffsetTracker(); - final Runnable onSuccess = commitOffsets - ? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext))) - : session::commitAsync; - final KafkaMessageConverter converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory, - headerEncoding, headerNamePattern, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger()); - converter.toFlowFiles(session, consumerRecords); - } - - private void processInputFlowFile(final ProcessSession session, final PollingContext pollingContext, final Iterator consumerRecords) { + private void processInputFlowFile(final ProcessSession session, final KafkaConsumerService consumerService, final PollingContext pollingContext, final Iterator consumerRecords) { final OffsetTracker offsetTracker = new OffsetTracker(); final Runnable onSuccess = commitOffsets ? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext))) @@ -448,7 +489,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess converter.toFlowFiles(session, consumerRecords); } - private PollingContext getPollingContext(final ProcessContext context) { + private PollingContext createPollingContext(final ProcessContext context) { final String groupId = context.getProperty(GROUP_ID).getValue(); final String offsetReset = context.getProperty(AUTO_OFFSET_RESET).getValue(); final AutoOffsetReset autoOffsetReset = AutoOffsetReset.valueOf(offsetReset.toUpperCase()); @@ -466,6 +507,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess } else { throw new ProcessException(String.format("Topic Format [%s] not supported", topicFormat)); } + return pollingContext; } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java index 0f90bb1063..0dbfd158b1 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/common/KafkaUtils.java @@ -25,8 +25,8 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HexFormat; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -74,7 +74,7 @@ public class KafkaUtils { public static Map toAttributes(final ByteRecord consumerRecord, final KeyEncoding keyEncoding, final Pattern headerNamePattern, final Charset headerEncoding, final boolean commitOffsets) { - final Map attributes = new LinkedHashMap<>(); + final Map attributes = new HashMap<>(); attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic()); attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition())); attributes.put(KafkaFlowFileAttribute.KAFKA_OFFSET, Long.toString(consumerRecord.getOffset())); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java index bb6758b2bc..b8b3f35333 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/bundle/ByteRecordBundler.java @@ -56,16 +56,15 @@ public class ByteRecordBundler { while (consumerRecords.hasNext()) { update(bundles, consumerRecords.next()); } + return bundles.entrySet().stream() .map(e -> toByteRecord(e.getKey(), e.getValue())).iterator(); } private ByteRecord toByteRecord(final BundleKey key, final BundleValue value) { final TopicPartitionSummary topicPartition = key.getTopicPartition(); - key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, - Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8))); - key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT, - Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8))); + key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8))); + key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT, Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8))); return new ByteRecord(topicPartition.getTopic(), topicPartition.getPartition(), value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData()); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java index 306cb6b149..5d9eb5c209 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java @@ -17,10 +17,13 @@ package org.apache.nifi.kafka.processors.consumer.convert; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.kafka.processors.ConsumeKafka; import org.apache.nifi.kafka.processors.common.KafkaUtils; import org.apache.nifi.kafka.processors.consumer.OffsetTracker; +import org.apache.nifi.kafka.service.api.header.RecordHeader; import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.kafka.shared.property.KeyEncoding; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; @@ -32,20 +35,25 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; public class RecordStreamKafkaMessageConverter implements KafkaMessageConverter { + private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of()); + private final RecordReaderFactory readerFactory; private final RecordSetWriterFactory writerFactory; private final Charset headerEncoding; @@ -78,51 +86,135 @@ public class RecordStreamKafkaMessageConverter implements KafkaMessageConverter } @Override - public void toFlowFiles(final ProcessSession session, - final Iterator consumerRecords) { + public void toFlowFiles(final ProcessSession session, final Iterator consumerRecords) { try { + final Map recordGroups = new HashMap<>(); + + String topic = null; + int partition = 0; while (consumerRecords.hasNext()) { final ByteRecord consumerRecord = consumerRecords.next(); - final byte[] valueIn = consumerRecord.getValue(); - if (valueIn.length > 0) { - final InputStream in = new ByteArrayInputStream(valueIn); - final Map attributes = KafkaUtils.toAttributes( - consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets); - final RecordReader reader = readerFactory.createRecordReader(attributes, in, valueIn.length, logger); - toFlowFile(session, attributes, reader, consumerRecord); - offsetTracker.update(consumerRecord); + if (topic == null) { + partition = consumerRecord.getPartition(); + topic = consumerRecord.getTopic(); } + + final byte[] value = consumerRecord.getValue(); + final Map headers = getRelevantHeaders(consumerRecord, headerNamePattern); + + final Map attributes = KafkaUtils.toAttributes( + consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets); + + try (final InputStream in = new ByteArrayInputStream(value); + final RecordReader valueRecordReader = readerFactory.createRecordReader(attributes, in, value.length, logger)) { + + int recordCount = 0; + while (true) { + final Record record = valueRecordReader.nextRecord(); + // If we get a KafkaRecord that has no value, we still need to process it. + if (recordCount++ > 0 && record == null) { + break; + } + + final RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema); + + // Get/Register the Record Group that is associated with the schema for this Kafka Record + final RecordGroupCriteria groupCriteria = new RecordGroupCriteria(writeSchema, headers); + RecordGroup recordGroup = recordGroups.get(groupCriteria); + if (recordGroup == null) { + FlowFile flowFile = session.create(); + final Map groupAttributes = Map.of( + KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic(), + KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition()) + ); + flowFile = session.putAllAttributes(flowFile, groupAttributes); + + final OutputStream out = session.write(flowFile); + final RecordSetWriter writer; + try { + writer = writerFactory.createWriter(logger, writeSchema, out, attributes); + writer.beginRecordSet(); + } catch (final Exception e) { + out.close(); + throw e; + } + + recordGroup = new RecordGroup(flowFile, writer, topic, partition); + recordGroups.put(groupCriteria, recordGroup); + } + + // Create the Record object and write it to the Record Writer. + if (record != null) { + recordGroup.writer().write(record); + } + } + } catch (final MalformedRecordException e) { + // Failed to parse the record. Transfer to a 'parse.failure' relationship + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> out.write(value)); + session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE); + + // Track the offsets for the Kafka Record + offsetTracker.update(consumerRecord); + continue; + } + + // Track the offsets for the Kafka Record + offsetTracker.update(consumerRecord); } + + // Finish writing the records + for (final Map.Entry entry : recordGroups.entrySet()) { + final RecordGroupCriteria criteria = entry.getKey(); + final RecordGroup recordGroup = entry.getValue(); + + final Map attributes; + try (final RecordSetWriter writer = recordGroup.writer()) { + final WriteResult writeResult = writer.finishRecordSet(); + attributes = new HashMap<>(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(criteria.headers()); + attributes.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, String.valueOf(commitOffsets)); + } + + FlowFile flowFile = recordGroup.flowFile(); + flowFile = session.putAllAttributes(flowFile, attributes); + final ProvenanceReporter provenanceReporter = session.getProvenanceReporter(); + final String transitUri = String.format(TRANSIT_URI_FORMAT, topic, partition); + provenanceReporter.receive(flowFile, transitUri); + session.transfer(flowFile, ConsumeKafka.SUCCESS); + } + onSuccess.run(); - } catch (MalformedRecordException | SchemaNotFoundException | IOException e) { + } catch (final SchemaNotFoundException | IOException e) { throw new ProcessException("FlowFile Record conversion failed", e); } } - private void toFlowFile(final ProcessSession session, - final Map attributes, - final RecordReader reader, - final ByteRecord consumerRecord) throws IOException, SchemaNotFoundException { - int recordCount = 0; - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - try (final OutputStream rawOut = session.write(flowFile)) { - final RecordSet recordSet = reader.createRecordSet(); - final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); - final RecordSetWriter writer = writerFactory.createWriter(logger, schema, rawOut, attributes); - Record record; - writer.beginRecordSet(); - while ((record = recordSet.next()) != null) { - ++recordCount; - writer.write(record); - } - writer.finishRecordSet(); - writer.flush(); - final ProvenanceReporter provenanceReporter = session.getProvenanceReporter(); - final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition()); - provenanceReporter.receive(flowFile, transitUri); + private Map getRelevantHeaders(final ByteRecord consumerRecord, final Pattern headerNamePattern) { + if (headerNamePattern == null || consumerRecord == null) { + return Map.of(); } - flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(recordCount)); - session.transfer(flowFile, ConsumeKafka.SUCCESS); + + final Map headers = new HashMap<>(); + for (final RecordHeader header : consumerRecord.getHeaders()) { + final String name = header.key(); + if (headerNamePattern.matcher(name).matches()) { + final String value = new String(header.value(), headerEncoding); + headers.put(name, value); + } + } + + return headers; } + + private record RecordGroupCriteria(RecordSchema schema, Map headers) { + } + + private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) { + } + } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/WrapperRecordStreamKafkaMessageConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/WrapperRecordStreamKafkaMessageConverter.java index e1a0d93123..7bdf01a45a 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/WrapperRecordStreamKafkaMessageConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/WrapperRecordStreamKafkaMessageConverter.java @@ -17,6 +17,7 @@ package org.apache.nifi.kafka.processors.consumer.convert; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.kafka.processors.ConsumeKafka; import org.apache.nifi.kafka.processors.common.KafkaUtils; import org.apache.nifi.kafka.processors.consumer.OffsetTracker; @@ -24,6 +25,7 @@ import org.apache.nifi.kafka.processors.consumer.wrapper.ConsumeWrapperRecord; import org.apache.nifi.kafka.processors.consumer.wrapper.WrapperRecordKeyReader; import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord; import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.kafka.shared.property.KeyEncoding; import org.apache.nifi.kafka.shared.property.KeyFormat; import org.apache.nifi.logging.ComponentLog; @@ -36,11 +38,12 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.Tuple; import java.io.ByteArrayInputStream; @@ -48,11 +51,15 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; public class WrapperRecordStreamKafkaMessageConverter implements KafkaMessageConverter { + private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of()); + private final RecordReaderFactory readerFactory; private final RecordSetWriterFactory writerFactory; private final RecordReaderFactory keyReaderFactory; @@ -91,62 +98,111 @@ public class WrapperRecordStreamKafkaMessageConverter implements KafkaMessageCon } @Override - public void toFlowFiles(final ProcessSession session, - final Iterator consumerRecords) { + public void toFlowFiles(final ProcessSession session, final Iterator consumerRecords) { try { + final Map recordGroups = new HashMap<>(); + + String topic = null; + int partition = 0; while (consumerRecords.hasNext()) { final ByteRecord consumerRecord = consumerRecords.next(); - final byte[] value = consumerRecord.getValue(); - if (value.length > 0) { - final InputStream in = new ByteArrayInputStream(value); - final Map attributes = KafkaUtils.toAttributes( - consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets); - - final WrapperRecordKeyReader keyReader = new WrapperRecordKeyReader( - keyFormat, keyReaderFactory, keyEncoding, logger); - final Tuple recordKey = keyReader.toWrapperRecordKey( - consumerRecord.getKey().orElse(null), attributes); - - final RecordReader reader = readerFactory.createRecordReader(attributes, in, value.length, logger); - toFlowFile(session, attributes, reader, consumerRecord, recordKey); - offsetTracker.update(consumerRecord); + if (topic == null) { + partition = consumerRecord.getPartition(); + topic = consumerRecord.getTopic(); } + + final byte[] value = consumerRecord.getValue(); + final Map attributes = KafkaUtils.toAttributes( + consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets); + + try (final InputStream in = new ByteArrayInputStream(value); + final RecordReader valueRecordReader = readerFactory.createRecordReader(attributes, in, value.length, logger)) { + + int recordCount = 0; + while (true) { + final Record record = valueRecordReader.nextRecord(); + // If we get a KafkaRecord that has no value, we still need to process it. + if (recordCount++ > 0 && record == null) { + break; + } + + final WrapperRecordKeyReader keyReader = new WrapperRecordKeyReader(keyFormat, keyReaderFactory, keyEncoding, logger); + final Tuple recordKey = keyReader.toWrapperRecordKey(consumerRecord.getKey().orElse(null), attributes); + final RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema(); + final RecordSchema fullSchema = WrapperRecord.toWrapperSchema(recordKey.getKey(), recordSchema); + final RecordSchema writeSchema = writerFactory.getSchema(attributes, fullSchema); + + // Get/Register the Record Group that is associated with the schema for this Kafka Record + RecordGroup recordGroup = recordGroups.get(writeSchema); + if (recordGroup == null) { + FlowFile flowFile = session.create(); + final Map groupAttributes = Map.of( + KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic(), + KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition()) + ); + + flowFile = session.putAllAttributes(flowFile, groupAttributes); + + final OutputStream out = session.write(flowFile); + final RecordSetWriter writer; + try { + writer = writerFactory.createWriter(logger, writeSchema, out, attributes); + writer.beginRecordSet(); + } catch (final Exception e) { + out.close(); + throw e; + } + + recordGroup = new RecordGroup(flowFile, writer, topic, partition); + recordGroups.put(writeSchema, recordGroup); + } + + // Create the Record object and write it to the Record Writer. + final ConsumeWrapperRecord consumeWrapperRecord = new ConsumeWrapperRecord(headerEncoding); + final MapRecord wrapperRecord = consumeWrapperRecord.toWrapperRecord(consumerRecord, record, recordKey); + recordGroup.writer().write(wrapperRecord); + } + } catch (final MalformedRecordException e) { + // Failed to parse the record. Transfer to a 'parse.failure' relationship + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> out.write(value)); + session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE); + + // Track the offsets for the Kafka Record + offsetTracker.update(consumerRecord); + continue; + } + + // Track the offsets for the Kafka Record + offsetTracker.update(consumerRecord); } + + // Finish writing the records + for (final RecordGroup recordGroup : recordGroups.values()) { + final Map attributes; + try (final RecordSetWriter writer = recordGroup.writer()) { + final WriteResult writeResult = writer.finishRecordSet(); + attributes = new HashMap<>(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, String.valueOf(commitOffsets)); + } + + FlowFile flowFile = recordGroup.flowFile(); + flowFile = session.putAllAttributes(flowFile, attributes); + final ProvenanceReporter provenanceReporter = session.getProvenanceReporter(); + final String transitUri = String.format(TRANSIT_URI_FORMAT, topic, partition); + provenanceReporter.receive(flowFile, transitUri); + session.transfer(flowFile, ConsumeKafka.SUCCESS); + } + onSuccess.run(); - } catch (MalformedRecordException | SchemaNotFoundException | IOException e) { + } catch (final SchemaNotFoundException | IOException e) { throw new ProcessException("FlowFile Record conversion failed", e); } } - private void toFlowFile(final ProcessSession session, - final Map attributes, - final RecordReader reader, - final ByteRecord consumerRecord, - Tuple recordKey) - throws IOException, SchemaNotFoundException, MalformedRecordException { - int recordCount = 0; - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - try (final OutputStream rawOut = session.write(flowFile)) { - final RecordSet recordSet = reader.createRecordSet(); - final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); - final RecordSchema schemaWrapper = WrapperRecord.toWrapperSchema(recordKey.getKey(), schema); - final RecordSetWriter writer = writerFactory.createWriter(logger, schemaWrapper, rawOut, attributes); - Record record; - writer.beginRecordSet(); - while ((record = recordSet.next()) != null) { - ++recordCount; - final ConsumeWrapperRecord consumeWrapperRecord = new ConsumeWrapperRecord(headerEncoding); - final MapRecord wrapperRecord = consumeWrapperRecord.toWrapperRecord(consumerRecord, record, recordKey); - writer.write(wrapperRecord); - } - writer.finishRecordSet(); - writer.flush(); - final ProvenanceReporter provenanceReporter = session.getProvenanceReporter(); - final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition()); - provenanceReporter.receive(flowFile, transitUri); - } - flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(recordCount)); - session.transfer(flowFile, ConsumeKafka.SUCCESS); + private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) { } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/wrapper/ConsumeWrapperRecord.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/wrapper/ConsumeWrapperRecord.java index e1f2380044..15b8d022c6 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/wrapper/ConsumeWrapperRecord.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/wrapper/ConsumeWrapperRecord.java @@ -30,9 +30,13 @@ import org.apache.nifi.util.Tuple; import java.nio.charset.Charset; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class ConsumeWrapperRecord { + + private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of()); + private final Charset headerCharacterSet; public ConsumeWrapperRecord(final Charset headerCharacterSet) { @@ -57,7 +61,7 @@ public class ConsumeWrapperRecord { } private Tuple toWrapperRecordValue(final Record record) { - final RecordSchema recordSchema = (record == null) ? null : record.getSchema(); + final RecordSchema recordSchema = (record == null) ? EMPTY_SCHEMA : record.getSchema(); final RecordField recordField = new RecordField(WrapperRecord.VALUE, RecordFieldType.RECORD.getRecordDataType(recordSchema)); return new Tuple<>(recordField, record); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java index 74a8203895..87605dde0f 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaTest.java @@ -83,7 +83,7 @@ class ConsumeKafkaTest { public void testVerifySuccessful() throws InitializationException { final PartitionState firstPartitionState = new PartitionState(TEST_TOPIC_NAME, FIRST_PARTITION); final List partitionStates = Collections.singletonList(firstPartitionState); - when(kafkaConsumerService.getPartitionStates(any())).thenReturn(partitionStates); + when(kafkaConsumerService.getPartitionStates()).thenReturn(partitionStates); setConnectionService(); when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService); @@ -100,7 +100,7 @@ class ConsumeKafkaTest { @Test public void testVerifyFailed() throws InitializationException { - when(kafkaConsumerService.getPartitionStates(any())).thenThrow(new IllegalStateException()); + when(kafkaConsumerService.getPartitionStates()).thenThrow(new IllegalStateException()); when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService); setConnectionService(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java index c34dc9feec..0697cad4f6 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/KafkaConnectionService.java @@ -17,14 +17,14 @@ package org.apache.nifi.kafka.service.api; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; public interface KafkaConnectionService extends ControllerService { - KafkaConsumerService getConsumerService(ConsumerConfiguration consumerConfiguration); + KafkaConsumerService getConsumerService(PollingContext pollingContext); KafkaProducerService getProducerService(ProducerConfiguration producerConfiguration); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java index d74498b980..2f70a23ad6 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java @@ -19,12 +19,13 @@ package org.apache.nifi.kafka.service.api.consumer; import org.apache.nifi.kafka.service.api.common.PartitionState; import org.apache.nifi.kafka.service.api.record.ByteRecord; +import java.io.Closeable; import java.util.List; /** * Kafka Consumer Service must be closed to avoid leaking connection resources */ -public interface KafkaConsumerService { +public interface KafkaConsumerService extends Closeable { /** * Commit record information to Kafka Brokers * @@ -32,19 +33,27 @@ public interface KafkaConsumerService { */ void commit(PollingSummary pollingSummary); + /** + * Rolls back the offsets of the records so that any records that have been polled since the last commit are re-polled + */ + void rollback(); + + /** + * @return true if the service is closed; false otherwise + */ + boolean isClosed(); + /** * Poll Subscriptions for Records * - * @param pollingContext Polling Context containing subscription information * @return Stream of Records or empty when none returned */ - Iterable poll(PollingContext pollingContext); + Iterable poll(); /** * Get Partition State information for subscription * - * @param pollingContext Polling Context containing subscription information * @return List of Partition State information */ - List getPartitionStates(PollingContext pollingContext); + List getPartitionStates(); } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/PollingContext.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/PollingContext.java index daafe069aa..ed9a84ccc3 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/PollingContext.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/PollingContext.java @@ -27,14 +27,11 @@ import java.util.regex.Pattern; * Polling Context for consuming records from Kafka Topics */ public class PollingContext { + private final String groupId; - private final Collection topics; - private final Pattern topicPattern; - private final AutoOffsetReset autoOffsetReset; - private final Duration maxUncommittedTime; public PollingContext(final String groupId, final Collection topics,