mirror of https://github.com/apache/nifi.git
NIFI-900: Created Processors for interacting with Microsoft Azure EventHubs
Reviewed (with amendments needed for clean merge, whitespace and NOTICEs) by Tony Kurc (tkurc@apache.org)
This commit is contained in:
@ -765,6 +765,13 @@ The following binary components are provided under the Apache Software License v
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Apache Qpid AMQP 1.0 Client
The following NOTICE information applies:
Copyright 2006-2015 The Apache Software Foundation
(ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
Common Development and Distribution License 1.1
@ -242,6 +242,11 @@ language governing permissions and limitations under the License. -->
@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -0,0 +1,34 @@
Copyright 2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Apache Software License v2
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Qpid AMQP 1.0 Client
The following NOTICE information applies:
Copyright 2006-2015 The Apache Software Foundation
(ASLv2) EventHubs Client (com.microsoft.eventhubs.client:eventhubs-client:0.9.1 - https://github.com/hdinsight/eventhubs-client/)
Common Development and Distribution License 1.1
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) Javax JMS Api (javax.jms:javax.jms-api:jar:2.0.1 - http://java.net/projects/jms-spec/pages/Home)
@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -0,0 +1,254 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.ConnectionStringBuilder;
import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubMessage;
import com.microsoft.eventhubs.client.IEventHubFilter;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver;
@Tags({ "azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams" })
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages from")
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions")
.description("The number of partitions that the Event Hub has. Only this number of partitions will be used, "
+ "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.")
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("Event Hub Consumer Group")
.description("The name of the Event Hub Consumer Group to use when pulling events")
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The number of FlowFiles to pull in a single JMS session")
static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.")
private final ConcurrentMap<String, ResilientEventHubReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
return properties;
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
private ResilientEventHubReceiver getReceiver(final ProcessContext context, final String partitionId) throws EventHubException {
ResilientEventHubReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) {
return existingReceiver;
// we want to avoid allowing multiple threads to create Receivers simultaneously because that could result in
// having multiple Receivers for the same partition. So if the map does not contain a receiver for this partition,
// we will enter a synchronized block and check again (because once we enter the synchronized block, we know that no
// other thread is creating a client). If within the synchronized block, we still do not have an entry in the map,
// it is up to use to create the receiver, initialize it, and then put it into the map.
// We do not use the putIfAbsent method in order to do a CAS operation here because we want to also initialize the
// receiver if and only if it is not present in the map. As a result, we need to initialize the receiver and add it
// to the map atomically. Hence, the synchronized block.
synchronized (this) {
existingReceiver = partitionToReceiverMap.get(partitionId);
if (existingReceiver != null) {
return existingReceiver;
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroupName = context.getProperty(CONSUMER_GROUP).getValue();
final String connectionString = new ConnectionStringBuilder(policyName, policyKey, namespace).getConnectionString();
final IEventHubFilter filter = new EventHubEnqueueTimeFilter(System.currentTimeMillis());
final ResilientEventHubReceiver receiver = new ResilientEventHubReceiver(connectionString, eventHubName, partitionId, consumerGroupName, -1, filter);
partitionToReceiverMap.put(partitionId, receiver);
return receiver;
public void tearDown() {
for (final ResilientEventHubReceiver receiver : partitionToReceiverMap.values()) {
public void setupPartitions(final ProcessContext context) {
final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
for (int i = 0; i < context.getProperty(NUM_PARTITIONS).asInteger(); i++) {
this.partitionNames = partitionNames;
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final BlockingQueue<String> partitionIds = this.partitionNames;
final String partitionId = partitionIds.poll();
if (partitionId == null) {
getLogger().debug("No partitions available");
final StopWatch stopWatch = new StopWatch(true);
try {
final ResilientEventHubReceiver receiver;
try {
receiver = getReceiver(context, partitionId);
} catch (final EventHubException e) {
throw new ProcessException(e);
final EventHubMessage message = EventHubMessage.parseAmqpMessage(receiver.receive(100L));
if (message == null) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("eventhub.enqueued.timestamp", String.valueOf(message.getEnqueuedTimestamp()));
attributes.put("eventhub.offset", message.getOffset());
attributes.put("eventhub.sequence", String.valueOf(message.getSequence()));
attributes.put("eventhub.name", context.getProperty(EVENT_HUB_NAME).getValue());
attributes.put("eventhub.partition", partitionId);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
flowFile = session.write(flowFile, new OutputStreamCallback() {
public void process(final OutputStream out) throws IOException {
session.transfer(flowFile, REL_SUCCESS);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).getValue();
final String transitUri = "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} finally {
@ -0,0 +1,186 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.azure.eventhub;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import com.microsoft.eventhubs.client.EventHubClient;
import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubSender;
@Tags({ "microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming" })
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
public class PutAzureEventHub extends AbstractProcessor {
static final AllowableValue DELIVERY_MODE_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.PERSISTENT), "Persistent", "This mode indicates that the Event Hub "
+ "server must persist the message to a reliable storage mechanism before the FlowFile is routed to 'success', in order to ensure that the data is not lost.");
static final AllowableValue DELIVERY_MODE_NON_PERSISTENT = new AllowableValue(String.valueOf(DeliveryMode.NON_PERSISTENT), "Non-Persistent",
"This mode indicates that the Event Hub server does not have to persist the message to a reliable storage mechanism before the FlowFile is routed to 'success'. "
+ "This delivery mode may offer higher throughput but may result in message loss if the server crashes or is restarted.");
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to send to")
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.")
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.")
static final Relationship REL_FAILURE = new Relationship.Builder()
.description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.")
private volatile BlockingQueue<EventHubSender> senderQueue = new LinkedBlockingQueue<>();
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
return relationships;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
return properties;
public final void setupClient(final ProcessContext context) throws EventHubException {
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final EventHubClient client = EventHubClient.create(policyName, policyKey, namespace, eventHubName);
final int numThreads = context.getMaxConcurrentTasks();
senderQueue = new LinkedBlockingQueue<>(numThreads);
for (int i = 0; i < numThreads; i++) {
final EventHubSender sender = client.createPartitionSender(null);
public void tearDown() {
EventHubSender sender;
while ((sender = senderQueue.poll()) != null) {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
final StopWatch stopWatch = new StopWatch(true);
final EventHubSender sender = senderQueue.poll();
try {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
try {
} catch (final EventHubException ehe) {
getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[] { flowFile, ehe }, ehe);
session.transfer(session.penalize(flowFile), REL_FAILURE);
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} finally {
@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -42,12 +42,13 @@
@ -931,6 +931,12 @@
Reference in New Issue