NIFI-733: Make use of Client Name, Zookeeper Timeout, Kafka Timeout properties and add new Group ID property

This commit is contained in:
Mark Payne 2015-07-03 09:04:52 -04:00
parent 063afe2a0d
commit 786bc1d612
1 changed files with 22 additions and 6 deletions

View File

@ -129,6 +129,7 @@ public class GetKafka extends AbstractProcessor {
.expressionLanguageSupported(false)
.defaultValue("\\n")
.build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
@ -136,6 +137,13 @@ public class GetKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name("Group ID")
.description("A Group ID is used to identify consumers that are within the same consumer group")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -152,9 +160,13 @@ public class GetKafka extends AbstractProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier())
.build();
.fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier())
.build();
final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(GROUP_ID)
.defaultValue(getIdentifier())
.build();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_CONNECTION_STRING);
@ -163,6 +175,7 @@ public class GetKafka extends AbstractProcessor {
props.add(BATCH_SIZE);
props.add(MESSAGE_DEMARCATOR);
props.add(clientNameWithDefault);
props.add(groupIdWithDefault);
props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT);
return props;
@ -184,10 +197,13 @@ public class GetKafka extends AbstractProcessor {
final Properties props = new Properties();
props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
props.setProperty("group.id", getIdentifier());
props.setProperty("group.id", context.getProperty(GROUP_ID).getValue());
props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.commit.enable", "true"); // just be explicit
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
@ -236,7 +252,7 @@ public class GetKafka extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if (iterator == null) {
return;
}
@ -293,7 +309,7 @@ public class GetKafka extends AbstractProcessor {
}
// add the message to the FlowFile's contents
final boolean firstMessage = (msgCount == 0);
final boolean firstMessage = msgCount == 0;
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {