NIFI-1684 This closes #302. fixed random partitioner initialization

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-03-26 10:47:13 -04:00 committed by joewitt
parent 1292581ec8
commit 9912f18de5
1 changed files with 1 additions and 3 deletions

View File

@ -55,8 +55,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import kafka.producer.DefaultPartitioner;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
@ -454,7 +452,7 @@ public class PutKafka extends AbstractProcessor {
if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
} else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
partitionerClass = DefaultPartitioner.class.getName(); partitionerClass = Partitioners.RandomPartitioner.class.getName();
} }
properties.setProperty("partitioner.class", partitionerClass); properties.setProperty("partitioner.class", partitionerClass);