NIFI-2826 Adding enqueue time to GetAzureEventHub processor

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Joe N 2016-09-27 17:32:39 -04:00 committed by jpercivall
parent bec7dbbad6
commit 34e5a5321a
4 changed files with 47 additions and 3 deletions

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -201,6 +202,19 @@ public class StandardValidators {
}
};
public static final Validator ISO8061_INSTANT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
try {
Instant.parse(input);
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid ISO8061 Instant Date").valid(true).build();
} catch (final Exception e) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid ISO8061 Instant Date, please enter in UTC time").valid(false).build();
}
}
};
public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
@ -774,5 +788,4 @@ public class StandardValidators {
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
}
}
}

View File

@ -218,4 +218,18 @@ public class TestStandardValidators {
assertFalse(vr.isValid());
}
@Test
public void testiso8061InstantValidator() {
Validator val = StandardValidators.ISO8061_INSTANT_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
ValidationResult vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "2016-01-01T01:01:01.000-0100", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "2016-01-01T01:01:01.000Z", vc);
assertTrue(vr.isValid());
}
}

View File

@ -113,6 +113,14 @@ public class GetAzureEventHub extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder()
.name("Event Hub Message Enqueue Time")
.description("A timestamp (ISO-8061 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
+ "should have been enqueued in the EventHub to start reading from")
.addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR)
.expressionLanguageSupported(false)
.required(false)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -121,6 +129,7 @@ public class GetAzureEventHub extends AbstractProcessor {
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private volatile Instant configuredEnqueueTime;
private EventHubClient eventHubClient;
private final static List<PropertyDescriptor> propertyDescriptors;
@ -138,6 +147,7 @@ public class GetAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
_propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP);
_propertyDescriptors.add(ENQUEUE_TIME);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@ -189,7 +199,7 @@ public class GetAzureEventHub extends AbstractProcessor {
final PartitionReceiver receiver = eventHubClient.createReceiver(
consumerGroupName,
partitionId,
Instant.now()).get();
configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get();
partitionToReceiverMap.put(partitionId, receiver);
return receiver;
@ -249,6 +259,11 @@ public class GetAzureEventHub extends AbstractProcessor {
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
if(context.getProperty(ENQUEUE_TIME).isSet()) {
configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
} else {
configuredEnqueueTime = null;
}
final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
setupReceiver(connectionString);

View File

@ -41,7 +41,6 @@ public class GetAzureEventHubTest {
private static final String sasKeyName = "bogus-policy";
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
private TestRunner testRunner;
private MockGetAzureEventHub processor;
@ -63,7 +62,10 @@ public class GetAzureEventHubTest {
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
testRunner.assertValid();
}
@Test
public void verifyRelationships(){