From d382b378a8d6bc09ec252ce1b37cc494973c358a Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 5 Apr 2023 16:29:53 -0500 Subject: [PATCH] NIFI-11387 Added Transport Type property to Azure Event Hub Components - Transport Type defaults to AMQP and also supports AMQP with WebSockets Signed-off-by: Pierre Villard This closes #7126. --- .../azure/eventhub/ConsumeAzureEventHub.java | 7 ++- .../azure/eventhub/GetAzureEventHub.java | 7 ++- .../azure/eventhub/PutAzureEventHub.java | 7 ++- .../eventhub/AzureEventHubRecordSink.java | 12 +++-- .../eventhubs/AzureEventHubComponent.java | 37 +++++++++++++ .../eventhubs/AzureEventHubTransportType.java | 52 +++++++++++++++++++ .../azure/eventhub/GetAzureEventHubTest.java | 3 ++ .../azure/eventhub/PutAzureEventHubTest.java | 3 ++ .../eventhub/TestConsumeAzureEventHub.java | 3 ++ .../eventhub/TestAzureEventHubRecordSink.java | 6 ++- 10 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubTransportType.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 485928eb8b..2287838ae0 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.amqp.AmqpTransportType; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -62,6 +63,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; @@ -101,7 +103,7 @@ import static org.apache.commons.lang3.StringUtils.defaultIfBlank; @WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"), @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'") }) -public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { +public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implements AzureEventHubComponent { private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$"); private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s"; @@ -284,6 +286,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, + TRANSPORT_TYPE, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, @@ -425,8 +428,10 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); final Duration maxWaitTime = Duration.ofMillis(receiveTimeout); final Integer maxBatchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue()); final EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() + .transportType(transportType) .consumerGroup(consumerGroup) .trackLastEnqueuedEventProperties(true) .checkpointStore(checkpointStore) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 855b531d92..d6e9e17bdf 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import com.azure.core.amqp.AmqpClientOptions; +import com.azure.core.amqp.AmqpTransportType; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -68,6 +69,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.util.StopWatch; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @@ -87,7 +89,7 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'") }) @SeeAlso(ConsumeAzureEventHub.class) -public class GetAzureEventHub extends AbstractProcessor { +public class GetAzureEventHub extends AbstractProcessor implements AzureEventHubComponent { private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s/%s/ConsumerGroups/%s/Partitions/%s"; private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60); private static final int DEFAULT_FETCH_SIZE = 100; @@ -175,6 +177,7 @@ public class GetAzureEventHub extends AbstractProcessor { NAMESPACE, EVENT_HUB_NAME, SERVICE_BUS_ENDPOINT, + TRANSPORT_TYPE, ACCESS_POLICY, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, @@ -372,8 +375,10 @@ public class GetAzureEventHub extends AbstractProcessor { final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint); + final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue()); final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder(); + eventHubClientBuilder.transportType(transportType); final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); eventHubClientBuilder.consumerGroup(consumerGroup); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index 389bb20cb9..1f6fc3bb42 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.amqp.AmqpTransportType; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -56,6 +57,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; @@ -64,7 +66,7 @@ import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Send FlowFile contents to Azure Event Hubs") @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The Processor buffers FlowFile contents in memory before sending") -public class PutAzureEventHub extends AbstractProcessor { +public class PutAzureEventHub extends AbstractProcessor implements AzureEventHubComponent { private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s"; static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() @@ -127,6 +129,7 @@ public class PutAzureEventHub extends AbstractProcessor { configuredDescriptors.add(NAMESPACE); configuredDescriptors.add(EVENT_HUB_NAME); configuredDescriptors.add(SERVICE_BUS_ENDPOINT); + configuredDescriptors.add(TRANSPORT_TYPE); configuredDescriptors.add(ACCESS_POLICY); configuredDescriptors.add(POLICY_PRIMARY_KEY); configuredDescriptors.add(USE_MANAGED_IDENTITY); @@ -194,9 +197,11 @@ public class PutAzureEventHub extends AbstractProcessor { final String namespace = context.getProperty(NAMESPACE).getValue(); final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue()); try { final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder(); + eventHubClientBuilder.transportType(transportType); final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint); if (useManagedIdentity) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java index 53f09d5061..9345a4692d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.services.azure.eventhub; +import com.azure.core.amqp.AmqpTransportType; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.DefaultAzureCredentialBuilder; @@ -44,6 +45,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -55,7 +57,7 @@ import java.util.Map; @Tags({"azure", "record", "sink"}) @CapabilityDescription("Format and send Records to Azure Event Hubs") -public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService { +public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService, AzureEventHubComponent { static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint"); @@ -133,6 +135,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement SERVICE_BUS_ENDPOINT, EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, + TRANSPORT_TYPE, RECORD_WRITER_FACTORY, AUTHENTICATION_STRATEGY, SHARED_ACCESS_POLICY, @@ -155,10 +158,12 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement final String eventHubName, final String policyName, final String policyKey, - final AzureAuthenticationStrategy authenticationStrategy + final AzureAuthenticationStrategy authenticationStrategy, + final AmqpTransportType transportType ) { final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint); final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder(); + eventHubClientBuilder.transportType(transportType); if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) { final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential); @@ -181,7 +186,8 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue(); final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue(); final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy); - client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy); + final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue()); + client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy, transportType); } @OnDisabled diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java new file mode 100644 index 0000000000..c905c2106b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.shared.azure.eventhubs; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Azure Event Hub Component interface with shared properties + */ +public interface AzureEventHubComponent { + PropertyDescriptor TRANSPORT_TYPE = new PropertyDescriptor.Builder() + .name("Transport Type") + .displayName("Transport Type") + .description("Advanced Message Queuing Protocol Transport Type for communication with Azure Event Hubs") + .allowableValues(AzureEventHubTransportType.class) + .defaultValue(AzureEventHubTransportType.AMQP.getValue()) + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubTransportType.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubTransportType.java new file mode 100644 index 0000000000..7d5ec13352 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubTransportType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.shared.azure.eventhubs; + +import org.apache.nifi.components.DescribedValue; + +/** + * Azure Event Hubs Transport Type allowable values based on AmqpTransportType values from the Azure SDK + */ +public enum AzureEventHubTransportType implements DescribedValue { + AMQP("Amqp", "AMQP over TCP on ports 5671 and 5672"), + + AMQP_WEB_SOCKETS("AmqpWebSockets", "AMQP over HTTPS with WebSockets on port 443"); + + private final String value; + + private final String description; + + AzureEventHubTransportType(final String value, final String description) { + this.value = value; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index e40d714111..03069e142e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -23,6 +23,7 @@ import com.azure.messaging.eventhubs.models.PartitionEvent; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -78,6 +79,8 @@ public class GetAzureEventHubTest { testRunner.assertValid(); testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT, "10000"); testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); + testRunner.assertValid(); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index 10fdb12e8d..ce591b1acc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.eventhub; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; @@ -71,6 +72,8 @@ public class PutAzureEventHubTest { testRunner.assertNotValid(); testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY, POLICY_KEY); testRunner.assertValid(); + testRunner.setProperty(PutAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); + testRunner.assertValid(); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index e44197c928..4b09f227ae 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -189,6 +190,8 @@ public class TestConsumeAzureEventHub { testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, STORAGE_TOKEN); testRunner.assertValid(); + testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); + testRunner.assertValid(); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java index 8ab5595a6b..de53ca2b6d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.services.azure.eventhub; +import com.azure.core.amqp.AmqpTransportType; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventDataBatch; import com.azure.messaging.eventhubs.EventHubProducerClient; @@ -30,6 +31,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -90,6 +92,7 @@ public class TestAzureEventHubRecordSink { runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE); runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY); runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER); + runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); runner.enableControllerService(azureEventHubRecordSink); } @@ -149,7 +152,8 @@ public class TestAzureEventHubRecordSink { final String eventHubName, final String policyName, final String policyKey, - final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException { + final AzureAuthenticationStrategy authenticationStrategy, + final AmqpTransportType transportType) throws ProcessException { return client; } }