From 2b41b074899045fee0fe9c1c9fbb05d5d531ab86 Mon Sep 17 00:00:00 2001 From: mkalavala <mkalavala@cloudera.com> Date: Thu, 14 Sep 2023 16:40:50 -0400 Subject: [PATCH] NIFI-6240 Added Proxy Support for WebSocket Transport in Azure EventHubs This closes #7740 Signed-off-by: David Handermann <exceptionfactory@apache.org> --- .../azure/eventhub/ConsumeAzureEventHub.java | 5 +- .../azure/eventhub/GetAzureEventHub.java | 36 +++--- .../azure/eventhub/PutAzureEventHub.java | 24 ++-- .../eventhub/utils/AzureEventHubUtils.java | 113 +++++++++++++----- .../eventhubs/AzureEventHubComponent.java | 8 ++ .../azure/eventhub/GetAzureEventHubTest.java | 22 +++- .../azure/eventhub/PutAzureEventHubTest.java | 23 +++- .../eventhub/TestConsumeAzureEventHub.java | 31 ++++- 8 files changed, 196 insertions(+), 66 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 9919af3b86..8739172876 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -301,7 +301,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, - STORAGE_CONTAINER_NAME + STORAGE_CONTAINER_NAME, + PROXY_CONFIGURATION_SERVICE )); Set<Relationship> relationships = new HashSet<>(); @@ -469,6 +470,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem eventProcessorClientBuilder.initialPartitionEventPosition(legacyPartitionEventPosition); } + AzureEventHubUtils.getProxyOptions(context).ifPresent(eventProcessorClientBuilder::proxyOptions); + return eventProcessorClientBuilder.buildEventProcessorClient(); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 6c94a4f983..5f813bd47d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -27,20 +27,6 @@ import com.azure.messaging.eventhubs.EventHubConsumerClient; import com.azure.messaging.eventhubs.models.EventPosition; import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.messaging.eventhubs.models.PartitionEvent; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -70,6 +56,21 @@ import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.util.StopWatch; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) @CapabilityDescription("Receives messages from Microsoft Azure Event Hubs without reliable checkpoint tracking. " + "In clustered environment, GetAzureEventHub processor instances work independently and all cluster nodes process all messages " @@ -173,7 +174,8 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub CONSUMER_GROUP, ENQUEUE_TIME, RECEIVER_FETCH_SIZE, - RECEIVER_FETCH_TIMEOUT + RECEIVER_FETCH_TIMEOUT, + PROXY_CONFIGURATION_SERVICE ); relationships = Collections.singleton(REL_SUCCESS); } @@ -388,6 +390,8 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub clientOptions.setIdentifier(clientIdentifier); eventHubClientBuilder.clientOptions(clientOptions); + AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions); + return eventHubClientBuilder; } @@ -434,7 +438,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub attributes.put("eventhub.name", partitionContext.getEventHubName()); attributes.put("eventhub.partition", partitionContext.getPartitionId()); - final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData.getProperties()); + final Map<String, String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData.getProperties()); attributes.putAll(applicationProperties); return attributes; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index 1f6fc3bb42..09c01a3701 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -24,17 +24,6 @@ import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.models.SendOptions; -import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -56,11 +45,21 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + @SupportsBatching @Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"}) @InputRequirement(Requirement.INPUT_REQUIRED) @@ -135,6 +134,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub configuredDescriptors.add(USE_MANAGED_IDENTITY); configuredDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME); configuredDescriptors.add(MAX_BATCH_SIZE); + configuredDescriptors.add(PROXY_CONFIGURATION_SERVICE); propertyDescriptors = Collections.unmodifiableList(configuredDescriptors); final Set<Relationship> configuredRelationships = new HashSet<>(); @@ -214,7 +214,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential); } - + AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions); return eventHubClientBuilder.buildProducerClient(); } catch (final Exception e) { throw new ProcessException("EventHubClient creation failed", e); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java index 3cb4e78909..795f797269 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -16,41 +16,49 @@ */ package org.apache.nifi.processors.azure.eventhub.utils; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - +import com.azure.core.amqp.ProxyAuthenticationType; +import com.azure.core.amqp.ProxyOptions; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; + +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; public final class AzureEventHubUtils { - public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use"); + public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net", "Azure", "Servicebus endpoint for general use"); public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China"); public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany"); public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government"); public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Primary Key") - .displayName("Shared Access Policy Key") - .description("The key of the shared access policy. Either the primary or the secondary key can be used.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .sensitive(true) - .required(false) - .build(); + .name("Shared Access Policy Primary Key") + .displayName("Shared Access Policy Key") + .description("The key of the shared access policy. Either the primary or the secondary key can be used.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .sensitive(true) + .required(false) + .build(); public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder() - .name("use-managed-identity") - .displayName("Use Azure Managed Identity") - .description("Choose whether or not to use the managed identity of Azure VM/VMSS") - .required(false).defaultValue("false").allowableValues("true", "false") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); + .name("use-managed-identity") + .displayName("Use Azure Managed Identity") + .description("Choose whether or not to use the managed identity of Azure VM/VMSS") + .required(false).defaultValue("false").allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder() .name("Service Bus Endpoint") @@ -64,35 +72,36 @@ public final class AzureEventHubUtils { .build(); public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor, - PropertyDescriptor policyKeyDescriptor, - ValidationContext context) { + PropertyDescriptor policyKeyDescriptor, + ValidationContext context) { List<ValidationResult> retVal = new ArrayList<>(); - boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet(); - boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet(); + boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet(); + boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet(); boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); - if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) { + if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet)) { final String msg = String.format( - "('%s') and ('%s' with '%s') fields cannot be set at the same time.", - USE_MANAGED_IDENTITY.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - POLICY_PRIMARY_KEY.getDisplayName() + "('%s') and ('%s' with '%s') fields cannot be set at the same time.", + USE_MANAGED_IDENTITY.getDisplayName(), + accessPolicyDescriptor.getDisplayName(), + POLICY_PRIMARY_KEY.getDisplayName() ); retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) { final String msg = String.format( - "either('%s') or (%s with '%s') must be set", - USE_MANAGED_IDENTITY.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - POLICY_PRIMARY_KEY.getDisplayName() + "either('%s') or (%s with '%s') must be set", + USE_MANAGED_IDENTITY.getDisplayName(), + accessPolicyDescriptor.getDisplayName(), + POLICY_PRIMARY_KEY.getDisplayName() ); retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } + ProxyConfiguration.validateProxySpec(context, retVal, AzureEventHubComponent.PROXY_SPECS); return retVal; } - public static Map<String, String> getApplicationProperties(final Map<String,Object> eventProperties) { + public static Map<String, String> getApplicationProperties(final Map<String, Object> eventProperties) { final Map<String, String> properties = new HashMap<>(); if (eventProperties != null) { @@ -103,4 +112,42 @@ public final class AzureEventHubUtils { return properties; } + + /** + * Creates the {@link ProxyOptions proxy options}. + * + * @param propertyContext to supply Proxy configurations + * @return {@link ProxyOptions proxy options}, null if Proxy is not set + */ + public static Optional<ProxyOptions> getProxyOptions(final PropertyContext propertyContext) { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(propertyContext); + final ProxyOptions proxyOptions; + if (proxyConfiguration != ProxyConfiguration.DIRECT_CONFIGURATION) { + final Proxy proxy = getProxy(proxyConfiguration); + if (proxyConfiguration.hasCredential()) { + proxyOptions = new ProxyOptions( + ProxyAuthenticationType.BASIC, + proxy, + proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword()); + } else { + proxyOptions = new ProxyOptions( + ProxyAuthenticationType.NONE, + proxy, null, null); + } + } else { + proxyOptions = null; + } + + return Optional.ofNullable(proxyOptions); + } + + private static Proxy getProxy(ProxyConfiguration proxyConfiguration) { + final Proxy.Type type; + if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) { + type = Proxy.Type.HTTP; + } else { + throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType()); + } + return new Proxy(type, new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort())); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java index c905c2106b..11a51b7783 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java @@ -19,6 +19,8 @@ package org.apache.nifi.shared.azure.eventhubs; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxySpec; /** * Azure Event Hub Component interface with shared properties @@ -34,4 +36,10 @@ public interface AzureEventHubComponent { .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); + ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH}; + PropertyDescriptor PROXY_CONFIGURATION_SERVICE + = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS)) + .dependsOn(TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()) + .build(); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index 03069e142e..cb0e2331fa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -22,6 +22,9 @@ import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.messaging.eventhubs.models.PartitionEvent; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.MockFlowFile; @@ -30,6 +33,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.net.Proxy; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -37,6 +41,8 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -64,7 +70,7 @@ public class GetAzureEventHubTest { } @Test - public void testProperties() { + public void testProperties() throws InitializationException { testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); testRunner.assertNotValid(); testRunner.setProperty(GetAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); @@ -81,6 +87,20 @@ public class GetAzureEventHubTest { testRunner.assertValid(); testRunner.setProperty(GetAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); testRunner.assertValid(); + configureProxyControllerService(); + testRunner.assertValid(); + } + + private void configureProxyControllerService() throws InitializationException { + final String serviceId = "proxyConfigurationService"; + final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class); + when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP); + final ProxyConfigurationService service = mock(ProxyConfigurationService.class); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getConfiguration()).thenReturn(proxyConfiguration); + testRunner.addControllerService(serviceId, service); + testRunner.enableControllerService(service); + testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index ce591b1acc..cb3c18ef65 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -19,6 +19,9 @@ package org.apache.nifi.processors.azure.eventhub; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -30,14 +33,18 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.net.Proxy; import java.util.Collections; import java.util.Map; +import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class PutAzureEventHubTest { @@ -63,7 +70,7 @@ public class PutAzureEventHubTest { } @Test - public void testProperties() { + public void testProperties() throws InitializationException { testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); testRunner.assertNotValid(); testRunner.setProperty(PutAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); @@ -74,6 +81,20 @@ public class PutAzureEventHubTest { testRunner.assertValid(); testRunner.setProperty(PutAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); testRunner.assertValid(); + configureProxyControllerService(); + testRunner.assertValid(); + } + + private void configureProxyControllerService() throws InitializationException { + final String serviceId = "proxyConfigurationService"; + final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class); + when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP); + final ProxyConfigurationService service = mock(ProxyConfigurationService.class); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getConfiguration()).thenReturn(proxyConfiguration); + testRunner.addControllerService(serviceId, service); + testRunner.enableControllerService(service); + testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId); } @Test diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 4b09f227ae..af88d19d58 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -26,6 +26,8 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; @@ -53,6 +55,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.io.OutputStream; +import java.net.Proxy; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -64,10 +67,12 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -148,6 +153,10 @@ public class TestConsumeAzureEventHub { testRunner.assertNotValid(); testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true"); testRunner.assertValid(); + testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); + testRunner.assertValid(); + configureProxyControllerService(); + testRunner.assertValid(); } @Test @@ -179,7 +188,7 @@ public class TestConsumeAzureEventHub { } @Test - public void testProcessorConfigValidityWithTokenSet() { + public void testProcessorConfigValidityWithTokenSet() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); testRunner.assertNotValid(); testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); @@ -192,10 +201,12 @@ public class TestConsumeAzureEventHub { testRunner.assertValid(); testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); testRunner.assertValid(); + configureProxyControllerService(); + testRunner.assertValid(); } @Test - public void testProcessorConfigValidityWithStorageKeySet() { + public void testProcessorConfigValidityWithStorageKeySet() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); testRunner.assertNotValid(); testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); @@ -206,6 +217,10 @@ public class TestConsumeAzureEventHub { testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, STORAGE_ACCOUNT_KEY); testRunner.assertValid(); + testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue()); + testRunner.assertValid(); + configureProxyControllerService(); + testRunner.assertValid(); } @Test @@ -496,6 +511,18 @@ public class TestConsumeAzureEventHub { .collect(Collectors.toList()); } + private void configureProxyControllerService() throws InitializationException { + final String serviceId = "proxyConfigurationService"; + final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class); + when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP); + final ProxyConfigurationService service = mock(ProxyConfigurationService.class); + when(service.getIdentifier()).thenReturn(serviceId); + when(service.getConfiguration()).thenReturn(proxyConfiguration); + testRunner.addControllerService(serviceId, service); + testRunner.enableControllerService(service); + testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId); + } + private class MockConsumeAzureEventHub extends ConsumeAzureEventHub { @Override