From 9653770ac4a050f4439c07f7ae6e34658f08e5a0 Mon Sep 17 00:00:00 2001 From: Brian Ghigiarelli Date: Thu, 14 May 2015 08:55:04 -0400 Subject: [PATCH 01/18] [NIFI-413] Adding properties to PutKafka to support asynchronous production with configurable batching. Also added user-defined control over compression codec and compressed topics. Producer type remains synchronous by default. --- .../nifi/processors/kafka/PutKafka.java | 81 +++++++++++++++- .../nifi/processors/kafka/TestPutKafka.java | 93 ++++++++++++++++++- 2 files changed, 171 insertions(+), 3 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 44b65849af..5bd0d2b981 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -136,6 +136,68 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() + .name("Producer Type") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread." + + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send." + + " By setting the producer to async we allow batching together of requests (which is great for throughput)" + + " but open the possibility of a failure of the client machine dropping unsent data.") + .required(true) + .allowableValues("sync", "async") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("sync") + .build(); + public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() + .name("Async Message Batch Size (batch.num.messages)") + .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode." + + " The producer will wait until either this number of messages are ready" + + " to send or queue.buffer.max.ms is reached.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("200").build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time (queue.buffering.max.ms)") + .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100" + + " will try to batch together 100ms of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5000").build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() + .name("Queue Buffer Max Count (queue.buffering.max.messages)") + .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when" + + " using async mode before either the producer must be blocked or data must be dropped.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000").build(); + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)") + .description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode" + + " and the buffer has reached queue.buffering.max.messages. If set to 0 events will" + + " be enqueued immediately or dropped if the queue is full (the producer send call will" + + " never block). If set to -1 the producer will block indefinitely and never willingly" + + " drop a send.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("-1").build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression Codec (compression.codec)") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy") + .defaultValue("none").build(); + public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() + .name("Compressed Topics (compressed.topics)") + .description("This parameter allows you to set whether compression should be turned on" + + " for particular topics. If the compression codec is anything other than" + + " NoCompressionCodec, enable compression only for specified topics if any." + + " If the list of compressed topics is empty, then enable the specified" + + " compression codec for all topics. If the compression codec is NoCompressionCodec," + + " compression is disabled for all topics") + .required(false).build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -163,6 +225,13 @@ public class PutKafka extends AbstractProcessor { props.add(MESSAGE_DELIMITER); props.add(MAX_BUFFER_SIZE); props.add(TIMEOUT); + props.add(PRODUCER_TYPE); + props.add(BATCH_NUM_MESSAGES); + props.add(QUEUE_BUFFERING_MAX_MESSAGES); + props.add(QUEUE_BUFFERING_MAX_MS); + props.add(QUEUE_ENQUEUE_TIMEOUT_MS); + props.add(COMPRESSION_CODEC); + props.add(COMPRESSED_TOPICS); props.add(clientName); return props; } @@ -194,7 +263,17 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); properties.setProperty("message.send.max.retries", "1"); - properties.setProperty("producer.type", "sync"); + properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); + properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue()); + properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); + properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue()); + properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue()); + + String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); + if(compressedTopics != null) { + properties.setProperty("compressed.topics", compressedTopics); + } return new ProducerConfig(properties); } diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 9500e29399..dd6b309803 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -29,11 +29,11 @@ import java.util.concurrent.atomic.AtomicLong; import kafka.common.FailedToSendMessageException; import kafka.javaapi.producer.Producer; +import kafka.message.CompressionCodec; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.nifi.annotation.lifecycle.OnScheduled; - import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.util.MockFlowFile; @@ -49,6 +49,8 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; +import scala.collection.Seq; + public class TestPutKafka { @Test @@ -217,7 +219,25 @@ public class TestPutKafka { runner.setProperty(PutKafka.TIMEOUT, "3 secs"); runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); - final Map attributes = new HashMap<>(); + keyValuePutExecute(runner); + } + + @Test + @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") + public void testKeyValuePutAsync() { + final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); + runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); + runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); + runner.setProperty(PutKafka.KEY, "${kafka.key}"); + runner.setProperty(PutKafka.TIMEOUT, "3 secs"); + runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + + keyValuePutExecute(runner); + } + + private void keyValuePutExecute(final TestRunner runner) { + final Map attributes = new HashMap<>(); attributes.put("kafka.topic", "test"); attributes.put("kafka.key", "key3"); @@ -234,6 +254,68 @@ public class TestPutKafka { final MockFlowFile mff = mffs.get(0); assertTrue(Arrays.equals(data, mff.toByteArray())); + } + + @Test + public void testProducerConfigDefault() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check the codec + CompressionCodec codec = config.compressionCodec(); + assertTrue(codec instanceof kafka.message.NoCompressionCodec$); + + // Check compressed topics + Seq compressedTopics = config.compressedTopics(); + assertEquals(0, compressedTopics.size()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); + + } + + @Test + public void testProducerConfigAsyncWithCompression() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy"); + runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the codec is snappy + CompressionCodec codec = config.compressionCodec(); + assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); + + // Check compressed topics + Seq compressedTopics = config.compressedTopics(); + assertEquals(3, compressedTopics.size()); + assertTrue(compressedTopics.contains("topic01")); + assertTrue(compressedTopics.contains("topic02")); + assertTrue(compressedTopics.contains("topic03")); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + } private static class TestableProcessor extends PutKafka { @@ -262,6 +344,13 @@ public class TestPutKafka { public MockProducer getProducer() { return producer; } + + /** + * Exposed for test verification + */ + public ProducerConfig createConfig(final ProcessContext context) { + return super.createConfig(context); + } } private static class MockProducer extends Producer { From 8af84f3f73bd7890c44c1f3b7597330023e34d16 Mon Sep 17 00:00:00 2001 From: Brian Ghigiarelli Date: Fri, 19 Jun 2015 18:49:21 -0400 Subject: [PATCH 02/18] [NIFI-413] Updating PutKafka properties to follow NiFi standards. Added validation for asynchronous properties. --- .../nifi/processors/kafka/PutKafka.java | 129 +++++++++++++----- .../nifi/processors/kafka/TestPutKafka.java | 78 ++++++++++- 2 files changed, 167 insertions(+), 40 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 5bd0d2b981..e572622a54 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Properties; @@ -39,6 +40,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -74,6 +77,32 @@ public class PutKafka extends AbstractProcessor { + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " in data loss."); + /** + * AllowableValue for a Producer Type that synchronously sends messages to Kafka + */ + public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately."); + + /** + * AllowableValue for a Producer Type that asynchronously sends messages to Kafka + */ + public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." + + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); + + /** + * AllowableValue for sending messages to Kafka without compression + */ + public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic."); + + /** + * AllowableValue for sending messages to Kafka with GZIP compression + */ + public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP"); + + /** + * AllowableValue for sending messages to Kafka with Snappy compression + */ + public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() .name("Known Brokers") .description("A comma-separated list of known Kafka Brokers in the format :") @@ -138,64 +167,66 @@ public class PutKafka extends AbstractProcessor { .build(); public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread." - + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send." - + " By setting the producer to async we allow batching together of requests (which is great for throughput)" - + " but open the possibility of a failure of the client machine dropping unsent data.") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") .required(true) - .allowableValues("sync", "async") + .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) - .defaultValue("sync") + .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Message Batch Size (batch.num.messages)") - .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode." + .name("Async Batch Size") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." + " The producer will wait until either this number of messages are ready" - + " to send or queue.buffer.max.ms is reached.") + + " to send or \"Queue Buffering Max Time\" is reached.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("200").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time (queue.buffering.max.ms)") - .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100" + public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" + " will try to batch together 100ms of messages to send at once. This will improve" + " throughput but adds message delivery latency due to the buffering.") .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("5000").build(); + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs").build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count (queue.buffering.max.messages)") - .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when" - + " using async mode before either the producer must be blocked or data must be dropped.") + .name("Queue Buffer Max Count") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The maximum number of unsent messages that can be queued up in the producer when" + + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("10000").build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)") - .description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode" - + " and the buffer has reached queue.buffering.max.messages. If set to 0 events will" + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The amount of time to block before dropping messages when running in " + + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" + + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If set to -1 the producer will block indefinitely and never willingly" + + " never block). If not set, the producer will block indefinitely and never willingly" + " drop a send.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("-1").build(); + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec (compression.codec)") + .name("Compression Codec") .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".") + + " data generated by this producer.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues("none", "gzip", "snappy") - .defaultValue("none").build(); + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build(); public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics (compressed.topics)") + .name("Compressed Topics") .description("This parameter allows you to set whether compression should be turned on" + " for particular topics. If the compression codec is anything other than" - + " NoCompressionCodec, enable compression only for specified topics if any." + + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is NoCompressionCodec," + + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," + " compression is disabled for all topics") .required(false).build(); @@ -228,14 +259,28 @@ public class PutKafka extends AbstractProcessor { props.add(PRODUCER_TYPE); props.add(BATCH_NUM_MESSAGES); props.add(QUEUE_BUFFERING_MAX_MESSAGES); - props.add(QUEUE_BUFFERING_MAX_MS); - props.add(QUEUE_ENQUEUE_TIMEOUT_MS); + props.add(QUEUE_BUFFERING_MAX); + props.add(QUEUE_ENQUEUE_TIMEOUT); props.add(COMPRESSION_CODEC); props.add(COMPRESSED_TOPICS); props.add(clientName); return props; } + @Override + public Collection customValidate(final ValidationContext context) { + final List errors = new ArrayList<>(super.customValidate(context)); + + final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); + final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); + + if (batchMessages > bufferMaxMessages) { + errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); + } + + return errors; + } + @Override public Set getRelationships() { final Set relationships = new HashSet<>(1); @@ -265,10 +310,20 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("message.send.max.retries", "1"); properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - properties.setProperty("queue.buffering.max.ms", context.getProperty(QUEUE_BUFFERING_MAX_MS).getValue()); + + Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + if(queueBufferingMillis != null) { + properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); + } properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - properties.setProperty("queue.enqueue.timeout.ms", context.getProperty(QUEUE_ENQUEUE_TIMEOUT_MS).getValue()); - properties.setProperty("compression.codec", context.getProperty(COMPRESSION_CODEC).getValue()); + + Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + if(queueEnqueueTimeoutMillis != null) { + properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + } + + String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + properties.setProperty("compression.codec", compressionCodec); String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); if(compressedTopics != null) { diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index ded0afa54a..5d1eacf022 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -205,7 +205,7 @@ public class TestPutKafka { runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); runner.setProperty(PutKafka.KEY, "${kafka.key}"); runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); keyValuePutExecute(runner); @@ -269,8 +269,8 @@ public class TestPutKafka { runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, "async"); - runner.setProperty(PutKafka.COMPRESSION_CODEC, "snappy"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); ProcessContext context = runner.getProcessContext(); @@ -292,6 +292,78 @@ public class TestPutKafka { assertEquals("async", actualProducerType); } + + @Test + public void testProducerConfigAsyncQueueThresholds() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs"); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); + runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the queue thresholds were properly translated + assertEquals(7000, config.queueBufferingMaxMs()); + assertEquals(535, config.queueBufferingMaxMessages()); + assertEquals(200, config.queueEnqueueTimeoutMs()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + + } + + @Test + public void testProducerConfigInvalidBatchSize() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200"); + runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100"); + + runner.assertNotValid(); + + } + + @Test + public void testProducerConfigAsyncDefaultEnqueueTimeout() { + + final TestableProcessor processor = new TestableProcessor(); + TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); + // Do not set QUEUE_ENQUEUE_TIMEOUT + + ProcessContext context = runner.getProcessContext(); + ProducerConfig config = processor.createConfig(context); + + // Check that the enqueue timeout defaults to -1 + assertEquals(-1, config.queueEnqueueTimeoutMs()); + + // Check the producer type + String actualProducerType = config.producerType(); + assertEquals("async", actualProducerType); + + } private static class TestableProcessor extends PutKafka { From f2f90560557de70ef4404672ce53c4593995a5f1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 12:05:02 -0400 Subject: [PATCH 03/18] NIFI-413: Formatted code to fix checkstyle failures --- .../nifi/processors/kafka/PutKafka.java | 319 +++++++++--------- .../nifi/processors/kafka/TestPutKafka.java | 79 ++--- 2 files changed, 203 insertions(+), 195 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index e572622a54..d83c7bfdbb 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -61,7 +61,7 @@ import org.apache.nifi.util.LongHolder; import scala.actors.threadpool.Arrays; @SupportsBatching -@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") public class PutKafka extends AbstractProcessor { @@ -69,13 +69,13 @@ public class PutKafka extends AbstractProcessor { private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" - + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" - + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" - + " but can result in data loss if a Kafka node crashes"); + + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + + " but can result in data loss if a Kafka node crashes"); public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" - + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" - + " in data loss."); + + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + + " in data loss."); /** * AllowableValue for a Producer Type that synchronously sends messages to Kafka @@ -86,7 +86,7 @@ public class PutKafka extends AbstractProcessor { * AllowableValue for a Producer Type that asynchronously sends messages to Kafka */ public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." - + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); + + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); /** * AllowableValue for sending messages to Kafka without compression @@ -103,150 +103,156 @@ public class PutKafka extends AbstractProcessor { */ public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() - .name("Known Brokers") - .description("A comma-separated list of known Kafka Brokers in the format :") - .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(false) - .build(); + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format :") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic of interest") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data to buffer in memory before sending to Kafka") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1 MB") - .build(); + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() - .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") - .required(true) - .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) - .build(); - public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Batch Size") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." - + " The producer will wait until either this number of messages are ready" - + " to send or \"Queue Buffering Max Time\" is reached.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("200").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" - + " will try to batch together 100ms of messages to send at once. This will improve" - + " throughput but adds message delivery latency due to the buffering.") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("5 secs").build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The maximum number of unsent messages that can be queued up in the producer when" - + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000").build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The amount of time to block before dropping messages when running in " - + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" - + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" - + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If not set, the producer will block indefinitely and never willingly" - + " drop a send.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec") - .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) - .defaultValue(COMPRESSION_CODEC_NONE.getValue()).build(); - public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics") - .description("This parameter allows you to set whether compression should be turned on" - + " for particular topics. If the compression codec is anything other than" - + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." - + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," - + " compression is disabled for all topics") - .required(false).build(); + .name("Producer Type") + .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") + .required(true) + .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) + .build(); + public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() + .name("Async Batch Size") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." + + " The producer will wait until either this number of messages are ready" + + " to send or \"Queue Buffering Max Time\" is reached.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("200") + .build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() + .name("Queue Buffering Max Time") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" + + " will try to batch together 100ms of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); + public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() + .name("Queue Buffer Max Count") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The maximum number of unsent messages that can be queued up in the producer when" + + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() + .name("Queue Enqueue Timeout") + .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." + + " The amount of time to block before dropping messages when running in " + + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" + + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" + + " be enqueued immediately or dropped if the queue is full (the producer send call will" + + " never block). If not set, the producer will block indefinitely and never willingly" + + " drop a send.") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression Codec") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()) + .build(); + public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() + .name("Compressed Topics") + .description("This parameter allows you to set whether compression should be turned on" + + " for particular topics. If the compression codec is anything other than" + + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." + + " If the list of compressed topics is empty, then enable the specified" + + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," + + " compression is disabled for all topics") + .required(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") - .build(); + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); private final BlockingQueue> producers = new LinkedBlockingQueue<>(); @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor clientName = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); final List props = new ArrayList<>(); props.add(SEED_BROKERS); @@ -269,13 +275,14 @@ public class PutKafka extends AbstractProcessor { @Override public Collection customValidate(final ValidationContext context) { - final List errors = new ArrayList<>(super.customValidate(context)); + final List errors = new ArrayList<>(super.customValidate(context)); final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); if (batchMessages > bufferMaxMessages) { - errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false).explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); + errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false) + .explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); } return errors; @@ -311,23 +318,23 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); - if(queueBufferingMillis != null) { - properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); + final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + if (queueBufferingMillis != null) { + properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); } properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - if(queueEnqueueTimeoutMillis != null) { - properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + if (queueEnqueueTimeoutMillis != null) { + properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); } - String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); properties.setProperty("compression.codec", compressionCodec); - - String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); - if(compressedTopics != null) { - properties.setProperty("compressed.topics", compressedTopics); + + final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); + if (compressedTopics != null) { + properties.setProperty("compressed.topics", compressedTopics); } return new ProducerConfig(properties); @@ -338,7 +345,7 @@ public class PutKafka extends AbstractProcessor { } private Producer borrowProducer(final ProcessContext context) { - Producer producer = producers.poll(); + final Producer producer = producers.poll(); return producer == null ? createProducer(context) : producer; } @@ -348,7 +355,7 @@ public class PutKafka extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); if (flowFile == null) { return; } @@ -356,7 +363,7 @@ public class PutKafka extends AbstractProcessor { final long start = System.nanoTime(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); if (delimiter != null) { delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); @@ -389,9 +396,9 @@ public class PutKafka extends AbstractProcessor { session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); session.transfer(flowFile, REL_FAILURE); error = true; } finally { @@ -426,7 +433,7 @@ public class PutKafka extends AbstractProcessor { int nextByte; try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { // read until we're out of data. while (!streamFinished) { @@ -514,7 +521,7 @@ public class PutKafka extends AbstractProcessor { final long nanos = System.nanoTime() - start; session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); } catch (final ProcessException pe) { error = true; @@ -524,7 +531,7 @@ public class PutKafka extends AbstractProcessor { final long offset = lastMessageOffset.get(); if (offset == 0L) { // all of the messages failed to send. Route FlowFile to failure - getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); session.transfer(flowFile, REL_FAILURE); } else { // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. @@ -532,8 +539,8 @@ public class PutKafka extends AbstractProcessor { final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" - + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{ - messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()}); + + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { + messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); session.transfer(successfulMessages, REL_SUCCESS); session.transfer(failedMessages, REL_FAILURE); diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 5d1eacf022..750d40691e 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -212,7 +212,7 @@ public class TestPutKafka { } private void keyValuePutExecute(final TestRunner runner) { - final Map attributes = new HashMap<>(); + final Map attributes = new HashMap<>(); attributes.put("kafka.topic", "test"); attributes.put("kafka.key", "key3"); @@ -229,32 +229,32 @@ public class TestPutKafka { final MockFlowFile mff = mffs.get(0); assertTrue(Arrays.equals(data, mff.toByteArray())); - } + } @Test public void testProducerConfigDefault() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check the codec - CompressionCodec codec = config.compressionCodec(); + final CompressionCodec codec = config.compressionCodec(); assertTrue(codec instanceof kafka.message.NoCompressionCodec$); // Check compressed topics - Seq compressedTopics = config.compressedTopics(); + final Seq compressedTopics = config.compressedTopics(); assertEquals(0, compressedTopics.size()); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); } @@ -262,10 +262,10 @@ public class TestPutKafka { @Test public void testProducerConfigAsyncWithCompression() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -273,33 +273,33 @@ public class TestPutKafka { runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the codec is snappy - CompressionCodec codec = config.compressionCodec(); + final CompressionCodec codec = config.compressionCodec(); assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); // Check compressed topics - Seq compressedTopics = config.compressedTopics(); + final Seq compressedTopics = config.compressedTopics(); assertEquals(3, compressedTopics.size()); assertTrue(compressedTopics.contains("topic01")); assertTrue(compressedTopics.contains("topic02")); assertTrue(compressedTopics.contains("topic03")); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } - + @Test public void testProducerConfigAsyncQueueThresholds() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -308,27 +308,27 @@ public class TestPutKafka { runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the queue thresholds were properly translated assertEquals(7000, config.queueBufferingMaxMs()); assertEquals(535, config.queueBufferingMaxMessages()); assertEquals(200, config.queueEnqueueTimeoutMs()); - + // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } - + @Test public void testProducerConfigInvalidBatchSize() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); @@ -339,28 +339,28 @@ public class TestPutKafka { runner.assertNotValid(); } - + @Test public void testProducerConfigAsyncDefaultEnqueueTimeout() { - final TestableProcessor processor = new TestableProcessor(); - TestRunner runner = TestRunners.newTestRunner(processor); + final TestableProcessor processor = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); // Do not set QUEUE_ENQUEUE_TIMEOUT - ProcessContext context = runner.getProcessContext(); - ProducerConfig config = processor.createConfig(context); + final ProcessContext context = runner.getProcessContext(); + final ProducerConfig config = processor.createConfig(context); // Check that the enqueue timeout defaults to -1 assertEquals(-1, config.queueEnqueueTimeoutMs()); // Check the producer type - String actualProducerType = config.producerType(); + final String actualProducerType = config.producerType(); assertEquals("async", actualProducerType); } @@ -391,12 +391,13 @@ public class TestPutKafka { public MockProducer getProducer() { return producer; } - + /** * Exposed for test verification */ + @Override public ProducerConfig createConfig(final ProcessContext context) { - return super.createConfig(context); + return super.createConfig(context); } } From 5d8bfa7c806549f5ede6eec5047dc66696bc95d8 Mon Sep 17 00:00:00 2001 From: danbress Date: Sat, 20 Jun 2015 19:53:13 -0400 Subject: [PATCH 04/18] NIFI-704 StandardProcessorTestRunner should allow you to wait before calling OnUnScheduled methods Signed-off-by: Mark Payne --- .../util/StandardProcessorTestRunner.java | 10 +++ .../java/org/apache/nifi/util/TestRunner.java | 45 ++++++++++++ ...urrentTestStandardProcessorTestRunner.java | 71 +++++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 655f2df5d9..89385477d7 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -39,6 +39,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -182,6 +183,11 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { + run(iterations, stopOnFinish, initialize, 5000); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) { if (iterations < 1) { throw new IllegalArgumentException(); } @@ -207,6 +213,10 @@ public class StandardProcessorTestRunner implements TestRunner { } executorService.shutdown(); + try { + executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e1) { + } int finishedCount = 0; boolean unscheduledRun = false; diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index a599e5bbdd..fb9fc78600 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -125,6 +125,51 @@ public interface TestRunner { */ void run(int iterations, boolean stopOnFinish, final boolean initialize); + /** + * This method runs the {@link Processor} iterations times, + * using the sequence of steps below: + *
    + *
  • + * If {@code initialize} is true, run all methods on the Processor that are + * annotated with the + * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If + * any of these methods throws an Exception, the Unit Test will fail. + *
  • + *
  • + * Schedule the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method to be invoked iterations times. The number of threads + * used to run these iterations is determined by the ThreadCount of this + * TestRunner. By default, the value is set to 1, but it can be + * modified by calling the {@link #setThreadCount(int)} method. + *
  • + *
  • + * As soon as the first thread finishes its execution of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}, + * all methods on the Processor that are annotated with the + * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + * are invoked. If any of these methods throws an Exception, the Unit Test + * will fail. + *
  • + *
  • + * Waits for all threads to finish execution. + *
  • + *
  • + * If and only if the value of shutdown is true: Call all + * methods on the Processor that is annotated with the + * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation. + *
  • + *
