NIFI-12194 Added Yield on Exceptions in Kafka Processors

- Catching KafkaException and yielding for publisher lease requests improves behavior when the Processor is unable to connect to Kafka Brokers

This closes #7955

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-10-30 15:15:52 -04:00 committed by exceptionfactory
parent 2c0ff6f624
commit 75c661bbbe
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 32 additions and 6 deletions

View File

@ -540,9 +540,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
context.yield();
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
context.yield();
} finally {
activeLeases.remove(lease);
}

View File

@ -483,9 +483,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", lease);
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
context.yield();
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
context.yield();
} finally {
activeLeases.remove(lease);
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@ -505,7 +506,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
}
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
try (final PublisherLease lease = obtainPublisher(context, pool)) {
try {
if (useTransactions) {
lease.beginTransaction();
@ -588,6 +589,16 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
}
}
private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
try {
return pool.obtainPublisher();
} catch (final KafkaException e) {
getLogger().error("Failed to obtain Kafka Producer", e);
context.yield();
throw e;
}
}
private Function<Record, Integer> getPartitioner(final ProcessContext context, final FlowFile flowFile) {
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@ -439,7 +440,7 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
try (final PublisherLease lease = obtainPublisher(context, pool)) {
try {
if (useTransactions) {
lease.beginTransaction();
@ -512,6 +513,16 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
}
}
private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
try {
return pool.obtainPublisher();
} catch (final KafkaException e) {
getLogger().error("Failed to obtain Kafka Producer", e);
context.yield();
throw e;
}
}
private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {