NIFI-2714 This closes #1549. Added regex support to ConsumeKafka_0_10

Enabled the ability to specify wildcard topics as a regular expression
as supported in the Kafka client library.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Jack Pickett 2017-03-01 09:43:32 -05:00 committed by joewitt
parent 778ba3957e
commit 4bfb905f37
4 changed files with 95 additions and 11 deletions

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -78,6 +79,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic") .name("topic")
.displayName("Topic Name(s)") .displayName("Topic Name(s)")
@ -87,6 +92,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
.name("topic_type")
.displayName("Topic Name Format")
.description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
.required(true)
.allowableValues(TOPIC_NAME, TOPIC_PATTERN)
.defaultValue(TOPIC_NAME.getValue())
.build();
static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name(ConsumerConfig.GROUP_ID_CONFIG) .name(ConsumerConfig.GROUP_ID_CONFIG)
.displayName("Group ID") .displayName("Group ID")
@ -166,6 +180,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
List<PropertyDescriptor> descriptors = new ArrayList<>(); List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
descriptors.add(TOPICS); descriptors.add(TOPICS);
descriptors.add(TOPIC_TYPE);
descriptors.add(GROUP_ID); descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET); descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(KEY_ATTRIBUTE_ENCODING);
@ -229,18 +244,26 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue(); final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>(); final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
}
}
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
if (topicType.equals(TOPIC_NAME.getValue())) {
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);
}
}
return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
}
} }
@OnUnscheduled @OnUnscheduled

View File

@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -40,6 +41,7 @@ public class ConsumerPool implements Closeable {
private final BlockingQueue<SimpleConsumerLease> pooledLeases; private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics; private final List<String> topics;
private final Pattern topicPattern;
private final Map<String, Object> kafkaProperties; private final Map<String, Object> kafkaProperties;
private final long maxWaitMillis; private final long maxWaitMillis;
private final ComponentLog logger; private final ComponentLog logger;
@ -74,7 +76,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool( public ConsumerPool(
final int maxConcurrentLeases, final int maxConcurrentLeases,
final byte[] demarcator, final byte[] demarcator,
final Map<String, Object> kafkaProperties, final Map<String, Object> kafkaProperties,
final List<String> topics, final List<String> topics,
final long maxWaitMillis, final long maxWaitMillis,
final String keyEncoding, final String keyEncoding,
@ -90,6 +92,29 @@ public class ConsumerPool implements Closeable {
this.bootstrapServers = bootstrapServers; this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = Collections.unmodifiableList(topics); this.topics = Collections.unmodifiableList(topics);
this.topicPattern = null;
}
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
final Map<String, Object> kafkaProperties,
final Pattern topics,
final long maxWaitMillis,
final String keyEncoding,
final String securityProtocol,
final String bootstrapServers,
final ComponentLog logger) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = demarcator;
this.keyEncoding = keyEncoding;
this.securityProtocol = securityProtocol;
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = null;
this.topicPattern = topics;
} }
/** /**
@ -119,7 +144,11 @@ public class ConsumerPool implements Closeable {
* This subscription tightly couples the lease to the given * This subscription tightly couples the lease to the given
* consumer. They cannot be separated from then on. * consumer. They cannot be separated from then on.
*/ */
consumer.subscribe(topics, lease); if (topics != null) {
consumer.subscribe(topics, lease);
} else {
consumer.subscribe(topicPattern, lease);
}
} }
lease.setProcessSession(session); lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet(); leasesObtainedCountRef.incrementAndGet();

View File

@ -133,6 +133,38 @@ public class ConsumeKafkaTest {
verifyNoMoreInteractions(mockLease); verifyNoMoreInteractions(mockLease);
} }
@Test
public void validateGetAllMessagesPattern() throws Exception {
String groupName = "validateGetAllMessagesPattern";
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
verify(mockLease, times(3)).continuePolling();
verify(mockLease, times(2)).poll();
verify(mockLease, times(1)).commit();
verify(mockLease, times(1)).close();
verifyNoMoreInteractions(mockConsumerPool);
verifyNoMoreInteractions(mockLease);
}
@Test @Test
public void validateGetErrorMessages() throws Exception { public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages"; String groupName = "validateGetErrorMessages";

View File

@ -54,5 +54,5 @@
<version>1.2.0-SNAPSHOT</version> <version>1.2.0-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>