NIFI-1088: Ensure that FlowFile is penalized before routing to failure

This commit is contained in:
Mark Payne 2015-10-30 14:25:27 -04:00
parent dc4004de64
commit 9515b74607
1 changed files with 3 additions and 3 deletions

View File

@ -401,7 +401,7 @@ public class PutKafka extends AbstractProcessor {
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e });
session.transfer(flowFile, REL_FAILURE); session.transfer(session.penalize(flowFile), REL_FAILURE);
error = true; error = true;
} finally { } finally {
if (error) { if (error) {
@ -534,7 +534,7 @@ public class PutKafka extends AbstractProcessor {
if (offset == 0L) { if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure // all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() });
session.transfer(flowFile, REL_FAILURE); session.transfer(session.penalize(flowFile), REL_FAILURE);
} else { } else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
@ -545,7 +545,7 @@ public class PutKafka extends AbstractProcessor {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
session.transfer(successfulMessages, REL_SUCCESS); session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(failedMessages, REL_FAILURE); session.transfer(session.penalize(failedMessages), REL_FAILURE);
session.remove(flowFile); session.remove(flowFile);
session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
} }