diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index e10977b1f0..e644064f5e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -65,13 +65,18 @@ import kafka.message.MessageAndMetadata; @CapabilityDescription("Fetches messages from Apache Kafka") @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) @WritesAttributes({ - @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), - @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" - + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), - @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), - @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) + @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), + @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" + + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), + @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), + @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1"), + @WritesAttribute(attribute = "auto.offset.reset", description = "If this is set to largest, the consumer may lose some messages when the number of partitions, " + + "for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest")}) public class GetKafka extends AbstractProcessor { + public static final String SMALLEST = "smallest"; + public static final String LARGEST = "largest"; + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() .name("ZooKeeper Connection String") .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of :" @@ -141,12 +146,20 @@ public class GetKafka extends AbstractProcessor { .expressionLanguageSupported(false) .build(); public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() - .name("Group ID") - .description("A Group ID is used to identify consumers that are within the same consumer group") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name("Auto Offset Reset") + .description("Auto Offset Reset indicator") + .required(true) + .allowableValues(SMALLEST, LARGEST) + .defaultValue(LARGEST) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -163,13 +176,13 @@ public class GetKafka extends AbstractProcessor { @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(GROUP_ID) - .defaultValue(getIdentifier()) - .build(); + .fromPropertyDescriptor(GROUP_ID) + .defaultValue(getIdentifier()) + .build(); final List props = new ArrayList<>(); props.add(ZOOKEEPER_CONNECTION_STRING); @@ -181,6 +194,7 @@ public class GetKafka extends AbstractProcessor { props.add(groupIdWithDefault); props.add(KAFKA_TIMEOUT); props.add(ZOOKEEPER_TIMEOUT); + props.add(AUTO_OFFSET_RESET); return props; } @@ -204,7 +218,7 @@ public class GetKafka extends AbstractProcessor { props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); props.setProperty("auto.commit.enable", "true"); // just be explicit - props.setProperty("auto.offset.reset", "smallest"); + props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); @@ -356,4 +370,4 @@ public class GetKafka extends AbstractProcessor { } } -} +} \ No newline at end of file