diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 12ea1bae67..69d558640b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -40,6 +40,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -81,6 +83,15 @@ public class GetAzureEventHub extends AbstractProcessor { .expressionLanguageSupported(false) .required(true) .build(); + static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() + .name("Service Bus Endpoint") + .description("To support Namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn") + .defaultValue(".servicebus.windows.net") + .required(true) + .build(); static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") @@ -159,6 +170,7 @@ public class GetAzureEventHub extends AbstractProcessor { static { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(EVENT_HUB_NAME); + _propertyDescriptors.add(SERVICE_BUS_ENDPOINT); _propertyDescriptors.add(NAMESPACE); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); @@ -268,7 +280,7 @@ public class GetAzureEventHub extends AbstractProcessor { } @OnScheduled - public void onScheduled(final ProcessContext context) throws ProcessException { + public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException { final BlockingQueue partitionNames = new LinkedBlockingQueue<>(); for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { partitionNames.add(String.valueOf(i)); @@ -279,6 +291,9 @@ public class GetAzureEventHub extends AbstractProcessor { final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + + if(context.getProperty(ENQUEUE_TIME).isSet()) { configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString()); @@ -296,7 +311,7 @@ public class GetAzureEventHub extends AbstractProcessor { receiverFetchTimeout = null; } - final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); + final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString(); setupReceiver(connectionString); } @@ -346,7 +361,8 @@ public class GetAzureEventHub extends AbstractProcessor { final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); - final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + final String serviceBusEndPoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final String transitUri = "amqps://" + namespace + serviceBusEndPoint + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } }