From 786bc1d61260e2d8558747ca206d360aebdd1994 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 3 Jul 2015 09:04:52 -0400 Subject: [PATCH] NIFI-733: Make use of Client Name, Zookeeper Timeout, Kafka Timeout properties and add new Group ID property --- .../nifi/processors/kafka/GetKafka.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 1b63a465cf..26590dfbad 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -129,6 +129,7 @@ public class GetKafka extends AbstractProcessor { .expressionLanguageSupported(false) .defaultValue("\\n") .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() .name("Client Name") .description("Client Name to use when communicating with Kafka") @@ -136,6 +137,13 @@ public class GetKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -152,9 +160,13 @@ public class GetKafka extends AbstractProcessor { @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(GROUP_ID) + .defaultValue(getIdentifier()) + .build(); final List props = new ArrayList<>(); props.add(ZOOKEEPER_CONNECTION_STRING); @@ -163,6 +175,7 @@ public class GetKafka extends AbstractProcessor { props.add(BATCH_SIZE); props.add(MESSAGE_DEMARCATOR); props.add(clientNameWithDefault); + props.add(groupIdWithDefault); props.add(KAFKA_TIMEOUT); props.add(ZOOKEEPER_TIMEOUT); return props; @@ -184,10 +197,13 @@ public class GetKafka extends AbstractProcessor { final Properties props = new Properties(); props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); - props.setProperty("group.id", getIdentifier()); + props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); + props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); props.setProperty("auto.commit.enable", "true"); // just be explicit props.setProperty("auto.offset.reset", "smallest"); + props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); + props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = Consumer.createJavaConsumerConnector(consumerConfig); @@ -236,7 +252,7 @@ public class GetKafka extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - ConsumerIterator iterator = getStreamIterator(); + final ConsumerIterator iterator = getStreamIterator(); if (iterator == null) { return; } @@ -293,7 +309,7 @@ public class GetKafka extends AbstractProcessor { } // add the message to the FlowFile's contents - final boolean firstMessage = (msgCount == 0); + final boolean firstMessage = msgCount == 0; flowFile = session.append(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException {