NIFI-6240 Added Proxy Support for WebSocket Transport in Azure EventHubs

This closes #7740

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
mkalavala 2023-09-14 16:40:50 -04:00 committed by exceptionfactory
parent 66b68140e2
commit 2b41b07489
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 196 additions and 66 deletions

View File

@ -301,7 +301,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
STORAGE_CONTAINER_NAME
STORAGE_CONTAINER_NAME,
PROXY_CONFIGURATION_SERVICE
));
Set<Relationship> relationships = new HashSet<>();
@ -469,6 +470,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
eventProcessorClientBuilder.initialPartitionEventPosition(legacyPartitionEventPosition);
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventProcessorClientBuilder::proxyOptions);
return eventProcessorClientBuilder.buildEventProcessorClient();
}

View File

@ -27,20 +27,6 @@ import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -70,6 +56,21 @@ import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.util.StopWatch;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs without reliable checkpoint tracking. "
+ "In clustered environment, GetAzureEventHub processor instances work independently and all cluster nodes process all messages "
@ -173,7 +174,8 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
CONSUMER_GROUP,
ENQUEUE_TIME,
RECEIVER_FETCH_SIZE,
RECEIVER_FETCH_TIMEOUT
RECEIVER_FETCH_TIMEOUT,
PROXY_CONFIGURATION_SERVICE
);
relationships = Collections.singleton(REL_SUCCESS);
}
@ -388,6 +390,8 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
clientOptions.setIdentifier(clientIdentifier);
eventHubClientBuilder.clientOptions(clientOptions);
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder;
}
@ -434,7 +438,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub
attributes.put("eventhub.name", partitionContext.getEventHubName());
attributes.put("eventhub.partition", partitionContext.getPartitionId());
final Map<String,String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData.getProperties());
final Map<String, String> applicationProperties = AzureEventHubUtils.getApplicationProperties(eventData.getProperties());
attributes.putAll(applicationProperties);
return attributes;

View File

