From 56ad22aea6db7d401a72857b1356dfba85f274b6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 11 Nov 2015 23:06:04 -0500 Subject: [PATCH] NIFI-900: Created Processors for interacting with Microsoft Azure EventHubs Reviewed (with amendments needed for clean merge, whitespace and NOTICEs) by Tony Kurc (tkurc@apache.org) --- nifi-assembly/NOTICE | 7 + nifi-assembly/pom.xml | 5 + .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 37 +++ .../src/main/resources/META-INF/NOTICE | 34 +++ .../nifi-azure-processors/pom.xml | 60 +++++ .../azure/eventhub/GetAzureEventHub.java | 254 ++++++++++++++++++ .../azure/eventhub/PutAzureEventHub.java | 186 +++++++++++++ .../org.apache.nifi.processor.Processor | 16 ++ nifi-nar-bundles/nifi-azure-bundle/pom.xml | 35 +++ nifi-nar-bundles/pom.xml | 7 +- pom.xml | 6 + 11 files changed, 644 insertions(+), 3 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-azure-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 02c9d7f661..5e180351cf 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -765,6 +765,13 @@ The following binary components are provided under the Apache Software License v is a distributed tracing system that is Apache 2.0 Licensed. Copyright 2012 Twitter, Inc. + (ASLv2) Apache Qpid AMQP 1.0 Client + The following NOTICE information applies: + Copyright 2006-2015 The Apache Software Foundation + + (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/) + + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 8e4a175c93..961349f1ff 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -242,6 +242,11 @@ language governing permissions and limitations under the License. --> nifi-hbase_1_1_2-client-service-nar nar + + org.apache.nifi + nifi-azure-nar + nar + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml new file mode 100644 index 0000000000..26d79d20c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + + + nifi-azure-nar + 0.4.0-SNAPSHOT + nar + + + + org.apache.nifi + nifi-azure-processors + 0.4.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..1d5a375977 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,34 @@ +nifi-azure-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Qpid AMQP 1.0 Client + The following NOTICE information applies: + Copyright 2006-2015 The Apache Software Foundation + + (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/) + + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml new file mode 100644 index 0000000000..c34161c820 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + + + nifi-azure-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + javax.jms + javax.jms-api + + + com.microsoft.eventhubs.client + eventhubs-client + 0.9.1 + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java new file mode 100644 index 0000000000..2c62df61bb --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; + +import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubMessage; +import com.microsoft.eventhubs.client.IEventHubFilter; +import com.microsoft.eventhubs.client.ResilientEventHubReceiver; + +@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" }) +@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile") +@WritesAttributes({ + @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"), + @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"), + @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"), + @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"), + @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled") +}) +public class GetAzureEventHub extends AbstractProcessor { + + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("Event Hub Name") + .description("The name of the Azure Event Hub to pull messages from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); + + static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder() + .name("Number of Event Hub Partitions") + .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, " + + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder() + .name("Event Hub Consumer Group") + .description("The name of the Event Hub Consumer Group to use when pulling events") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("$Default") + .required(true) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The number of FlowFiles to pull in a single JMS session") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("10") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.") + .build(); + + private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); + private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(EVENT_HUB_NAME); + properties.add(NAMESPACE); + properties.add(ACCESS_POLICY); + properties.add(POLICY_PRIMARY_KEY); + properties.add(NUM_PARTITIONS); + properties.add(CONSUMER_GROUP); + return properties; + } + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException { + ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId); + if (existingReceiver != null) { + return existingReceiver; + } + + // we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in + // having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition, + // we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no + // other thread is creating a client). If within the synchronized block, we still do not have an entry in the map, + // it is up to use to create the receiver, initialize it, and then put it into the map. + // We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the + // receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it + // to the map atomically. Hence, the synchronized block. + synchronized (this) { + existingReceiver = partitionToReceiverMap.get(partitionId); + if (existingReceiver != null) { + return existingReceiver; + } + + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue(); + + final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString(); + final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis()); + final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter); + receiver.initialize(); + + partitionToReceiverMap.put(partitionId, receiver); + return receiver; + } + } + + + @OnStopped + public void tearDown() { + for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) { + receiver.close(); + } + + partitionToReceiverMap.clear(); + } + + @OnScheduled + public void setupPartitions(final ProcessContext context) { + final BlockingQueue partitionNames = new LinkedBlockingQueue<>(); + for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) { + partitionNames.add(String.valueOf(i)); + } + this.partitionNames = partitionNames; + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final BlockingQueue partitionIds = this.partitionNames; + final String partitionId = partitionIds.poll(); + if (partitionId == null) { + getLogger().debug("No partitions available"); + return; + } + + final StopWatch stopWatch = new StopWatch(true); + try { + final ResilientEventHubReceiver receiver; + try { + receiver = getReceiver(context, partitionId); + } catch (final EventHubException e) { + throw new ProcessException(e); + } + + final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L)); + if (message == null) { + return; + } + + final Map attributes = new HashMap<>(); + attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp())); + attributes.put("eventhub.offset", message.getOffset()); + attributes.put("eventhub.sequence", String.valueOf(message.getSequence())); + attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue()); + attributes.put("eventhub.partition", partitionId); + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(message.getData()); + } + }); + + session.transfer(flowFile, REL_SUCCESS); + + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue(); + final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId; + session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } finally { + partitionIds.offer(partitionId); + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java new file mode 100644 index 0000000000..82fff2752f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.eventhubs.client.EventHubException; +import com.microsoft.eventhubs.client.EventHubSender; + + +@SupportsBatching +@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" }) +@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, " + + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.") +public class PutAzureEventHub extends AbstractProcessor { + + static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub " + + "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost."); + + static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent", + "This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. " + + "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted."); + + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() + .name("Event Hub Name") + .description("The name of the Azure Event Hub to send to") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() + .name("Event Hub Namespace") + .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Name") + .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder() + .name("Shared Access Policy Primary Key") + .description("The primary key of the Event Hub Shared Access Policy") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .sensitive(true) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.") + .build(); + + private volatile BlockingQueue senderQueue = new LinkedBlockingQueue<>(); + + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(EVENT_HUB_NAME); + properties.add(NAMESPACE); + properties.add(ACCESS_POLICY); + properties.add(POLICY_PRIMARY_KEY); + return properties; + } + + + @OnScheduled + public final void setupClient(final ProcessContext context) throws EventHubException { + final String policyName = context.getProperty(ACCESS_POLICY).getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + + final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName); + + final int numThreads = context.getMaxConcurrentTasks(); + senderQueue = new LinkedBlockingQueue<>(numThreads); + for (int i = 0; i < numThreads; i++) { + final EventHubSender sender = client.createPartitionSender(null); + senderQueue.offer(sender); + } + } + + @OnStopped + public void tearDown() { + EventHubSender sender; + while ((sender = senderQueue.poll()) != null) { + sender.close(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final StopWatch stopWatch = new StopWatch(true); + final EventHubSender sender = senderQueue.poll(); + try { + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + try { + sender.send(buffer); + } catch (final EventHubException ehe) { + getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } finally { + senderQueue.offer(sender); + } + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..178e52c984 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.azure.eventhub.PutAzureEventHub +org.apache.nifi.processors.azure.eventhub.GetAzureEventHub \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml new file mode 100644 index 0000000000..3978748773 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 0.4.0-SNAPSHOT + + + org.apache.nifi + nifi-azure-bundle + 0.4.0-SNAPSHOT + pom + + + nifi-azure-processors + nifi-azure-nar + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 510935dca8..4c0925f957 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -42,12 +42,13 @@ nifi-language-translation-bundle nifi-mongodb-bundle nifi-flume-bundle - nifi-hbase-bundle + nifi-hbase-bundle nifi-ambari-bundle nifi-image-bundle nifi-avro-bundle nifi-couchbase-bundle - + nifi-azure-bundle + @@ -132,4 +133,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index cf6f018cda..8a8cdb09a7 100644 --- a/pom.xml +++ b/pom.xml @@ -931,6 +931,12 @@ 0.4.0-SNAPSHOT nar + + org.apache.nifi + nifi-azure-nar + 0.4.0-SNAPSHOT + nar + org.apache.nifi nifi-properties