NIFI-13784 Fixed Kafka Publishing Behavior creating multiple Producers (#9297)

Fixed error handling in several situations. Cleaned up leaky abstraction. Some code cleanup, fixed default values for time-based properties to use 'sec' instead of 's' to adhere to typical conventions.

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-09-21 17:45:36 -04:00 committed by GitHub
parent a43135ce0c
commit 6ddae78b6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 297 additions and 189 deletions

View File

@ -45,8 +45,7 @@ public class PublishKafkaFailureStrategyIT extends AbstractPublishKafkaIT {
// attempt to send a non-json FlowFile to Kafka using record strategy;
// this will fail on the record parsing step prior to send; triggering the failure strategy logic
final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE)));
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE)));
runner.enqueue(bytesFlowFile, attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka.REL_FAILURE, 1);
@ -65,10 +64,10 @@ public class PublishKafkaFailureStrategyIT extends AbstractPublishKafkaIT {
// attempt to send a non-json FlowFile to Kafka using record strategy;
// this will fail on the record parsing step prior to send; triggering the failure strategy logic
final Map<String, String> attributes = new HashMap<>();
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_RESOURCE)));
final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull(getClass().getClassLoader().getResource(TEST_RESOURCE)));
runner.enqueue(bytesFlowFile, attributes);
runner.run(1);
// on rollback, FlowFile is returned to source queue
runner.assertTransferCount(PublishKafka.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka.REL_FAILURE, 0);

View File

@ -44,8 +44,8 @@ import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
@ -175,7 +175,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.name("default.api.timeout.ms")
.displayName("Client Timeout")
.description("Default timeout for Kafka client operations. Mapped to Kafka default.api.timeout.ms. The Kafka request.timeout.ms property is derived from half of the configured timeout")
.defaultValue("60 s")
.defaultValue("60 sec")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -191,7 +191,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("5 s")
.defaultValue("5 sec")
.build();
public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
@ -204,7 +204,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 s")
.defaultValue("5 sec")
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
@ -244,7 +244,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
@OnDisabled
public void onDisabled() {
if (consumerService == null) {
getLogger().warn("Consumer Service not configured");
getLogger().debug("Consumer Service not configured");
} else {
consumerService.close();
}
@ -278,6 +278,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
if (partitionClass != null && partitionClass.startsWith("org.apache.kafka")) {
propertiesProducer.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionClass);
}
return new Kafka3ProducerService(propertiesProducer, serviceConfiguration, producerConfiguration);
}

View File