@ -24,17 +24,6 @@ import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -56,11 +45,21 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -135,6 +134,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
configuredDescriptors.add(USE_MANAGED_IDENTITY);
configuredDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
configuredDescriptors.add(MAX_BATCH_SIZE);
configuredDescriptors.add(PROXY_CONFIGURATION_SERVICE);
propertyDescriptors = Collections.unmodifiableList(configuredDescriptors);
final Set<Relationship> configuredRelationships = new HashSet<>();
@ -214,7 +214,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub
final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
}
AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions);
return eventHubClientBuilder.buildProducerClient();
} catch (final Exception e) {
throw new ProcessException("EventHubClient creation failed", e);

View File

@ -16,41 +16,49 @@
*/
package org.apache.nifi.processors.azure.eventhub.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public final class AzureEventHubUtils {
public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net", "Azure", "Servicebus endpoint for general use");
public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.displayName("Shared Access Policy Key")
.description("The key of the shared access policy. Either the primary or the secondary key can be used.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(false)
.build();
.name("Shared Access Policy Primary Key")
.displayName("Shared Access Policy Key")
.description("The key of the shared access policy. Either the primary or the secondary key can be used.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(false)
.build();
public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS")
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
.name("use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS")
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
.name("Service Bus Endpoint")
@ -64,35 +72,36 @@ public final class AzureEventHubUtils {
.build();
public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor,
PropertyDescriptor policyKeyDescriptor,
ValidationContext context) {
PropertyDescriptor policyKeyDescriptor,
ValidationContext context) {
List<ValidationResult> retVal = new ArrayList<>();
boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet();
boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet();
boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet();
boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet();
boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) {
if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet)) {
final String msg = String.format(
"('%s') and ('%s' with '%s') fields cannot be set at the same time.",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
"('%s') and ('%s' with '%s') fields cannot be set at the same time.",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
} else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) {
final String msg = String.format(
"either('%s') or (%s with '%s') must be set",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
"either('%s') or (%s with '%s') must be set",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
}
ProxyConfiguration.validateProxySpec(context, retVal, AzureEventHubComponent.PROXY_SPECS);
return retVal;
}
public static Map<String, String> getApplicationProperties(final Map<String,Object> eventProperties) {
public static Map<String, String> getApplicationProperties(final Map<String, Object> eventProperties) {
final Map<String, String> properties = new HashMap<>();
if (eventProperties != null) {
@ -103,4 +112,42 @@ public final class AzureEventHubUtils {
return properties;
}
/**
* Creates the {@link ProxyOptions proxy options}.
*
* @param propertyContext to supply Proxy configurations
* @return {@link ProxyOptions proxy options}, null if Proxy is not set
*/
public static Optional<ProxyOptions> getProxyOptions(final PropertyContext propertyContext) {
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(propertyContext);
final ProxyOptions proxyOptions;
if (proxyConfiguration != ProxyConfiguration.DIRECT_CONFIGURATION) {
final Proxy proxy = getProxy(proxyConfiguration);
if (proxyConfiguration.hasCredential()) {
proxyOptions = new ProxyOptions(
ProxyAuthenticationType.BASIC,
proxy,
proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
} else {
proxyOptions = new ProxyOptions(
ProxyAuthenticationType.NONE,
proxy, null, null);
}
} else {
proxyOptions = null;
}
return Optional.ofNullable(proxyOptions);
}
private static Proxy getProxy(ProxyConfiguration proxyConfiguration) {
final Proxy.Type type;
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
type = Proxy.Type.HTTP;
} else {
throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType());
}
return new Proxy(type, new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort()));
}
}

View File

@ -19,6 +19,8 @@ package org.apache.nifi.shared.azure.eventhubs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
/**
* Azure Event Hub Component interface with shared properties
@ -34,4 +36,10 @@ public interface AzureEventHubComponent {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH};
PropertyDescriptor PROXY_CONFIGURATION_SERVICE
= new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(false, PROXY_SPECS))
.dependsOn(TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue())
.build();
}

View File

@ -22,6 +22,9 @@ import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
@ -30,6 +33,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.Proxy;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@ -37,6 +41,8 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -64,7 +70,7 @@ public class GetAzureEventHubTest {
}
@Test
public void testProperties() {
public void testProperties() throws InitializationException {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE);
@ -81,6 +87,20 @@ public class GetAzureEventHubTest {
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
configureProxyControllerService();
testRunner.assertValid();
}
private void configureProxyControllerService() throws InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class);
when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP);
final ProxyConfigurationService service = mock(ProxyConfigurationService.class);
when(service.getIdentifier()).thenReturn(serviceId);
when(service.getConfiguration()).thenReturn(proxyConfiguration);
testRunner.addControllerService(serviceId, service);
testRunner.enableControllerService(service);
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
@Test

View File

@ -19,6 +19,9 @@ package org.apache.nifi.processors.azure.eventhub;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -30,14 +33,18 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.net.Proxy;
import java.util.Collections;
import java.util.Map;
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class PutAzureEventHubTest {
@ -63,7 +70,7 @@ public class PutAzureEventHubTest {
}
@Test
public void testProperties() {
public void testProperties() throws InitializationException {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE);
@ -74,6 +81,20 @@ public class PutAzureEventHubTest {
testRunner.assertValid();
testRunner.setProperty(PutAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
configureProxyControllerService();
testRunner.assertValid();
}
private void configureProxyControllerService() throws InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class);
when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP);
final ProxyConfigurationService service = mock(ProxyConfigurationService.class);
when(service.getIdentifier()).thenReturn(serviceId);
when(service.getConfiguration()).thenReturn(proxyConfiguration);
testRunner.addControllerService(serviceId, service);
testRunner.enableControllerService(service);
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
@Test

View File

@ -26,6 +26,8 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@ -53,6 +55,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@ -64,10 +67,12 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.nifi.proxy.ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@ -148,6 +153,10 @@ public class TestConsumeAzureEventHub {
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
configureProxyControllerService();
testRunner.assertValid();
}
@Test
@ -179,7 +188,7 @@ public class TestConsumeAzureEventHub {
}
@Test
public void testProcessorConfigValidityWithTokenSet() {
public void testProcessorConfigValidityWithTokenSet() throws InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE);
@ -192,10 +201,12 @@ public class TestConsumeAzureEventHub {
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
configureProxyControllerService();
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithStorageKeySet() {
public void testProcessorConfigValidityWithStorageKeySet() throws InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE);
@ -206,6 +217,10 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, STORAGE_ACCOUNT_KEY);
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
configureProxyControllerService();
testRunner.assertValid();
}
@Test
@ -496,6 +511,18 @@ public class TestConsumeAzureEventHub {
.collect(Collectors.toList());
}
private void configureProxyControllerService() throws InitializationException {
final String serviceId = "proxyConfigurationService";
final ProxyConfiguration proxyConfiguration = mock(ProxyConfiguration.class);
when(proxyConfiguration.getProxyType()).thenReturn(Proxy.Type.HTTP);
final ProxyConfigurationService service = mock(ProxyConfigurationService.class);
when(service.getIdentifier()).thenReturn(serviceId);
when(service.getConfiguration()).thenReturn(proxyConfiguration);
testRunner.addControllerService(serviceId, service);
testRunner.enableControllerService(service);
testRunner.setProperty(PROXY_CONFIGURATION_SERVICE, serviceId);
}
private class MockConsumeAzureEventHub extends ConsumeAzureEventHub {
@Override