NIFI-413: Formatted code to fix checkstyle failures

This commit is contained in:
Mark Payne 2015-06-22 12:05:02 -04:00
parent f5226ad3c6
commit f2f9056055
2 changed files with 203 additions and 195 deletions

View File

@ -61,7 +61,7 @@ import org.apache.nifi.util.LongHolder;
import scala.actors.threadpool.Arrays; import scala.actors.threadpool.Arrays;
@SupportsBatching @SupportsBatching
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor { public class PutKafka extends AbstractProcessor {
@ -69,13 +69,13 @@ public class PutKafka extends AbstractProcessor {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+ " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
+ " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss."); + " in data loss.");
/** /**
* AllowableValue for a Producer Type that synchronously sends messages to Kafka * AllowableValue for a Producer Type that synchronously sends messages to Kafka
@ -86,7 +86,7 @@ public class PutKafka extends AbstractProcessor {
* AllowableValue for a Producer Type that asynchronously sends messages to Kafka * AllowableValue for a Producer Type that asynchronously sends messages to Kafka
*/ */
public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka."
+ " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data.");
/** /**
* AllowableValue for sending messages to Kafka without compression * AllowableValue for sending messages to Kafka without compression
@ -103,150 +103,156 @@ public class PutKafka extends AbstractProcessor {
*/ */
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers") .name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true) .required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name") .name("Topic Name")
.description("The Kafka Topic of interest") .description("The Kafka Topic of interest")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Kafka Key") .name("Kafka Key")
.description("The Key to use for the Message") .description("The Key to use for the Message")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("Delivery Guarantee") .name("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
.required(true) .required(true)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue()) .defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter") .name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. " + "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section " + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.") + "sent as a separate Kafka message.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size") .name("Max Buffer Size")
.description("The maximum amount of data to buffer in memory before sending to Kafka") .description("The maximum amount of data to buffer in memory before sending to Kafka")
.required(true) .required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("1 MB") .defaultValue("1 MB")
.build(); .build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error") .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("30 secs") .defaultValue("30 secs")
.build(); .build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name") .name("Client Name")
.description("Client Name to use when communicating with Kafka") .description("Client Name to use when communicating with Kafka")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type") .name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread.") .description("This parameter specifies whether the messages are sent asynchronously in a background thread.")
.required(true) .required(true)
.allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
.build(); .build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Batch Size") .name("Async Batch Size")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+ " The producer will wait until either this number of messages are ready" + " The producer will wait until either this number of messages are ready"
+ " to send or \"Queue Buffering Max Time\" is reached.") + " to send or \"Queue Buffering Max Time\" is reached.")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("200").build(); .defaultValue("200")
public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() .build();
.name("Queue Buffering Max Time") public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." .name("Queue Buffering Max Time")
+ " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " will try to batch together 100ms of messages to send at once. This will improve" + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms"
+ " throughput but adds message delivery latency due to the buffering.") + " will try to batch together 100ms of messages to send at once. This will improve"
.required(true) + " throughput but adds message delivery latency due to the buffering.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .required(true)
.defaultValue("5 secs").build(); .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() .defaultValue("5 secs")
.name("Queue Buffer Max Count") .build();
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
+ " The maximum number of unsent messages that can be queued up in the producer when" .name("Queue Buffer Max Count")
+ " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
.required(true) + " The maximum number of unsent messages that can be queued up in the producer when"
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.")
.defaultValue("10000").build(); .required(true)
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.name("Queue Enqueue Timeout") .defaultValue("10000")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." .build();
+ " The amount of time to block before dropping messages when running in " public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder()
+ PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" .name("Queue Enqueue Timeout")
+ " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " be enqueued immediately or dropped if the queue is full (the producer send call will" + " The amount of time to block before dropping messages when running in "
+ " never block). If not set, the producer will block indefinitely and never willingly" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+ " drop a send.") + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will"
.required(false) + " be enqueued immediately or dropped if the queue is full (the producer send call will"
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + " never block). If not set, the producer will block indefinitely and never willingly"
.build(); + " drop a send.")
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() .required(false)
.name("Compression Codec") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.description("This parameter allows you to specify the compression codec for all" .build();
+ " data generated by this producer.") public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.required(true) .name("Compression Codec")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .description("This parameter allows you to specify the compression codec for all"
.allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + " data generated by this producer.")
.defaultValue(COMPRESSION_CODEC_NONE.getValue()).build(); .required(true)
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.name("Compressed Topics") .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
.description("This parameter allows you to set whether compression should be turned on" .defaultValue(COMPRESSION_CODEC_NONE.getValue())
+ " for particular topics. If the compression codec is anything other than" .build();
+ " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
+ " If the list of compressed topics is empty, then enable the specified" .name("Compressed Topics")
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," .description("This parameter allows you to set whether compression should be turned on"
+ " compression is disabled for all topics") + " for particular topics. If the compression codec is anything other than"
.required(false).build(); + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+ " compression is disabled for all topics")
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
.build(); .build();
public static final Relationship REL_FAILURE = new Relationship.Builder() public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build(); .build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>(); private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientName = new PropertyDescriptor.Builder() final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME) .fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier()) .defaultValue("NiFi-" + getIdentifier())
.build(); .build();
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEED_BROKERS); props.add(SEED_BROKERS);
@ -269,13 +275,14 @@ public class PutKafka extends AbstractProcessor {
@Override @Override
public Collection<ValidationResult> customValidate(final ValidationContext context) { public Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context)); final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger();
final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
if (batchMessages > bufferMaxMessages) { if (batchMessages > bufferMaxMessages) {
errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false)
.explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build());
} }
return errors; return errors;
@ -311,23 +318,23 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
if(queueBufferingMillis != null) { if (queueBufferingMillis != null) {
properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis));
} }
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
if(queueEnqueueTimeoutMillis != null) { if (queueEnqueueTimeoutMillis != null) {
properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis));
} }
String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.codec", compressionCodec); properties.setProperty("compression.codec", compressionCodec);
String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if(compressedTopics != null) { if (compressedTopics != null) {
properties.setProperty("compressed.topics", compressedTopics); properties.setProperty("compressed.topics", compressedTopics);
} }
return new ProducerConfig(properties); return new ProducerConfig(properties);
@ -338,7 +345,7 @@ public class PutKafka extends AbstractProcessor {
} }
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) { private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
Producer<byte[], byte[]> producer = producers.poll(); final Producer<byte[], byte[]> producer = producers.poll();
return producer == null ? createProducer(context) : producer; return producer == null ? createProducer(context) : producer;
} }
@ -348,7 +355,7 @@ public class PutKafka extends AbstractProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); final FlowFile flowFile = session.get();
if (flowFile == null) { if (flowFile == null) {
return; return;
} }
@ -356,7 +363,7 @@ public class PutKafka extends AbstractProcessor {
final long start = System.nanoTime(); final long start = System.nanoTime();
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if (delimiter != null) { if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
@ -389,9 +396,9 @@ public class PutKafka extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e}); getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e });
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
error = true; error = true;
} finally { } finally {
@ -426,7 +433,7 @@ public class PutKafka extends AbstractProcessor {
int nextByte; int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn); try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
// read until we're out of data. // read until we're out of data.
while (!streamFinished) { while (!streamFinished) {
@ -514,7 +521,7 @@ public class PutKafka extends AbstractProcessor {
final long nanos = System.nanoTime() - start; final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final ProcessException pe) { } catch (final ProcessException pe) {
error = true; error = true;
@ -524,7 +531,7 @@ public class PutKafka extends AbstractProcessor {
final long offset = lastMessageOffset.get(); final long offset = lastMessageOffset.get();
if (offset == 0L) { if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure // all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()}); getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() });
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} else { } else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
@ -532,8 +539,8 @@ public class PutKafka extends AbstractProcessor {
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into"
+ " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{ + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()}); messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
session.transfer(successfulMessages, REL_SUCCESS); session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(failedMessages, REL_FAILURE); session.transfer(failedMessages, REL_FAILURE);

View File

@ -212,7 +212,7 @@ public class TestPutKafka {
} }
private void keyValuePutExecute(final TestRunner runner) { private void keyValuePutExecute(final TestRunner runner) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test"); attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3"); attributes.put("kafka.key", "key3");
@ -229,32 +229,32 @@ public class TestPutKafka {
final MockFlowFile mff = mffs.get(0); final MockFlowFile mff = mffs.get(0);
assertTrue(Arrays.equals(data, mff.toByteArray())); assertTrue(Arrays.equals(data, mff.toByteArray()));
} }
@Test @Test
public void testProducerConfigDefault() { public void testProducerConfigDefault() {
final TestableProcessor processor = new TestableProcessor(); final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
ProcessContext context = runner.getProcessContext(); final ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context); final ProducerConfig config = processor.createConfig(context);
// Check the codec // Check the codec
CompressionCodec codec = config.compressionCodec(); final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.NoCompressionCodec$); assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
// Check compressed topics // Check compressed topics
Seq<String> compressedTopics = config.compressedTopics(); final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(0, compressedTopics.size()); assertEquals(0, compressedTopics.size());
// Check the producer type // Check the producer type
String actualProducerType = config.producerType(); final String actualProducerType = config.producerType();
assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType);
} }
@ -262,10 +262,10 @@ public class TestPutKafka {
@Test @Test
public void testProducerConfigAsyncWithCompression() { public void testProducerConfigAsyncWithCompression() {
final TestableProcessor processor = new TestableProcessor(); final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@ -273,33 +273,33 @@ public class TestPutKafka {
runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
ProcessContext context = runner.getProcessContext(); final ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context); final ProducerConfig config = processor.createConfig(context);
// Check that the codec is snappy // Check that the codec is snappy
CompressionCodec codec = config.compressionCodec(); final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
// Check compressed topics // Check compressed topics
Seq<String> compressedTopics = config.compressedTopics(); final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(3, compressedTopics.size()); assertEquals(3, compressedTopics.size());
assertTrue(compressedTopics.contains("topic01")); assertTrue(compressedTopics.contains("topic01"));
assertTrue(compressedTopics.contains("topic02")); assertTrue(compressedTopics.contains("topic02"));
assertTrue(compressedTopics.contains("topic03")); assertTrue(compressedTopics.contains("topic03"));
// Check the producer type // Check the producer type
String actualProducerType = config.producerType(); final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType); assertEquals("async", actualProducerType);
} }
@Test @Test
public void testProducerConfigAsyncQueueThresholds() { public void testProducerConfigAsyncQueueThresholds() {
final TestableProcessor processor = new TestableProcessor(); final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@ -308,27 +308,27 @@ public class TestPutKafka {
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
ProcessContext context = runner.getProcessContext(); final ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context); final ProducerConfig config = processor.createConfig(context);
// Check that the queue thresholds were properly translated // Check that the queue thresholds were properly translated
assertEquals(7000, config.queueBufferingMaxMs()); assertEquals(7000, config.queueBufferingMaxMs());
assertEquals(535, config.queueBufferingMaxMessages()); assertEquals(535, config.queueBufferingMaxMessages());
assertEquals(200, config.queueEnqueueTimeoutMs()); assertEquals(200, config.queueEnqueueTimeoutMs());
// Check the producer type // Check the producer type
String actualProducerType = config.producerType(); final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType); assertEquals("async", actualProducerType);
} }
@Test @Test
public void testProducerConfigInvalidBatchSize() { public void testProducerConfigInvalidBatchSize() {
final TestableProcessor processor = new TestableProcessor(); final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
@ -339,28 +339,28 @@ public class TestPutKafka {
runner.assertNotValid(); runner.assertNotValid();
} }
@Test @Test
public void testProducerConfigAsyncDefaultEnqueueTimeout() { public void testProducerConfigAsyncDefaultEnqueueTimeout() {
final TestableProcessor processor = new TestableProcessor(); final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
// Do not set QUEUE_ENQUEUE_TIMEOUT // Do not set QUEUE_ENQUEUE_TIMEOUT
ProcessContext context = runner.getProcessContext(); final ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context); final ProducerConfig config = processor.createConfig(context);
// Check that the enqueue timeout defaults to -1 // Check that the enqueue timeout defaults to -1
assertEquals(-1, config.queueEnqueueTimeoutMs()); assertEquals(-1, config.queueEnqueueTimeoutMs());
// Check the producer type // Check the producer type
String actualProducerType = config.producerType(); final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType); assertEquals("async", actualProducerType);
} }
@ -391,12 +391,13 @@ public class TestPutKafka {
public MockProducer getProducer() { public MockProducer getProducer() {
return producer; return producer;
} }
/** /**
* Exposed for test verification * Exposed for test verification
*/ */
@Override
public ProducerConfig createConfig(final ProcessContext context) { public ProducerConfig createConfig(final ProcessContext context) {
return super.createConfig(context); return super.createConfig(context);
} }
} }