From 9515b7460713ba985a6d7c8ad033fe2c1ac98e3d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 30 Oct 2015 14:25:27 -0400 Subject: [PATCH] NIFI-1088: Ensure that FlowFile is penalized before routing to failure --- .../java/org/apache/nifi/processors/kafka/PutKafka.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index cff285cbef..09025a4c96 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -401,7 +401,7 @@ public class PutKafka extends AbstractProcessor { getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final Exception e) { getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); error = true; } finally { if (error) { @@ -534,7 +534,7 @@ public class PutKafka extends AbstractProcessor { if (offset == 0L) { // all of the messages failed to send. Route FlowFile to failure getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); - session.transfer(flowFile, REL_FAILURE); + session.transfer(session.penalize(flowFile), REL_FAILURE); } else { // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); @@ -545,7 +545,7 @@ public class PutKafka extends AbstractProcessor { messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); session.transfer(successfulMessages, REL_SUCCESS); - session.transfer(failedMessages, REL_FAILURE); + session.transfer(session.penalize(failedMessages), REL_FAILURE); session.remove(flowFile); session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); }