NIFI-11387 Added Transport Type property to Azure Event Hub Components

- Transport Type defaults to AMQP and also supports AMQP with WebSockets

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7126.
This commit is contained in:
exceptionfactory 2023-04-05 16:29:53 -05:00 committed by Pierre Villard
parent 9a002d9a43
commit d382b378a8
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
10 changed files with 130 additions and 7 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@ -62,6 +63,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@ -101,7 +103,7 @@ import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")
})
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implements AzureEventHubComponent {
private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$");
private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.%s";
@ -284,6 +286,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
NAMESPACE,
EVENT_HUB_NAME,
SERVICE_BUS_ENDPOINT,
TRANSPORT_TYPE,
ACCESS_POLICY_NAME,
POLICY_PRIMARY_KEY,
USE_MANAGED_IDENTITY,
@ -425,8 +428,10 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Duration maxWaitTime = Duration.ofMillis(receiveTimeout);
final Integer maxBatchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue());
final EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.transportType(transportType)
.consumerGroup(consumerGroup)
.trackLastEnqueuedEventProperties(true)
.checkpointStore(checkpointStore)

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@ -68,6 +69,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@ -87,7 +89,7 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")
})
@SeeAlso(ConsumeAzureEventHub.class)
public class GetAzureEventHub extends AbstractProcessor {
public class GetAzureEventHub extends AbstractProcessor implements AzureEventHubComponent {
private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s/%s/ConsumerGroups/%s/Partitions/%s";
private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60);
private static final int DEFAULT_FETCH_SIZE = 100;
@ -175,6 +177,7 @@ public class GetAzureEventHub extends AbstractProcessor {
NAMESPACE,
EVENT_HUB_NAME,
SERVICE_BUS_ENDPOINT,
TRANSPORT_TYPE,
ACCESS_POLICY,
POLICY_PRIMARY_KEY,
USE_MANAGED_IDENTITY,
@ -372,8 +375,10 @@ public class GetAzureEventHub extends AbstractProcessor {
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue());
final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
eventHubClientBuilder.transportType(transportType);
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
eventHubClientBuilder.consumerGroup(consumerGroup);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@ -56,6 +57,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
@ -64,7 +66,7 @@ import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Send FlowFile contents to Azure Event Hubs")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The Processor buffers FlowFile contents in memory before sending")
public class PutAzureEventHub extends AbstractProcessor {
public class PutAzureEventHub extends AbstractProcessor implements AzureEventHubComponent {
private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s%s/%s";
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
@ -127,6 +129,7 @@ public class PutAzureEventHub extends AbstractProcessor {
configuredDescriptors.add(NAMESPACE);
configuredDescriptors.add(EVENT_HUB_NAME);
configuredDescriptors.add(SERVICE_BUS_ENDPOINT);
configuredDescriptors.add(TRANSPORT_TYPE);
configuredDescriptors.add(ACCESS_POLICY);
configuredDescriptors.add(POLICY_PRIMARY_KEY);
configuredDescriptors.add(USE_MANAGED_IDENTITY);
@ -194,9 +197,11 @@ public class PutAzureEventHub extends AbstractProcessor {
final String namespace = context.getProperty(NAMESPACE).getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue());
try {
final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
eventHubClientBuilder.transportType(transportType);
final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
if (useManagedIdentity) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.services.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
@ -44,6 +45,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -55,7 +57,7 @@ import java.util.Map;
@Tags({"azure", "record", "sink"})
@CapabilityDescription("Format and send Records to Azure Event Hubs")
public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService, AzureEventHubComponent {
static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint");
@ -133,6 +135,7 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement
SERVICE_BUS_ENDPOINT,
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
TRANSPORT_TYPE,
RECORD_WRITER_FACTORY,
AUTHENTICATION_STRATEGY,
SHARED_ACCESS_POLICY,
@ -155,10 +158,12 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement
final String eventHubName,
final String policyName,
final String policyKey,
final AzureAuthenticationStrategy authenticationStrategy
final AzureAuthenticationStrategy authenticationStrategy,
final AmqpTransportType transportType
) {
final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
eventHubClientBuilder.transportType(transportType);
if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
@ -181,7 +186,8 @@ public class AzureEventHubRecordSink extends AbstractControllerService implement
final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
final AmqpTransportType transportType = AmqpTransportType.fromString(context.getProperty(TRANSPORT_TYPE).getValue());
client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy, transportType);
}
@OnDisabled

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.shared.azure.eventhubs;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
/**
* Azure Event Hub Component interface with shared properties
*/
public interface AzureEventHubComponent {
PropertyDescriptor TRANSPORT_TYPE = new PropertyDescriptor.Builder()
.name("Transport Type")
.displayName("Transport Type")
.description("Advanced Message Queuing Protocol Transport Type for communication with Azure Event Hubs")
.allowableValues(AzureEventHubTransportType.class)
.defaultValue(AzureEventHubTransportType.AMQP.getValue())
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.shared.azure.eventhubs;
import org.apache.nifi.components.DescribedValue;
/**
* Azure Event Hubs Transport Type allowable values based on AmqpTransportType values from the Azure SDK
*/
public enum AzureEventHubTransportType implements DescribedValue {
AMQP("Amqp", "AMQP over TCP on ports 5671 and 5672"),
AMQP_WEB_SOCKETS("AmqpWebSockets", "AMQP over HTTPS with WebSockets on port 443");
private final String value;
private final String description;
AzureEventHubTransportType(final String value, final String description) {
this.value = value;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -23,6 +23,7 @@ import com.azure.messaging.eventhubs.models.PartitionEvent;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -78,6 +79,8 @@ public class GetAzureEventHubTest {
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT, "10000");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
}
@Test

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.eventhub;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
@ -71,6 +72,8 @@ public class PutAzureEventHubTest {
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY, POLICY_KEY);
testRunner.assertValid();
testRunner.setProperty(PutAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
}
@Test

View File

@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -189,6 +190,8 @@ public class TestConsumeAzureEventHub {
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, STORAGE_TOKEN);
testRunner.assertValid();
testRunner.setProperty(ConsumeAzureEventHub.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
testRunner.assertValid();
}
@Test

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.services.azure.eventhub;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
@ -30,6 +31,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -90,6 +92,7 @@ public class TestAzureEventHubRecordSink {
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE);
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY);
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.TRANSPORT_TYPE, AzureEventHubTransportType.AMQP_WEB_SOCKETS.getValue());
runner.enableControllerService(azureEventHubRecordSink);
}
@ -149,7 +152,8 @@ public class TestAzureEventHubRecordSink {
final String eventHubName,
final String policyName,
final String policyKey,
final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
final AzureAuthenticationStrategy authenticationStrategy,
final AmqpTransportType transportType) throws ProcessException {
return client;
}
}