mirror of
https://github.com/apache/nifi.git
synced 2025-02-14 14:05:26 +00:00
NIFI-12194 Added Yield on Exceptions in Kafka Processors
- Catching KafkaException and yielding for publisher lease requests improves behavior when the Processor is unable to connect to Kafka Brokers This closes #7955 Signed-off-by: David Handermann <exceptionfactory@apache.org> (cherry picked from commit 75c661bbbe56a7951974a701921af9da74dd0d68)
This commit is contained in:
parent
84becd0a63
commit
9a5a56e79e
@ -546,9 +546,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
|
||||
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
||||
+ "Will roll back session and discard any partially received data.", lease);
|
||||
} catch (final KafkaException kex) {
|
||||
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
|
||||
getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
|
||||
context.yield();
|
||||
} catch (final Throwable t) {
|
||||
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
|
||||
getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
|
||||
context.yield();
|
||||
} finally {
|
||||
activeLeases.remove(lease);
|
||||
}
|
||||
|
@ -489,9 +489,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
|
||||
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
|
||||
+ "Will roll back session and discard any partially received data.", lease);
|
||||
} catch (final KafkaException kex) {
|
||||
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
|
||||
getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
|
||||
context.yield();
|
||||
} catch (final Throwable t) {
|
||||
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
|
||||
getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
|
||||
context.yield();
|
||||
} finally {
|
||||
activeLeases.remove(lease);
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.nifi.processors.kafka.pubsub;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.errors.AuthorizationException;
|
||||
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
|
||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||
@ -511,7 +512,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
|
||||
}
|
||||
|
||||
final long startTime = System.nanoTime();
|
||||
try (final PublisherLease lease = pool.obtainPublisher()) {
|
||||
try (final PublisherLease lease = obtainPublisher(context, pool)) {
|
||||
try {
|
||||
if (useTransactions) {
|
||||
lease.beginTransaction();
|
||||
@ -594,6 +595,16 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
|
||||
}
|
||||
}
|
||||
|
||||
private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
|
||||
try {
|
||||
return pool.obtainPublisher();
|
||||
} catch (final KafkaException e) {
|
||||
getLogger().error("Failed to obtain Kafka Producer", e);
|
||||
context.yield();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private Function<Record, Integer> getPartitioner(final ProcessContext context, final FlowFile flowFile) {
|
||||
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
|
||||
if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.nifi.processors.kafka.pubsub;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.errors.AuthorizationException;
|
||||
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
|
||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||
@ -444,7 +445,7 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
|
||||
final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
|
||||
|
||||
final long startTime = System.nanoTime();
|
||||
try (final PublisherLease lease = pool.obtainPublisher()) {
|
||||
try (final PublisherLease lease = obtainPublisher(context, pool)) {
|
||||
try {
|
||||
if (useTransactions) {
|
||||
lease.beginTransaction();
|
||||
@ -517,6 +518,16 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
|
||||
}
|
||||
}
|
||||
|
||||
private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
|
||||
try {
|
||||
return pool.obtainPublisher();
|
||||
} catch (final KafkaException e) {
|
||||
getLogger().error("Failed to obtain Kafka Producer", e);
|
||||
context.yield();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
|
||||
final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
|
||||
if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user