mirror of https://github.com/apache/nifi.git
NIFI-6597: Upgrade Azure Event Hub version and code.
- Lazy instantiation for PutAzureEventHub. Also add explaination for thread pool needed for EventHubClient.
This commit is contained in:
parent
ccf85777c4
commit
5c69faf9bb
|
@ -20,7 +20,8 @@
|
|||
<artifactId>nifi-azure-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<properties>
|
||||
<azure-eventhubs.version>0.14.4</azure-eventhubs.version>
|
||||
<azure-eventhubs.version>2.3.2</azure-eventhubs.version>
|
||||
<azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
@ -57,7 +58,7 @@
|
|||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
<artifactId>azure-eventhubs-eph</artifactId>
|
||||
<version>${azure-eventhubs.version}</version>
|
||||
<version>${azure-eventhubs-eph.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.microsoft.azure</groupId>
|
||||
|
|
|
@ -23,8 +23,8 @@ import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
|
|||
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
|
||||
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
|
||||
import com.microsoft.azure.eventprocessorhost.PartitionContext;
|
||||
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
|
||||
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
|
@ -243,7 +243,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFiles received from Event Hub.")
|
||||
|
@ -531,7 +530,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
" consumerGroupName={}, exception={}",
|
||||
new Object[]{context.getEventHubPath(), context.getPartitionId(), context.getConsumerGroupName(), e}, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -606,9 +604,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
final EventProcessorOptions options = new EventProcessorOptions();
|
||||
final String initialOffset = context.getProperty(INITIAL_OFFSET).getValue();
|
||||
if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
|
||||
options.setInitialOffsetProvider(options.new StartOfStreamInitialOffsetProvider());
|
||||
options.setInitialPositionProvider(options.new StartOfStreamInitialPositionProvider());
|
||||
} else if (INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)){
|
||||
options.setInitialOffsetProvider(options.new EndOfStreamInitialOffsetProvider());
|
||||
options.setInitialPositionProvider(options.new EndOfStreamInitialPositionProvider());
|
||||
} else {
|
||||
throw new IllegalArgumentException("Initial offset " + initialOffset + " is not allowed.");
|
||||
}
|
||||
|
@ -629,7 +627,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
|
||||
final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
|
||||
|
||||
final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
|
||||
final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
|
||||
|
||||
eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
|
||||
|
||||
|
@ -639,7 +637,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
new Object[]{eventHubName, consumerGroupName, e.getPartitionId(), e.getAction(), e.getHostname()}, e.getException());
|
||||
});
|
||||
|
||||
|
||||
eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get();
|
||||
}
|
||||
|
||||
|
@ -652,5 +649,4 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
|
|||
throw new IllegalArgumentException(String.format("'%s' is required, but not specified.", property.getDisplayName()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,11 +16,35 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.eventhub;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
import com.microsoft.azure.eventhubs.EventHubException;
|
||||
import com.microsoft.azure.eventhubs.EventPosition;
|
||||
import com.microsoft.azure.eventhubs.PartitionReceiver;
|
||||
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.servicebus.ServiceBusException;
|
||||
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
|
@ -40,27 +64,9 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
|
||||
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile")
|
||||
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile. "
|
||||
+ "Note: Please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
|
||||
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
|
||||
|
@ -70,7 +76,6 @@ import java.util.concurrent.TimeUnit;
|
|||
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
|
||||
})
|
||||
public class GetAzureEventHub extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Event Hub Name")
|
||||
.description("The name of the Azure Event Hub to pull messages from")
|
||||
|
@ -128,9 +133,9 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
|
||||
static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder()
|
||||
.name("Event Hub Message Enqueue Time")
|
||||
.description("A timestamp (ISO-8061 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
|
||||
.description("A timestamp (ISO-8601 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
|
||||
+ "should have been enqueued in the EventHub to start reading from")
|
||||
.addValidator(StandardValidators.ISO8061_INSTANT_VALIDATOR)
|
||||
.addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.build();
|
||||
|
@ -198,16 +203,16 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
|
||||
protected void setupReceiver(final String connectionString) throws ProcessException {
|
||||
protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
|
||||
try {
|
||||
eventHubClient = EventHubClient.createFromConnectionString(connectionString).get();
|
||||
} catch (InterruptedException | ExecutionException | IOException | ServiceBusException e) {
|
||||
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
|
||||
eventHubClient = EventHubClient.createSync(connectionString, executor);
|
||||
} catch (IOException | EventHubException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException {
|
||||
PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, EventHubException, ExecutionException, InterruptedException {
|
||||
PartitionReceiver existingReceiver = partitionToReceiverMap.get(partitionId);
|
||||
if (existingReceiver != null) {
|
||||
return existingReceiver;
|
||||
|
@ -232,7 +237,8 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
final PartitionReceiver receiver = eventHubClient.createReceiver(
|
||||
consumerGroupName,
|
||||
partitionId,
|
||||
configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get();
|
||||
EventPosition.fromEnqueuedTime(
|
||||
configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime)).get();
|
||||
|
||||
receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
|
||||
partitionToReceiverMap.put(partitionId, receiver);
|
||||
|
@ -257,7 +263,7 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
try {
|
||||
receiver = getReceiver(context, partitionId);
|
||||
return receiver.receive(receiverFetchSize).get();
|
||||
} catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) {
|
||||
} catch (final EventHubException | IOException | ExecutionException | InterruptedException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
@ -275,11 +281,14 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
if (null != eventHubClient) {
|
||||
eventHubClient.closeSync();
|
||||
}
|
||||
} catch (final ServiceBusException e) {
|
||||
executor.shutdown();
|
||||
} catch (final EventHubException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) throws ProcessException, URISyntaxException {
|
||||
final BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
|
||||
|
@ -294,8 +303,6 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
|
||||
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
|
||||
|
||||
|
||||
|
||||
if(context.getProperty(ENQUEUE_TIME).isSet()) {
|
||||
configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
|
||||
} else {
|
||||
|
@ -312,11 +319,12 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
receiverFetchTimeout = null;
|
||||
}
|
||||
|
||||
final String connectionString = new ConnectionStringBuilder(new URI("amqps://"+namespace+serviceBusEndpoint), eventHubName, policyName, policyKey).toString();
|
||||
setupReceiver(connectionString);
|
||||
executor = Executors.newScheduledThreadPool(4);
|
||||
final String connectionString = new ConnectionStringBuilder().setEndpoint(
|
||||
new URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
|
||||
setupReceiver(connectionString, executor);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final BlockingQueue<String> partitionIds = this.partitionNames;
|
||||
|
@ -370,5 +378,4 @@ public class GetAzureEventHub extends AbstractProcessor {
|
|||
partitionIds.offer(partitionId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,16 +16,30 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.eventhub;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
|
||||
import com.microsoft.azure.servicebus.IllegalConnectionStringFormatException;
|
||||
import com.microsoft.azure.servicebus.ServiceBusException;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import com.microsoft.azure.eventhubs.EventHubException;
|
||||
import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
|
||||
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -42,23 +56,12 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
@SupportsBatching
|
||||
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
|
||||
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available.")
|
||||
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available. "
|
||||
+ "Also please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
|
||||
@SystemResourceConsideration(resource = SystemResource.MEMORY)
|
||||
public class PutAzureEventHub extends AbstractProcessor {
|
||||
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
|
||||
|
@ -131,22 +134,10 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
@OnScheduled
|
||||
public final void setupClient(final ProcessContext context) throws ProcessException{
|
||||
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
|
||||
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
|
||||
final String namespace = context.getProperty(NAMESPACE).getValue();
|
||||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
|
||||
|
||||
|
||||
final int numThreads = context.getMaxConcurrentTasks();
|
||||
senderQueue = new LinkedBlockingQueue<>(numThreads);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey);
|
||||
if(null != client) {
|
||||
senderQueue.offer(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
|
@ -159,6 +150,22 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
if(senderQueue.size() == 0){
|
||||
final int numThreads = context.getMaxConcurrentTasks();
|
||||
senderQueue = new LinkedBlockingQueue<>(numThreads);
|
||||
executor = Executors.newScheduledThreadPool(4);
|
||||
final String policyName = context.getProperty(ACCESS_POLICY).getValue();
|
||||
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
|
||||
final String namespace = context.getProperty(NAMESPACE).getValue();
|
||||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
final EventHubClient client = createEventHubClient(namespace, eventHubName, policyName, policyKey, executor);
|
||||
if(null != client) {
|
||||
senderQueue.offer(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
|
@ -183,25 +190,32 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
|
||||
}
|
||||
|
||||
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException{
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor)
|
||||
throws ProcessException{
|
||||
|
||||
try {
|
||||
return EventHubClient.createFromConnectionString(getConnectionString(namespace, eventHubName, policyName, policyKey)).get();
|
||||
} catch (InterruptedException | ExecutionException | IOException | ServiceBusException | IllegalConnectionStringFormatException e) {
|
||||
EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
|
||||
return EventHubClient.createSync(getConnectionString(namespace, eventHubName, policyName, policyKey), executor);
|
||||
} catch (IOException | EventHubException | IllegalConnectionStringFormatException e) {
|
||||
getLogger().error("Failed to create EventHubClient due to {}", e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
|
||||
return new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
|
||||
return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
|
||||
}
|
||||
protected void sendMessage(final byte[] buffer) throws ProcessException {
|
||||
|
||||
final EventHubClient sender = senderQueue.poll();
|
||||
if(null != sender) {
|
||||
try {
|
||||
sender.sendSync(new EventData(buffer));
|
||||
} catch (final ServiceBusException sbe) {
|
||||
sender.sendSync(EventData.create(buffer));
|
||||
} catch (final EventHubException sbe) {
|
||||
throw new ProcessException("Caught exception trying to send message to eventbus", sbe);
|
||||
} finally {
|
||||
senderQueue.offer(sender);
|
||||
|
@ -209,6 +223,5 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
}else{
|
||||
throw new ProcessException("No EventHubClients are configured for sending");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,10 @@ package org.apache.nifi.processors.azure.eventhub;
|
|||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventData.SystemProperties;
|
||||
import com.microsoft.azure.eventhubs.PartitionReceiver;
|
||||
import com.microsoft.azure.servicebus.ServiceBusException;
|
||||
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
|
||||
import com.microsoft.azure.eventhubs.EventHubException;
|
||||
import com.microsoft.azure.eventhubs.impl.AmqpConstants;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
|
@ -31,8 +30,8 @@ import java.util.Date;
|
|||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.commons.lang3.reflect.FieldUtils;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
|
@ -41,9 +40,7 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class GetAzureEventHubTest {
|
||||
|
||||
private static final String namespaceName = "nifi-azure-hub";
|
||||
private static final String eventHubName = "get-test";
|
||||
private static final String sasKeyName = "bogus-policy";
|
||||
|
@ -158,11 +155,11 @@ public class GetAzureEventHubTest {
|
|||
boolean received = true;
|
||||
|
||||
@Override
|
||||
protected void setupReceiver(final String connectionString) throws ProcessException{
|
||||
protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException{
|
||||
//do nothing
|
||||
}
|
||||
@Override
|
||||
protected PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, ServiceBusException, ExecutionException, InterruptedException {
|
||||
protected PartitionReceiver getReceiver(final ProcessContext context, final String partitionId) throws IOException, EventHubException, ExecutionException, InterruptedException {
|
||||
if(getReceiverThrow){
|
||||
throw new IOException("Could not create receiver");
|
||||
}
|
||||
|
@ -179,7 +176,7 @@ public class GetAzureEventHubTest {
|
|||
}
|
||||
final LinkedList<EventData> receivedEvents = new LinkedList<>();
|
||||
for(int i = 0; i < 10; i++){
|
||||
EventData eventData = new EventData(String.format("test event number: %d", i).getBytes());
|
||||
EventData eventData = EventData.create(String.format("test event number: %d", i).getBytes());
|
||||
if (received) {
|
||||
HashMap<String, Object> properties = new HashMap<>();
|
||||
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
|
||||
|
@ -188,26 +185,18 @@ public class GetAzureEventHubTest {
|
|||
properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, ENQUEUED_TIME_VALUE);
|
||||
|
||||
SystemProperties systemProperties = new SystemProperties(properties);
|
||||
Field systemPropertiesField = FieldUtils.getDeclaredField(EventData.class, "systemProperties", true);
|
||||
try {
|
||||
systemPropertiesField.set(eventData, systemProperties);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new ProcessException("Could not set systemProperties on EventData", e);
|
||||
}
|
||||
eventData.setSystemProperties(systemProperties);
|
||||
}
|
||||
receivedEvents.add(eventData);
|
||||
}
|
||||
|
||||
return receivedEvents;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockGetAzureEventHubNoPartitions extends GetAzureEventHub{
|
||||
|
||||
|
||||
@Override
|
||||
protected void setupReceiver(final String connectionString) throws ProcessException{
|
||||
protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException{
|
||||
//do nothing
|
||||
}
|
||||
|
||||
|
@ -215,10 +204,10 @@ public class GetAzureEventHubTest {
|
|||
public void onScheduled(final ProcessContext context) throws ProcessException {
|
||||
|
||||
}
|
||||
@Override
|
||||
public void tearDown() throws ProcessException {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private void setUpStandardTestConfig() {
|
||||
testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME,eventHubName);
|
||||
testRunner.setProperty(GetAzureEventHub.NAMESPACE,namespaceName);
|
||||
|
@ -227,5 +216,4 @@ public class GetAzureEventHubTest {
|
|||
testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
|
||||
testRunner.assertValid();
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,8 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public class PutAzureEventHubTest {
|
||||
private static final String namespaceName = "nifi-azure-hub";
|
||||
private static final String eventHubName = "get-test";
|
||||
|
@ -51,13 +53,10 @@ public class PutAzureEventHubTest {
|
|||
}
|
||||
@Test
|
||||
public void verifyRelationships(){
|
||||
|
||||
assert(2 == processor.getRelationships().size());
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testNoFlow() {
|
||||
|
||||
setUpStandardTestConfig();
|
||||
testRunner.run(1, true);
|
||||
testRunner.assertAllFlowFilesTransferred(PutAzureEventHub.REL_SUCCESS, 0);
|
||||
|
@ -65,7 +64,6 @@ public class PutAzureEventHubTest {
|
|||
}
|
||||
@Test
|
||||
public void testNormalFlow(){
|
||||
|
||||
setUpStandardTestConfig();
|
||||
String flowFileContents = "TEST MESSAGE";
|
||||
testRunner.enqueue(flowFileContents);
|
||||
|
@ -76,7 +74,6 @@ public class PutAzureEventHubTest {
|
|||
}
|
||||
@Test
|
||||
public void testSendMessageThrows() {
|
||||
|
||||
PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub throwingProcessor = new PutAzureEventHubTest.OnSendThrowingMockPutAzureEventHub();
|
||||
testRunner = TestRunners.newTestRunner(throwingProcessor);
|
||||
setUpStandardTestConfig();
|
||||
|
@ -89,7 +86,6 @@ public class PutAzureEventHubTest {
|
|||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testBadConnectionString() {
|
||||
|
||||
PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub badConnectionStringProcessor = new PutAzureEventHubTest.BogusConnectionStringMockPutAzureEventHub();
|
||||
testRunner = TestRunners.newTestRunner(badConnectionStringProcessor);
|
||||
setUpStandardTestConfig();
|
||||
|
@ -97,14 +93,18 @@ public class PutAzureEventHubTest {
|
|||
}
|
||||
|
||||
private static class MockPutAzureEventHub extends PutAzureEventHub{
|
||||
|
||||
byte[] receivedBuffer = null;
|
||||
byte[] getReceivedBuffer(){
|
||||
return receivedBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -115,12 +115,16 @@ public class PutAzureEventHubTest {
|
|||
}
|
||||
private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
private static class BogusConnectionStringMockPutAzureEventHub extends PutAzureEventHub{
|
||||
|
||||
@Override
|
||||
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
|
||||
return "Bogus Connection String";
|
||||
|
|
|
@ -66,7 +66,6 @@ import static org.mockito.Mockito.mock;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestConsumeAzureEventHub {
|
||||
|
||||
private ConsumeAzureEventHub.EventProcessor eventProcessor;
|
||||
private MockProcessSession processSession;
|
||||
private SharedSessionState sharedState;
|
||||
|
@ -101,8 +100,7 @@ public class TestConsumeAzureEventHub {
|
|||
|
||||
@Test
|
||||
public void testReceiveOne() throws Exception {
|
||||
|
||||
final Iterable<EventData> eventDataList = Arrays.asList(new EventData("one".getBytes(StandardCharsets.UTF_8)));
|
||||
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
|
||||
eventProcessor.onEvents(partitionContext, eventDataList);
|
||||
|
||||
processSession.assertCommitted();
|
||||
|
@ -121,13 +119,11 @@ public class TestConsumeAzureEventHub {
|
|||
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReceiveTwo() throws Exception {
|
||||
|
||||
final Iterable<EventData> eventDataList = Arrays.asList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("two".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("two".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
eventProcessor.onEvents(partitionContext, eventDataList);
|
||||
|
||||
|
@ -145,10 +141,9 @@ public class TestConsumeAzureEventHub {
|
|||
|
||||
@Test
|
||||
public void testCheckpointFailure() throws Exception {
|
||||
|
||||
final Iterable<EventData> eventDataList = Arrays.asList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("two".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("two".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
doThrow(new RuntimeException("Failed to create a checkpoint.")).when(partitionContext).checkpoint();
|
||||
eventProcessor.onEvents(partitionContext, eventDataList);
|
||||
|
@ -239,15 +234,13 @@ public class TestConsumeAzureEventHub {
|
|||
.thenThrow(new MalformedRecordException("Simulating Record parse failure."))
|
||||
.thenReturn(records2[0], Arrays.copyOfRange(records2, 1, records2.length));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReceiveRecords() throws Exception {
|
||||
|
||||
final List<EventData> eventDataList = Arrays.asList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("two".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("two".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
|
||||
setupRecordReader(eventDataList);
|
||||
|
@ -274,12 +267,11 @@ public class TestConsumeAzureEventHub {
|
|||
|
||||
@Test
|
||||
public void testReceiveRecordReaderFailure() throws Exception {
|
||||
|
||||
final List<EventData> eventDataList = Arrays.asList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("two".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("three".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("four".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("two".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("three".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("four".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
|
||||
setupRecordReader(eventDataList, 2, null);
|
||||
|
@ -319,9 +311,8 @@ public class TestConsumeAzureEventHub {
|
|||
|
||||
@Test
|
||||
public void testReceiveAllRecordFailure() throws Exception {
|
||||
|
||||
final List<EventData> eventDataList = Collections.singletonList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
|
||||
setupRecordReader(eventDataList, 0, null);
|
||||
|
@ -348,17 +339,15 @@ public class TestConsumeAzureEventHub {
|
|||
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
|
||||
assertEquals("amqps://namespace.servicebus.windows.net/" +
|
||||
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReceiveRecordWriterFailure() throws Exception {
|
||||
|
||||
final List<EventData> eventDataList = Arrays.asList(
|
||||
new EventData("one".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("two".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("three".getBytes(StandardCharsets.UTF_8)),
|
||||
new EventData("four".getBytes(StandardCharsets.UTF_8))
|
||||
EventData.create("one".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("two".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("three".getBytes(StandardCharsets.UTF_8)),
|
||||
EventData.create("four".getBytes(StandardCharsets.UTF_8))
|
||||
);
|
||||
|
||||
setupRecordReader(eventDataList, -1, "two");
|
||||
|
@ -395,5 +384,4 @@ public class TestConsumeAzureEventHub {
|
|||
assertEquals("amqps://namespace.servicebus.windows.net/" +
|
||||
"eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue