[NIFI-413] Adding properties to PutKafka to support asynchronous production with configurable batching. Also added user-defined control over compression codec and compressed topics. Producer type remains synchronous by default.

This commit is contained in:
Brian Ghigiarelli 2015-05-14 08:55:04 -04:00
parent 421ad8fb13
commit 9653770ac4
2 changed files with 171 additions and 3 deletions

View File

@ -136,6 +136,68 @@ public class PutKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread."
+ " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
+ " By setting the producer to async we allow batching together of requests (which is great for throughput)"
+ " but open the possibility of a failure of the client machine dropping unsent data.")
.required(true)
.allowableValues("sync", "async")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("sync")
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Message Batch Size (batch.num.messages)")
.description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode."
+ " The producer will wait until either this number of messages are ready"
+ " to send or queue.buffer.max.ms is reached.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("200").build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time (queue.buffering.max.ms)")
.description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100"
+ " will try to batch together 100ms of messages to send at once. This will improve"
+ " throughput but adds message delivery latency due to the buffering.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5000").build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
.name("Queue Buffer Max Count (queue.buffering.max.messages)")
.description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when"
+ " using async mode before either the producer must be blocked or data must be dropped.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000").build();
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
.description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode"
+ " and the buffer has reached queue.buffering.max.messages. If set to 0 events will"
+ " be enqueued immediately or dropped if the queue is full (the producer send call will"
+ " never block). If set to -1 the producer will block indefinitely and never willingly"
+ " drop a send.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("-1").build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression Codec (compression.codec)")
.description("This parameter allows you to specify the compression codec for all"
+ " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("none", "gzip", "snappy")
.defaultValue("none").build();
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
.name("Compressed Topics (compressed.topics)")
.description("This parameter allows you to set whether compression should be turned on"
+ " for particular topics. If the compression codec is anything other than"
+ " NoCompressionCodec, enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is NoCompressionCodec,"
+ " compression is disabled for all topics")
.required(false).build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -163,6 +225,13 @@ public class PutKafka extends AbstractProcessor {
props.add(MESSAGE_DELIMITER);
props.add(MAX_BUFFER_SIZE);
props.add(TIMEOUT);
props.add(PRODUCER_TYPE);
props.add(BATCH_NUM_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MS);
props.add(QUEUE_ENQUEUE_TIMEOUT_MS);
props.add(COMPRESSION_CODEC);
props.add(COMPRESSED_TOPICS);
props.add(clientName);
return props;
}
@ -194,7 +263,17 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", "sync");
properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue());
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue());
properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue());
String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if(compressedTopics != null) {
properties.setProperty("compressed.topics", compressedTopics);
}
return new ProducerConfig(properties);
}

View File

@ -29,11 +29,11 @@ import java.util.concurrent.atomic.AtomicLong;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.message.CompressionCodec;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.util.MockFlowFile;
@ -49,6 +49,8 @@ import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import scala.collection.Seq;
public class TestPutKafka {
@Test
@ -217,7 +219,25 @@ public class TestPutKafka {
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
final Map<String, String> attributes = new HashMap<>();
keyValuePutExecute(runner);
}
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePutAsync() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
}
private void keyValuePutExecute(final TestRunner runner) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3");
@ -234,6 +254,68 @@ public class TestPutKafka {
final MockFlowFile mff = mffs.get(0);
assertTrue(Arrays.equals(data, mff.toByteArray()));
}
@Test
public void testProducerConfigDefault() {
final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context);
// Check the codec
CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
// Check compressed topics
Seq<String> compressedTopics = config.compressedTopics();
assertEquals(0, compressedTopics.size());
// Check the producer type
String actualProducerType = config.producerType();
assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType);
}
@Test
public void testProducerConfigAsyncWithCompression() {
final TestableProcessor processor = new TestableProcessor();
TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, "async");
runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy");
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
ProcessContext context = runner.getProcessContext();
ProducerConfig config = processor.createConfig(context);
// Check that the codec is snappy
CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
// Check compressed topics
Seq<String> compressedTopics = config.compressedTopics();
assertEquals(3, compressedTopics.size());
assertTrue(compressedTopics.contains("topic01"));
assertTrue(compressedTopics.contains("topic02"));
assertTrue(compressedTopics.contains("topic03"));
// Check the producer type
String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
private static class TestableProcessor extends PutKafka {
@ -262,6 +344,13 @@ public class TestPutKafka {
public MockProducer getProducer() {
return producer;
}
/**
* Exposed for test verification
*/
public ProducerConfig createConfig(final ProcessContext context) {
return super.createConfig(context);
}
}
private static class MockProducer extends Producer<byte[], byte[]> {