diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index ab0618b4a2..616c6f3acf 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -40,8 +39,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -97,11 +94,20 @@ public class PutKafka extends AbstractKafkaProcessor { public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + */ static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + "the next Partition to Partition 2, and so on, wrapping as necessary."); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + */ static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", "Messages will be assigned to random partitions."); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. To specify partition, simply configure the 'partition' property. + */ static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", "The property will be used to determine the partition. All messages within the same FlowFile will be " + "assigned to the same partition."); @@ -120,19 +126,22 @@ public class PutKafka extends AbstractKafkaProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + /** + * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. + * This property is still valid as a dynamic property, so that existing processor configuration can stay valid. + */ static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() .name("Partition Strategy") - .description("Specifies how messages should be partitioned when sent to Kafka") + .description("Deprecated. Used to specify how messages should be partitioned when sent to Kafka, but it's no longer used.") .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) - .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) - .required(true) + .dynamic(true) .build(); public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() .name("Partition") .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages " + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " - + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + + "then the FlowFile will be routed to failure relationship.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(true) .required(false) .build(); @@ -247,7 +256,6 @@ public class PutKafka extends AbstractKafkaProcessor { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(SEED_BROKERS); _propertyDescriptors.add(TOPIC); - _propertyDescriptors.add(PARTITION_STRATEGY); _propertyDescriptors.add(PARTITION); _propertyDescriptors.add(KEY); _propertyDescriptors.add(DELIVERY_GUARANTEE); @@ -310,7 +318,14 @@ public class PutKafka extends AbstractKafkaProcessor { @Override public void process(InputStream contentStream) throws IOException { PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext); + KafkaPublisherResult result = null; + try { + result = PutKafka.this.kafkaResource.publish(publishingContext); + } catch (final IllegalArgumentException e) { + getLogger().error("Failed to publish {}, due to {}", new Object[]{flowFile, e}, e); + result = new KafkaPublisherResult(0, -1); + + } publishResultRef.set(result); } }); @@ -399,26 +414,16 @@ public class PutKafka extends AbstractKafkaProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) { + return PARTITION_STRATEGY; + } + return new PropertyDescriptor.Builder() .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) .build(); } - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - - final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) - && !validationContext.getProperty(PARTITION).isSet()) { - results.add(new ValidationResult.Builder().subject("Partition").valid(false) - .explanation("The property must be set when configured to use the User-Defined Partitioning Strategy") - .build()); - } - return results; - } - /** * */ @@ -439,15 +444,11 @@ public class PutKafka extends AbstractKafkaProcessor { * */ private Integer determinePartition(ProcessContext context, FlowFile flowFile) { - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - Integer partitionValue = null; - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { - String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); - if (pv != null){ - partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); - } + String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + if (pv != null){ + return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); } - return partitionValue; + return null; } /** @@ -493,19 +494,13 @@ public class PutKafka extends AbstractKafkaProcessor { properties.setProperty("timeout.ms", timeout); properties.setProperty("metadata.fetch.timeout.ms", timeout); - String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - String partitionerClass = null; - if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); - } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { - partitionerClass = Partitioners.RandomPartitioner.class.getName(); - } - properties.setProperty("partitioner.class", partitionerClass); - // Set Dynamic Properties for (final Entry entry : context.getProperties().entrySet()) { PropertyDescriptor descriptor = entry.getKey(); if (descriptor.isDynamic()) { + if (PARTITION_STRATEGY.equals(descriptor)) { + continue; + } if (properties.containsKey(descriptor.getName())) { this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" + properties.getProperty(descriptor.getName()) + "' with dynamically set value '" diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java index 8437b00b55..77b2bb919b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; @@ -210,6 +211,62 @@ public class PutKafkaTest { runner.shutdown(); } + @Test + public void validateDeprecatedPartitionStrategy() { + String topicName = "validateDeprecatedPartitionStrategy"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + + // Old configuration using deprecated property still work. + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${partition}"); + + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("partition", "0"); + runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1, false); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8)); + assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8)); + + runner.shutdown(); + } + + @Test + public void validatePartitionOutOfBounds() { + String topicName = "validatePartitionOutOfBounds"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + runner.setProperty(PutKafka.PARTITION, "${partition}"); + + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("partition", "123"); + runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1, false); + + assertTrue("Error message should be logged", runner.getLogger().getErrorMessages().size() > 0); + runner.assertTransferCount(PutKafka.REL_SUCCESS, 0); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + runner.shutdown(); + } + private ConsumerIterator buildConsumer(String topic) { Properties props = new Properties(); props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());