NIFI-2214 updated the both of the Azure Event Hub processors to use the latest Azure Event Hub libraries. Also added test cases for each of the processors

This commit is contained in:
Andrew Psaltis 2016-07-10 13:00:56 -04:00 committed by Mark Payne
parent 1e1630cc69
commit 317918a9bb
7 changed files with 616 additions and 226 deletions

View File

@ -23,7 +23,6 @@
</parent> </parent>
<artifactId>nifi-azure-nar</artifactId> <artifactId>nifi-azure-nar</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>nar</packaging> <packaging>nar</packaging>
<properties> <properties>
<maven.javadoc.skip>true</maven.javadoc.skip> <maven.javadoc.skip>true</maven.javadoc.skip>

View File

@ -20,7 +20,9 @@
<artifactId>nifi-azure-processors</artifactId> <artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties>
<powermock.version>1.6.5</powermock.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -35,11 +37,10 @@
<artifactId>javax.jms-api</artifactId> <artifactId>javax.jms-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.microsoft.eventhubs.client</groupId> <groupId>com.microsoft.azure</groupId>
<artifactId>eventhubs-client</artifactId> <artifactId>azure-eventhubs</artifactId>
<version>0.9.1</version> <version>0.7.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
@ -55,5 +56,11 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -16,20 +16,11 @@
*/ */
package org.apache.nifi.processors.azure.eventhub; package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException; import com.microsoft.azure.eventhubs.EventData;
import java.io.OutputStream; import com.microsoft.azure.eventhubs.EventHubClient;
import java.util.ArrayList; import com.microsoft.azure.eventhubs.PartitionReceiver;
import java.util.Collections; import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import java.util.HashMap; import com.microsoft.azure.servicebus.ServiceBusException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -45,110 +36,136 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.ConnectionStringBuilder; import java.io.IOException;
import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; import java.time.Instant;
import com.microsoft.eventhubs.client.EventHubException; import java.util.ArrayList;
import com.microsoft.eventhubs.client.EventHubMessage; import java.util.Collections;
import com.microsoft.eventhubs.client.IEventHubFilter; import java.util.HashMap;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver; import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" }) @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile") @CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
@InputRequirement(Requirement.INPUT_FORBIDDEN) @InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
}) })
public class GetAzureEventHub extends AbstractProcessor { public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name") .name("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages from") .description("The name of the Azure Event Hub to pull messages from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace") .name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns") .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name") .name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key") .name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy") .description("The primary key of the Event Hub Shared Access Policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.sensitive(true) .sensitive(true)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions") .name("Number of Event Hub Partitions")
.description("The number of partitions that the Event Hub has. Only this number of partitions will be used, " .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, "
+ "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.") + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("Event Hub Consumer Group") .name("Event Hub Consumer Group")
.description("The name of the Event Hub Consumer Group to use when pulling events") .description("The name of the Event Hub Consumer Group to use when pulling events")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("$Default") .defaultValue("$Default")
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of FlowFiles to pull in a single JMS session")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("10")
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.") .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.")
.build(); .build();
private final ConcurrentMap<String, ResilientEventHubReceiver> partitionToReceiverMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>(); private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private EventHubClient eventHubClient;
@Override private final static List<PropertyDescriptor> propertyDescriptors;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { private final static Set<Relationship> relationships;
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(EVENT_HUB_NAME); /*
properties.add(NAMESPACE); * Will ensure that the list of property descriptors is build only once.
properties.add(ACCESS_POLICY); * Will also create a Set of relationships
properties.add(POLICY_PRIMARY_KEY); */
properties.add(NUM_PARTITIONS); static {
properties.add(CONSUMER_GROUP); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
return properties; _propertyDescriptors.add(EVENT_HUB_NAME);
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
_propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(_relationships);
} }
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS); return relationships;
} }
private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException { @Override
ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId); public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
protected void setupReceiver(final String connectionString) throws ProcessException {
try {
eventHubClient = EventHubClient.createFromConnectionString(connectionString).get();
} catch (InterruptedException | ExecutionException | IOException | ServiceBusException e) {
throw new ProcessException(e);
}
}
PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException {
PartitionReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) { if (existingReceiver != null) {
return existingReceiver; return existingReceiver;
} }
@ -167,39 +184,74 @@ public class GetAzureEventHub extends AbstractProcessor {
return existingReceiver; return existingReceiver;
} }
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue(); final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString(); final PartitionReceiver receiver = eventHubClient.createReceiver(
final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis()); consumerGroupName,
final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter); partitionId,
receiver.initialize(); Instant.now()).get();
partitionToReceiverMap.put(partitionId, receiver); partitionToReceiverMap.put(partitionId, receiver);
return receiver; return receiver;
} }
} }
/**
* This method is here to try and isolate the Azure related code as the PartitionReceiver cannot be mocked
* with PowerMock due to it being final. Unfortunately it extends a base class and does not implement an interface
* so even if we create a MockPartitionReciver, it will not work as the two classes are orthogonal.
*
* @param context - The processcontext for this processor
* @param partitionId - The partition ID to retrieve a receiver by.
* @return - Returns the events received from the EventBus.
* @throws ProcessException -- If any exception is encountered, receiving events it is wrapped in a ProcessException
* and then that exception is thrown.
*/
protected Iterable<EventData> receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException {
final PartitionReceiver receiver;
try {
receiver = getReceiver(context, partitionId);
return receiver.receive(100).get();
} catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) {
throw new ProcessException(e);
}
}
@OnStopped @OnStopped
public void tearDown() { public void tearDown() throws ProcessException {
for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) { for (final PartitionReceiver receiver : partitionToReceiverMap.values()) {
receiver.close(); if (null != receiver) {
receiver.close();
}
} }
partitionToReceiverMap.clear(); partitionToReceiverMap.clear();
try {
if (null != eventHubClient) {
eventHubClient.closeSync();
}
} catch (final ServiceBusException e) {
throw new ProcessException(e);
}
} }
@OnScheduled @OnScheduled
public void setupPartitions(final ProcessContext context) { public void onScheduled(final ProcessContext context) throws ProcessException {
final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>(); final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
partitionNames.add(String.valueOf(i)); partitionNames.add(String.valueOf(i));
} }
this.partitionNames = partitionNames; this.partitionNames = partitionNames;
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
setupReceiver(connectionString);
} }
@ -214,41 +266,44 @@ public class GetAzureEventHub extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
try { try {
final ResilientEventHubReceiver receiver;
try {
receiver = getReceiver(context, partitionId);
} catch (final EventHubException e) {
throw new ProcessException(e);
}
final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L)); final Iterable<EventData> receivedEvents = receiveEvents(context, partitionId);
if (message == null) { if (receivedEvents == null) {
return; return;
} }
final Map<String, String> attributes = new HashMap<>(); for (final EventData eventData : receivedEvents) {
attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp())); if (null != eventData) {
attributes.put("eventhub.offset", message.getOffset());
attributes.put("eventhub.sequence", String.valueOf(message.getSequence()));
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
attributes.put("eventhub.partition", partitionId);
FlowFile flowFile = session.create(); final Map<String, String> attributes = new HashMap<>();
flowFile = session.putAllAttributes(flowFile, attributes); FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() { EventData.SystemProperties systemProperties = eventData.getSystemProperties();
@Override
public void process(final OutputStream out) throws IOException { if (null != systemProperties) {
out.write(message.getData()); attributes.put("eventhub.enqueued.timestamp", String.valueOf(eventData.getSystemProperties().getEnqueuedTime()));
attributes.put("eventhub.offset", eventData.getSystemProperties().getOffset());
attributes.put("eventhub.sequence", String.valueOf(eventData.getSystemProperties().getSequenceNumber()));
}
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
attributes.put("eventhub.partition", partitionId);
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, out -> {
out.write(eventData.getBody());
});
session.transfer(flowFile, REL_SUCCESS);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} }
}); }
session.transfer(flowFile, REL_SUCCESS);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} finally { } finally {
partitionIds.offer(partitionId); partitionIds.offer(partitionId);
} }