+ * + * @param iterations number of iterations + * @param stopOnFinish whether or not to run the Processor methods that are + * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped} + * @param initialize true if must initialize + * @param runWait indicates the amount of time in milliseconds that the framework should wait for + * processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + */ + void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); + /** * Invokes all methods on the Processor that are annotated with the * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If diff --git a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java new file mode 100644 index 0000000000..6b403af25c --- /dev/null +++ b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Assert; +import org.junit.Test; + +public class CurrentTestStandardProcessorTestRunner { + + /** + * This test will verify that all iterations of the run are finished before unscheduled is called + */ + @Test + public void testOnScheduledCalledAfterRunFinished() { + SlowRunProcessor processor = new SlowRunProcessor(); + StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor); + final int iterations = 5; + runner.run(iterations); + // if the counter is not equal to iterations, the the processor must have been unscheduled + // before all the run calls were made, that would be bad. + Assert.assertEquals(iterations, processor.getCounter()); + } + + /** + * This processor simulates a "slow" processor that checks whether it is scheduled before doing something + * + * + */ + private static class SlowRunProcessor extends AbstractProcessor { + + private int counter = 0; + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + try { + // be slow + Thread.sleep(50); + // make sure we are still scheduled + if (isScheduled()) { + // increment counter + ++counter; + } + } catch (InterruptedException e) { + } + + } + + public int getCounter() { + return counter; + } + } +} From ddad70ba009a875e088020ae1ea8f29caeb529f9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 12:42:22 -0400 Subject: [PATCH 05/18] NIFI-378: updated documentation to explain contract of MergeContent's Defragment strategy a bit more clearly --- .../processors/standard/MergeContent.java | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 2883a758e2..65f4124d55 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -82,31 +82,34 @@ import org.apache.nifi.util.ObjectHolder; + "created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate.") @ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the property is set to Defragment. " - + "All FlowFiles with the same value for this attribute will be bundled together"), - @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the property is set to Defragment. This " - + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute and must be a unique integer " - + "between 0 and the value of the fragment.count attribute. This attribute indicates the order in which the fragments should be assembled"), + + "All FlowFiles with the same value for this attribute will be bundled together."), + @ReadsAttribute(attribute = "fragment.index", description = "Applicable only if the property is set to Defragment. " + + "This attribute indicates the order in which the fragments should be assembled. This " + + "attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all " + + "FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer " + + "between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the " + + "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the behavior of this Processor is undefined."), @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the property is set to Defragment. This " - + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " - + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " - + "in the given bundle"), + + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " + + "in the given bundle."), @ReadsAttribute(attribute = "segment.original.filename", description = "Applicable only if the property is set to Defragment. " - + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " - + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged " - + "FlowFile"), + + "This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged " + + "FlowFile."), @ReadsAttribute(attribute = "tar.permissions", description = "Applicable only if the property is set to TAR. The value of this " - + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should " - + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used")}) + + "attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should " + + "be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used") }) @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "When more than 1 file is merged, the filename comes from the segment.original.filename " - + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching " - + "system time. Then a filename extension may be applied:" - + "if Merge Format is TAR, then the filename will be appended with .tar, " - + "if Merge Format is ZIP, then the filename will be appended with .zip, " - + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), + + "attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching " + + "system time. Then a filename extension may be applied:" + + "if Merge Format is TAR, then the filename will be appended with .tar, " + + "if Merge Format is ZIP, then the filename will be appended with .zip, " + + "if Merge Format is FlowFileStream, then the filename will be appended with .pkg"), @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " - + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output")}) + + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") }) @SeeAlso(SegmentContent.class) public class MergeContent extends BinFiles { @@ -131,7 +134,9 @@ public class MergeContent extends BinFiles { "Defragment", "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + "have the attributes , , and or alternatively (for backward compatibility " - + "purposes) , , and "); + + "purposes) , , and . All FlowFiles with the same value for \"fragment.identifier\" " + + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. All FlowFiles " + + "in this group must have a unique value for the \"fragment.index\" attribute between 0 and the value of the \"fragment.count\" attribute."); public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new AllowableValue( "Filename", "Filename", "The values of Header, Footer, and Demarcator will be retrieved from the contents of a file"); @@ -307,7 +312,7 @@ public class MergeContent extends BinFiles { @Override protected Collection additionalCustomValidation(ValidationContext context) { - Collection results = new ArrayList<>(); + final Collection results = new ArrayList<>(); final String delimiterStrategy = context.getProperty(DELIMITER_STRATEGY).getValue(); if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) { @@ -353,7 +358,7 @@ public class MergeContent extends BinFiles { @Override protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); - String groupId = (correlationAttributeName == null) ? null : flowFile.getAttribute(correlationAttributeName); + String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); // when MERGE_STRATEGY is Defragment and correlationAttributeName is null then bin by fragment.identifier if (groupId == null && MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { @@ -442,7 +447,7 @@ public class MergeContent extends BinFiles { bundle = session.putAllAttributes(bundle, bundleAttributes); - final String inputDescription = (binCopy.size() < 10) ? binCopy.toString() : binCopy.size() + " FlowFiles"; + final String inputDescription = binCopy.size() < 10 ? binCopy.toString() : binCopy.size() + " FlowFiles"; getLogger().info("Merged {} into {}", new Object[]{inputDescription, bundle}); session.transfer(bundle, REL_MERGED); @@ -640,18 +645,18 @@ public class MergeContent extends BinFiles { } if (".".equals(path.getName(0).toString())) { - path = (path.getNameCount() == 1) ? null : path.subpath(1, path.getNameCount()); + path = path.getNameCount() == 1 ? null : path.subpath(1, path.getNameCount()); } - return (path == null) ? "" : path.toString() + "/"; + return path == null ? "" : path.toString() + "/"; } private String createFilename(final List wrappers) { if (wrappers.size() == 1) { return wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key()); } else { - FlowFile ff = wrappers.get(0).getFlowFile(); - String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME); + final FlowFile ff = wrappers.get(0).getFlowFile(); + final String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME); if (origFilename != null) { return origFilename; } else { From 77a0561bc059b7503867d583245b025d08244dc8 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 12:50:17 -0400 Subject: [PATCH 06/18] NIFI-672: Fixed typo in Admin Guide --- nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc index 039e5b9e8d..5535c35544 100644 --- a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -210,7 +210,7 @@ Here is an example entry using the name John Smith: ---- - + From b22a1261c3f29c8714d043174b7beff36ebe5549 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 11:31:55 -0400 Subject: [PATCH 07/18] NIFI-711: Do not check status of FlowFile when emitting provenance CLONE event during session commit/checkpoint --- .../controller/repository/StandardProcessSession.java | 2 +- .../controller/repository/StandardProvenanceReporter.java | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 4ee8c06e2c..04e819e5a2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -260,7 +260,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord clone = builder.build(); final StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue()); - getProvenanceReporter().clone(currRec, clone); + provenanceReporter.clone(currRec, clone, false); final ContentClaim claim = clone.getContentClaim(); if (claim != null) { diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 5194fef3b8..8852f424f4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -327,7 +327,13 @@ public class StandardProvenanceReporter implements ProvenanceReporter { @Override public void clone(final FlowFile parent, final FlowFile child) { - verifyFlowFileKnown(child); + clone(parent, child, true); + } + + void clone(final FlowFile parent, final FlowFile child, final boolean verifyFlowFile) { + if (verifyFlowFile) { + verifyFlowFileKnown(child); + } try { final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); From 979671ca9719aca6789f213529991fc99d86979d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Jun 2015 09:21:44 -0400 Subject: [PATCH 08/18] NIFI-545: Code cleanup --- .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index c0395a42a1..335c0ef5a4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -188,7 +188,7 @@ public class DataFlowDaoImpl implements DataFlowDao { return; } - if ((primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null)) { + if (primaryEntry == null && restoreEntry != null || primaryEntry != null && restoreEntry == null) { throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); } @@ -352,7 +352,7 @@ public class DataFlowDaoImpl implements DataFlowDao { final File[] files = dir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT)); + return name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) || name.endsWith(UNKNOWN_EXT); } }); @@ -515,19 +515,10 @@ public class DataFlowDaoImpl implements DataFlowDao { final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes); dataFlow.setAutoStartProcessors(autoStart); - return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); + return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes); } private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException { - - // get the data flow - DataFlow dataFlow = clusterDataFlow.getDataFlow(); - - // if no dataflow, then write a new dataflow - if (dataFlow == null) { - dataFlow = new StandardDataFlow(new byte[0], new byte[0], new byte[0]); - } - // setup the cluster metadata final ClusterMetadata clusterMetadata = new ClusterMetadata(); clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId()); From 25146a5828bc5dd1d04c5252843bef93ea87afac Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sat, 20 Jun 2015 08:52:14 -0400 Subject: [PATCH 09/18] NIFI-80: Truncate attribute values that exceed some threshold. Expose threshold as properties in nifi.properties file --- nifi/nifi-assembly/pom.xml | 1 + .../src/main/resources/conf/nifi.properties | 3 + .../nifi/provenance/IndexConfiguration.java | 2 +- .../PersistentProvenanceRepository.java | 90 ++++++++++++++++--- .../provenance/RepositoryConfiguration.java | 18 ++++ .../nifi/provenance/StandardRecordReader.java | 15 ++-- .../provenance/lucene/DeleteIndexAction.java | 6 +- .../nifi/provenance/lucene/DocsReader.java | 9 +- .../nifi/provenance/lucene/IndexSearch.java | 7 +- .../nifi/provenance/lucene/LineageQuery.java | 6 +- .../serialization/RecordReaders.java | 17 +++- .../TestPersistentProvenanceRepository.java | 39 +++++++- .../TestStandardRecordReaderWriter.java | 10 +-- 13 files changed, 180 insertions(+), 43 deletions(-) diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 02a16f9384..9b17617b2a 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -276,6 +276,7 @@ language governing permissions and limitations under the License. --> 500 MB false 16 + 65536 100000 diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 90b3cdd07f..4043076774 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -82,6 +82,9 @@ nifi.provenance.repository.indexed.attributes=${nifi.provenance.repository.index # Large values for the shard size will result in more Java heap usage when searching the Provenance Repository # but should provide better performance nifi.provenance.repository.index.shard.size=${nifi.provenance.repository.index.shard.size} +# Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from +# the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved. +nifi.provenance.repository.max.attribute.length=${nifi.provenance.repository.max.attribute.length} # Volatile Provenance Respository Properties nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size} diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java index 9ea793daf9..4e808111d5 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java @@ -85,7 +85,7 @@ public class IndexConfiguration { } private Long getFirstEntryTime(final File provenanceLogFile) { - try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null)) { + try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) { final StandardProvenanceEventRecord firstRecord = reader.nextRecord(); if (firstRecord == null) { return provenanceLogFile.lastModified(); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 5da5d6fe42..81d883a74e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -134,6 +134,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final IndexManager indexManager; private final boolean alwaysSync; private final int rolloverCheckMillis; + private final int maxAttributeChars; private final ScheduledExecutorService scheduledExecService; private final ScheduledExecutorService rolloverExecutor; @@ -167,6 +168,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } this.configuration = configuration; + this.maxAttributeChars = configuration.getMaxAttributeChars(); for (final File file : configuration.getStorageDirectories()) { final Path storageDirectory = file.toPath(); @@ -289,6 +291,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); + final int defaultMaxAttrChars = 65536; + final String maxAttrLength = properties.getProperty("nifi.provenance.repository.max.attribute.length", String.valueOf(defaultMaxAttrChars)); + int maxAttrChars; + try { + maxAttrChars = Integer.parseInt(maxAttrLength); + // must be at least 36 characters because that's the length of the uuid attribute, + // which must be kept intact + if (maxAttrChars < 36) { + maxAttrChars = 36; + logger.warn("Found max attribute length property set to " + maxAttrLength + " but minimum length is 36; using 36 instead"); + } + } catch (final Exception e) { + maxAttrChars = defaultMaxAttrChars; + } + final List searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); final List searchableAttributes = SearchableFieldParser.extractSearchableFields(indexedAttrString, false); @@ -310,6 +327,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository config.setMaxStorageCapacity(maxStorageBytes); config.setQueryThreadPoolSize(queryThreads); config.setJournalCount(journalCount); + config.setMaxAttributeChars(maxAttrChars); if (shardSize != null) { config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); @@ -337,6 +355,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return writers; } + /** + * @return the maximum number of characters that any Event attribute should contain. If the event contains + * more characters than this, the attribute may be truncated on retrieval + */ + public int getMaxAttributeCharacters() { + return maxAttributeChars; + } + @Override public StandardProvenanceEventRecord.Builder eventBuilder() { return new StandardProvenanceEventRecord.Builder(); @@ -362,7 +388,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } for (final Path path : paths) { - try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles())) { + try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) { // if this is the first record, try to find out the block index and jump directly to // the block index. This avoids having to read through a lot of data that we don't care about // just to get to the first record that we want. @@ -377,7 +403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } StandardProvenanceEventRecord record; - while (records.size() < maxRecords && ((record = reader.nextRecord()) != null)) { + while (records.size() < maxRecords && (record = reader.nextRecord()) != null) { if (record.getEventId() >= firstRecordId) { records.add(record); } @@ -507,7 +533,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository if (maxIdFile != null) { // Determine the max ID in the last file. - try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) { + try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles(), maxAttributeChars)) { final long eventId = reader.getMaxEventId(); if (eventId > maxId) { maxId = eventId; @@ -571,7 +597,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Read the records in the last file to find its max id if (greatestMinIdFile != null) { - try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.emptyList())) { + try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections. emptyList(), maxAttributeChars)) { maxId = recordReader.getMaxEventId(); } } @@ -1224,7 +1250,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { for (final File journalFile : journalFiles) { try { - readers.add(RecordReaders.newRecordReader(journalFile, null)); + // Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it + // out. This allows us to later decide that we want more characters and still be able to retrieve + // the entire event. + readers.add(RecordReaders.newRecordReader(journalFile, null, Integer.MAX_VALUE)); } catch (final EOFException eof) { // there's nothing here. Skip over it. } catch (final IOException ioe) { @@ -1314,7 +1343,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository indexingAction.index(record, indexWriter, blockIndex); maxId = record.getEventId(); - latestRecords.add(record); + latestRecords.add(truncateAttributes(record)); records++; // Remove this entry from the map @@ -1383,6 +1412,39 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return writerFile; } + private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) { + boolean requireTruncation = false; + + for (final Map.Entry entry : original.getAttributes().entrySet()) { + if (entry.getValue().length() > maxAttributeChars) { + requireTruncation = true; + break; + } + } + + if (!requireTruncation) { + return original; + } + + final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder().fromEvent(original); + builder.setAttributes(truncateAttributes(original.getPreviousAttributes()), truncateAttributes(original.getUpdatedAttributes())); + final StandardProvenanceEventRecord truncated = builder.build(); + truncated.setEventId(original.getEventId()); + return truncated; + } + + private Map truncateAttributes(final Map original) { + final Map truncatedAttrs = new HashMap<>(); + for (final Map.Entry entry : original.entrySet()) { + if (entry.getValue().length() > maxAttributeChars) { + truncatedAttrs.put(entry.getKey(), entry.getValue().substring(0, maxAttributeChars)); + } else { + truncatedAttrs.put(entry.getKey(), entry.getValue()); + } + } + return truncatedAttrs; + } + @Override public List getSearchableFields() { final List searchableFields = new ArrayList<>(configuration.getSearchableFields()); @@ -1612,7 +1674,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final File file : potentialFiles) { try { - reader = RecordReaders.newRecordReader(file, allLogFiles); + reader = RecordReaders.newRecordReader(file, allLogFiles, maxAttributeChars); } catch (final IOException ioe) { continue; } @@ -1788,7 +1850,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return true; } - if (repoDirty.get() || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) { + if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) { return true; } @@ -1797,7 +1859,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public Collection getAllLogFiles() { final SortedMap map = idToPathMap.get(); - return (map == null) ? new ArrayList() : map.values(); + return map == null ? new ArrayList() : map.values(); } private static class PathMapComparator implements Comparator { @@ -1885,7 +1947,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository @Override public void run() { try { - final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager); + final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars); final StandardQueryResult queryResult = search.search(query, retrievalCount); submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount()); if (queryResult.isFinished()) { @@ -1926,7 +1988,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } try { - final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, indexManager, indexDir, null, flowFileUuids); + final Set matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this, + indexManager, indexDir, null, flowFileUuids, maxAttributeChars); + final StandardLineageResult result = submission.getResult(); result.update(matchingRecords); @@ -1959,7 +2023,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Map.Entry entry = queryIterator.next(); final StandardQueryResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { queryIterator.remove(); } } @@ -1969,7 +2033,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final Map.Entry entry = lineageIterator.next(); final StandardLineageResult result = entry.getValue().getResult(); - if (entry.getValue().isCanceled() || (result.isFinished() && result.getExpiration().before(now))) { + if (entry.getValue().isCanceled() || result.isFinished() && result.getExpiration().before(now)) { lineageIterator.remove(); } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java index d0d147c332..381d778595 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java @@ -34,6 +34,7 @@ public class RepositoryConfiguration { private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int journalCount = 16; private int compressionBlockBytes = 1024 * 1024; + private int maxAttributeChars = 65536; private List searchableFields = new ArrayList<>(); private List searchableAttributes = new ArrayList<>(); @@ -278,4 +279,21 @@ public class RepositoryConfiguration { public void setAlwaysSync(boolean alwaysSync) { this.alwaysSync = alwaysSync; } + + /** + * @return the maximum number of characters to include in any attribute. If an attribute in a Provenance + * Event has more than this number of characters, it will be truncated when the event is retrieved. + */ + public int getMaxAttributeChars() { + return maxAttributeChars; + } + + /** + * Sets the maximum number of characters to include in any attribute. If an attribute in a Provenance + * Event has more than this number of characters, it will be truncated when it is retrieved. + */ + public void setMaxAttributeChars(int maxAttributeChars) { + this.maxAttributeChars = maxAttributeChars; + } + } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index ca0d5edd26..09391072c6 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -47,18 +47,20 @@ public class StandardRecordReader implements RecordReader { private final boolean compressed; private final TocReader tocReader; private final int headerLength; + private final int maxAttributeChars; private DataInputStream dis; private ByteCountingInputStream byteCountingIn; - public StandardRecordReader(final InputStream in, final String filename) throws IOException { - this(in, filename, null); + public StandardRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException { + this(in, filename, null, maxAttributeChars); } - public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException { + public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException { logger.trace("Creating RecordReader for {}", filename); rawInputStream = new ByteCountingInputStream(in); + this.maxAttributeChars = maxAttributeChars; final InputStream limitedStream; if ( tocReader == null ) { @@ -367,7 +369,8 @@ public class StandardRecordReader implements RecordReader { for (int i = 0; i < numAttributes; i++) { final String key = readLongString(dis); final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis); - attrs.put(key, value); + final String truncatedValue = value.length() > maxAttributeChars ? value.substring(0, maxAttributeChars) : value; + attrs.put(key, truncatedValue); } return attrs; @@ -429,7 +432,7 @@ public class StandardRecordReader implements RecordReader { byteCountingIn.reset(); } - return (nextByte >= 0); + return nextByte >= 0; } @Override @@ -451,7 +454,7 @@ public class StandardRecordReader implements RecordReader { // committed, so we can just process the FlowFile again. } - return (lastRecord == null) ? -1L : lastRecord.getEventId(); + return lastRecord == null ? -1L : lastRecord.getEventId(); } @Override diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java index 70bf36e108..7707352064 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java @@ -46,9 +46,9 @@ public class DeleteIndexAction implements ExpirationAction { @Override public File execute(final File expiredFile) throws IOException { // count the number of records and determine the max event id that we are deleting. - long numDeleted = 0; + final long numDeleted = 0; long maxEventId = -1L; - try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) { + try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles(), Integer.MAX_VALUE)) { maxEventId = reader.getMaxEventId(); } catch (final IOException ioe) { logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath()); @@ -65,7 +65,7 @@ public class DeleteIndexAction implements ExpirationAction { writer.deleteDocuments(term); writer.commit(); final int docsLeft = writer.numDocs(); - deleteDir = (docsLeft <= 0); + deleteDir = docsLeft <= 0; logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory); } finally { indexManager.returnIndexWriter(indexingDirectory, writer); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 02fd5c3e28..eef46281a2 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -51,7 +51,7 @@ public class DocsReader { } public Set read(final TopDocs topDocs, final IndexReader indexReader, final Collection allProvenanceLogFiles, - final AtomicInteger retrievalCount, final int maxResults) throws IOException { + final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); } @@ -68,7 +68,7 @@ public class DocsReader { final long readDocuments = System.nanoTime() - start; logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); - return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); + return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars); } @@ -108,7 +108,8 @@ public class DocsReader { } - public Set read(final List docs, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + public Set read(final List docs, final Collection allProvenanceLogFiles, + final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException { if (retrievalCount.get() >= maxResults) { return Collections.emptySet(); } @@ -161,7 +162,7 @@ public class DocsReader { for (final File file : potentialFiles) { try { - reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); + reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars); matchingRecords.add(getRecord(d, reader)); if ( retrievalCount.incrementAndGet() >= maxResults ) { diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java index 53869f4563..c9bb238bed 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java @@ -39,11 +39,13 @@ public class IndexSearch { private final PersistentProvenanceRepository repository; private final File indexDirectory; private final IndexManager indexManager; + private final int maxAttributeChars; - public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager) { + public IndexSearch(final PersistentProvenanceRepository repo, final File indexDirectory, final IndexManager indexManager, final int maxAttributeChars) { this.repository = repo; this.indexDirectory = indexDirectory; this.indexManager = indexManager; + this.maxAttributeChars = maxAttributeChars; } public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount) throws IOException { @@ -82,7 +84,8 @@ public class IndexSearch { } final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories()); - matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults()); + matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, + provenanceQuery.getMaxResults(), maxAttributeChars); final long readRecordsNanos = System.nanoTime() - finishSearch; logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java index 502068bea5..e9e6e63462 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java @@ -46,7 +46,7 @@ public class LineageQuery { private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class); public static Set computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final IndexManager indexManager, final File indexDirectory, - final String lineageIdentifier, final Collection flowFileUuids) throws IOException { + final String lineageIdentifier, final Collection flowFileUuids, final int maxAttributeChars) throws IOException { if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) { throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size())); } @@ -94,7 +94,9 @@ public class LineageQuery { final long searchEnd = System.nanoTime(); final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories()); - final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE); + final Set recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(), + new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars); + final long readDocsEnd = System.nanoTime(); logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis", indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd)); diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java index cab5e6f250..7889cd6952 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java @@ -32,7 +32,18 @@ import org.apache.nifi.provenance.toc.TocUtil; public class RecordReaders { - public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles) throws IOException { + /** + * Creates a new Record Reader that is capable of reading Provenance Event Journals + * + * @param file the Provenance Event Journal to read data from + * @param provenanceLogFiles collection of all provenance journal files + * @param maxAttributeChars the maximum number of characters to retrieve for any one attribute. This allows us to avoid + * issues where a FlowFile has an extremely large attribute and reading events + * for that FlowFile results in loading that attribute into memory many times, exhausting the Java Heap + * @return a Record Reader capable of reading Provenance Event Journals + * @throws IOException if unable to create a Record Reader for the given file + */ + public static RecordReader newRecordReader(File file, final Collection provenanceLogFiles, final int maxAttributeChars) throws IOException { final File originalFile = file; InputStream fis = null; @@ -92,9 +103,9 @@ public class RecordReaders { final File tocFile = TocUtil.getTocFile(file); if ( tocFile.exists() ) { final TocReader tocReader = new StandardTocReader(tocFile); - return new StandardRecordReader(fis, filename, tocReader); + return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars); } else { - return new StandardRecordReader(fis, filename); + return new StandardRecordReader(fis, filename, maxAttributeChars); } } catch (final IOException ioe) { if ( fis != null ) { diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 7d97bcdd5d..16f0312839 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -252,7 +252,7 @@ public class TestPersistentProvenanceRepository { assertEquals(10, recoveredRecords.size()); for (int i = 0; i < 10; i++) { final ProvenanceEventRecord recovered = recoveredRecords.get(i); - assertEquals((long) i, recovered.getEventId()); + assertEquals(i, recovered.getEventId()); assertEquals("nifi://unit-test", recovered.getTransitUri()); assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType()); assertEquals(attributes, recovered.getAttributes()); @@ -283,7 +283,7 @@ public class TestPersistentProvenanceRepository { builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); builder.setComponentId("1234"); builder.setComponentType("dummy processor"); - ProvenanceEventRecord record = builder.build(); + final ProvenanceEventRecord record = builder.build(); for (int i = 0; i < 10; i++) { repo.registerEvent(record); @@ -1106,7 +1106,7 @@ public class TestPersistentProvenanceRepository { final Query q = new Query(""); q.setMaxResults(1000); - TopDocs topDocs = searcher.search(luceneQuery, 1000); + final TopDocs topDocs = searcher.search(luceneQuery, 1000); final List docs = new ArrayList<>(); for (int i = 0; i < topDocs.scoreDocs.length; i++) { @@ -1157,7 +1157,7 @@ public class TestPersistentProvenanceRepository { for (final File file : storageDir.listFiles()) { if (file.isFile()) { - try (RecordReader reader = RecordReaders.newRecordReader(file, null)) { + try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) { ProvenanceEventRecord r = null; while ((r = reader.nextRecord()) != null) { @@ -1169,4 +1169,35 @@ public class TestPersistentProvenanceRepository { assertEquals(10000, counter); } + + @Test + public void testTruncateAttributes() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxAttributeChars(50); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); + repo.initialize(getEventReporter()); + + final Map attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + repo.waitForRollover(); + + final ProvenanceEventRecord retrieved = repo.getEvent(0L); + assertNotNull(retrieved); + assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid")); + assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars")); + } + } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index f242642fbe..d9e64e55d3 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -74,7 +74,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); final StandardProvenanceEventRecord recovered = reader.nextRecord(); @@ -102,7 +102,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { assertEquals(0, reader.getBlockIndex()); reader.skipToBlock(0); final StandardProvenanceEventRecord recovered = reader.nextRecord(); @@ -133,7 +133,7 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { for (int i=0; i < 10; i++) { assertEquals(0, reader.getBlockIndex()); @@ -172,12 +172,12 @@ public class TestStandardRecordReaderWriter { final TocReader tocReader = new StandardTocReader(tocFile); try (final FileInputStream fis = new FileInputStream(journalFile); - final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) { + final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader, 2048)) { for (int i=0; i < 10; i++) { final StandardProvenanceEventRecord recovered = reader.nextRecord(); System.out.println(recovered); assertNotNull(recovered); - assertEquals((long) i, recovered.getEventId()); + assertEquals(i, recovered.getEventId()); assertEquals("nifi://unit-test", recovered.getTransitUri()); } From 908ec18b59030f0b43637ca3c90654040566c99e Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Mon, 22 Jun 2015 16:39:18 -0400 Subject: [PATCH 10/18] NIFI-578: - Deprecating unused methods. --- .../java/org/apache/nifi/provenance/lineage/LineageNode.java | 1 + .../main/java/org/apache/nifi/provenance/lineage/EventNode.java | 2 ++ .../java/org/apache/nifi/provenance/lineage/FlowFileNode.java | 1 + 3 files changed, 4 insertions(+) diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java index c50cdf5840..56e865f1f9 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java @@ -22,6 +22,7 @@ public interface LineageNode { * @return the identifier of the Clustered NiFi Node that generated the * event */ + @Deprecated String getClusterNodeIdentifier(); /** diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java index 12d9a4ff5e..1f8d1dcd46 100644 --- a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java +++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java @@ -36,11 +36,13 @@ public class EventNode implements ProvenanceEventLineageNode { return String.valueOf(getEventIdentifier()); } + @Deprecated @Override public String getClusterNodeIdentifier() { return clusterNodeIdentifier; } + @Deprecated public void setClusterNodeIdentifier(final String nodeIdentifier) { this.clusterNodeIdentifier = nodeIdentifier; } diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java index fdc7470d99..94e7661aed 100644 --- a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java +++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java @@ -39,6 +39,7 @@ public class FlowFileNode implements LineageNode { return creationTime; } + @Deprecated @Override public String getClusterNodeIdentifier() { return clusterNodeIdentifier; From b38d74a65f9f58e8a628c648ab48ee3c7cace0db Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Tue, 23 Jun 2015 07:26:43 -0400 Subject: [PATCH 11/18] NIFI-373: - Including a clearer message when the UI and cluster security properties are not inline. --- .../org/apache/nifi/util/NiFiProperties.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index 43f02cab77..e25f5d6129 100644 --- a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -233,8 +233,7 @@ public class NiFiProperties extends Properties { * obtained. * * @return the NiFiProperties object to use - * @throws RuntimeException - * if unable to load properties file + * @throws RuntimeException if unable to load properties file */ public static synchronized NiFiProperties getInstance() { if (null == instance) { @@ -794,7 +793,7 @@ public class NiFiProperties extends Properties { final String scheme = (rawScheme == null) ? "http" : rawScheme; final String host; - final int port; + final Integer port; if ("http".equalsIgnoreCase(scheme)) { // get host if (StringUtils.isBlank(getProperty(WEB_HTTP_HOST))) { @@ -804,6 +803,10 @@ public class NiFiProperties extends Properties { } // get port port = getPort(); + + if (port == null) { + throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to false.", WEB_HTTP_PORT, CLUSTER_PROTOCOL_IS_SECURE)); + } } else { // get host if (StringUtils.isBlank(getProperty(WEB_HTTPS_HOST))) { @@ -813,6 +816,10 @@ public class NiFiProperties extends Properties { } // get port port = getSslPort(); + + if (port == null) { + throw new RuntimeException(String.format("The %s must be specified if running in a cluster with %s set to true.", WEB_HTTPS_PORT, CLUSTER_PROTOCOL_IS_SECURE)); + } } return InetSocketAddress.createUnresolved(host, port); @@ -824,8 +831,7 @@ public class NiFiProperties extends Properties { * configured. No directories will be created as a result of this operation. * * @return database repository path - * @throws InvalidPathException - * If the configured path is invalid + * @throws InvalidPathException If the configured path is invalid */ public Path getDatabaseRepositoryPath() { return Paths.get(getProperty(REPOSITORY_DATABASE_DIRECTORY)); @@ -836,8 +842,7 @@ public class NiFiProperties extends Properties { * configured. No directories will be created as a result of this operation. * * @return database repository path - * @throws InvalidPathException - * If the configured path is invalid + * @throws InvalidPathException If the configured path is invalid */ public Path getFlowFileRepositoryPath() { return Paths.get(getProperty(FLOWFILE_REPOSITORY_DIRECTORY)); @@ -850,8 +855,7 @@ public class NiFiProperties extends Properties { * operation. * * @return file repositories paths - * @throws InvalidPathException - * If any of the configured paths are invalid + * @throws InvalidPathException If any of the configured paths are invalid */ public Map getContentRepositoryPaths() { final Map contentRepositoryPaths = new HashMap<>(); From 1faca8c93ea617a80bbeaa7ebd318b477f4ebd07 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 23 Jun 2015 09:53:40 -0400 Subject: [PATCH 12/18] NIFI-718: Add links to /etc/rc2.d when installing nifi as a linux service --- .../nifi-resources/src/main/resources/bin/nifi.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh index 6d3191d9a7..8caf55e84d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh @@ -140,6 +140,10 @@ install() { cp $0 $SVC_FILE sed -i s:NIFI_HOME=.*:NIFI_HOME="$NIFI_HOME": $SVC_FILE sed -i s:PROGNAME=.*:PROGNAME=$(basename "$0"): $SVC_FILE + rm -f /etc/rc2.d/S65${SVC_NAME} + ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/S65${SVC_NAME} + rm -f /etc/rc2.d/K65${SVC_NAME} + ln -s /etc/init.d/$SVC_NAME /etc/rc2.d/K65${SVC_NAME} echo Service $SVC_NAME installed } From c45dcf0b52b6a856bc238a54adf45addc78651e8 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 23 Jun 2015 10:24:58 -0400 Subject: [PATCH 13/18] This closes #60 From a896510d7f0c60184ac37159f39c2f2fbc11208f Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Tue, 23 Jun 2015 14:09:45 -0400 Subject: [PATCH 14/18] NIFI-565: - Ensuring tooltips are properly disposed. Without proper cleanup the bulletins were not being initialized correctly, when new bulletins would arise. - Increasing the max width of tooltips. --- .../nifi-web/nifi-web-ui/src/main/webapp/css/main.css | 1 + .../nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js | 2 +- .../src/main/webapp/js/nf/canvas/nf-controller-service.js | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css index e161efc14c..70916c6c8f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/main.css @@ -115,6 +115,7 @@ div.nifi-tooltip { border: 1px solid #454545; background-color: #FFFFA3; color: #454545; + max-width: 500px; } .ellipsis { diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js index ca45a3dbe9..59f38a5ec3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js @@ -811,7 +811,7 @@ nf.Canvas = (function () { // bulletins for this processor are now gone if (bulletins.length === 0) { if (bulletinIcon.data('qtip')) { - bulletinIcon.removeClass('has-bulletins').qtip('destroy'); + bulletinIcon.removeClass('has-bulletins').qtip('api').destroy(true); } // hide the icon diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js index e8a111d419..65c6dfa1fd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js @@ -264,7 +264,7 @@ nf.ControllerService = (function () { }, nf.CanvasUtils.config.systemTooltipConfig)); } } else if (icon.data('qtip')) { - icon.qtip('destroy'); + icon.qtip('api').destroy(true); } return state; }); @@ -294,7 +294,7 @@ nf.ControllerService = (function () { }, nf.CanvasUtils.config.systemTooltipConfig)); } } else if (icon.data('qtip')) { - icon.qtip('destroy'); + icon.qtip('api').destroy(true); } return state; }); From 3a05a7da721d70767ab3ad9a6827a10e2d318725 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Tue, 23 Jun 2015 16:15:33 -0400 Subject: [PATCH 15/18] NIFI-683: - Add explicit handling of errors that occur during the initial page loading. --- .../src/main/webapp/js/nf/nf-common.js | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js index 2a5273ddfc..fa43b290bd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js @@ -184,6 +184,23 @@ nf.Common = { * @argument {string} error The error */ handleAjaxError: function (xhr, status, error) { + // if an error occurs while the splash screen is visible close the canvas show the error message + if ($('#splash').is(':visible')) { + $('#message-title').text('An unexcepted error has occurred'); + if ($.trim(xhr.responseText) === '') { + $('#message-content').text('Please check the logs.'); + } else { + $('#message-content').text(xhr.responseText); + } + + // show the error pane + $('#message-pane').show(); + + // close the canvas + nf.Common.closeCanvas(); + return; + } + // status code 400, 404, and 409 are expected response codes for common errors. if (xhr.status === 400 || xhr.status === 404 || xhr.status === 409) { nf.Dialog.showOkDialog({ From 0c5b78f03f9fa2a91a52347114a073a8c6ee59e6 Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Tue, 23 Jun 2015 19:59:00 -0400 Subject: [PATCH 16/18] NIFI-721: Correcting typos in capability description and dynamic property of ExtractText. --- .../java/org/apache/nifi/processors/standard/ExtractText.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java index 1bcd3bfea0..5a386a628b 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractText.java @@ -60,7 +60,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; + "Regular Expressions are entered by adding user-defined properties; " + "the name of the property maps to the Attribute Name into which the result will be placed. " + "The first capture group, if any found, will be placed into that attribute name." - + "But all catpure groups, including the matching string sequence itself will also be " + + "But all capture groups, including the matching string sequence itself will also be " + "provided at that attribute name with an index value provided." + "The value of the property must be a valid Regular Expressions with one or more capturing groups. " + "If the Regular Expression matches more than once, only the first match will be used. " @@ -69,7 +69,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; + "and no attributes will be applied to the FlowFile.") @DynamicProperty(name = "A FlowFile attribute", value = "A Regular Expression with one or more capturing group", description = "The first capture group, if any found, will be placed into that attribute name." - + "But all catpure groups, including the matching string sequence itself will also be " + + "But all capture groups, including the matching string sequence itself will also be " + "provided at that attribute name with an index value provided.") public class ExtractText extends AbstractProcessor { From 0df351dcb47432dd0cacf31a2ed5e5765aa96f03 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Wed, 24 Jun 2015 09:13:29 -0400 Subject: [PATCH 17/18] NIFI-683: - Fixing typo. --- .../nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js index fa43b290bd..110d383057 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js @@ -186,7 +186,7 @@ nf.Common = { handleAjaxError: function (xhr, status, error) { // if an error occurs while the splash screen is visible close the canvas show the error message if ($('#splash').is(':visible')) { - $('#message-title').text('An unexcepted error has occurred'); + $('#message-title').text('An unexpected error has occurred'); if ($.trim(xhr.responseText) === '') { $('#message-content').text('Please check the logs.'); } else { From f58972e566448886cdf984e6c8a34c83674b0705 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 Jun 2015 09:46:16 -0400 Subject: [PATCH 18/18] NIFI-728: Allow Mock Framework to use property descriptors from subclasses that are created for unit testing --- .../nifi/util/MockConfigurationContext.java | 18 +++++++++++++++++- .../nifi/util/StandardProcessorTestRunner.java | 4 ++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java index 61af49da42..742f03be9c 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java @@ -22,14 +22,21 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; public class MockConfigurationContext implements ConfigurationContext { private final Map properties; private final ControllerServiceLookup serviceLookup; + private final ControllerService service; public MockConfigurationContext(final Map properties, final ControllerServiceLookup serviceLookup) { + this(null, properties, serviceLookup); + } + + public MockConfigurationContext(final ControllerService service, final Map properties, final ControllerServiceLookup serviceLookup) { + this.service = service; this.properties = properties; this.serviceLookup = serviceLookup; } @@ -38,7 +45,7 @@ public class MockConfigurationContext implements ConfigurationContext { public PropertyValue getProperty(final PropertyDescriptor property) { String value = properties.get(property); if (value == null) { - value = property.getDefaultValue(); + value = getActualDescriptor(property).getDefaultValue(); } return new MockPropertyValue(value, serviceLookup); } @@ -47,4 +54,13 @@ public class MockConfigurationContext implements ConfigurationContext { public Map getProperties() { return new HashMap<>(this.properties); } + + private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) { + if (service == null) { + return property; + } + + final PropertyDescriptor resolved = service.getPropertyDescriptor(property.getName()); + return resolved == null ? property : resolved; + } } diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 89385477d7..048e2b92c3 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -215,7 +215,7 @@ public class StandardProcessorTestRunner implements TestRunner { executorService.shutdown(); try { executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS); - } catch (InterruptedException e1) { + } catch (final InterruptedException e1) { } int finishedCount = 0; @@ -609,7 +609,7 @@ public class StandardProcessorTestRunner implements TestRunner { } try { - final ConfigurationContext configContext = new MockConfigurationContext(configuration.getProperties(), context); + final ConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context); ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); } catch (final InvocationTargetException ite) { ite.getCause().printStackTrace();