NIFI-3640 uri eventhub changes

This closes #1617
This commit is contained in:
Joseph Niemiec 2017-03-23 11:19:56 -04:00 committed by Jeff Storck
parent 4d0667380a
commit 44fdc0e4ef
1 changed files with 19 additions and 3 deletions

View File

@ -40,6 +40,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@ -81,6 +83,15 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(false)
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
.name("Service Bus Endpoint")
.description("To support Namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn")
.defaultValue(".servicebus.windows.net")
.required(true)
.build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
@ -159,6 +170,7 @@ public class GetAzureEventHub extends AbstractProcessor {
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(EVENT_HUB_NAME);
_propertyDescriptors.add(SERVICE_BUS_ENDPOINT);
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
@ -268,7 +280,7 @@ public class GetAzureEventHub extends AbstractProcessor {
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws ProcessException {
public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException {
final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
partitionNames.add(String.valueOf(i));
@ -279,6 +291,9 @@ public class GetAzureEventHub extends AbstractProcessor {
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
if(context.getProperty(ENQUEUE_TIME).isSet()) {
configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
@ -296,7 +311,7 @@ public class GetAzureEventHub extends AbstractProcessor {
receiverFetchTimeout = null;
}
final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString();
setupReceiver(connectionString);
}
@ -346,7 +361,8 @@ public class GetAzureEventHub extends AbstractProcessor {
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
final String serviceBusEndPoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String transitUri = "amqps://" + namespace + serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
}