@ -34,6 +34,8 @@ import org.apache.nifi.kafka.service.consumer.pool.ConsumerObjectPool;
import org.apache.nifi.kafka.service.consumer.pool.Subscription;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -43,14 +45,13 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed Kafka Consumers
*/
public class Kafka3ConsumerService implements KafkaConsumerService {
public class Kafka3ConsumerService implements KafkaConsumerService, Closeable {
private final ComponentLog componentLog;
private final ConsumerObjectPool consumerObjectPool;
@ -68,10 +69,14 @@ public class Kafka3ConsumerService implements KafkaConsumerService {
final Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(pollingSummary);
final long started = System.currentTimeMillis();
final long elapsed = runConsumerFunction(subscription, (consumer) -> {
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
final long elapsed;
try {
consumer.commitSync(offsets);
return started - System.currentTimeMillis();
});
elapsed = started - System.currentTimeMillis();
} finally {
returnConsumer(subscription, consumer);
}
componentLog.debug("Committed Records in [{} ms] for {}", elapsed, pollingSummary);
}
@ -81,10 +86,13 @@ public class Kafka3ConsumerService implements KafkaConsumerService {
Objects.requireNonNull(pollingContext, "Polling Context required");
final Subscription subscription = getSubscription(pollingContext);
return runConsumerFunction(subscription, (consumer) -> {
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(pollingContext.getMaxUncommittedTime());
return new RecordIterable(consumerRecords);
});
} finally {
returnConsumer(subscription, consumer);
}
}
@Override
@ -96,12 +104,16 @@ public class Kafka3ConsumerService implements KafkaConsumerService {
if (topics.hasNext()) {
final String topic = topics.next();
partitionStates = runConsumerFunction(subscription, (consumer) ->
consumer.partitionsFor(topic)
.stream()
.map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList())
);
final Consumer<byte[], byte[]> consumer = borrowConsumer(subscription);
try {
partitionStates = consumer.partitionsFor(topic)
.stream()
.map(partitionInfo -> new PartitionState(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
} finally {
returnConsumer(subscription, consumer);
}
} else {
partitionStates = Collections.emptyList();
}
@ -140,21 +152,26 @@ public class Kafka3ConsumerService implements KafkaConsumerService {
return offsets;
}
private <T> T runConsumerFunction(final Subscription subscription, final Function<Consumer<byte[], byte[]>, T> consumerFunction) {
Consumer<byte[], byte[]> consumer = null;
private Consumer<byte[], byte[]> borrowConsumer(final Subscription subscription) {
try {
consumer = consumerObjectPool.borrowObject(subscription);
return consumerFunction.apply(consumer);
return consumerObjectPool.borrowObject(subscription);
} catch (final Exception e) {
throw new ConsumerException("Borrow Consumer failed", e);
} finally {
if (consumer != null) {
try {
consumerObjectPool.returnObject(subscription, consumer);
} catch (final Exception e) {
componentLog.warn("Return Consumer failed", e);
}
}
}
private void returnConsumer(final Subscription subscription, final Consumer<byte[], byte[]> consumer) {
try {
consumerObjectPool.returnObject(subscription, consumer);
} catch (final Exception e) {
try {
consumerObjectPool.invalidateObject(subscription, consumer);
} catch (final Exception e2) {
componentLog.debug("Failed to invalidate Kafka Consumer", e2);
}
consumer.close(Duration.ofSeconds(30));
componentLog.warn("Failed to return Kafka Consumer to pool", e);
}
}
@ -191,6 +208,7 @@ public class Kafka3ConsumerService implements KafkaConsumerService {
final RecordHeader recordHeader = new RecordHeader(header.key(), header.value());
recordHeaders.add(recordHeader);
});
return new ByteRecord(
consumerRecord.topic(),
consumerRecord.partition(),

View File

@ -33,22 +33,22 @@ import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper;
import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
public class Kafka3ProducerService implements KafkaProducerService {
private final Producer<byte[], byte[]> producer;
private final List<ProducerCallback> callbacks;
private final ServiceConfiguration serviceConfiguration;
private final KafkaProducerWrapper wrapper;
private volatile boolean closed = false;
public Kafka3ProducerService(final Properties properties,
final ServiceConfiguration serviceConfiguration,
final ProducerConfiguration producerConfiguration) {
@ -65,50 +65,69 @@ public class Kafka3ProducerService implements KafkaProducerService {
@Override
public void close() {
producer.close();
closed = true;
producer.close(Duration.ofSeconds(30));
}
@Override
public void init() {
wrapper.init();
public boolean isClosed() {
return closed;
}
@Override
public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext) {
final ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile());
callbacks.add(callback);
Optional.ofNullable(publishContext.getException()).ifPresent(e -> callback.getExceptions().add(e));
if (callback.getExceptions().isEmpty()) {
final List<Exception> callbackExceptions = callback.getExceptions();
final Exception publishException = publishContext.getException();
if (publishException != null) {
callbackExceptions.add(publishException);
}
if (callbackExceptions.isEmpty()) {
try {
wrapper.send(kafkaRecords, publishContext, callback);
} catch (final UncheckedIOException e) {
callback.getExceptions().add(e);
// We don't throw the Exception because we will later deal with this by
// checking if there are any Exceptions.
callbackExceptions.add(e);
} catch (final Exception e) {
// We re-throw the Exception in this case because it is an unexpected Exception
callbackExceptions.add(e);
throw e;
}
}
}
@Override
public RecordSummary complete() {
final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure);
if (shouldCommit) {
producer.flush(); // finish Kafka processing of in-flight data
wrapper.commit(); // commit Kafka transaction (when transactions configured)
} else {
// rollback on transactions + exception
wrapper.abort();
}
final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data
final List<FlowFileResult> flowFileResults = recordSummary.getFlowFileResults();
for (final ProducerCallback callback : callbacks) {
// short-circuit the handling of the flowfile results here
if (callback.isFailure()) {
flowFileResults.add(callback.toFailureResult());
try {
final boolean shouldCommit = callbacks.stream().noneMatch(ProducerCallback::isFailure);
if (shouldCommit) {
producer.flush(); // finish Kafka processing of in-flight data
wrapper.commit(); // commit Kafka transaction (when transactions configured)
} else {
flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis()));
// rollback on transactions + exception
wrapper.abort();
}
final RecordSummary recordSummary = new RecordSummary(); // scrape the Kafka callbacks for disposition of in-flight data
final List<FlowFileResult> flowFileResults = recordSummary.getFlowFileResults();
for (final ProducerCallback callback : callbacks) {
// short-circuit the handling of the flowfile results here
if (callback.isFailure()) {
flowFileResults.add(callback.toFailureResult());
} else {
flowFileResults.add(callback.waitComplete(serviceConfiguration.getMaxAckWait().toMillis()));
}
}
return recordSummary;
} finally {
callbacks.clear();
}
return recordSummary;
}
@Override

View File

@ -24,10 +24,6 @@ public class KafkaNonTransactionalProducerWrapper extends KafkaProducerWrapper {
super(producer);
}
@Override
public void init() {
}
@Override
public void commit() {
}

View File

@ -43,11 +43,6 @@ public abstract class KafkaProducerWrapper {
this.producer = producer;
}
/**
* Transaction-enabled publish to Kafka involves the use of special Kafka client library APIs.
*/
public abstract void init();
public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) {
while (kafkaRecords.hasNext()) {
final KafkaRecord kafkaRecord = kafkaRecords.next();

View File

@ -17,31 +17,73 @@
package org.apache.nifi.kafka.service.producer.transaction;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.service.producer.ProducerCallback;
import java.util.Iterator;
public class KafkaTransactionalProducerWrapper extends KafkaProducerWrapper {
private volatile boolean inTransaction = false;
public KafkaTransactionalProducerWrapper(final Producer<byte[], byte[]> producer) {
super(producer);
producer.initTransactions();
}
@Override
public void init() {
producer.initTransactions();
producer.beginTransaction();
public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) {
if (!inTransaction) {
producer.beginTransaction();
inTransaction = true;
}
super.send(kafkaRecords, publishContext, callback);
}
@Override
public void commit() {
try {
producer.commitTransaction();
// Commit the transaction. If a TimeoutException is thrown, retry up to 3 times.
// The producer will throw an Exception if we attempt to abort a transaction
// after a commit times out.
boolean failure = false;
for (int i = 0; i < 3; i++) {
try {
producer.commitTransaction();
// If we logged any warning that we timed out and will retry, we should log a notification
// that we were successful this time. Otherwise, don't spam the logs.
if (failure) {
logger.info("Successfully commited producer transaction after {} retries", i);
}
break;
} catch (final TimeoutException te) {
failure = true;
if (i == 2) {
logger.warn("Failed to commit producer transaction after 3 attempts, each timing out. Aborting transaction.");
throw te;
}
logger.warn("Timed out while committing producer transaction. Retrying...");
}
}
inTransaction = false;
} catch (final Exception e) {
logger.debug("Failure during producer transaction commit", e);
logger.error("Failed to commit producer transaction", e);
abort();
}
}
@Override
public void abort() {
if (!inTransaction) {
return;
}
inTransaction = false;
producer.abortTransaction();
}
}

View File

@ -188,7 +188,6 @@ public class Kafka3ConnectionServiceBaseIT {
final KafkaProducerService producerService = service.getProducerService(producerConfiguration);
final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, RECORD_VALUE, Collections.emptyList());
final List<KafkaRecord> kafkaRecords = Collections.singletonList(kafkaRecord);
producerService.init();
producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC + "-produce", null, null, null));
final RecordSummary summary = producerService.complete();
assertNotNull(summary);
@ -200,46 +199,43 @@ public class Kafka3ConnectionServiceBaseIT {
final KafkaProducerService producerService = service.getProducerService(producerConfiguration);
final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, RECORD_VALUE, Collections.emptyList());
final List<KafkaRecord> kafkaRecords = Collections.singletonList(kafkaRecord);
producerService.init();
producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC + "-produce", null, null, null));
final RecordSummary summary = producerService.complete();
assertNotNull(summary);
}
@Test
void testProduceConsumeRecord() throws Exception {
void testProduceConsumeRecord() {
final ProducerConfiguration producerConfiguration = new ProducerConfiguration(false, null, null, null, null);
final KafkaProducerService producerService = service.getProducerService(producerConfiguration);
final long timestamp = System.currentTimeMillis();
final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp, RECORD_KEY, RECORD_VALUE, Collections.emptyList());
final List<KafkaRecord> kafkaRecords = Collections.singletonList(kafkaRecord);
producerService.init();
producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC, null, null, null));
final RecordSummary summary = producerService.complete();
assertNotNull(summary);
try (KafkaConsumerService consumerService = service.getConsumerService(null)) {
final PollingContext pollingContext = new PollingContext(
GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final Iterator<ByteRecord> consumerRecords = poll(consumerService, pollingContext);
final KafkaConsumerService consumerService = service.getConsumerService(null);
final PollingContext pollingContext = new PollingContext(
GROUP_ID, Collections.singleton(TOPIC), AutoOffsetReset.EARLIEST, Duration.ofSeconds(1));
final Iterator<ByteRecord> consumerRecords = poll(consumerService, pollingContext);
assertTrue(consumerRecords.hasNext(), "Consumer Records not found");
assertTrue(consumerRecords.hasNext(), "Consumer Records not found");
final ByteRecord consumerRecord = consumerRecords.next();
assertEquals(TOPIC, consumerRecord.getTopic());
assertEquals(0, consumerRecord.getOffset());
assertEquals(0, consumerRecord.getPartition());
assertEquals(timestamp, consumerRecord.getTimestamp());
final ByteRecord consumerRecord = consumerRecords.next();
assertEquals(TOPIC, consumerRecord.getTopic());
assertEquals(0, consumerRecord.getOffset());
assertEquals(0, consumerRecord.getPartition());
assertEquals(timestamp, consumerRecord.getTimestamp());
final Optional<byte[]> keyFound = consumerRecord.getKey();
assertTrue(keyFound.isPresent());
final Optional<byte[]> keyFound = consumerRecord.getKey();
assertTrue(keyFound.isPresent());
assertArrayEquals(RECORD_KEY, keyFound.get());
assertArrayEquals(RECORD_VALUE, consumerRecord.getValue());
assertArrayEquals(RECORD_KEY, keyFound.get());
assertArrayEquals(RECORD_VALUE, consumerRecord.getValue());
assertFalse(consumerRecords.hasNext());
}
assertFalse(consumerRecords.hasNext());
}
@Test

View File

@ -23,11 +23,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
@ -330,17 +328,13 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
} else {
headerNamePattern = null;
}
keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class);
commitOffsets = context.getProperty(COMMIT_OFFSETS).asBoolean();
outputStrategy = context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
keyFormat = context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class);
}
@OnStopped
public void onStopped() {
// discard reference; leave controller service state intact
consumerService = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
@ -399,9 +393,9 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
}
private Iterator<ByteRecord> transformDemarcator(final ProcessContext context, final Iterator<ByteRecord> consumerRecords) {
final PropertyValue propertyValueDemarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR);
if (propertyValueDemarcator.isSet()) {
final byte[] demarcator = propertyValueDemarcator.getValue().getBytes(StandardCharsets.UTF_8);
final String demarcatorValue = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).getValue();
if (demarcatorValue != null) {
final byte[] demarcator = demarcatorValue.getBytes(StandardCharsets.UTF_8);
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords);
} else {

View File

@ -22,11 +22,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.processors.producer.PartitionStrategy;
import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
@ -78,9 +80,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. "
@ -211,6 +216,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
.description("The format used to publish the incoming FlowFile record to Kafka.")
.required(true)
.defaultValue(PublishStrategy.USE_VALUE)
.dependsOn(RECORD_READER)
.allowableValues(PublishStrategy.class)
.build();
@ -319,6 +325,9 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
private final Queue<KafkaProducerService> producerServices = new LinkedBlockingQueue<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
@ -352,6 +361,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(attributes).getValue();
try {
final List<PartitionState> partitionStates = producerService.getPartitionStates(topicName);
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.explanation(String.format("Partitions [%d] found for Topic [%s]", partitionStates.size(), topicName));
@ -366,6 +376,17 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
return verificationResults;
}
@OnStopped
public void onStopped() {
// Ensure that we close all Producer services when stopped
KafkaProducerService service;
while ((service = producerServices.poll()) != null) {
service.close();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = PublishKafkaUtil.pollFlowFiles(session);
@ -373,6 +394,33 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
return;
}
final KafkaProducerService producerService = getProducerService(context);
try {
publishFlowFiles(context, session, flowFiles, producerService);
} catch (final Exception e) {
final String uuids = flowFiles.stream()
.map(ff -> ff.getAttribute(CoreAttributes.UUID.key()))
.collect(Collectors.joining(", "));
getLogger().error("Failed to publish {} FlowFiles to Kafka: uuids={}", flowFiles.size(), uuids, e);
producerService.close();
} finally {
if (!producerService.isClosed()) {
producerServices.offer(producerService);
}
}
}
private KafkaProducerService getProducerService(final ProcessContext context) {
final KafkaProducerService producerService = producerServices.poll();
if (producerService != null) {
return producerService;
}
return createProducerService(context);
}
private KafkaProducerService createProducerService(final ProcessContext context) {
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
final boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean();
@ -381,27 +429,36 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
final ProducerConfiguration producerConfiguration = new ProducerConfiguration(
transactionsEnabled, transactionalIdPrefix, deliveryGuarantee, compressionCodec, partitionClass);
transactionsEnabled, transactionalIdPrefix, deliveryGuarantee, compressionCodec, partitionClass);
try (final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration)) {
publishFlowFiles(context, session, flowFiles, producerService);
} catch (final Throwable e) {
getLogger().error("Publishing FlowFiles failed", e);
context.yield();
}
return connectionService.getProducerService(producerConfiguration);
}
private void publishFlowFiles(final ProcessContext context, final ProcessSession session,
final List<FlowFile> flowFiles, final KafkaProducerService producerService) {
producerService.init();
for (final FlowFile flowFile : flowFiles) {
publishFlowFile(context, session, flowFile, producerService);
}
final RecordSummary recordSummary = producerService.complete();
if (recordSummary.isFailure()) {
routeFailureStrategy(context, session, flowFiles);
} else {
routeResults(session, recordSummary.getFlowFileResults());
// Publish all FlowFiles and ensure that we call complete() on the producer and route flowfiles as appropriate, regardless
// of the outcome. If there are failures, the complete() method will abort the transaction (if transactions are enabled).
// Otherwise, it will commit the transaction (if transactions are enabled). We then route the FlowFiles based on the results.
try {
for (final FlowFile flowFile : flowFiles) {
publishFlowFile(context, session, flowFile, producerService);
}
} finally {
RecordSummary recordSummary = null;
try {
recordSummary = producerService.complete();
} catch (final Exception e) {
getLogger().warn("Failed to complete transaction with Kafka", e);
producerService.close();
}
if (recordSummary == null || recordSummary.isFailure()) {
routeFailureStrategy(context, session, flowFiles);
} else {
routeResults(session, recordSummary.getFlowFileResults());
}
}
}
@ -420,6 +477,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
final long msgCount = flowFileResult.getSentCount();
final FlowFile flowFile = session.putAttribute(flowFileResult.getFlowFile(), MSG_COUNT, String.valueOf(msgCount));
session.adjustCounter("Messages Sent", msgCount, true);
final Relationship relationship = flowFileResult.getExceptions().isEmpty() ? REL_SUCCESS : REL_FAILURE;
session.transfer(flowFile, relationship);
}
@ -427,26 +485,31 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
private void publishFlowFile(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, final KafkaProducerService producerService) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(flowFile.getAttributes()).getValue();
final Integer partition = getPartition(context, flowFile);
final PublishContext publishContext = new PublishContext(topic, partition, null, flowFile);
final PropertyValue propertyDemarcator = context.getProperty(MESSAGE_DEMARCATOR);
final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverter(context, flowFile);
final PublishCallback callback = new PublishCallback(producerService, publishContext, kafkaRecordConverter, flowFile.getAttributes(), flowFile.getSize());
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue());
session.read(flowFile, callback);
}
final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue();
final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null))
? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger())
: new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding);
private Integer getPartition(final ProcessContext context, final FlowFile flowFile) {
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
if (PartitionStrategy.EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
return Objects.hashCode(partition);
}
return null;
}
private KafkaRecordConverter getKafkaRecordConverter(final ProcessContext context, final FlowFile flowFile) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final String attributeHeaderPatternProperty = context.getProperty(ATTRIBUTE_HEADER_PATTERN).getValue();
final Pattern attributeHeaderPattern = (attributeHeaderPatternProperty == null) ? null : Pattern.compile(attributeHeaderPatternProperty);
@ -454,54 +517,37 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
final Charset headerEncodingCharacterSet = Charset.forName(headerEncoding);
final HeadersFactory headersFactory = new AttributesHeadersFactory(attributeHeaderPattern, headerEncodingCharacterSet);
final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverter(
publishStrategy, metadataStrategy, readerFactory, writerFactory, keyWriterFactory,
keyFactory, headersFactory, propertyDemarcator, flowFile, maxMessageSize);
final PublishCallback callback = new PublishCallback(
producerService, publishContext, kafkaRecordConverter, flowFile.getAttributes(), flowFile.getSize());
session.read(flowFile, callback);
}
final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
private Integer getPartition(final ProcessContext context, final FlowFile flowFile) {
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
if (PartitionStrategy.EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
return Objects.hashCode(partition);
}
return null;
}
if (readerFactory != null && writerFactory != null) {
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue());
final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue();
final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null))
? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger())
: new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding);
private KafkaRecordConverter getKafkaRecordConverter(
final PublishStrategy publishStrategy,
final RecordMetadataStrategy metadataStrategy,
final RecordReaderFactory readerFactory,
final RecordSetWriterFactory writerFactory,
final RecordSetWriterFactory keyWriterFactory,
final KeyFactory keyFactory,
final HeadersFactory headersFactory,
final PropertyValue propertyValueDemarcator,
final FlowFile flowFile,
final int maxMessageSize
) {
final KafkaRecordConverter kafkaRecordConverter;
if ((readerFactory != null) && (writerFactory != null)) {
if (publishStrategy == PublishStrategy.USE_WRAPPER) {
kafkaRecordConverter = new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy,
readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger());
return new RecordWrapperStreamKafkaRecordConverter(flowFile, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, maxMessageSize, getLogger());
} else {
kafkaRecordConverter = new RecordStreamKafkaRecordConverter(
readerFactory, writerFactory, headersFactory, keyFactory, maxMessageSize, getLogger());
return new RecordStreamKafkaRecordConverter(readerFactory, writerFactory, headersFactory, keyFactory, maxMessageSize, getLogger());
}
} else if (propertyValueDemarcator.isSet()) {
final String demarcator = propertyValueDemarcator.evaluateAttributeExpressions(flowFile).getValue();
kafkaRecordConverter = new DelimitedStreamKafkaRecordConverter(
demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory);
} else {
kafkaRecordConverter = new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory);
}
return kafkaRecordConverter;
final PropertyValue demarcatorValue = context.getProperty(MESSAGE_DEMARCATOR);
if (demarcatorValue.isSet()) {
final String demarcator = demarcatorValue.evaluateAttributeExpressions(flowFile).getValue();
return new DelimitedStreamKafkaRecordConverter(demarcator.getBytes(StandardCharsets.UTF_8), maxMessageSize, headersFactory);
}
return new FlowFileStreamKafkaRecordConverter(maxMessageSize, headersFactory);
}
private static class PublishCallback implements InputStreamCallback {
private final KafkaProducerService producerService;
private final PublishContext publishContext;
@ -515,6 +561,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo
final KafkaRecordConverter kafkaConverter,
final Map<String, String> attributes,
final long inputLength) {
this.producerService = producerService;
this.publishContext = publishContext;
this.kafkaConverter = kafkaConverter;

View File

@ -19,13 +19,12 @@ package org.apache.nifi.kafka.processors.producer.convert;
import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
@ -42,13 +41,16 @@ public class FlowFileStreamKafkaRecordConverter implements KafkaRecordConverter
}
@Override
public Iterator<KafkaRecord> convert(
final Map<String, String> attributes, final InputStream in, final long inputLength) throws IOException {
public Iterator<KafkaRecord> convert(final Map<String, String> attributes, final InputStream in, final long inputLength) throws IOException {
ProducerUtils.checkMessageSize(maxMessageSize, inputLength);
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
StreamUtils.copy(in, bos);
final KafkaRecord kafkaRecord = new KafkaRecord(
null, null, null, null, bos.toByteArray(), headersFactory.getHeaders(attributes));
return Collections.singletonList(kafkaRecord).iterator();
final byte[] recordBytes;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
in.transferTo(baos);
recordBytes = baos.toByteArray();
}
final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, null, recordBytes, headersFactory.getHeaders(attributes));
return List.of(kafkaRecord).iterator();
}
}

View File

@ -24,7 +24,7 @@ import java.util.List;
/**
* Kafka Consumer Service must be closed to avoid leaking connection resources
*/
public interface KafkaConsumerService extends AutoCloseable {
public interface KafkaConsumerService {
/**
* Commit record information to Kafka Brokers
*

View File

@ -25,12 +25,6 @@ import java.util.List;
public interface KafkaProducerService extends Closeable {
/**
* Initialize the Kafka `Producer` for the publish API call sequence. This has significance in the case of
* transactional publish activity.
*/
void init();
/**
* Send the record(s) associated with a single FlowFile.
*
@ -51,6 +45,11 @@ public interface KafkaProducerService extends Closeable {
*/
void close();
/**
* @return true if the producer is closed, false otherwise
*/
boolean isClosed();
/**
* Fetch metadata associated with the Kafka partitions associated with the topic.
*/