NIFI-13787 Fixed error and record handling in ConsumeKafka

Eliminated unnecessary connection pooling at the service layer so that a single Consumer is made available to the processor. This allows the processor to rollback offsets and provides a simpler API.

This closes #9298

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-09-21 13:31:10 -04:00 committed by exceptionfactory
parent dee455a802
commit d04d38fc44
No known key found for this signature in database
24 changed files with 505 additions and 629 deletions

View File

@ -45,8 +45,8 @@ public abstract class AbstractConsumeKafkaIT extends AbstractKafkaBaseIT {
}
protected void produceOne(final String topic, final Integer partition,
final String key, final String value, final List<Header> headers)
throws ExecutionException, InterruptedException {
final String key, final String value, final List<Header> headers) throws ExecutionException, InterruptedException {
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(getKafkaProducerProperties())) {
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value, headers);
final Future<RecordMetadata> future = producer.send(record);

View File

@ -42,7 +42,7 @@ public abstract class AbstractKafkaBaseIT {
protected static final String CONNECTION_SERVICE_ID = Kafka3ConnectionService.class.getSimpleName();
protected static final Duration DURATION_POLL = Duration.ofMillis(1000L);
protected static final Duration DURATION_POLL = Duration.ofSeconds(3);
protected static final KafkaContainer kafkaContainer;

View File

@ -144,18 +144,16 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
*/
@Test
public void testTopicNames() throws ExecutionException, InterruptedException {
final String topic = UUID.randomUUID().toString();
final String groupId = topic.substring(0, topic.indexOf("-"));
final String topicTestCase = topic + "-C";
final String topicNames = topic + "-D," + topicTestCase;
final String topic = "testTopicNames";
final String groupId = "testTopicNames";
runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topicNames);
runner.setProperty(ConsumeKafka.TOPICS, topic + "," + topic + "-2");
runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_NAME);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue());
runner.run(1, false, true);
produceOne(topicTestCase, 0, null, RECORD_VALUE, null);
produceOne(topic, 0, null, RECORD_VALUE, null);
final long pollUntil = System.currentTimeMillis() + DURATION_POLL.toMillis();
while ((System.currentTimeMillis() < pollUntil) && (runner.getFlowFilesForRelationship("success").isEmpty())) {
runner.run(1, false, false);

View File

@ -118,9 +118,6 @@ class ConsumeKafkaRecordIT extends AbstractConsumeKafkaIT {
flowFile.assertContentEquals(flowFileString);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_TOPIC, topic);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_PARTITION, Integer.toString(FIRST_PARTITION));
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_OFFSET, Long.toString(FIRST_OFFSET));
flowFile.assertAttributeExists(KafkaFlowFileAttribute.KAFKA_TIMESTAMP);
flowFile.assertAttributeEquals(KafkaFlowFileAttribute.KAFKA_HEADER_COUNT, "3");
flowFile.assertAttributeEquals("record.count", Long.toString(TEST_RECORD_COUNT));
flowFile.assertAttributeEquals("aaa", "value");
flowFile.assertAttributeNotExists("bbb");

View File

@ -49,11 +49,6 @@
<artifactId>nifi-kafka-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>

View File

