NIFI-3363: PutKafka NPE with User-Defined partition

- Marked PutKafka Partition Strategy property as deprecated, as Kafka 0.8 client doesn't use 'partitioner.class' as producer property, we don't have to specify it.
- Changed Partition Strategy property from a required one to a dynamic property, so that existing processor config can stay in valid state.
- Fixed partition property to work.
- Route a flow file if it failed to be published due to invalid partition.

This closes #1425
This commit is contained in:
Koji Kawamura 2017-01-18 16:48:09 +09:00 committed by Oleg Zhurakousky
parent f8f66fa22b
commit 63c763885c
2 changed files with 93 additions and 41 deletions

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -40,8 +39,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@ -97,11 +94,20 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy",
"Compress messages using Snappy");
/**
* @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
*/
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
"Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
/**
* @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
*/
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
"Messages will be assigned to random partitions.");
/**
* @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. To specify partition, simply configure the 'partition' property.
*/
static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
"The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be "
+ "assigned to the same partition.");
@ -120,19 +126,22 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
/**
* @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
* This property is still valid as a dynamic property, so that existing processor configuration can stay valid.
*/
static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
.name("Partition Strategy")
.description("Specifies how messages should be partitioned when sent to Kafka")
.description("Deprecated. Used to specify how messages should be partitioned when sent to Kafka, but it's no longer used.")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
.defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
.required(true)
.dynamic(true)
.build();
public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("Partition")
.description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages "
+ "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, "
+ "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ "then the FlowFile will be routed to failure relationship.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
@ -247,7 +256,6 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(SEED_BROKERS);
_propertyDescriptors.add(TOPIC);
_propertyDescriptors.add(PARTITION_STRATEGY);
_propertyDescriptors.add(PARTITION);
_propertyDescriptors.add(KEY);
_propertyDescriptors.add(DELIVERY_GUARANTEE);
@ -310,7 +318,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
public void process(InputStream contentStream) throws IOException {
PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext);
KafkaPublisherResult result = null;
try {
result = PutKafka.this.kafkaResource.publish(publishingContext);
} catch (final IllegalArgumentException e) {
getLogger().error("Failed to publish {}, due to {}", new Object[]{flowFile, e}, e);
result = new KafkaPublisherResult(0, -1);
}
publishResultRef.set(result);
}
});
@ -399,26 +414,16 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) {
return PARTITION_STRATEGY;
}
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
&& !validationContext.getProperty(PARTITION).isSet()) {
results.add(new ValidationResult.Builder().subject("Partition").valid(false)
.explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy")
.build());
}
return results;
}
/**
*
*/
@ -439,15 +444,11 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
*
*/
private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
Integer partitionValue = null;
if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
if (pv != null){
partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
}
}
return partitionValue;
return null;
}
/**
@ -493,19 +494,13 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
properties.setProperty("timeout.ms", timeout);
properties.setProperty("metadata.fetch.timeout.ms", timeout);
String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
String partitionerClass = null;
if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
} else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
partitionerClass = Partitioners.RandomPartitioner.class.getName();
}
properties.setProperty("partitioner.class", partitionerClass);
// Set Dynamic Properties
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
if (PARTITION_STRATEGY.equals(descriptor)) {
continue;
}
if (properties.containsKey(descriptor.getName())) {
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+ properties.getProperty(descriptor.getName()) + "' with dynamically set value '"

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
@ -210,6 +211,62 @@ public class PutKafkaTest {
runner.shutdown();
}
@Test
public void validateDeprecatedPartitionStrategy() {
String topicName = "validateDeprecatedPartitionStrategy";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
// Old configuration using deprecated property still work.
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${partition}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("partition", "0");
runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1, false);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
runner.shutdown();
}
@Test
public void validatePartitionOutOfBounds() {
String topicName = "validatePartitionOutOfBounds";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PutKafka.TOPIC, topicName);
runner.setProperty(PutKafka.CLIENT_NAME, "foo");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.setProperty(PutKafka.PARTITION, "${partition}");
runner.assertValid();
final Map<String, String> attributes = new HashMap<>();
attributes.put("partition", "123");
runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1, false);
assertTrue("Error message should be logged", runner.getLogger().getErrorMessages().size() > 0);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 0);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
runner.shutdown();
}
private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());