NIFI-1146 Allow GetKafka to be configured with auto.offset.reset to largest or smallest

This commit is contained in:
Naveen Madhire 2015-11-16 11:59:52 -06:00
parent 64369f67f9
commit b954ca620e
1 changed files with 33 additions and 19 deletions

View File

@ -65,13 +65,18 @@ import kafka.message.MessageAndMetadata;
@CapabilityDescription("Fetches messages from Apache Kafka") @CapabilityDescription("Fetches messages from Apache Kafka")
@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"),
@WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If"
+ " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
@WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
@WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1"),
@WritesAttribute(attribute = "auto.offset.reset", description = "If this is set to largest, the consumer may lose some messages when the number of partitions, " +
"for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest")})
public class GetKafka extends AbstractProcessor { public class GetKafka extends AbstractProcessor {
public static final String SMALLEST = "smallest";
public static final String LARGEST = "largest";
public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
.name("ZooKeeper Connection String") .name("ZooKeeper Connection String")
.description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>" .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>"
@ -141,12 +146,20 @@ public class GetKafka extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name("Group ID") .name("Group ID")
.description("A Group ID is used to identify consumers that are within the same consumer group") .description("A Group ID is used to identify consumers that are within the same consumer group")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
.name("Auto Offset Reset")
.description("Auto Offset Reset indicator")
.required(true)
.allowableValues(SMALLEST, LARGEST)
.defaultValue(LARGEST)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -163,13 +176,13 @@ public class GetKafka extends AbstractProcessor {
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME) .fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier()) .defaultValue("NiFi-" + getIdentifier())
.build(); .build();
final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(GROUP_ID) .fromPropertyDescriptor(GROUP_ID)
.defaultValue(getIdentifier()) .defaultValue(getIdentifier())
.build(); .build();
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_CONNECTION_STRING); props.add(ZOOKEEPER_CONNECTION_STRING);
@ -181,6 +194,7 @@ public class GetKafka extends AbstractProcessor {
props.add(groupIdWithDefault); props.add(groupIdWithDefault);
props.add(KAFKA_TIMEOUT); props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT); props.add(ZOOKEEPER_TIMEOUT);
props.add(AUTO_OFFSET_RESET);
return props; return props;
} }
@ -204,7 +218,7 @@ public class GetKafka extends AbstractProcessor {
props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.commit.enable", "true"); // just be explicit props.setProperty("auto.commit.enable", "true"); // just be explicit
props.setProperty("auto.offset.reset", "smallest"); props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
@ -356,4 +370,4 @@ public class GetKafka extends AbstractProcessor {
} }
} }
} }