@ -20,13 +20,15 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
@ -38,11 +40,13 @@ import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
@ -58,8 +62,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
@ -222,33 +229,20 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
);
private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2);
private static final String CONNECTION_STEP = "Kafka Broker Connection";
private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
private Properties clientProperties;
private ServiceConfiguration serviceConfiguration;
private Kafka3ConsumerService consumerService;
private volatile Properties clientProperties;
private volatile ServiceConfiguration serviceConfiguration;
private volatile Properties consumerProperties;
@OnEnabled
public void onEnabled(final ConfigurationContext configurationContext) {
clientProperties = getClientProperties(configurationContext);
serviceConfiguration = getServiceConfiguration(configurationContext);
final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties);
consumerService = new Kafka3ConsumerService(getLogger(), consumerProperties);
consumerProperties = getConsumerProperties(configurationContext, clientProperties);
}
@OnDisabled
public void onDisabled() {
if (consumerService == null) {
getLogger().debug("Consumer Service not configured");
} else {
consumerService.close();
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -256,10 +250,42 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
}
@Override
public KafkaConsumerService getConsumerService(final ConsumerConfiguration consumerConfiguration) {
public KafkaConsumerService getConsumerService(final PollingContext pollingContext) {
Objects.requireNonNull(pollingContext, "Polling Context required");
final Subscription subscription = createSubscription(pollingContext);
final Properties properties = new Properties();
properties.putAll(consumerProperties);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscription.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, subscription.getAutoOffsetReset().getValue());
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties, deserializer, deserializer);
final Optional<Pattern> topicPatternFound = subscription.getTopicPattern();
if (topicPatternFound.isPresent()) {
final Pattern topicPattern = topicPatternFound.get();
consumer.subscribe(topicPattern);
} else {
final Collection<String> topics = subscription.getTopics();
consumer.subscribe(topics);
}
final Kafka3ConsumerService consumerService = new Kafka3ConsumerService(getLogger(), consumer, subscription, pollingContext.getMaxUncommittedTime());
return consumerService;
}
private Subscription createSubscription(final PollingContext pollingContext) {
final String groupId = pollingContext.getGroupId();
final Optional<Pattern> topicPatternFound = pollingContext.getTopicPattern();
final AutoOffsetReset autoOffsetReset = pollingContext.getAutoOffsetReset();
return topicPatternFound
.map(pattern -> new Subscription(groupId, pattern, autoOffsetReset))
.orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset));
}
@Override
public KafkaProducerService getProducerService(final ProducerConfiguration producerConfiguration) {
final Properties propertiesProducer = new Properties();

View File

@ -24,13 +24,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.kafka.service.api.common.OffsetSummary;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.consumer.PollingSummary;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.service.consumer.pool.ConsumerObjectPool;
import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.logging.ComponentLog;
@ -43,77 +40,87 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed Kafka Consumers
*/
public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
private final ComponentLog componentLog;
private final Consumer<byte[], byte[]> consumer;
private final Subscription subscription;
private final Duration maxUncommittedTime;
private volatile boolean closed = false;
private final ConsumerObjectPool consumerObjectPool;
public Kafka3ConsumerService(final ComponentLog componentLog, final Consumer<byte[], byte[]> consumer, final Subscription subscription,
final Duration maxUncommittedTime) {
public Kafka3ConsumerService(final ComponentLog componentLog, final Properties properties) {
this.componentLog = Objects.requireNonNull(componentLog, "Component Log required");
this.consumerObjectPool = new ConsumerObjectPool(properties);
this.consumer = consumer;
this.subscription = subscription;
this.maxUncommittedTime = maxUncommittedTime;
}
@Override
public void commit(final PollingSummary pollingSummary) {
Objects.requireNonNull(pollingSummary, "Polling Summary required");
final Subscription subscription = getSubscription(pollingSummary);
final Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(pollingSummary);
final long started = System.currentTimeMillis();
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
final long elapsed;
try {
consumer.commitSync(offsets);
elapsed = started - System.currentTimeMillis();
} finally {
returnConsumer(subscription, consumer);
}
final long elapsed = started - System.currentTimeMillis();
componentLog.debug("Committed Records in [{} ms] for {}", elapsed, pollingSummary);
}
@Override
public Iterable<ByteRecord> poll(final PollingContext pollingContext) {
Objects.requireNonNull(pollingContext, "Polling Context required");
final Subscription subscription = getSubscription(pollingContext);
public void rollback() {
final Set<TopicPartition> assignment = consumer.assignment();
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(pollingContext.getMaxUncommittedTime());
return new RecordIterable(consumerRecords);
} finally {
returnConsumer(subscription, consumer);
final Map<TopicPartition, OffsetAndMetadata> metadataMap = consumer.committed(assignment);
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : metadataMap.entrySet()) {
final TopicPartition topicPartition = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata == null) {
consumer.seekToBeginning(Collections.singleton(topicPartition));
componentLog.debug("Rolling back offsets so that {}-{} it is at the beginning", topicPartition.topic(), topicPartition.partition());
} else {
consumer.seek(topicPartition, offsetAndMetadata.offset());
componentLog.debug("Rolling back offsets so that {}-{} has offset of {}", topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset());
}
}
} catch (final Exception rollbackException) {
componentLog.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
close();
}
}
@Override
public List<PartitionState> getPartitionStates(final PollingContext pollingContext) {
final Subscription subscription = getSubscription(pollingContext);
public boolean isClosed() {
return closed;
}
@Override
public Iterable<ByteRecord> poll() {
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(maxUncommittedTime);
return new RecordIterable(consumerRecords);
}
@Override
public List<PartitionState> getPartitionStates() {
final Iterator<String> topics = subscription.getTopics().iterator();
final List<PartitionState> partitionStates;
if (topics.hasNext()) {
final String topic = topics.next();
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
partitionStates = consumer.partitionsFor(topic)
.stream()
.map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
} finally {
returnConsumer(subscription, consumer);
}
} else {
partitionStates = Collections.emptyList();
}
@ -123,17 +130,8 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
@Override
public void close() {
consumerObjectPool.close();
}
private Subscription getSubscription(final PollingContext pollingContext) {
final String groupId = pollingContext.getGroupId();
final Optional<Pattern> topicPatternFound = pollingContext.getTopicPattern();
final AutoOffsetReset autoOffsetReset = pollingContext.getAutoOffsetReset();
return topicPatternFound
.map(pattern -> new Subscription(groupId, pattern, autoOffsetReset))
.orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset));
closed = true;
consumer.close();
}
private Map<TopicPartition, OffsetAndMetadata> getOffsets(final PollingSummary pollingSummary) {
@ -152,28 +150,6 @@ public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
return offsets;
}
private Consumer<byte[], byte[]> borrowConsumer(final Subscription subscription) {
try {
return consumerObjectPool.borrowObject(subscription);
} catch (final Exception e) {
throw new ConsumerException("Borrow Consumer failed", e);
}
}
private void returnConsumer(final Subscription subscription, final Consumer<byte[], byte[]> consumer) {
try {
consumerObjectPool.returnObject(subscription, consumer);
} catch (final Exception e) {
try {
consumerObjectPool.invalidateObject(subscription, consumer);
} catch (final Exception e2) {
componentLog.debug("Failed to invalidate Kafka Consumer", e2);
}
consumer.close(Duration.ofSeconds(30));
componentLog.warn("Failed to return Kafka Consumer to pool", e);
}
}
private static class RecordIterable implements Iterable<ByteRecord> {
private final Iterator<ByteRecord> records;

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.consumer.pool;
import org.apache.kafka.clients.consumer.Consumer;
import java.util.Properties;
/**
* Factory abstraction for creating Kafka Consumer objects
*/
interface ConsumerFactory {
/**
* Create new Kafka Consumer using supplied Properties
*
* @param properties Consumer configuration properties
* @return Kafka Consumer
*/
Consumer<byte[], byte[]> newConsumer(Properties properties);
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.consumer.pool;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import java.util.Properties;
/**
* Kafka Consumer Object Pool
*/
public class ConsumerObjectPool extends GenericKeyedObjectPool<Subscription, Consumer<byte[], byte[]>> {
/**
* Consumer Object Pool constructor with Kafka Consumer Properties
*
* @param consumerProperties Kafka Consumer Properties required
*/
public ConsumerObjectPool(final Properties consumerProperties) {
super(new ConsumerPooledObjectFactory(consumerProperties, new StandardConsumerFactory()));
}
}

View File

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.consumer.pool;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
/**
* Pooled Object Factory for Kafka Consumers
*/
class ConsumerPooledObjectFactory extends BaseKeyedPooledObjectFactory<Subscription, Consumer<byte[], byte[]>> {
private final Properties consumerProperties;
private final ConsumerFactory consumerFactory;
/**
* Consumer Pooled Object Factory constructor with Kafka Consumer Properties
*
* @param consumerProperties Kafka Consumer Properties
* @param consumerFactory Kafka Consumer Factory
*/
ConsumerPooledObjectFactory(final Properties consumerProperties, final ConsumerFactory consumerFactory) {
this.consumerProperties = Objects.requireNonNull(consumerProperties, "Consumer Properties required");
this.consumerFactory = Objects.requireNonNull(consumerFactory, "Consumer Factory required");
}
@Override
public Consumer<byte[], byte[]> create(final Subscription subscription) {
Objects.requireNonNull(subscription, "Topic Subscription required");
final Properties properties = new Properties();
properties.putAll(consumerProperties);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, subscription.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, subscription.getAutoOffsetReset().getValue());
final Consumer<byte[], byte[]> consumer = consumerFactory.newConsumer(properties);
final Optional<Pattern> topicPatternFound = subscription.getTopicPattern();
if (topicPatternFound.isPresent()) {
final Pattern topicPattern = topicPatternFound.get();
consumer.subscribe(topicPattern);
} else {
final Collection<String> topics = subscription.getTopics();
consumer.subscribe(topics);
}
return consumer;
}
/**
* Wrap Kafka Consumer using Default Pooled Object for tracking
*
* @param consumer Kafka Consumer
* @return Pooled Object wrapper
*/
@Override
public PooledObject<Consumer<byte[], byte[]>> wrap(final Consumer<byte[], byte[]> consumer) {
Objects.requireNonNull(consumer, "Consumer required");
return new DefaultPooledObject<>(consumer);
}
/**
* Destroy Pooled Object closes wrapped Kafka Consumer
*
* @param subscription Subscription
* @param pooledObject Pooled Object with Consumer to be closed
*/
@Override
public void destroyObject(final Subscription subscription, final PooledObject<Consumer<byte[], byte[]>> pooledObject) {
Objects.requireNonNull(pooledObject, "Pooled Object required");
final Consumer<byte[], byte[]> consumer = pooledObject.getObject();
consumer.unsubscribe();
consumer.close();
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.consumer.pool;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.util.Objects;
import java.util.Properties;
/**
* Standard Kafka Consumer Factory with Byte Array Deserializer for Key and Value elements
*/
class StandardConsumerFactory implements ConsumerFactory {
/**
* Create new Kafka Consumer with Byte Array Deserializer and configured properties
*
* @param properties Consumer configuration properties
* @return Kafka Consumer
*/
@Override
public Consumer<byte[], byte[]> newConsumer(final Properties properties) {
Objects.requireNonNull(properties, "Properties required");
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
return new KafkaConsumer<>(properties, deserializer, deserializer);
}
}

View File

@ -28,12 +28,10 @@ import java.util.regex.Pattern;
* Subscription for pooled Kafka Consumers
*/
public class Subscription {
private final String groupId;
private final Collection<String> topics;
private final Pattern topicPattern;
private final AutoOffsetReset autoOffsetReset;
public Subscription(final String groupId, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
@ -68,26 +66,21 @@ public class Subscription {
@Override
public boolean equals(final Object object) {
final boolean equals;
if (object == null) {
equals = false;
} else if (object instanceof Subscription) {
final Subscription subscription = (Subscription) object;
if (groupId.equals(subscription.groupId)) {
if (isTopicSubscriptionMatched(subscription)) {
equals = autoOffsetReset == subscription.autoOffsetReset;
} else {
equals = false;
}
} else {
equals = false;
}
} else {
equals = false;
return false;
}
return equals;
if (object == this) {
return true;
}
if (object instanceof final Subscription subscription) {
return groupId.equals(subscription.groupId)
&& isTopicSubscriptionMatched(subscription)
&& autoOffsetReset == subscription.autoOffsetReset;
}
return false;
}
@Override
@ -101,20 +94,12 @@ public class Subscription {
}
private boolean isTopicSubscriptionMatched(final Subscription subscription) {
final boolean matched;
if (topics.size() == subscription.topics.size()) {
if (topics.containsAll(subscription.topics)) {
if (topics.size() == subscription.topics.size() && topics.containsAll(subscription.topics)) {
final String regexLeft = (topicPattern == null ? null : topicPattern.pattern());
final String regexRight = (subscription.topicPattern == null ? null : subscription.topicPattern.pattern());
matched = Objects.equals(regexLeft, regexRight);
} else {
matched = false;
}
} else {
matched = false;
return Objects.equals(regexLeft, regexRight);
}
return matched;
return false;
}
}

View File

@ -216,10 +216,9 @@ public class Kafka3ConnectionServiceBaseIT {
final RecordSummary summary = producerService.complete();
assertNotNull(summary);
final KafkaConsumerService consumerService = service.getConsumerService(null);
final PollingContext pollingContext = new PollingContext(
GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final Iterator<ByteRecord> consumerRecords = poll(consumerService, pollingContext);
final PollingContext pollingContext = new PollingContext(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final KafkaConsumerService consumerService = service.getConsumerService(pollingContext);
final Iterator<ByteRecord> consumerRecords = poll(consumerService);
assertTrue(consumerRecords.hasNext(), "Consumer Records not found");
@ -282,10 +281,9 @@ public class Kafka3ConnectionServiceBaseIT {
@Test
void testGetConsumerService() {
final KafkaConsumerService consumerService = service.getConsumerService(null);
final PollingContext pollingContext = new PollingContext(
GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final List<PartitionState> partitionStates = consumerService.getPartitionStates(pollingContext);
final PollingContext pollingContext = new PollingContext(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final KafkaConsumerService consumerService = service.getConsumerService(pollingContext);
final List<PartitionState> partitionStates = consumerService.getPartitionStates();
assertPartitionStatesFound(partitionStates);
}
@ -296,11 +294,11 @@ public class Kafka3ConnectionServiceBaseIT {
assertEquals(0, partitionState.getPartition());
}
private Iterator<ByteRecord> poll(final KafkaConsumerService consumerService, final PollingContext pollingContext) {
private Iterator<ByteRecord> poll(final KafkaConsumerService consumerService) {
Iterator<ByteRecord> consumerRecords = Collections.emptyIterator();
for (int i = 0; i < POLLING_ATTEMPTS; i++) {
final Iterable<ByteRecord> records = consumerService.poll(pollingContext);
final Iterable<ByteRecord> records = consumerService.poll();
assertNotNull(records);
consumerRecords = records.iterator();
if (consumerRecords.hasNext()) {

View File

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.consumer.pool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ConsumerPooledObjectFactoryTest {
private static final String GROUP_ID = ConsumerPooledObjectFactoryTest.class.getSimpleName();
private static final String TOPIC = String.class.getSimpleName();
@Mock
ConsumerFactory consumerFactory;
@Mock
Consumer<byte[], byte[]> mockConsumer;
ConsumerPooledObjectFactory factory;
@BeforeEach
void setFactory() {
final Properties properties = new Properties();
factory = new ConsumerPooledObjectFactory(properties, consumerFactory);
}
@Test
void testCreate() {
final Collection<String> topics = Collections.singleton(TOPIC);
final Subscription subscription = new Subscription(GROUP_ID, topics, AutoOffsetReset.EARLIEST);
when(consumerFactory.newConsumer(any(Properties.class))).thenReturn(mockConsumer);
final Consumer<byte[], byte[]> consumer = factory.create(subscription);
assertNotNull(consumer);
verify(mockConsumer).subscribe(anyCollection());
}
@Test
void testWrap() {
final PooledObject<Consumer<byte[], byte[]>> pooledObject = factory.wrap(mockConsumer);
assertNotNull(pooledObject);
}
@Test
void testDestroyObject() {
final PooledObject<Consumer<byte[], byte[]>> pooledObject = new DefaultPooledObject<>(mockConsumer);
final Subscription subscription = new Subscription(GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST);
factory.destroyObject(subscription, pooledObject);
verify(mockConsumer).close();
}
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
@ -39,7 +40,6 @@ import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafk
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
@ -59,16 +59,18 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@ -162,7 +164,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
+ "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ "than when we're not as there is much less for us to keep track of in memory.")
.required(true)
.defaultValue("1 s")
.defaultValue("1 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.dependsOn(COMMIT_OFFSETS, "true")
.build();
@ -269,6 +271,11 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
.description("FlowFiles containing one or more serialized Kafka Records")
.build();
public static final Relationship PARSE_FAILURE = new Relationship.Builder()
.name("parse failure")
.description("If configured to use a Record Reader, a Kafka message that cannot be parsed using the configured Record Reader will be routed to this relationship")
.build();
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
CONNECTION_SERVICE,
GROUP_ID,
@ -290,21 +297,19 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
SEPARATE_BY_KEY
);
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(SUCCESS);
private static final Set<Relationship> SUCCESS_RELATIONSHIP = Set.of(SUCCESS);
private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = Set.of(SUCCESS, PARSE_FAILURE);
private KafkaConsumerService consumerService;
private volatile Charset headerEncoding;
private volatile Pattern headerNamePattern;
private volatile KeyEncoding keyEncoding;
private volatile OutputStrategy outputStrategy;
private volatile KeyFormat keyFormat;
private volatile boolean commitOffsets;
private volatile boolean useReader;
private volatile PollingContext pollingContext;
private Charset headerEncoding;
private Pattern headerNamePattern;
private KeyEncoding keyEncoding;
private OutputStrategy outputStrategy;
private KeyFormat keyFormat;
private boolean commitOffsets;
private final Queue<KafkaConsumerService> consumerServices = new LinkedBlockingQueue<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -313,13 +318,20 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
return useReader ? SUCCESS_FAILURE_RELATIONSHIPS : SUCCESS_RELATIONSHIP;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.equals(RECORD_READER)) {
useReader = newValue != null;
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
consumerService = connectionService.getConsumerService(new ConsumerConfiguration());
pollingContext = createPollingContext(context);
headerEncoding = Charset.forName(context.getProperty(HEADER_ENCODING).getValue());
final String headerNamePatternProperty = context.getProperty(HEADER_NAME_PATTERN).getValue();
@ -335,17 +347,46 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
keyFormat = context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class);
}
@OnStopped
public void onStopped() {
// Ensure that we close all Producer services when stopped
KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
try {
service.close();
} catch (IOException e) {
getLogger().warn("Failed to close Kafka Consumer Service", e);
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final PollingContext pollingContext = getPollingContext(context);
final KafkaConsumerService consumerService = getConsumerService(context);
final Iterator<ByteRecord> consumerRecords = consumerService.poll(pollingContext).iterator();
if (consumerRecords.hasNext()) {
processConsumerRecords(context, session, pollingContext, consumerRecords);
} else {
try {
final Iterator<ByteRecord> consumerRecords = consumerService.poll().iterator();
if (!consumerRecords.hasNext()) {
getLogger().debug("No Kafka Records consumed: {}", pollingContext);
context.yield();
return;
}
processConsumerRecords(context, session, consumerService, pollingContext, consumerRecords);
} catch (final Exception e) {
getLogger().error("Failed to consume Kafka Records", e);
consumerService.rollback();
try {
consumerService.close();
} catch (IOException ex) {
getLogger().warn("Failed to close Kafka Consumer Service", ex);
}
} finally {
if (!consumerService.isClosed()) {
consumerServices.offer(consumerService);
}
}
}
@ -354,15 +395,14 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
final KafkaConsumerService consumerService = connectionService.getConsumerService(new ConsumerConfiguration());
final PollingContext pollingContext = createPollingContext(context);
final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext);
final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder()
.verificationStepName("Verify Topic Partitions");
final PollingContext pollingContext = getPollingContext(context);
try {
final List<PartitionState> partitionStates = consumerService.getPartitionStates(pollingContext);
final List<PartitionState> partitionStates = consumerService.getPartitionStates();
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics()));
@ -377,68 +417,69 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
return verificationResults;
}
private void processConsumerRecords(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final ProcessingStrategy processingStrategy = ProcessingStrategy.valueOf(context.getProperty(PROCESSING_STRATEGY).getValue());
// model this switch on the existing implementation at `ConsumerLease.processRecords()`
if (ProcessingStrategy.FLOW_FILE == processingStrategy) {
processInputFlowFile(session, pollingContext, consumerRecords);
} else if (ProcessingStrategy.DEMARCATOR == processingStrategy) {
final Iterator<ByteRecord> iteratorDemarcator = transformDemarcator(context, consumerRecords);
processInputFlowFile(session, pollingContext, iteratorDemarcator);
} else if (ProcessingStrategy.RECORD == processingStrategy) {
processInputRecords(context, session, pollingContext, consumerRecords);
} else {
throw new ProcessException(String.format("Processing Strategy not supported [%s]", processingStrategy));
private KafkaConsumerService getConsumerService(final ProcessContext context) {
final KafkaConsumerService consumerService = consumerServices.poll();
if (consumerService != null) {
return consumerService;
}
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
return connectionService.getConsumerService(pollingContext);
}
private void processConsumerRecords(final ProcessContext context, final ProcessSession session, final KafkaConsumerService consumerService,
final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final ProcessingStrategy processingStrategy = ProcessingStrategy.valueOf(context.getProperty(PROCESSING_STRATEGY).getValue());
switch (processingStrategy) {
case RECORD -> processInputRecords(context, session, consumerService, pollingContext, consumerRecords);
case FLOW_FILE -> processInputFlowFile(session, consumerService, pollingContext, consumerRecords);
case DEMARCATOR -> {
final Iterator<ByteRecord> demarcatedRecords = transformDemarcator(context, consumerRecords);
processInputFlowFile(session, consumerService, pollingContext, demarcatedRecords);
}
}
}
private Iterator<ByteRecord> transformDemarcator(final ProcessContext context, final Iterator<ByteRecord> consumerRecords) {
final String demarcatorValue = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).getValue();
if (demarcatorValue != null) {
if (demarcatorValue == null) {
return consumerRecords;
}
final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8);
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords);
} else {
return consumerRecords;
}
}
private void processInputRecords(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
private void processInputRecords(final ProcessContext context, final ProcessSession session, final KafkaConsumerService consumerService,
final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final OffsetTracker offsetTracker = new OffsetTracker();
final Runnable onSuccess = commitOffsets
? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext)))
: session::commitAsync;
final KafkaMessageConverter converter;
if (OutputStrategy.USE_VALUE.equals(outputStrategy)) {
processOutputStrategyUseValue(context, session, pollingContext, consumerRecords);
converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory,
headerEncoding, headerNamePattern, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger());
} else if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) {
processOutputStrategyUseWrapper(context, session, pollingContext, consumerRecords);
final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class);
converter = new WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, keyReaderFactory,
headerEncoding, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger());
} else {
throw new ProcessException(String.format("Output Strategy not supported [%s]", outputStrategy));
}
}
private void processOutputStrategyUseWrapper(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class);
final OffsetTracker offsetTracker = new OffsetTracker();
final Runnable onSuccess = commitOffsets
? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext)))
: session::commitAsync;
final KafkaMessageConverter converter = new WrapperRecordStreamKafkaMessageConverter(readerFactory, writerFactory, keyReaderFactory,
headerEncoding, headerNamePattern, keyFormat, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger());
converter.toFlowFiles(session, consumerRecords);
}
private void processOutputStrategyUseValue(final ProcessContext context, final ProcessSession session, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final OffsetTracker offsetTracker = new OffsetTracker();
final Runnable onSuccess = commitOffsets
? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext)))
: session::commitAsync;
final KafkaMessageConverter converter = new RecordStreamKafkaMessageConverter(readerFactory, writerFactory,
headerEncoding, headerNamePattern, keyEncoding, commitOffsets, offsetTracker, onSuccess, getLogger());
converter.toFlowFiles(session, consumerRecords);
}
private void processInputFlowFile(final ProcessSession session, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
private void processInputFlowFile(final ProcessSession session, final KafkaConsumerService consumerService, final PollingContext pollingContext, final Iterator<ByteRecord> consumerRecords) {
final OffsetTracker offsetTracker = new OffsetTracker();
final Runnable onSuccess = commitOffsets
? () -> session.commitAsync(() -> consumerService.commit(offsetTracker.getPollingSummary(pollingContext)))
@ -448,7 +489,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
converter.toFlowFiles(session, consumerRecords);
}
private PollingContext getPollingContext(final ProcessContext context) {
private PollingContext createPollingContext(final ProcessContext context) {
final String groupId = context.getProperty(GROUP_ID).getValue();
final String offsetReset = context.getProperty(AUTO_OFFSET_RESET).getValue();
final AutoOffsetReset autoOffsetReset = AutoOffsetReset.valueOf(offsetReset.toUpperCase());
@ -466,6 +507,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
} else {
throw new ProcessException(String.format("Topic Format [%s] not supported", topicFormat));
}
return pollingContext;
}
}

