[NIFI-413] Updating PutKafka properties to follow NiFi standards. Added validation for asynchronous properties.

This commit is contained in:
Brian Ghigiarelli 2015-06-19 18:49:21 -04:00
parent 5b0648cf3f
commit 8af84f3f73
2 changed files with 167 additions and 40 deletions
nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src
main/java/org/apache/nifi/processors/kafka
test/java/org/apache/nifi/processors/kafka

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
@ -39,6 +40,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@ -74,6 +77,32 @@ public class PutKafka extends AbstractProcessor {
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss.");
/**
* AllowableValue for a Producer Type that synchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
/**
* AllowableValue for a Producer Type that asynchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka."
+ " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data.");
/**
* AllowableValue for sending messages to Kafka without compression
*/
public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic.");
/**
* AllowableValue for sending messages to Kafka with GZIP compression
*/
public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP");
/**
* AllowableValue for sending messages to Kafka with Snappy compression
*/
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
@ -138,64 +167,66 @@ public class PutKafka extends AbstractProcessor {
.build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread."
+ " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
+ " By setting the producer to async we allow batching together of requests (which is great for throughput)"
+ " but open the possibility of a failure of the client machine dropping unsent data.")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread.")
.required(true)
.allowableValues("sync", "async")
.allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("sync")
.defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Message Batch Size (batch.num.messages)")
.description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode."
.name("Async Batch Size")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+ " The producer will wait until either this number of messages are ready"
+ " to send or queue.buffer.max.ms is reached.")
+ " to send or \"Queue Buffering Max Time\" is reached.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("200").build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time (queue.buffering.max.ms)")
.description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100"
public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms"
+ " will try to batch together 100ms of messages to send at once. This will improve"
+ " throughput but adds message delivery latency due to the buffering.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5000").build();
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs").build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
.name("Queue Buffer Max Count (queue.buffering.max.messages)")
.description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when"
+ " using async mode before either the producer must be blocked or data must be dropped.")
.name("Queue Buffer Max Count")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The maximum number of unsent messages that can be queued up in the producer when"
+ " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000").build();
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
.description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode"
+ " and the buffer has reached queue.buffering.max.messages. If set to 0 events will"
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The amount of time to block before dropping messages when running in "
+ PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+ " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will"
+ " be enqueued immediately or dropped if the queue is full (the producer send call will"
+ " never block). If set to -1 the producer will block indefinitely and never willingly"
+ " never block). If not set, the producer will block indefinitely and never willingly"
+ " drop a send.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("-1").build();
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression Codec (compression.codec)")
.name("Compression Codec")
.description("This parameter allows you to specify the compression codec for all"
+ " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".")
+ " data generated by this producer.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("none", "gzip", "snappy")
.defaultValue("none").build();
.allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
.defaultValue(COMPRESSION_CODEC_NONE.getValue()).build();
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
.name("Compressed Topics (compressed.topics)")
.name("Compressed Topics")
.description("This parameter allows you to set whether compression should be turned on"
+ " for particular topics. If the compression codec is anything other than"
+ " NoCompressionCodec, enable compression only for specified topics if any."
+ " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is NoCompressionCodec,"
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+ " compression is disabled for all topics")
.required(false).build();
@ -228,14 +259,28 @@ public class PutKafka extends AbstractProcessor {
props.add(PRODUCER_TYPE);
props.add(BATCH_NUM_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MS);
props.add(QUEUE_ENQUEUE_TIMEOUT_MS);
props.add(QUEUE_BUFFERING_MAX);
props.add(QUEUE_ENQUEUE_TIMEOUT);
props.add(COMPRESSION_CODEC);
props.add(COMPRESSED_TOPICS);
props.add(clientName);
return props;
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger();
final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
if (batchMessages > bufferMaxMessages) {
errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build());
}
return errors;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>(1);
@ -265,10 +310,20 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue());
Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
if(queueBufferingMillis != null) {
properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis));
}
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue());
properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue());
Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
if(queueEnqueueTimeoutMillis != null) {
properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis));
}
String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.codec", compressionCodec);
String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if(compressedTopics != null) {

View File

@ -205,7 +205,7 @@ public class TestPutKafka {
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
@ -269,8 +269,8 @@ public class TestPutKafka {
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
ProcessContext context = runner.getProcessContext();
@ -293,6 +293,78 @@ public class TestPutKafka {
}
@Test
public void testProducerConfigAsyncQueueThresholds() {
final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context);
// Check that the queue thresholds were properly translated
assertEquals(7000, config.queueBufferingMaxMs());
assertEquals(535, config.queueBufferingMaxMessages());
assertEquals(200, config.queueEnqueueTimeoutMs());
// Check the producer type
String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
@Test
public void testProducerConfigInvalidBatchSize() {
final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
runner.assertNotValid();
}
@Test
public void testProducerConfigAsyncDefaultEnqueueTimeout() {
final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
// Do not set QUEUE_ENQUEUE_TIMEOUT
ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context);
// Check that the enqueue timeout defaults to -1
assertEquals(-1, config.queueEnqueueTimeoutMs());
// Check the producer type
String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
private static class TestableProcessor extends PutKafka {
private MockProducer producer;