From 317918a9bbe40bf201a02daa1b147e8b624f7242 Mon Sep 17 00:00:00 2001 From: Andrew Psaltis Date: Sun, 10 Jul 2016 13:00:56 -0400 Subject: [PATCH] NIFI-2214 updated the both of the Azure Event Hub processors to use the latest Azure Event Hub libraries. Also added test cases for each of the processors --- .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 1 - .../nifi-azure-processors/pom.xml | 17 +- .../azure/eventhub/GetAzureEventHub.java | 317 ++++++++++-------- .../azure/eventhub/PutAzureEventHub.java | 195 ++++++----- .../azure/eventhub/GetAzureEventHubTest.java | 176 ++++++++++ .../azure/eventhub/PutAzureEventHubTest.java | 134 ++++++++ nifi-nar-bundles/nifi-azure-bundle/pom.xml | 2 - 7 files changed, 616 insertions(+), 226 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml index da513f1d7a..3d1b131e73 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -23,7 +23,6 @@ nifi-azure-nar - 1.0.0-SNAPSHOT nar true diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 85625155ae..8abf52ff91 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -20,7 +20,9 @@ nifi-azure-processors jar - + + 1.6.5 + org.apache.nifi @@ -35,11 +37,10 @@ javax.jms-api - com.microsoft.eventhubs.client - eventhubs-client - 0.9.1 + com.microsoft.azure + azure-eventhubs + 0.7.1 - org.apache.nifi nifi-mock @@ -55,5 +56,11 @@ junit test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index ffd0bab63a..97f1c92c3d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -16,20 +16,11 @@ */ package org.apache.nifi.processors.azure.eventhub; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ServiceBusException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -45,110 +36,136 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; -import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; -import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubMessage; -import com.microsoft.eventhubs.client.IEventHubFilter; -import com.microsoft.eventhubs.client.ResilientEventHubReceiver; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; -@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" }) +@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) @CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile") @InputRequirement(Requirement.INPUT_FORBIDDEN) @WritesAttributes({ - @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), - @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), - @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), - @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), - @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") + @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), + @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), + @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), + @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") }) public class GetAzureEventHub extends AbstractProcessor { static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() - .name("Event Hub Name") - .description("The name of the Azure Event Hub to pull messages from") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .build(); + .name("Event Hub Name") + .description("The name of the Azure Event Hub to pull messages from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() - .name("Event Hub Namespace") - .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .required(true) - .build(); + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Name") - .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .required(true) - .build(); + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Primary Key") - .description("The primary key of the Event Hub Shared Access Policy") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .sensitive(true) - .required(true) - .build(); + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() - .name("Number of Event Hub Partitions") - .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, " - + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .required(true) - .build(); + .name("Number of Event Hub Partitions") + .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, " + + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() - .name("Event Hub Consumer Group") - .description("The name of the Event Hub Consumer Group to use when pulling events") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("$Default") - .required(true) - .build(); - static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The number of FlowFiles to pull in a single JMS session") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("10") - .required(true) - .build(); + .name("Event Hub Consumer Group") + .description("The name of the Event Hub Consumer Group to use when pulling events") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("$Default") + .required(true) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.") - .build(); + .name("success") + .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.") + .build(); - private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); + private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + private EventHubClient eventHubClient; - @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.add(EVENT_HUB_NAME); - properties.add(NAMESPACE); - properties.add(ACCESS_POLICY); - properties.add(POLICY_PRIMARY_KEY); - properties.add(NUM_PARTITIONS); - properties.add(CONSUMER_GROUP); - return properties; + private final static List propertyDescriptors; + private final static Set relationships; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(EVENT_HUB_NAME); + _propertyDescriptors.add(NAMESPACE); + _propertyDescriptors.add(ACCESS_POLICY); + _propertyDescriptors.add(POLICY_PRIMARY_KEY); + _propertyDescriptors.add(NUM_PARTITIONS); + _propertyDescriptors.add(CONSUMER_GROUP); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } @Override public Set getRelationships() { - return Collections.singleton(REL_SUCCESS); + return relationships; } - private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException { - ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId); + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + + protected void setupReceiver(final String connectionString) throws ProcessException { + try { + eventHubClient = EventHubClient.createFromConnectionString(connectionString).get(); + } catch (InterruptedException | ExecutionException | IOException | ServiceBusException e) { + throw new ProcessException(e); + } + } + + PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException { + PartitionReceiver existingReceiver = partitionToReceiverMap.get(partitionId); if (existingReceiver != null) { return existingReceiver; } @@ -167,39 +184,74 @@ public class GetAzureEventHub extends AbstractProcessor { return existingReceiver; } - final String policyName = context.getProperty(ACCESS_POLICY).getValue(); - final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); - final String namespace = context.getProperty(NAMESPACE).getValue(); - final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue(); - final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString(); - final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis()); - final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter); - receiver.initialize(); + final PartitionReceiver receiver = eventHubClient.createReceiver( + consumerGroupName, + partitionId, + Instant.now()).get(); partitionToReceiverMap.put(partitionId, receiver); return receiver; + } } + /** + * This method is here to try and isolate the Azure related code as the PartitionReceiver cannot be mocked + * with PowerMock due to it being final. Unfortunately it extends a base class and does not implement an interface + * so even if we create a MockPartitionReciver, it will not work as the two classes are orthogonal. + * + * @param context - The processcontext for this processor + * @param partitionId - The partition ID to retrieve a receiver by. + * @return - Returns the events received from the EventBus. + * @throws ProcessException -- If any exception is encountered, receiving events it is wrapped in a ProcessException + * and then that exception is thrown. + */ + protected Iterable receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException { + final PartitionReceiver receiver; + try { + receiver = getReceiver(context, partitionId); + return receiver.receive(100).get(); + } catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) { + throw new ProcessException(e); + } + } @OnStopped - public void tearDown() { - for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) { - receiver.close(); + public void tearDown() throws ProcessException { + for (final PartitionReceiver receiver : partitionToReceiverMap.values()) { + if (null != receiver) { + receiver.close(); + } } partitionToReceiverMap.clear(); + try { + if (null != eventHubClient) { + eventHubClient.closeSync(); + } + } catch (final ServiceBusException e) { + throw new ProcessException(e); + } } @OnScheduled - public void setupPartitions(final ProcessContext context) { + public void onScheduled(final ProcessContext context) throws ProcessException { final BlockingQueue partitionNames = new LinkedBlockingQueue<>(); for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { partitionNames.add(String.valueOf(i)); } this.partitionNames = partitionNames; + + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + + + final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); + setupReceiver(connectionString); } @@ -214,41 +266,44 @@ public class GetAzureEventHub extends AbstractProcessor { final StopWatch stopWatch = new StopWatch(true); try { - final ResilientEventHubReceiver receiver; - try { - receiver = getReceiver(context, partitionId); - } catch (final EventHubException e) { - throw new ProcessException(e); - } - final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L)); - if (message == null) { + final Iterable receivedEvents = receiveEvents(context, partitionId); + if (receivedEvents == null) { return; } - final Map attributes = new HashMap<>(); - attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp())); - attributes.put("eventhub.offset", message.getOffset()); - attributes.put("eventhub.sequence", String.valueOf(message.getSequence())); - attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); - attributes.put("eventhub.partition", partitionId); + for (final EventData eventData : receivedEvents) { + if (null != eventData) { - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(message.getData()); + final Map attributes = new HashMap<>(); + FlowFile flowFile = session.create(); + EventData.SystemProperties systemProperties = eventData.getSystemProperties(); + + if (null != systemProperties) { + attributes.put("eventhub.enqueued.timestamp", String.valueOf(eventData.getSystemProperties().getEnqueuedTime())); + attributes.put("eventhub.offset", eventData.getSystemProperties().getOffset()); + attributes.put("eventhub.sequence", String.valueOf(eventData.getSystemProperties().getSequenceNumber())); + } + + attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); + attributes.put("eventhub.partition", partitionId); + + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, out -> { + out.write(eventData.getBody()); + }); + + + session.transfer(flowFile, REL_SUCCESS); + + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); + final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); } - }); - - session.transfer(flowFile, REL_SUCCESS); - - final String namespace = context.getProperty(NAMESPACE).getValue(); - final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); - final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); - final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; - session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } } finally { partitionIds.offer(partitionId); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index 3bf4ddcd3a..f41f21489b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -16,18 +16,11 @@ */ package org.apache.nifi.processors.azure.eventhub; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; - +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException; +import com.microsoft.azure.servicebus.ServiceBusException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -35,7 +28,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -43,110 +35,119 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -import com.microsoft.eventhubs.client.EventHubClient; -import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubSender; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; @SupportsBatching -@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" }) +@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, " - + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.") + + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.") public class PutAzureEventHub extends AbstractProcessor { - - static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub " - + "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost."); - - static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent", - "This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. " - + "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted."); - static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() - .name("Event Hub Name") - .description("The name of the Azure Event Hub to send to") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(true) - .build(); + .name("Event Hub Name") + .description("The name of the Azure Event Hub to send to") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() - .name("Event Hub Namespace") - .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .required(true) - .build(); + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Name") - .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .required(true) - .build(); + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() - .name("Shared Access Policy Primary Key") - .description("The primary key of the Event Hub Shared Access Policy") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .sensitive(true) - .required(true) - .build(); + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.") - .build(); + .name("success") + .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.") - .build(); + .name("failure") + .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); - private volatile BlockingQueue senderQueue = new LinkedBlockingQueue<>(); + private volatile BlockingQueue senderQueue = new LinkedBlockingQueue<>(); + private final static List propertyDescriptors; + private final static Set relationships; + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(EVENT_HUB_NAME); + _propertyDescriptors.add(NAMESPACE); + _propertyDescriptors.add(ACCESS_POLICY); + _propertyDescriptors.add(POLICY_PRIMARY_KEY); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } @Override public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); return relationships; } @Override - protected List getSupportedPropertyDescriptors() { - final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); - properties.add(EVENT_HUB_NAME); - properties.add(NAMESPACE); - properties.add(ACCESS_POLICY); - properties.add(POLICY_PRIMARY_KEY); - return properties; + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } - @OnScheduled - public final void setupClient(final ProcessContext context) throws EventHubException { + public final void setupClient(final ProcessContext context) throws ProcessException{ final String policyName = context.getProperty(ACCESS_POLICY).getValue(); final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); - final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName); final int numThreads = context.getMaxConcurrentTasks(); senderQueue = new LinkedBlockingQueue<>(numThreads); for (int i = 0; i < numThreads; i++) { - final EventHubSender sender = client.createPartitionSender(null); - senderQueue.offer(sender); + final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey); + if(null != client) { + senderQueue.offer(client); + } } } @OnStopped public void tearDown() { - EventHubSender sender; + EventHubClient sender; while ((sender = senderQueue.poll()) != null) { sender.close(); } @@ -160,20 +161,13 @@ public class PutAzureEventHub extends AbstractProcessor { } final StopWatch stopWatch = new StopWatch(true); - final EventHubSender sender = senderQueue.poll(); - try { final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); try { - sender.send(buffer); - } catch (final EventHubException ehe) { - getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe); + sendMessage(buffer); + } catch (final ProcessException processException) { + getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException); session.transfer(session.penalize(flowFile), REL_FAILURE); return; } @@ -182,8 +176,35 @@ public class PutAzureEventHub extends AbstractProcessor { final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } finally { - senderQueue.offer(sender); + + } + + protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException{ + + try { + return EventHubClient.createFromConnectionString(getConnectionString(namespace, eventHubName, policyName, policyKey)).get(); + } catch (InterruptedException | ExecutionException | IOException | ServiceBusException | IllegalConnectionStringFormatException e) { + getLogger().error("Failed to create EventHubClient due to {}", e); + throw new ProcessException(e); } } + protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ + return new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); + } + protected void sendMessage(final byte[] buffer) throws ProcessException { + + final EventHubClient sender = senderQueue.poll(); + if(null != sender) { + try { + sender.sendSync(new EventData(buffer)); + } catch (final ServiceBusException sbe) { + throw new ProcessException("Caught exception trying to send message to eventbus", sbe); + } finally { + senderQueue.offer(sender); + } + }else{ + throw new ProcessException("No EventHubClients are configured for sending"); + } + + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java new file mode 100644 index 0000000000..81bbf92d6a --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.servicebus.ServiceBusException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.powermock.reflect.Whitebox; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.LinkedList; +import java.util.concurrent.ExecutionException; + + +public class GetAzureEventHubTest { + + private static final String namespaceName = "nifi-azure-hub"; + private static final String eventHubName = "get-test"; + private static final String sasKeyName = "bogus-policy"; + private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!"; + + + private TestRunner testRunner; + private MockGetAzureEventHub processor; + + @Before + public void setUp() throws Exception { + processor = new MockGetAzureEventHub(); + testRunner = TestRunners.newTestRunner(processor); + } + + @Test + public void testProcessorConfigValidity() { + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY,sasKeyName); + testRunner.assertNotValid(); + testRunner.setProperty(GetAzureEventHub.POLICY_PRIMARY_KEY,sasKey); + testRunner.assertNotValid(); + testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4"); + testRunner.assertValid(); + } + @Test + public void verifyRelationships(){ + + assert(1 == processor.getRelationships().size()); + + } + @Test + public void testNoPartitions(){ + MockGetAzureEventHubNoPartitions mockProcessor = new MockGetAzureEventHubNoPartitions(); + testRunner = TestRunners.newTestRunner(mockProcessor); + setUpStandardTestConfig(); + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0); + testRunner.clearTransferState(); + } + @Test + public void testNullRecieve(){ + setUpStandardTestConfig(); + processor.nullReceive = true; + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0); + testRunner.clearTransferState(); + } + @Test(expected = AssertionError.class) + public void testThrowGetReceiver(){ + setUpStandardTestConfig(); + processor.getReceiverThrow = true; + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0); + testRunner.clearTransferState(); + } + @Test + public void testNormalFlow() throws Exception { + + setUpStandardTestConfig(); + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10); + testRunner.clearTransferState(); + } + + /** + * Provides a stubbed processor instance for testing + */ + public static class MockGetAzureEventHub extends GetAzureEventHub{ + + boolean nullReceive = false; + boolean getReceiverThrow = false; + + @Override + protected void setupReceiver(final String connectionString) throws ProcessException{ + //do nothing + } + @Override + protected PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException { + if(getReceiverThrow){ + throw new IOException("Could not create receiver"); + } + return null; + } + + @Override + protected Iterable receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException{ + if(nullReceive){ + return null; + } + if(getReceiverThrow){ + throw new ProcessException("Could not create receiver"); + } + final LinkedList receivedEvents = new LinkedList<>(); + for(int i = 0; i < 10; i++){ + final EventData eventData = new EventData(String.format("test event number: %d",i).getBytes()); + Whitebox.setInternalState(eventData,"isReceivedEvent",true); + Whitebox.setInternalState(eventData, "partitionKey","0"); + Whitebox.setInternalState(eventData, "offset", "100"); + Whitebox.setInternalState(eventData, "sequenceNumber",13L); + Whitebox.setInternalState(eventData, "enqueuedTime",Instant.now().minus(100L, ChronoUnit.SECONDS)); + receivedEvents.add(eventData); + } + + return receivedEvents; + + } + } + + public static class MockGetAzureEventHubNoPartitions extends GetAzureEventHub{ + + + @Override + protected void setupReceiver(final String connectionString) throws ProcessException{ + //do nothing + } + + @Override + public void onScheduled(final ProcessContext context) throws ProcessException { + + } + } + + + + private void setUpStandardTestConfig() { + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName); + testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY,sasKeyName); + testRunner.setProperty(GetAzureEventHub.POLICY_PRIMARY_KEY,sasKey); + testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4"); + testRunner.assertValid(); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java new file mode 100644 index 0000000000..75686192ba --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import com.microsoft.azure.eventhubs.EventHubClient; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class PutAzureEventHubTest { + private static final String namespaceName = "nifi-azure-hub"; + private static final String eventHubName = "get-test"; + private static final String sasKeyName = "bogus-policy"; + private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!"; + + + private TestRunner testRunner; + private PutAzureEventHubTest.MockPutAzureEventHub processor; + + @Before + public void setUp() throws Exception { + processor = new PutAzureEventHubTest.MockPutAzureEventHub(); + testRunner = TestRunners.newTestRunner(processor); + } + @Test + public void testProcessorConfigValidity() { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY,sasKeyName); + testRunner.assertNotValid(); + testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,sasKey); + testRunner.assertValid(); + } + @Test + public void verifyRelationships(){ + + assert(2 == processor.getRelationships().size()); + + } + @Test + public void testNoFlow() { + + setUpStandardTestConfig(); + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 0); + testRunner.clearTransferState(); + } + @Test + public void testNormalFlow(){ + + setUpStandardTestConfig(); + String flowFileContents = "TEST MESSAGE"; + testRunner.enqueue(flowFileContents); + testRunner.run(1, true); + assert(flowFileContents.contentEquals(new String(processor.getReceivedBuffer()))); + testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 1); + testRunner.clearTransferState(); + } + @Test + public void testSendMessageThrows() { + + PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub throwingProcessor = new PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub(); + testRunner = TestRunners.newTestRunner(throwingProcessor); + setUpStandardTestConfig(); + String flowFileContents = "TEST MESSAGE"; + testRunner.enqueue(flowFileContents); + testRunner.run(1, true); + testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_FAILURE); + testRunner.clearTransferState(); + } + + @Test(expected = AssertionError.class) + public void testBadConnectionString() { + + PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub badConnectionStringProcessor = new PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub(); + testRunner = TestRunners.newTestRunner(badConnectionStringProcessor); + setUpStandardTestConfig(); + testRunner.run(1, true); + } + + private static class MockPutAzureEventHub extends PutAzureEventHub{ + + byte[] receivedBuffer = null; + byte[] getReceivedBuffer(){return receivedBuffer;} + + @Override + protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException { + return null; + } + + @Override + protected void sendMessage(final byte[] buffer) throws ProcessException { + receivedBuffer = buffer; + } + } + private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{ + @Override + protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException { + return null; + } + } + private static class BogusConnectionStringMockPutAzureEventHub extends PutAzureEventHub{ + + @Override + protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ + return "Bogus Connection String"; + } + } + private void setUpStandardTestConfig() { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName); + testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY,sasKeyName); + testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,sasKey); + testRunner.assertValid(); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index 5d2b55a00e..1b7b95c361 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -22,9 +22,7 @@ 1.0.0-SNAPSHOT - org.apache.nifi nifi-azure-bundle - 1.0.0-SNAPSHOT pom