View File

@ -25,8 +25,8 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -74,7 +74,7 @@ public class KafkaUtils {
public static Map<String, String> toAttributes(final ByteRecord consumerRecord, final KeyEncoding keyEncoding,
final Pattern headerNamePattern, final Charset headerEncoding,
final boolean commitOffsets) {
final Map<String, String> attributes = new LinkedHashMap<>();
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic());
attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition()));
attributes.put(KafkaFlowFileAttribute.KAFKA_OFFSET, Long.toString(consumerRecord.getOffset()));

View File

@ -56,16 +56,15 @@ public class ByteRecordBundler {
while (consumerRecords.hasNext()) {
update(bundles, consumerRecords.next());
}
return bundles.entrySet().stream()
.map(e -> toByteRecord(e.getKey(), e.getValue())).iterator();
}
private ByteRecord toByteRecord(final BundleKey key, final BundleValue value) {
final TopicPartitionSummary topicPartition = key.getTopicPartition();
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET,
Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8)));
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT,
Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8)));
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_MAX_OFFSET, Long.toString(value.getLastOffset()).getBytes(StandardCharsets.UTF_8)));
key.headers.add(new RecordHeader(KafkaFlowFileAttribute.KAFKA_COUNT, Long.toString(value.getCount()).getBytes(StandardCharsets.UTF_8)));
return new ByteRecord(topicPartition.getTopic(), topicPartition.getPartition(),
value.getFirstOffset(), key.getTimestamp(), key.getHeaders(), key.getMessageKey(), value.getData());
}

