From 75c661bbbe56a7951974a701921af9da74dd0d68 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Mon, 30 Oct 2023 15:15:52 -0400 Subject: [PATCH] NIFI-12194 Added Yield on Exceptions in Kafka Processors - Catching KafkaException and yielding for publisher lease requests improves behavior when the Processor is unable to connect to Kafka Brokers This closes #7955 Signed-off-by: David Handermann --- .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 6 ++++-- .../processors/kafka/pubsub/ConsumeKafka_2_6.java | 6 ++++-- .../kafka/pubsub/PublishKafkaRecord_2_6.java | 13 ++++++++++++- .../processors/kafka/pubsub/PublishKafka_2_6.java | 13 ++++++++++++- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index 525f621e1f..50fece3b35 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -540,9 +540,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + "Will roll back session and discard any partially received data.", lease); } catch (final KafkaException kex) { - getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex); + getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex); + context.yield(); } catch (final Throwable t) { - getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t); + getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t); + context.yield(); } finally { activeLeases.remove(lease); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 4421ae92f8..a5c6b15891 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -483,9 +483,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + "Will roll back session and discard any partially received data.", lease); } catch (final KafkaException kex) { - getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex); + getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex); + context.yield(); } catch (final Throwable t) { - getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t); + getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t); + context.yield(); } finally { activeLeases.remove(lease); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java index af61faeb95..34053d6a3b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka.pubsub; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; @@ -505,7 +506,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu } final long startTime = System.nanoTime(); - try (final PublisherLease lease = pool.obtainPublisher()) { + try (final PublisherLease lease = obtainPublisher(context, pool)) { try { if (useTransactions) { lease.beginTransaction(); @@ -588,6 +589,16 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu } } + private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) { + try { + return pool.obtainPublisher(); + } catch (final KafkaException e) { + getLogger().error("Failed to obtain Kafka Producer", e); + context.yield(); + throw e; + } + } + private Function getPartitioner(final ProcessContext context, final FlowFile flowFile) { final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java index b2721a7199..b6b84ce1e0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java @@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; @@ -439,7 +440,7 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC final PublishFailureStrategy failureStrategy = getFailureStrategy(context); final long startTime = System.nanoTime(); - try (final PublisherLease lease = pool.obtainPublisher()) { + try (final PublisherLease lease = obtainPublisher(context, pool)) { try { if (useTransactions) { lease.beginTransaction(); @@ -512,6 +513,16 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC } } + private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) { + try { + return pool.obtainPublisher(); + } catch (final KafkaException e) { + getLogger().error("Failed to obtain Kafka Producer", e); + context.yield(); + throw e; + } + } + private PublishFailureStrategy getFailureStrategy(final ProcessContext context) { final String strategy = context.getProperty(FAILURE_STRATEGY).getValue(); if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {