NIFI-6149: Azure EventHub Managed identities support patch

review changes
additional review changes
NIFI-6149: typo fixes

This closes #4226.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
sjyang18 2020-04-22 18:42:35 +00:00 committed by Peter Turcsanyi
parent 7d494011ea
commit aa1e272052
8 changed files with 245 additions and 53 deletions

View File

@ -20,8 +20,8 @@
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
<properties>
<azure-eventhubs.version>2.3.2</azure-eventhubs.version>
<azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
<azure-eventhubs.version>3.1.1</azure-eventhubs.version>
<azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version>
</properties>
<dependencies>
<dependency>
@ -87,6 +87,12 @@
<version>1.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>

View File

@ -23,7 +23,6 @@ import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@ -76,6 +75,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.util.StringUtils.isEmpty;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@ -115,17 +115,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.required(false)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY)
.name("event-hub-shared-access-policy-primary-key")
.displayName("Shared Access Policy Primary Key")
.description("The primary key of the shared access policy.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(true)
.required(true)
.build();
static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("event-hub-consumer-group")
.displayName("Consumer Group")
@ -261,7 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static {
PROPERTIES = Collections.unmodifiableList(Arrays.asList(
NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
RECORD_READER, RECORD_WRITER,
INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
@ -335,6 +331,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.valid(false)
.build());
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
return results;
}
@ -347,9 +344,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
}
}
public class EventProcessorFactory implements IEventProcessorFactory {
public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
@Override
public IEventProcessor createEventProcessor(PartitionContext context) throws Exception {
public EventProcessor createEventProcessor(PartitionContext context) throws Exception {
final EventProcessor eventProcessor = new EventProcessor();
return eventProcessor;
}
@ -581,12 +578,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
@ -627,9 +618,22 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
final String connectionString;
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if(useManagedIdentity) {
connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, eventHubName);
} else {
final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey);
}
eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
.newBuilder(consumerHostname, consumerGroupName)
.useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null)
.useEventHubConnectionString(connectionString, eventHubName)
.build();
options.setExceptionNotification(e -> {
getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" +

View File

@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -54,6 +55,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@ -63,6 +66,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. "
@ -103,16 +107,10 @@ public class GetAzureEventHub extends AbstractProcessor {
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the shared access policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(true)
.required(false)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions")
@ -163,6 +161,7 @@ public class GetAzureEventHub extends AbstractProcessor {
.description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.")
.build();
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private volatile Instant configuredEnqueueTime;
@ -184,6 +183,7 @@ public class GetAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
_propertyDescriptors.add(USE_MANAGED_IDENTITY);
_propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP);
_propertyDescriptors.add(ENQUEUE_TIME);
@ -207,10 +207,16 @@ public class GetAzureEventHub extends AbstractProcessor {
return propertyDescriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
return retVal;
}
protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
try {
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
eventHubClient = EventHubClient.createSync(connectionString, executor);
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor);
} catch (IOException | EventHubException e) {
throw new ProcessException(e);
}
@ -301,11 +307,23 @@ public class GetAzureEventHub extends AbstractProcessor {
}
this.partitionNames = partitionNames;
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
final String connectionString;
if(useManagedIdentity){
connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName);
} else {
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
connectionString = new ConnectionStringBuilder()
.setEndpoint(new URI("amqps://"+namespace+serviceBusEndpoint))
.setEventHubName(eventHubName)
.setSasKeyName(policyName)
.setSasKey(policyKey).toString();
}
if(context.getProperty(ENQUEUE_TIME).isSet()) {
configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
@ -324,8 +342,6 @@ public class GetAzureEventHub extends AbstractProcessor {
}
executor = Executors.newScheduledThreadPool(4);
final String connectionString = new ConnectionStringBuilder().setEndpoint(
new URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
setupReceiver(connectionString, executor);
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.nifi.processors.azure.eventhub;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -34,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
@ -53,6 +54,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@ -91,16 +94,11 @@ public class PutAzureEventHub extends AbstractProcessor {
.description("The name of the shared access policy. This policy must have Send claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the shared access policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(true)
.required(false)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("partitioning-key-attribute-name")
.displayName("Partitioning Key Attribute Name")
@ -144,6 +142,7 @@ public class PutAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
_propertyDescriptors.add(USE_MANAGED_IDENTITY);
_propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
_propertyDescriptors.add(MAX_BATCH_SIZE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@ -177,6 +176,13 @@ public class PutAzureEventHub extends AbstractProcessor {
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
return retVal;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
@ -314,8 +320,15 @@ public class PutAzureEventHub extends AbstractProcessor {
final int numThreads = context.getMaxConcurrentTasks();
senderQueue = new LinkedBlockingQueue<>(numThreads);
executor = Executors.newScheduledThreadPool(4);
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final boolean useManagedIdentiy = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
final String policyName, policyKey;
if(useManagedIdentiy) {
policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
policyKey =null;
} else {
policyName = context.getProperty(ACCESS_POLICY).getValue();
policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
}
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
for (int i = 0; i < numThreads; i++) {
@ -345,8 +358,14 @@ public class PutAzureEventHub extends AbstractProcessor {
throws ProcessException{
try {
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
return EventHubClient.createSync(getConnectionString(namespace, eventHubName, policyName, policyKey), executor);
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
final String connectionString;
if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) {
connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
} else{
connectionString = getConnectionString(namespace, eventHubName, policyName, policyKey);
}
return EventHubClient.createFromConnectionStringSync(connectionString, executor);
} catch (IOException | EventHubException | IllegalConnectionStringFormatException e) {
getLogger().error("Failed to create EventHubClient due to {}", new Object[]{e.getMessage()}, e);
throw new ProcessException(e);
@ -354,7 +373,7 @@ public class PutAzureEventHub extends AbstractProcessor {
}
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey);
}
/**

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.utils;
import java.util.ArrayList;
import java.util.List;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
public final class AzureEventHubUtils {
public static final String MANAGED_IDENTITY_POLICY = ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION;
public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the shared access policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.required(false)
.build();
public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS")
.required(false).defaultValue("false").allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor,
PropertyDescriptor policyKeyDescriptor,
ValidationContext context) {
List<ValidationResult> retVal = new ArrayList<>();
boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet();
boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet();
boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) {
final String msg = String.format(
"('%s') and ('%s' with '%s') fields cannot be set at the same time.",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
} else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) {
final String msg = String.format(
"either('%s') or (%s with '%s') must be set",
USE_MANAGED_IDENTITY.getDisplayName(),
accessPolicyDescriptor.getDisplayName(),
POLICY_PRIMARY_KEY.getDisplayName()
);
retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
}
return retVal;
}
public static String getManagedIdentityConnectionString(final String namespace, final String eventHubName){
return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName)
.setAuthentication(MANAGED_IDENTITY_POLICY).toString();
}
public static String getSharedAccessSignatureConnectionString(final String namespace, final String eventHubName, final String sasName, final String sasKey) {
return new ConnectionStringBuilder()
.setNamespaceName(namespace)
.setEventHubName(eventHubName)
.setSasKeyName(sasName)
.setSasKey(sasKey).toString();
}
}

View File

@ -78,7 +78,23 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithManagedIdentityFlag() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
testRunner.assertValid();
}
@Test
public void verifyRelationships(){
assert(1 == processor.getRelationships().size());

View File

@ -89,6 +89,15 @@ public class PutAzureEventHubTest {
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithManagedIdentityFlag() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
testRunner.assertValid();
}
@Test
public void verifyRelationships(){
assert(2 == processor.getRelationships().size());
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@ -39,6 +40,8 @@ import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -64,8 +67,15 @@ import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
public class TestConsumeAzureEventHub {
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
private static final String storageAccountName = "test-sa";
private static final String storageAccountKey = "test-sa-key";
private ConsumeAzureEventHub.EventProcessor eventProcessor;
private MockProcessSession processSession;
private SharedSessionState sharedState;
@ -97,7 +107,29 @@ public class TestConsumeAzureEventHub {
when(partitionContext.getPartitionId()).thenReturn("partition-id");
when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
}
@Test
public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException {
TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
final MockRecordParser reader = new MockRecordParser();
final MockRecordWriter writer = new MockRecordWriter();
testRunner.addControllerService("writer", writer);
testRunner.enableControllerService(writer);
testRunner.addControllerService("reader", reader);
testRunner.enableControllerService(reader);
testRunner.setProperty(ConsumeAzureEventHub.RECORD_WRITER, "writer");
testRunner.setProperty(ConsumeAzureEventHub.RECORD_READER, "reader");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
testRunner.assertValid();
}
@Test
public void testReceiveOne() throws Exception {
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));