NIFI-3854 This closes #1773. Expand expression language support for Kafka processors

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Tim Reardon 2017-05-09 12:59:35 -04:00 committed by joewitt
parent 9294a26139
commit 58ce52d5d6
8 changed files with 17 additions and 25 deletions

View File

@ -253,7 +253,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
final String topicType = context.getProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue(); final String topicType = context.getProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>(); final List<String> topics = new ArrayList<>();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

View File

@ -248,7 +248,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
final List<String> topics = new ArrayList<>(); final List<String> topics = new ArrayList<>();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
if (topicType.equals(TOPIC_NAME.getValue())) { if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) { for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim(); final String trimmedName = topic.trim();

View File

@ -114,7 +114,6 @@ public class ConsumeKafkaTest {
} }
}; };
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
@ -145,7 +144,6 @@ public class ConsumeKafkaTest {
} }
}; };
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
@ -177,7 +175,6 @@ public class ConsumeKafkaTest {
} }
}; };
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);

View File

@ -132,7 +132,6 @@ public class TestConsumeKafkaRecord_0_10 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE); when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
@ -155,7 +154,6 @@ public class TestConsumeKafkaRecord_0_10 {
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE); when(mockLease.commit()).thenReturn(Boolean.TRUE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)"); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern"); runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
@ -179,7 +177,6 @@ public class TestConsumeKafkaRecord_0_10 {
when(mockLease.continuePolling()).thenReturn(true, false); when(mockLease.continuePolling()).thenReturn(true, false);
when(mockLease.commit()).thenReturn(Boolean.FALSE); when(mockLease.commit()).thenReturn(Boolean.FALSE);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);

View File

@ -93,14 +93,14 @@ public class GetKafka extends AbstractProcessor {
+ " combinations. For example, host1:2181,host2:2181,host3:2188") + " combinations. For example, host1:2181,host2:2181,host3:2188")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name") .name("Topic Name")
.description("The Kafka Topic to pull messages from") .description("The Kafka Topic to pull messages from")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
.name("Zookeeper Commit Frequency") .name("Zookeeper Commit Frequency")
@ -160,7 +160,7 @@ public class GetKafka extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group") .description("A Group ID is used to identify consumers that are within the same consumer group")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
@ -218,11 +218,11 @@ public class GetKafka extends AbstractProcessor {
} }
public void createConsumers(final ProcessContext context) { public void createConsumers(final ProcessContext context) {
final String topic = context.getProperty(TOPIC).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
final Properties props = new Properties(); final Properties props = new Properties();
props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue());
props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); props.setProperty("group.id", context.getProperty(GROUP_ID).evaluateAttributeExpressions().getValue());
props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
@ -257,7 +257,7 @@ public class GetKafka extends AbstractProcessor {
} }
int partitionCount = KafkaUtils.retrievePartitionCountForTopic( int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue(), context.getProperty(TOPIC).evaluateAttributeExpressions().getValue());
final ConsumerConfig consumerConfig = new ConsumerConfig(props); final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig); consumer = Consumer.createJavaConsumerConnector(consumerConfig);
@ -267,12 +267,12 @@ public class GetKafka extends AbstractProcessor {
int concurrentTaskToUse = context.getMaxConcurrentTasks(); int concurrentTaskToUse = context.getMaxConcurrentTasks();
if (context.getMaxConcurrentTasks() < partitionCount){ if (context.getMaxConcurrentTasks() < partitionCount){
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+ "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. "
+ "Consider making it equal to the amount of partition count for most efficient event consumption."); + "Consider making it equal to the amount of partition count for most efficient event consumption.");
} else if (context.getMaxConcurrentTasks() > partitionCount){ } else if (context.getMaxConcurrentTasks() > partitionCount){
concurrentTaskToUse = partitionCount; concurrentTaskToUse = partitionCount;
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+ "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. "
+ "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events");
} }
@ -400,7 +400,7 @@ public class GetKafka extends AbstractProcessor {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
final String topic = context.getProperty(TOPIC).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();

View File

@ -112,7 +112,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true) .required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name") .name("Topic Name")
@ -288,7 +288,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
flowFile = this.doRendezvousWithKafka(flowFile, context, session); flowFile = this.doRendezvousWithKafka(flowFile, context, session);
if (!this.isFailedFlowFile(flowFile)) { if (!this.isFailedFlowFile(flowFile)) {
session.getProvenanceReporter().send(flowFile, session.getProvenanceReporter().send(flowFile,
context.getProperty(SEED_BROKERS).getValue() + "/" context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/"
+ context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} else { } else {
@ -474,7 +474,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
private Properties buildKafkaConfigProperties(final ProcessContext context) { private Properties buildKafkaConfigProperties(final ProcessContext context) {
Properties properties = new Properties(); Properties properties = new Properties();
String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue()); properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).evaluateAttributeExpressions().getValue());
properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()));
properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue());

View File

@ -247,7 +247,7 @@ public class ConsumeKafka extends AbstractProcessor {
} }
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
} }

View File

@ -121,7 +121,6 @@ public class ConsumeKafkaTest {
} }
}; };
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName); runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
@ -152,7 +151,6 @@ public class ConsumeKafkaTest {
} }
}; };
final TestRunner runner = TestRunners.newTestRunner(proc); final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName); runner.setProperty(ConsumeKafka.GROUP_ID, groupName);