diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 02c9d7f661..5e180351cf 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -765,6 +765,13 @@ The following binary components are provided under the Apache Software License v
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
+ (ASLv2) Apache Qpid AMQP 1.0 Client
+ The following NOTICE information applies:
+ Copyright 2006-2015 The Apache Software Foundation
+
+ (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
+
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 8e4a175c93..961349f1ff 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -242,6 +242,11 @@ language governing permissions and limitations under the License. -->
nifi-hbase_1_1_2-client-service-nar
nar
+
+ org.apache.nifi
+ nifi-azure-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
new file mode 100644
index 0000000000..26d79d20c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-azure-bundle
+ 0.4.0-SNAPSHOT
+
+
+ nifi-azure-nar
+ 0.4.0-SNAPSHOT
+ nar
+
+
+
+ org.apache.nifi
+ nifi-azure-processors
+ 0.4.0-SNAPSHOT
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..1d5a375977
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,34 @@
+nifi-azure-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License v2
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2014 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+ (ASLv2) Apache Qpid AMQP 1.0 Client
+ The following NOTICE information applies:
+ Copyright 2006-2015 The Apache Software Foundation
+
+ (ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
+
+
+************************
+Common Development and Distribution License 1.1
+************************
+
+The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
+
+ (CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
new file mode 100644
index 0000000000..c34161c820
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-azure-bundle
+ 0.4.0-SNAPSHOT
+
+
+ nifi-azure-processors
+ jar
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ javax.jms
+ javax.jms-api
+
+
+ com.microsoft.eventhubs.client
+ eventhubs-client
+ 0.9.1
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ test
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
new file mode 100644
index 0000000000..2c62df61bb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+import com.microsoft.eventhubs.client.ConnectionStringBuilder;
+import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubMessage;
+import com.microsoft.eventhubs.client.IEventHubFilter;
+import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
+
+@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" })
+@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
+@WritesAttributes({
+ @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
+ @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
+ @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
+ @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
+ @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
+})
+public class GetAzureEventHub extends AbstractProcessor {
+
+ static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+ .name("Event Hub Name")
+ .description("The name of the Azure Event Hub to pull messages from")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+ static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+ .name("Event Hub Namespace")
+ .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+ static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Name")
+ .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+ static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Primary Key")
+ .description("The primary key of the Event Hub Shared Access Policy")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .sensitive(true)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
+ .name("Number of Event Hub Partitions")
+ .description("The number of partitions that the Event Hub has. Only this number of partitions will be used, "
+ + "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+ static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
+ .name("Event Hub Consumer Group")
+ .description("The name of the Event Hub Consumer Group to use when pulling events")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("$Default")
+ .required(true)
+ .build();
+ static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The number of FlowFiles to pull in a single JMS session")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("10")
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.")
+ .build();
+
+ private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>();
+ private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(EVENT_HUB_NAME);
+ properties.add(NAMESPACE);
+ properties.add(ACCESS_POLICY);
+ properties.add(POLICY_PRIMARY_KEY);
+ properties.add(NUM_PARTITIONS);
+ properties.add(CONSUMER_GROUP);
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException {
+ ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
+ if (existingReceiver != null) {
+ return existingReceiver;
+ }
+
+ // we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in
+ // having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition,
+ // we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no
+ // other thread is creating a client). If within the synchronized block, we still do not have an entry in the map,
+ // it is up to use to create the receiver, initialize it, and then put it into the map.
+ // We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the
+ // receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it
+ // to the map atomically. Hence, the synchronized block.
+ synchronized (this) {
+ existingReceiver = partitionToReceiverMap.get(partitionId);
+ if (existingReceiver != null) {
+ return existingReceiver;
+ }
+
+ final String policyName = context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
+ final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
+
+ final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString();
+ final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis());
+ final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter);
+ receiver.initialize();
+
+ partitionToReceiverMap.put(partitionId, receiver);
+ return receiver;
+ }
+ }
+
+
+ @OnStopped
+ public void tearDown() {
+ for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) {
+ receiver.close();
+ }
+
+ partitionToReceiverMap.clear();
+ }
+
+ @OnScheduled
+ public void setupPartitions(final ProcessContext context) {
+ final BlockingQueue partitionNames = new LinkedBlockingQueue<>();
+ for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
+ partitionNames.add(String.valueOf(i));
+ }
+ this.partitionNames = partitionNames;
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final BlockingQueue partitionIds = this.partitionNames;
+ final String partitionId = partitionIds.poll();
+ if (partitionId == null) {
+ getLogger().debug("No partitions available");
+ return;
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ try {
+ final ResilientEventHubReceiver receiver;
+ try {
+ receiver = getReceiver(context, partitionId);
+ } catch (final EventHubException e) {
+ throw new ProcessException(e);
+ }
+
+ final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L));
+ if (message == null) {
+ return;
+ }
+
+ final Map attributes = new HashMap<>();
+ attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp()));
+ attributes.put("eventhub.offset", message.getOffset());
+ attributes.put("eventhub.sequence", String.valueOf(message.getSequence()));
+ attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
+ attributes.put("eventhub.partition", partitionId);
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(message.getData());
+ }
+ });
+
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
+ final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
+ final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
+ session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ } finally {
+ partitionIds.offer(partitionId);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
new file mode 100644
index 0000000000..82fff2752f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
+
+import com.microsoft.eventhubs.client.EventHubClient;
+import com.microsoft.eventhubs.client.EventHubException;
+import com.microsoft.eventhubs.client.EventHubSender;
+
+
+@SupportsBatching
+@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" })
+@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
+ + "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
+public class PutAzureEventHub extends AbstractProcessor {
+
+ static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub "
+ + "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost.");
+
+ static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent",
+ "This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. "
+ + "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted.");
+
+ static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+ .name("Event Hub Name")
+ .description("The name of the Azure Event Hub to send to")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+ static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+ .name("Event Hub Namespace")
+ .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to -ns")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+ static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Name")
+ .description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+ static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Primary Key")
+ .description("The primary key of the Event Hub Shared Access Policy")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .sensitive(true)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.")
+ .build();
+
+ private volatile BlockingQueue senderQueue = new LinkedBlockingQueue<>();
+
+
+ @Override
+ public Set getRelationships() {
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(EVENT_HUB_NAME);
+ properties.add(NAMESPACE);
+ properties.add(ACCESS_POLICY);
+ properties.add(POLICY_PRIMARY_KEY);
+ return properties;
+ }
+
+
+ @OnScheduled
+ public final void setupClient(final ProcessContext context) throws EventHubException {
+ final String policyName = context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
+
+ final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName);
+
+ final int numThreads = context.getMaxConcurrentTasks();
+ senderQueue = new LinkedBlockingQueue<>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ final EventHubSender sender = client.createPartitionSender(null);
+ senderQueue.offer(sender);
+ }
+ }
+
+ @OnStopped
+ public void tearDown() {
+ EventHubSender sender;
+ while ((sender = senderQueue.poll()) != null) {
+ sender.close();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final EventHubSender sender = senderQueue.poll();
+ try {
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ try {
+ sender.send(buffer);
+ } catch (final EventHubException ehe) {
+ getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe);
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
+ }
+
+ final String namespace = context.getProperty(NAMESPACE).getValue();
+ final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
+ session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowFile, REL_SUCCESS);
+ } finally {
+ senderQueue.offer(sender);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..178e52c984
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
+org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
new file mode 100644
index 0000000000..3978748773
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ nifi-nar-bundles
+ 0.4.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-azure-bundle
+ 0.4.0-SNAPSHOT
+ pom
+
+
+ nifi-azure-processors
+ nifi-azure-nar
+
+
+
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 510935dca8..4c0925f957 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -42,12 +42,13 @@
nifi-language-translation-bundle
nifi-mongodb-bundle
nifi-flume-bundle
- nifi-hbase-bundle
+ nifi-hbase-bundle
nifi-ambari-bundle
nifi-image-bundle
nifi-avro-bundle
nifi-couchbase-bundle
-
+ nifi-azure-bundle
+
@@ -132,4 +133,4 @@
-
\ No newline at end of file
+
diff --git a/pom.xml b/pom.xml
index cf6f018cda..8a8cdb09a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -931,6 +931,12 @@
0.4.0-SNAPSHOT
nar
+
+ org.apache.nifi
+ nifi-azure-nar
+ 0.4.0-SNAPSHOT
+ nar
+
org.apache.nifi
nifi-properties