View File

@ -17,10 +17,13 @@
package org.apache.nifi.kafka.processors.consumer.convert;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.service.api.header.RecordHeader;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
@ -32,20 +35,25 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class RecordStreamKafkaMessageConverter implements KafkaMessageConverter {
private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
private final RecordReaderFactory readerFactory;
private final RecordSetWriterFactory writerFactory;
private final Charset headerEncoding;
@ -78,51 +86,135 @@ public class RecordStreamKafkaMessageConverter implements KafkaMessageConverter
}
@Override
public void toFlowFiles(final ProcessSession session,
final Iterator<ByteRecord> consumerRecords) {
public void toFlowFiles(final ProcessSession session, final Iterator<ByteRecord> consumerRecords) {
try {
final Map<RecordGroupCriteria, RecordGroup> recordGroups = new HashMap<>();
String topic = null;
int partition = 0;
while (consumerRecords.hasNext()) {
final ByteRecord consumerRecord = consumerRecords.next();
final byte[] valueIn = consumerRecord.getValue();
if (valueIn.length > 0) {
final InputStream in = new ByteArrayInputStream(valueIn);
if (topic == null) {
partition = consumerRecord.getPartition();
topic = consumerRecord.getTopic();
}
final byte[] value = consumerRecord.getValue();
final Map<String, String> headers = getRelevantHeaders(consumerRecord, headerNamePattern);
final Map<String, String> attributes = KafkaUtils.toAttributes(
consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets);
final RecordReader reader = readerFactory.createRecordReader(attributes, in, valueIn.length, logger);
toFlowFile(session, attributes, reader, consumerRecord);
try (final InputStream in = new ByteArrayInputStream(value);
final RecordReader valueRecordReader = readerFactory.createRecordReader(attributes, in, value.length, logger)) {
int recordCount = 0;
while (true) {
final Record record = valueRecordReader.nextRecord();
// If we get a KafkaRecord that has no value, we still need to process it.
if (recordCount++ > 0 && record == null) {
break;
}
final RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema();
final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema);
// Get/Register the Record Group that is associated with the schema for this Kafka Record
final RecordGroupCriteria groupCriteria = new RecordGroupCriteria(writeSchema, headers);
RecordGroup recordGroup = recordGroups.get(groupCriteria);
if (recordGroup == null) {
FlowFile flowFile = session.create();
final Map<String, String> groupAttributes = Map.of(
KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic(),
KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition())
);
flowFile = session.putAllAttributes(flowFile, groupAttributes);
final OutputStream out = session.write(flowFile);
final RecordSetWriter writer;
try {
writer = writerFactory.createWriter(logger, writeSchema, out, attributes);
writer.beginRecordSet();
} catch (final Exception e) {
out.close();
throw e;
}
recordGroup = new RecordGroup(flowFile, writer, topic, partition);
recordGroups.put(groupCriteria, recordGroup);
}
// Create the Record object and write it to the Record Writer.
if (record != null) {
recordGroup.writer().write(record);
}
}
} catch (final MalformedRecordException e) {
// Failed to parse the record. Transfer to a 'parse.failure' relationship
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> out.write(value));
session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE);
// Track the offsets for the Kafka Record
offsetTracker.update(consumerRecord);
continue;
}
// Track the offsets for the Kafka Record
offsetTracker.update(consumerRecord);
}
// Finish writing the records
for (final Map.Entry<RecordGroupCriteria, RecordGroup> entry : recordGroups.entrySet()) {
final RecordGroupCriteria criteria = entry.getKey();
final RecordGroup recordGroup = entry.getValue();
final Map<String, String> attributes;
try (final RecordSetWriter writer = recordGroup.writer()) {
final WriteResult writeResult = writer.finishRecordSet();
attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(criteria.headers());
attributes.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, String.valueOf(commitOffsets));
}
FlowFile flowFile = recordGroup.flowFile();
flowFile = session.putAllAttributes(flowFile, attributes);
final ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
final String transitUri = String.format(TRANSIT_URI_FORMAT, topic, partition);
provenanceReporter.receive(flowFile, transitUri);
session.transfer(flowFile, ConsumeKafka.SUCCESS);
}
onSuccess.run();
} catch (MalformedRecordException | SchemaNotFoundException | IOException e) {
} catch (final SchemaNotFoundException | IOException e) {
throw new ProcessException("FlowFile Record conversion failed", e);
}
}
private void toFlowFile(final ProcessSession session,
final Map<String, String> attributes,
final RecordReader reader,
final ByteRecord consumerRecord) throws IOException, SchemaNotFoundException {
int recordCount = 0;
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
try (final OutputStream rawOut = session.write(flowFile)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
final RecordSetWriter writer = writerFactory.createWriter(logger, schema, rawOut, attributes);
Record record;
writer.beginRecordSet();
while ((record = recordSet.next()) != null) {
++recordCount;
writer.write(record);
private Map<String, String> getRelevantHeaders(final ByteRecord consumerRecord, final Pattern headerNamePattern) {
if (headerNamePattern == null || consumerRecord == null) {
return Map.of();
}
writer.finishRecordSet();
writer.flush();
final ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition());
provenanceReporter.receive(flowFile, transitUri);
final Map<String, String> headers = new HashMap<>();
for (final RecordHeader header : consumerRecord.getHeaders()) {
final String name = header.key();
if (headerNamePattern.matcher(name).matches()) {
final String value = new String(header.value(), headerEncoding);
headers.put(name, value);
}
flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(recordCount));
session.transfer(flowFile, ConsumeKafka.SUCCESS);
}
return headers;
}
private record RecordGroupCriteria(RecordSchema schema, Map<String, String> headers) {
}
private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) {
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.kafka.processors.consumer.convert;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.ConsumeKafka;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
@ -24,6 +25,7 @@ import org.apache.nifi.kafka.processors.consumer.wrapper.ConsumeWrapperRecord;
import org.apache.nifi.kafka.processors.consumer.wrapper.WrapperRecordKeyReader;
import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.logging.ComponentLog;
@ -36,11 +38,12 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.Tuple;
import java.io.ByteArrayInputStream;
@ -48,11 +51,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class WrapperRecordStreamKafkaMessageConverter implements KafkaMessageConverter {
private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
private final RecordReaderFactory readerFactory;
private final RecordSetWriterFactory writerFactory;
private final RecordReaderFactory keyReaderFactory;
@ -91,62 +98,111 @@ public class WrapperRecordStreamKafkaMessageConverter implements KafkaMessageCon
}
@Override
public void toFlowFiles(final ProcessSession session,
final Iterator<ByteRecord> consumerRecords) {
public void toFlowFiles(final ProcessSession session, final Iterator<ByteRecord> consumerRecords) {
try {
final Map<RecordSchema, RecordGroup> recordGroups = new HashMap<>();
String topic = null;
int partition = 0;
while (consumerRecords.hasNext()) {
final ByteRecord consumerRecord = consumerRecords.next();
if (topic == null) {
partition = consumerRecord.getPartition();
topic = consumerRecord.getTopic();
}
final byte[] value = consumerRecord.getValue();
if (value.length > 0) {
final InputStream in = new ByteArrayInputStream(value);
final Map<String, String> attributes = KafkaUtils.toAttributes(
consumerRecord, keyEncoding, headerNamePattern, headerEncoding, commitOffsets);
final WrapperRecordKeyReader keyReader = new WrapperRecordKeyReader(
keyFormat, keyReaderFactory, keyEncoding, logger);
final Tuple<RecordField, Object> recordKey = keyReader.toWrapperRecordKey(
consumerRecord.getKey().orElse(null), attributes);
try (final InputStream in = new ByteArrayInputStream(value);
final RecordReader valueRecordReader = readerFactory.createRecordReader(attributes, in, value.length, logger)) {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, value.length, logger);
toFlowFile(session, attributes, reader, consumerRecord, recordKey);
int recordCount = 0;
while (true) {
final Record record = valueRecordReader.nextRecord();
// If we get a KafkaRecord that has no value, we still need to process it.
if (recordCount++ > 0 && record == null) {
break;
}
final WrapperRecordKeyReader keyReader = new WrapperRecordKeyReader(keyFormat, keyReaderFactory, keyEncoding, logger);
final Tuple<RecordField, Object> recordKey = keyReader.toWrapperRecordKey(consumerRecord.getKey().orElse(null), attributes);
final RecordSchema recordSchema = record == null ? EMPTY_SCHEMA : record.getSchema();
final RecordSchema fullSchema = WrapperRecord.toWrapperSchema(recordKey.getKey(), recordSchema);
final RecordSchema writeSchema = writerFactory.getSchema(attributes, fullSchema);
// Get/Register the Record Group that is associated with the schema for this Kafka Record
RecordGroup recordGroup = recordGroups.get(writeSchema);
if (recordGroup == null) {
FlowFile flowFile = session.create();
final Map<String, String> groupAttributes = Map.of(
KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.getTopic(),
KafkaFlowFileAttribute.KAFKA_PARTITION, Long.toString(consumerRecord.getPartition())
);
flowFile = session.putAllAttributes(flowFile, groupAttributes);
final OutputStream out = session.write(flowFile);
final RecordSetWriter writer;
try {
writer = writerFactory.createWriter(logger, writeSchema, out, attributes);
writer.beginRecordSet();
} catch (final Exception e) {
out.close();
throw e;
}
recordGroup = new RecordGroup(flowFile, writer, topic, partition);
recordGroups.put(writeSchema, recordGroup);
}
// Create the Record object and write it to the Record Writer.
final ConsumeWrapperRecord consumeWrapperRecord = new ConsumeWrapperRecord(headerEncoding);
final MapRecord wrapperRecord = consumeWrapperRecord.toWrapperRecord(consumerRecord, record, recordKey);
recordGroup.writer().write(wrapperRecord);
}
} catch (final MalformedRecordException e) {
// Failed to parse the record. Transfer to a 'parse.failure' relationship
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> out.write(value));
session.transfer(flowFile, ConsumeKafka.PARSE_FAILURE);
// Track the offsets for the Kafka Record
offsetTracker.update(consumerRecord);
continue;
}
// Track the offsets for the Kafka Record
offsetTracker.update(consumerRecord);
}
// Finish writing the records
for (final RecordGroup recordGroup : recordGroups.values()) {
final Map<String, String> attributes;
try (final RecordSetWriter writer = recordGroup.writer()) {
final WriteResult writeResult = writer.finishRecordSet();
attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put(KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED, String.valueOf(commitOffsets));
}
FlowFile flowFile = recordGroup.flowFile();
flowFile = session.putAllAttributes(flowFile, attributes);
final ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
final String transitUri = String.format(TRANSIT_URI_FORMAT, topic, partition);
provenanceReporter.receive(flowFile, transitUri);
session.transfer(flowFile, ConsumeKafka.SUCCESS);
}
onSuccess.run();
} catch (MalformedRecordException | SchemaNotFoundException | IOException e) {
} catch (final SchemaNotFoundException | IOException e) {
throw new ProcessException("FlowFile Record conversion failed", e);
}
}
private void toFlowFile(final ProcessSession session,
final Map<String, String> attributes,
final RecordReader reader,
final ByteRecord consumerRecord,
Tuple<RecordField, Object> recordKey)
throws IOException, SchemaNotFoundException, MalformedRecordException {
int recordCount = 0;
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
try (final OutputStream rawOut = session.write(flowFile)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
final RecordSchema schemaWrapper = WrapperRecord.toWrapperSchema(recordKey.getKey(), schema);
final RecordSetWriter writer = writerFactory.createWriter(logger, schemaWrapper, rawOut, attributes);
Record record;
writer.beginRecordSet();
while ((record = recordSet.next()) != null) {
++recordCount;
final ConsumeWrapperRecord consumeWrapperRecord = new ConsumeWrapperRecord(headerEncoding);
final MapRecord wrapperRecord = consumeWrapperRecord.toWrapperRecord(consumerRecord, record, recordKey);
writer.write(wrapperRecord);
}
writer.finishRecordSet();
writer.flush();
final ProvenanceReporter provenanceReporter = session.getProvenanceReporter();
final String transitUri = String.format(TRANSIT_URI_FORMAT, consumerRecord.getTopic(), consumerRecord.getPartition());
provenanceReporter.receive(flowFile, transitUri);
}
flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(recordCount));
session.transfer(flowFile, ConsumeKafka.SUCCESS);
private record RecordGroup(FlowFile flowFile, RecordSetWriter writer, String topic, int partition) {
}
}

