NIFI-220: Added error handling that was missing for one instance of calling producer.send, also indicated how many messages were sent per FlowFile in log message and provenance event

This commit is contained in:
Mark Payne 2015-01-13 19:27:07 -05:00
parent 95b22a0aee
commit a77fb50116
1 changed files with 16 additions and 12 deletions

View File

@ -98,15 +98,15 @@ public class PutKafka extends AbstractProcessor {
.defaultValue(DELIVERY_BEST_EFFORT.getValue()) .defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter") .name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. " + "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section " + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.") + "sent as a separate Kafka message.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size") .name("Max Buffer Size")
.description("The maximum amount of data to buffer in memory before sending to Kafka") .description("The maximum amount of data to buffer in memory before sending to Kafka")
@ -366,7 +366,11 @@ public class PutKafka extends AbstractProcessor {
// If there are messages left, send them // If there are messages left, send them
if ( !messages.isEmpty() ) { if ( !messages.isEmpty() ) {
producer.send(messages); try {
producer.send(messages);
} catch (final Exception e) {
throw new ProcessException("Failed to send messages to Kafka", e);
}
} }
} }
} }
@ -374,9 +378,9 @@ public class PutKafka extends AbstractProcessor {
final long nanos = System.nanoTime() - start; final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
error = true; error = true;