View File

@ -16,18 +16,11 @@
*/ */
package org.apache.nifi.processors.azure.eventhub; package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException; import com.microsoft.azure.eventhubs.EventData;
import java.io.InputStream; import com.microsoft.azure.eventhubs.EventHubClient;
import java.util.ArrayList; import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import java.util.HashSet; import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
import java.util.List; import com.microsoft.azure.servicebus.ServiceBusException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -35,7 +28,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -43,110 +35,119 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.EventHubClient; import java.io.IOException;
import com.microsoft.eventhubs.client.EventHubException; import java.util.List;
import com.microsoft.eventhubs.client.EventHubSender; import java.util.Set;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@SupportsBatching @SupportsBatching
@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" }) @Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, " @CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.") + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
public class PutAzureEventHub extends AbstractProcessor { public class PutAzureEventHub extends AbstractProcessor {
static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub "
+ "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost.");
static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent",
"This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. "
+ "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted.");
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name") .name("Event Hub Name")
.description("The name of the Azure Event Hub to send to") .description("The name of the Azure Event Hub to send to")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace") .name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns") .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name") .name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.") .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key") .name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy") .description("The primary key of the Event Hub Shared Access Policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.sensitive(true) .sensitive(true)
.required(true) .required(true)
.build(); .build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.") .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.")
.build(); .build();
static final Relationship REL_FAILURE = new Relationship.Builder() static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.") .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.")
.build(); .build();
private volatile BlockingQueue<EventHubSender> senderQueue = new LinkedBlockingQueue<>(); private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue<>();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(EVENT_HUB_NAME);
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override @Override
public Set<Relationship> getRelationships() { public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return relationships; return relationships;
} }
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); return propertyDescriptors;
properties.add(EVENT_HUB_NAME);
properties.add(NAMESPACE);
properties.add(ACCESS_POLICY);
properties.add(POLICY_PRIMARY_KEY);
return properties;
} }
@OnScheduled @OnScheduled
public final void setupClient(final ProcessContext context) throws EventHubException { public final void setupClient(final ProcessContext context) throws ProcessException{
final String policyName = context.getProperty(ACCESS_POLICY).getValue(); final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue(); final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName);
final int numThreads = context.getMaxConcurrentTasks(); final int numThreads = context.getMaxConcurrentTasks();
senderQueue = new LinkedBlockingQueue<>(numThreads); senderQueue = new LinkedBlockingQueue<>(numThreads);
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
final EventHubSender sender = client.createPartitionSender(null); final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey);
senderQueue.offer(sender); if(null != client) {
senderQueue.offer(client);
}
} }
} }
@OnStopped @OnStopped
public void tearDown() { public void tearDown() {
EventHubSender sender; EventHubClient sender;
while ((sender = senderQueue.poll()) != null) { while ((sender = senderQueue.poll()) != null) {
sender.close(); sender.close();
} }
@ -160,20 +161,13 @@ public class PutAzureEventHub extends AbstractProcessor {
} }
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
final EventHubSender sender = senderQueue.poll();
try {
final byte[] buffer = new byte[(int) flowFile.getSize()]; final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() { session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
try { try {
sender.send(buffer); sendMessage(buffer);
} catch (final EventHubException ehe) { } catch (final ProcessException processException) {
getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe); getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
session.transfer(session.penalize(flowFile), REL_FAILURE); session.transfer(session.penalize(flowFile), REL_FAILURE);
return; return;
} }
@ -182,8 +176,35 @@ public class PutAzureEventHub extends AbstractProcessor {
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} finally {
senderQueue.offer(sender); }
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException{
try {
return EventHubClient.createFromConnectionString(getConnectionString(namespace, eventHubName, policyName, policyKey)).get();
} catch (InterruptedException | ExecutionException | IOException | ServiceBusException | IllegalConnectionStringFormatException e) {
getLogger().error("Failed to create EventHubClient due to {}", e);
throw new ProcessException(e);
} }
} }
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
return new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
}
protected void sendMessage(final byte[] buffer) throws ProcessException {
final EventHubClient sender = senderQueue.poll();
if(null != sender) {
try {
sender.sendSync(new EventData(buffer));
} catch (final ServiceBusException sbe) {
throw new ProcessException("Caught exception trying to send message to eventbus", sbe);
} finally {
senderQueue.offer(sender);
}
}else{
throw new ProcessException("No EventHubClients are configured for sending");
}
}
} }

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.servicebus.ServiceBusException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
public class GetAzureEventHubTest {
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
private static final String sasKeyName = "bogus-policy";
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
private TestRunner testRunner;
private MockGetAzureEventHub processor;
@Before
public void setUp() throws Exception {
processor = new MockGetAzureEventHub();
testRunner = TestRunners.newTestRunner(processor);
}
@Test
public void testProcessorConfigValidity() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY,sasKeyName);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.POLICY_PRIMARY_KEY,sasKey);
testRunner.assertNotValid();
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
testRunner.assertValid();
}
@Test
public void verifyRelationships(){
assert(1 == processor.getRelationships().size());
}
@Test
public void testNoPartitions(){
MockGetAzureEventHubNoPartitions mockProcessor = new MockGetAzureEventHubNoPartitions();
testRunner = TestRunners.newTestRunner(mockProcessor);
setUpStandardTestConfig();
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
@Test
public void testNullRecieve(){
setUpStandardTestConfig();
processor.nullReceive = true;
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
@Test(expected = AssertionError.class)
public void testThrowGetReceiver(){
setUpStandardTestConfig();
processor.getReceiverThrow = true;
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
@Test
public void testNormalFlow() throws Exception {
setUpStandardTestConfig();
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
testRunner.clearTransferState();
}
/**
* Provides a stubbed processor instance for testing
*/
public static class MockGetAzureEventHub extends GetAzureEventHub{
boolean nullReceive = false;
boolean getReceiverThrow = false;
@Override
protected void setupReceiver(final String connectionString) throws ProcessException{
//do nothing
}
@Override
protected PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException {
if(getReceiverThrow){
throw new IOException("Could not create receiver");
}
return null;
}
@Override
protected Iterable<EventData> receiveEvents(final ProcessContext context, final String partitionId) throws ProcessException{
if(nullReceive){
return null;
}
if(getReceiverThrow){
throw new ProcessException("Could not create receiver");
}
final LinkedList<EventData> receivedEvents = new LinkedList<>();
for(int i = 0; i < 10; i++){
final EventData eventData = new EventData(String.format("test event number: %d",i).getBytes());
Whitebox.setInternalState(eventData,"isReceivedEvent",true);
Whitebox.setInternalState(eventData, "partitionKey","0");
Whitebox.setInternalState(eventData, "offset", "100");
Whitebox.setInternalState(eventData, "sequenceNumber",13L);
Whitebox.setInternalState(eventData, "enqueuedTime",Instant.now().minus(100L, ChronoUnit.SECONDS));
receivedEvents.add(eventData);
}
return receivedEvents;
}
}
public static class MockGetAzureEventHubNoPartitions extends GetAzureEventHub{
@Override
protected void setupReceiver(final String connectionString) throws ProcessException{
//do nothing
}
@Override
public void onScheduled(final ProcessContext context) throws ProcessException {
}
}
private void setUpStandardTestConfig() {
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName);
testRunner.setProperty(GetAzureEventHub.ACCESS_POLICY,sasKeyName);
testRunner.setProperty(GetAzureEventHub.POLICY_PRIMARY_KEY,sasKey);
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
testRunner.assertValid();
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub;
import com.microsoft.azure.eventhubs.EventHubClient;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class PutAzureEventHubTest {
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
private static final String sasKeyName = "bogus-policy";
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
private TestRunner testRunner;
private PutAzureEventHubTest.MockPutAzureEventHub processor;
@Before
public void setUp() throws Exception {
processor = new PutAzureEventHubTest.MockPutAzureEventHub();
testRunner = TestRunners.newTestRunner(processor);
}
@Test
public void testProcessorConfigValidity() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY,sasKeyName);
testRunner.assertNotValid();
testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,sasKey);
testRunner.assertValid();
}
@Test
public void verifyRelationships(){
assert(2 == processor.getRelationships().size());
}
@Test
public void testNoFlow() {
setUpStandardTestConfig();
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 0);
testRunner.clearTransferState();
}
@Test
public void testNormalFlow(){
setUpStandardTestConfig();
String flowFileContents = "TEST MESSAGE";
testRunner.enqueue(flowFileContents);
testRunner.run(1, true);
assert(flowFileContents.contentEquals(new String(processor.getReceivedBuffer())));
testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 1);
testRunner.clearTransferState();
}
@Test
public void testSendMessageThrows() {
PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub throwingProcessor = new PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub();
testRunner = TestRunners.newTestRunner(throwingProcessor);
setUpStandardTestConfig();
String flowFileContents = "TEST MESSAGE";
testRunner.enqueue(flowFileContents);
testRunner.run(1, true);
testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_FAILURE);
testRunner.clearTransferState();
}
@Test(expected = AssertionError.class)
public void testBadConnectionString() {
PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub badConnectionStringProcessor = new PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub();
testRunner = TestRunners.newTestRunner(badConnectionStringProcessor);
setUpStandardTestConfig();
testRunner.run(1, true);
}
private static class MockPutAzureEventHub extends PutAzureEventHub{
byte[] receivedBuffer = null;
byte[] getReceivedBuffer(){return receivedBuffer;}
@Override
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {
return null;
}
@Override
protected void sendMessage(final byte[] buffer) throws ProcessException {
receivedBuffer = buffer;
}
}
private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{
@Override
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {
return null;
}
}
private static class BogusConnectionStringMockPutAzureEventHub extends PutAzureEventHub{
@Override
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
return "Bogus Connection String";
}
}
private void setUpStandardTestConfig() {
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
testRunner.setProperty(PutAzureEventHub.ACCESS_POLICY,sasKeyName);
testRunner.setProperty(PutAzureEventHub.POLICY_PRIMARY_KEY,sasKey);
testRunner.assertValid();
}
}

View File

@ -22,9 +22,7 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-bundle</artifactId> <artifactId>nifi-azure-bundle</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>