From 34e5a5321ab5e5d02e22932f115be982b3b5b304 Mon Sep 17 00:00:00 2001 From: Joe N Date: Tue, 27 Sep 2016 17:32:39 -0400 Subject: [PATCH] NIFI-2826 Adding enqueue time to GetAzureEventHub processor Signed-off-by: jpercivall --- .../nifi/processor/util/StandardValidators.java | 15 ++++++++++++++- .../processor/util/TestStandardValidators.java | 14 ++++++++++++++ .../azure/eventhub/GetAzureEventHub.java | 17 ++++++++++++++++- .../azure/eventhub/GetAzureEventHubTest.java | 4 +++- 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index fdd341dba8..de1e57f591 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; +import java.time.Instant; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -201,6 +202,19 @@ public class StandardValidators { } }; + public static final Validator ISO8061_INSTANT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + + try { + Instant.parse(input); + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid ISO8061 Instant Date").valid(true).build(); + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid ISO8061 Instant Date, please enter in UTC time").valid(false).build(); + } + } + }; + public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() { @Override public ValidationResult validate(final String subject, final String value, final ValidationContext context) { @@ -774,5 +788,4 @@ public class StandardValidators { return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); } } - } diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java index 0f627ef67c..0c2ccdb96f 100644 --- a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java @@ -218,4 +218,18 @@ public class TestStandardValidators { assertFalse(vr.isValid()); } + + @Test + public void testiso8061InstantValidator() { + Validator val = StandardValidators.ISO8061_INSTANT_VALIDATOR; + ValidationContext vc = mock(ValidationContext.class); + ValidationResult vr = val.validate("foo", "", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "2016-01-01T01:01:01.000-0100", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "2016-01-01T01:01:01.000Z", vc); + assertTrue(vr.isValid()); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 97f1c92c3d..0455fe92ee 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -113,6 +113,14 @@ public class GetAzureEventHub extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder() + .name("Event Hub Message Enqueue Time") + .description("A timestamp (ISO-8061 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages " + + "should have been enqueued in the EventHub to start reading from") + .addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -121,6 +129,7 @@ public class GetAzureEventHub extends AbstractProcessor { private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + private volatile Instant configuredEnqueueTime; private EventHubClient eventHubClient; private final static List propertyDescriptors; @@ -138,6 +147,7 @@ public class GetAzureEventHub extends AbstractProcessor { _propertyDescriptors.add(POLICY_PRIMARY_KEY); _propertyDescriptors.add(NUM_PARTITIONS); _propertyDescriptors.add(CONSUMER_GROUP); + _propertyDescriptors.add(ENQUEUE_TIME); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -189,7 +199,7 @@ public class GetAzureEventHub extends AbstractProcessor { final PartitionReceiver receiver = eventHubClient.createReceiver( consumerGroupName, partitionId, - Instant.now()).get(); + configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get(); partitionToReceiverMap.put(partitionId, receiver); return receiver; @@ -249,6 +259,11 @@ public class GetAzureEventHub extends AbstractProcessor { final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + if(context.getProperty(ENQUEUE_TIME).isSet()) { + configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString()); + } else { + configuredEnqueueTime = null; + } final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); setupReceiver(connectionString); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index 81bbf92d6a..a63458fd5b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -41,7 +41,6 @@ public class GetAzureEventHubTest { private static final String sasKeyName = "bogus-policy"; private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!"; - private TestRunner testRunner; private MockGetAzureEventHub processor; @@ -63,7 +62,10 @@ public class GetAzureEventHubTest { testRunner.assertNotValid(); testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4"); testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z"); + testRunner.assertValid(); } + @Test public void verifyRelationships(){