From a77fb50116642c9692e18046ba42663ea8240087 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 13 Jan 2015 19:27:07 -0500 Subject: [PATCH] NIFI-220: Added error handling that was missing for one instance of calling producer.send, also indicated how many messages were sent per FlowFile in log message and provenance event --- .../nifi/processors/kafka/PutKafka.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 4b5a7423ad..51f9ef14a2 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -98,15 +98,15 @@ public class PutKafka extends AbstractProcessor { .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Max Buffer Size") .description("The maximum amount of data to buffer in memory before sending to Kafka") @@ -366,7 +366,11 @@ public class PutKafka extends AbstractProcessor { // If there are messages left, send them if ( !messages.isEmpty() ) { - producer.send(messages); + try { + producer.send(messages); + } catch (final Exception e) { + throw new ProcessException("Failed to send messages to Kafka", e); + } } } } @@ -374,9 +378,9 @@ public class PutKafka extends AbstractProcessor { final long nanos = System.nanoTime() - start; - session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); } catch (final ProcessException pe) { error = true;