View File

@ -30,9 +30,13 @@ import org.apache.nifi.util.Tuple;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConsumeWrapperRecord {
private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(List.of());
private final Charset headerCharacterSet;
public ConsumeWrapperRecord(final Charset headerCharacterSet) {
@ -57,7 +61,7 @@ public class ConsumeWrapperRecord {
}
private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
final RecordSchema recordSchema = (record == null) ? null : record.getSchema();
final RecordSchema recordSchema = (record == null) ? EMPTY_SCHEMA : record.getSchema();
final RecordField recordField = new RecordField(WrapperRecord.VALUE, RecordFieldType.RECORD.getRecordDataType(recordSchema));
return new Tuple<>(recordField, record);
}

View File

@ -83,7 +83,7 @@ class ConsumeKafkaTest {
public void testVerifySuccessful() throws InitializationException {
final PartitionState firstPartitionState = new PartitionState(TEST_TOPIC_NAME, FIRST_PARTITION);
final List<PartitionState> partitionStates = Collections.singletonList(firstPartitionState);
when(kafkaConsumerService.getPartitionStates(any())).thenReturn(partitionStates);
when(kafkaConsumerService.getPartitionStates()).thenReturn(partitionStates);
setConnectionService();
when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService);
@ -100,7 +100,7 @@ class ConsumeKafkaTest {
@Test
public void testVerifyFailed() throws InitializationException {
when(kafkaConsumerService.getPartitionStates(any())).thenThrow(new IllegalStateException());
when(kafkaConsumerService.getPartitionStates()).thenThrow(new IllegalStateException());
when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService);
setConnectionService();

View File

@ -17,14 +17,14 @@
package org.apache.nifi.kafka.service.api;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
public interface KafkaConnectionService extends ControllerService {
KafkaConsumerService getConsumerService(ConsumerConfiguration consumerConfiguration);
KafkaConsumerService getConsumerService(PollingContext pollingContext);
KafkaProducerService getProducerService(ProducerConfiguration producerConfiguration);
}

View File

@ -19,12 +19,13 @@ package org.apache.nifi.kafka.service.api.consumer;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import java.io.Closeable;
import java.util.List;
/**
* Kafka Consumer Service must be closed to avoid leaking connection resources
*/
public interface KafkaConsumerService {
public interface KafkaConsumerService extends Closeable {
/**
* Commit record information to Kafka Brokers
*
@ -32,19 +33,27 @@ public interface KafkaConsumerService {
*/
void commit(PollingSummary pollingSummary);
/**
* Rolls back the offsets of the records so that any records that have been polled since the last commit are re-polled
*/
void rollback();
/**
* @return <code>true</code> if the service is closed; <code>false</code> otherwise
*/
boolean isClosed();
/**
* Poll Subscriptions for Records
*
* @param pollingContext Polling Context containing subscription information
* @return Stream of Records or empty when none returned
*/
Iterable<ByteRecord> poll(PollingContext pollingContext);
Iterable<ByteRecord> poll();
/**
* Get Partition State information for subscription
*
* @param pollingContext Polling Context containing subscription information
* @return List of Partition State information
*/
List<PartitionState> getPartitionStates(PollingContext pollingContext);
List<PartitionState> getPartitionStates();
}

View File

@ -27,14 +27,11 @@ import java.util.regex.Pattern;
* Polling Context for consuming records from Kafka Topics
*/
public class PollingContext {
private final String groupId;
private final Collection<String> topics;
private final Pattern topicPattern;
private final AutoOffsetReset autoOffsetReset;
private final Duration maxUncommittedTime;
public PollingContext(final String groupId, final Collection<String> topics,