mirror of https://github.com/apache/nifi.git
NIFI-6998 This closes #3972. Batch & Partitioning key
+ Asynchronous sending in limited batches + Property to determine attribute name carrying partitioning key + Maximum batch size property + Carrier object - Unit test fakery NIFI-6998 Attributes to User Defined properties NIFI-6998 Unit tests NIFI-6998 Review corrections + Interruption propagation (& test) + Final carrier members + Unnecessary generic modifiers removed from generic container NIFI-6998 checkstyle corrections + Tabs to spaces, trailnig spaces + Absolute Imports + Braces locations NIFI-6998 imports & license Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
parent
66d5ab80eb
commit
2601f722b3
|
@ -19,10 +19,16 @@ package org.apache.nifi.processors.azure.eventhub;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -35,6 +41,8 @@ import com.microsoft.azure.eventhubs.EventHubException;
|
|||
import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
|
||||
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
|
@ -53,6 +61,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
|
||||
|
@ -92,6 +101,24 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
.sensitive(true)
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
|
||||
.name("partitioning-key-attribute-name")
|
||||
.displayName("Partitioning Key Attribute Name")
|
||||
.description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
|
||||
.defaultValue(null)
|
||||
.build();
|
||||
static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("max-batch-size")
|
||||
.displayName("Maximum batch size")
|
||||
.description("Maximum count of flow files being processed in one batch.")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.addValidator(StandardValidators.NUMBER_VALIDATOR)
|
||||
.defaultValue("100")
|
||||
.build();
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
|
@ -117,6 +144,8 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
_propertyDescriptors.add(NAMESPACE);
|
||||
_propertyDescriptors.add(ACCESS_POLICY);
|
||||
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
|
||||
_propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
|
||||
_propertyDescriptors.add(MAX_BATCH_SIZE);
|
||||
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
|
||||
|
||||
Set<Relationship> _relationships = new HashSet<>();
|
||||
|
@ -150,6 +179,132 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
populateSenderQueue(context);
|
||||
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
|
||||
final String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
|
||||
|
||||
// Get N flow files
|
||||
final int maxBatchSize = NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
|
||||
final List<FlowFile> flowFileList = session.get(maxBatchSize);
|
||||
|
||||
// Convert and send each flow file
|
||||
final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<>();
|
||||
for (FlowFile flowFile : flowFileList) {
|
||||
if (flowFile == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
futureQueue.offer(handleFlowFile(flowFile, partitioningKeyAttributeName, session));
|
||||
}
|
||||
|
||||
waitForAllFutures(context, session, stopWatch, futureQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Joins all the futures so it can determine which flow files from given batch were sent successfully and which were not.
|
||||
*
|
||||
* @param context of this instance of the processor
|
||||
* @param session that handles all flow files sent within the future queue
|
||||
* @param stopWatch for time measurements
|
||||
* @param futureQueue a list of futures of messages that had been sent within above context and session before this method was called.
|
||||
*/
|
||||
protected void waitForAllFutures(
|
||||
final ProcessContext context,
|
||||
final ProcessSession session,
|
||||
final StopWatch stopWatch,
|
||||
final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue){
|
||||
|
||||
try {
|
||||
for (CompletableFuture<FlowFileResultCarrier<Relationship>> completableFuture : futureQueue) {
|
||||
completableFuture.join();
|
||||
|
||||
final FlowFileResultCarrier<Relationship> flowFileResult = completableFuture.get();
|
||||
if(flowFileResult == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final FlowFile flowFile = flowFileResult.getFlowFile();
|
||||
|
||||
if(flowFileResult.getResult() == REL_SUCCESS) {
|
||||
final String namespace = context.getProperty(NAMESPACE).getValue();
|
||||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
|
||||
session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
} else {
|
||||
final Throwable processException = flowFileResult.getException();
|
||||
getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException | CancellationException | CompletionException e) {
|
||||
getLogger().error("Batch processing failed", e);
|
||||
session.rollback();
|
||||
|
||||
if(e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
throw new ProcessException("Batch processing failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert flow file to eventhub message entities (and send)!!!
|
||||
*
|
||||
* @param flowFile to be converted to a message and sent to Eventhub (Body = content, User Properties = attributes, partitioning key = value configured attribute)
|
||||
* @param partitioningKeyAttributeName where the partitioning is saved within each flow file
|
||||
* @param session under which is this flow file being managed
|
||||
*
|
||||
* @return Completable future carrying the context of flowfile used as a base for message being send. Never Null.
|
||||
* */
|
||||
protected CompletableFuture<FlowFileResultCarrier<Relationship>> handleFlowFile(FlowFile flowFile, final String partitioningKeyAttributeName, final ProcessSession session) {
|
||||
|
||||
// Read message body
|
||||
final byte[] buffer = new byte[(int) flowFile.getSize()];
|
||||
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
|
||||
|
||||
// Lift partitioning key
|
||||
final String partitioningKey;
|
||||
if (StringUtils.isNotBlank(partitioningKeyAttributeName)) {
|
||||
partitioningKey = flowFile.getAttribute(partitioningKeyAttributeName);
|
||||
} else {
|
||||
partitioningKey = null;
|
||||
}
|
||||
|
||||
// Prepare user properties
|
||||
final Map<String, Object> userProperties;
|
||||
Map<String, String> attributes = flowFile.getAttributes();
|
||||
if(attributes == null) {
|
||||
userProperties = Collections.emptyMap();
|
||||
}else {
|
||||
userProperties = new HashMap<>(attributes);
|
||||
}
|
||||
|
||||
// Send the message
|
||||
try {
|
||||
return sendMessage(buffer, partitioningKey, userProperties)
|
||||
.thenApplyAsync(param -> {
|
||||
return new FlowFileResultCarrier<Relationship>(flowFile, REL_SUCCESS);
|
||||
})
|
||||
.exceptionally(processException -> {
|
||||
return new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException);
|
||||
});
|
||||
|
||||
} catch (final ProcessException processException) {
|
||||
return CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Prepare at least one Event hub sender based on this instance of processor.
|
||||
*
|
||||
* @param context of this processor instance from which all connectivity information properties are taken.
|
||||
*/
|
||||
protected void populateSenderQueue(ProcessContext context) {
|
||||
if(senderQueue.size() == 0){
|
||||
final int numThreads = context.getMaxConcurrentTasks();
|
||||
senderQueue = new LinkedBlockingQueue<>(numThreads);
|
||||
|
@ -165,31 +320,17 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final StopWatch stopWatch = new StopWatch(true);
|
||||
final byte[] buffer = new byte[(int) flowFile.getSize()];
|
||||
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
|
||||
|
||||
try {
|
||||
sendMessage(buffer);
|
||||
} catch (final ProcessException processException) {
|
||||
getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
|
||||
session.transfer(session.penalize(flowFile), REL_FAILURE);
|
||||
return;
|
||||
}
|
||||
|
||||
final String namespace = context.getProperty(NAMESPACE).getValue();
|
||||
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
|
||||
session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param namespace name of the Eventhub namespace (part of the domain name)
|
||||
* @param eventHubName name of the eventhub, a message broker entity. Like topic.
|
||||
* @param policyName technically it is username bound to eventhub namespace or hub and privileges.
|
||||
* @param policyKey password belonging to the above policy
|
||||
* @param executor thread executor to perform the client connection.
|
||||
* @return An initialized eventhub client based on supplied parameters.
|
||||
* @throws ProcessException when creation of event hub fails due to formatting of conection string. Authorization or even network connectivity.
|
||||
*/
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
|
@ -206,22 +347,44 @@ public class PutAzureEventHub extends AbstractProcessor {
|
|||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
|
||||
return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
|
||||
}
|
||||
protected void sendMessage(final byte[] buffer) throws ProcessException {
|
||||
|
||||
/**
|
||||
* @param buffer Block of data to be sent as a message body. Entire array is used. See Event hub limits for body size.
|
||||
* @param partitioningKey A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.
|
||||
* @param userProperties A key value set of customary information that is attached in User defined properties part of the message.
|
||||
* @return future object for referencing a success/failure of this message sending.
|
||||
* @throws ProcessException
|
||||
*
|
||||
* @see <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Event Hubs Quotas</a>
|
||||
*/
|
||||
protected CompletableFuture<Void> sendMessage(final byte[] buffer, String partitioningKey, Map<String, Object> userProperties) throws ProcessException {
|
||||
|
||||
final EventHubClient sender = senderQueue.poll();
|
||||
if(null != sender) {
|
||||
try {
|
||||
sender.sendSync(EventData.create(buffer));
|
||||
} catch (final EventHubException sbe) {
|
||||
throw new ProcessException("Caught exception trying to send message to eventbus", sbe);
|
||||
} finally {
|
||||
senderQueue.offer(sender);
|
||||
}
|
||||
}else{
|
||||
if(sender == null) {
|
||||
throw new ProcessException("No EventHubClients are configured for sending");
|
||||
}
|
||||
|
||||
// Create message with properties
|
||||
final EventData eventData = EventData.create(buffer);
|
||||
final Map<String, Object> properties = eventData.getProperties();
|
||||
if(userProperties != null && properties != null) {
|
||||
properties.putAll(userProperties);
|
||||
}
|
||||
|
||||
// Send with optional partition key
|
||||
final CompletableFuture<Void> eventFuture;
|
||||
if(StringUtils.isNotBlank(partitioningKey)) {
|
||||
eventFuture = sender.send(eventData, partitioningKey);
|
||||
}else {
|
||||
eventFuture = sender.send(eventData);
|
||||
}
|
||||
|
||||
senderQueue.offer(sender);
|
||||
|
||||
return eventFuture;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.azure.storage.utils;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
||||
public class FlowFileResultCarrier<T> {
|
||||
|
||||
final private FlowFile flowFile;
|
||||
final private T result;
|
||||
final private Throwable exception;
|
||||
|
||||
public FlowFileResultCarrier(FlowFile flowFile, T result) {
|
||||
this.flowFile = flowFile;
|
||||
this.result = result;
|
||||
this.exception = null;
|
||||
}
|
||||
|
||||
public FlowFileResultCarrier(FlowFile flowFile, T result, Throwable exception) {
|
||||
this.flowFile = flowFile;
|
||||
this.result = result;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
public FlowFile getFlowFile() {
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
public T getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public Throwable getException() {
|
||||
return exception;
|
||||
}
|
||||
|
||||
}
|
|
@ -16,20 +16,57 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.eventhub;
|
||||
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
|
||||
public class PutAzureEventHubTest {
|
||||
private static final String namespaceName = "nifi-azure-hub";
|
||||
private static final String eventHubName = "get-test";
|
||||
private static final String sasKeyName = "bogus-policy";
|
||||
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
|
||||
private static final String TEST_PARTITIONING_KEY_ATTRIBUTE_NAME = "x-opt-partition-key";
|
||||
private static final String TEST_PARTITIONING_KEY = "some-partitioning-key";
|
||||
|
||||
|
||||
private TestRunner testRunner;
|
||||
|
@ -92,35 +129,213 @@ public class PutAzureEventHubTest {
|
|||
testRunner.run(1, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageIsSentWithPartitioningKeyIfSpecifiedAndPopulated() {
|
||||
MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
|
||||
MockitoAnnotations.initMocks(processor);
|
||||
|
||||
EventHubClient eventHubClient = processor.getEventHubClient();
|
||||
when(eventHubClient.send(any(EventData.class), anyString()))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
when(eventHubClient.send(any(EventData.class)))
|
||||
.thenThrow(new RuntimeException("Partition-key-less method called despite key is defined and required."));
|
||||
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
setUpStandardTestConfig();
|
||||
testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY_ATTRIBUTE_NAME);
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
testRunner.enqueue(flowFile);
|
||||
testRunner.run(1, true);
|
||||
|
||||
Mockito.verify(eventHubClient).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageIsSentWithoutPartitioningKeyIfNotSpecifiedOrNotPopulated() {
|
||||
MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
|
||||
MockitoAnnotations.initMocks(processor);
|
||||
|
||||
EventHubClient eventHubClient = processor.getEventHubClient();
|
||||
when(eventHubClient.send(any(EventData.class), anyString()))
|
||||
.thenThrow(new RuntimeException("Partition-key-full method called despite key is Not required or not populated."));
|
||||
|
||||
when(eventHubClient.send(any(EventData.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
setUpStandardTestConfig();
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
|
||||
// Key not specified
|
||||
testRunner.enqueue(flowFile);
|
||||
testRunner.run(1, true);
|
||||
|
||||
Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
|
||||
Mockito.verify(eventHubClient).send(any(EventData.class));
|
||||
|
||||
// Key wanted but not available
|
||||
testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, "Non-existing-attribute");
|
||||
|
||||
testRunner.enqueue(flowFile);
|
||||
testRunner.run(1, true);
|
||||
|
||||
Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
|
||||
Mockito.verify(eventHubClient, times(2)).send(any(EventData.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllAttributesAreLiftedToProperties() {
|
||||
MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
|
||||
MockitoAnnotations.initMocks(processor);
|
||||
|
||||
EventHubClient eventHubClient = processor.getEventHubClient();
|
||||
when(eventHubClient.send(any(EventData.class)))
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
setUpStandardTestConfig();
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
ImmutableMap<String, String> demoAttributes = ImmutableMap.of("A", "a", "B", "b", "D", "d", "C", "c");
|
||||
flowFile.putAttributes(demoAttributes);
|
||||
|
||||
testRunner.enqueue(flowFile);
|
||||
testRunner.run(1, true);
|
||||
ArgumentCaptor<EventData> eventDataCaptor = ArgumentCaptor.forClass(EventData.class);
|
||||
|
||||
Mockito.verify(eventHubClient).send(eventDataCaptor.capture());
|
||||
|
||||
EventData event = eventDataCaptor.getValue();
|
||||
assertTrue(event.getProperties().entrySet().containsAll(demoAttributes.entrySet()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchProcessesUptoMaximum() {
|
||||
MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
|
||||
MockitoAnnotations.initMocks(processor);
|
||||
|
||||
EventHubClient eventHubClient = processor.getEventHubClient();
|
||||
|
||||
CompletableFuture<Void> failedFuture = new CompletableFuture<Void>();
|
||||
failedFuture.completeExceptionally(new IllegalArgumentException());
|
||||
|
||||
when(eventHubClient.send(any(EventData.class)))
|
||||
.thenReturn(failedFuture)
|
||||
.thenReturn(CompletableFuture.completedFuture(null));
|
||||
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
setUpStandardTestConfig();
|
||||
|
||||
List<MockFlowFile> flowFiles = Arrays.asList(new MockFlowFile(1), new MockFlowFile(2), new MockFlowFile(3),
|
||||
new MockFlowFile(4), new MockFlowFile(5), new MockFlowFile(6));
|
||||
|
||||
flowFiles.stream().forEachOrdered(ff -> testRunner.enqueue(ff));
|
||||
|
||||
testRunner.setProperty(PutAzureEventHub.MAX_BATCH_SIZE, "4");
|
||||
testRunner.run(1, true);
|
||||
|
||||
Mockito.verify(eventHubClient, times(4)).send(any(EventData.class));
|
||||
testRunner.assertTransferCount(PutAzureEventHub.REL_SUCCESS, 3);
|
||||
testRunner.assertTransferCount(PutAzureEventHub.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailedBatchProcessesRollsBackTransactions() throws InterruptedException, ExecutionException {
|
||||
MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
|
||||
MockitoAnnotations.initMocks(processor);
|
||||
|
||||
final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
CompletableFuture<FlowFileResultCarrier<Relationship>> throwingFuture = (CompletableFuture<FlowFileResultCarrier<Relationship>>)mock(CompletableFuture.class);
|
||||
|
||||
when(throwingFuture.get()).thenThrow(new ExecutionException(new IllegalArgumentException()));
|
||||
|
||||
MockFlowFile flowFile1 = new MockFlowFile(1);
|
||||
MockFlowFile flowFile2 = new MockFlowFile(2);
|
||||
|
||||
futureQueue.offer(CompletableFuture.completedFuture(null));
|
||||
futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile1, PutAzureEventHub.REL_SUCCESS)));
|
||||
futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile2, PutAzureEventHub.REL_FAILURE, new IllegalArgumentException())));
|
||||
futureQueue.offer(throwingFuture);
|
||||
|
||||
testRunner = TestRunners.newTestRunner(processor);
|
||||
setUpStandardTestConfig();
|
||||
testRunner.enqueue(flowFile1);
|
||||
testRunner.enqueue(flowFile2);
|
||||
|
||||
|
||||
final ProcessContext context = testRunner.getProcessContext();
|
||||
final ProcessSession session = spy(testRunner.getProcessSessionFactory().createSession());
|
||||
doNothing().when(session).transfer(any(FlowFile.class), any());
|
||||
doReturn(flowFile2).when(session).penalize(any());
|
||||
|
||||
try {
|
||||
processor.waitForAllFutures(context, session, new StopWatch(true), futureQueue);
|
||||
assertFalse(true);
|
||||
}catch(ProcessException pe) {
|
||||
assertTrue(true);
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
verify(session).transfer(flowFile1, PutAzureEventHub.REL_SUCCESS);
|
||||
verify(session).transfer(flowFile2, PutAzureEventHub.REL_FAILURE);
|
||||
verify(session).rollback();
|
||||
|
||||
//Second run to test interrupted exception
|
||||
Mockito.reset(throwingFuture, session);
|
||||
when(throwingFuture.get()).thenThrow(new InterruptedException());
|
||||
doNothing().when(session).transfer(any(FlowFile.class), any());
|
||||
doReturn(flowFile2).when(session).penalize(any());
|
||||
|
||||
try {
|
||||
processor.waitForAllFutures(context, session, new StopWatch(true), futureQueue);
|
||||
assertFalse(true);
|
||||
}catch(ProcessException pe) {
|
||||
assertTrue(true);
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class MockPutAzureEventHub extends PutAzureEventHub{
|
||||
byte[] receivedBuffer = null;
|
||||
|
||||
byte[] getReceivedBuffer(){
|
||||
return receivedBuffer;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendMessage(final byte[] buffer) throws ProcessException {
|
||||
protected CompletableFuture<Void> sendMessage(final byte[] buffer, String partitioningKey, Map<String, Object> userProperties) throws ProcessException {
|
||||
receivedBuffer = buffer;
|
||||
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -130,6 +345,25 @@ public class PutAzureEventHubTest {
|
|||
return "Bogus Connection String";
|
||||
}
|
||||
}
|
||||
private static class MockedEventhubClientMockPutAzureEventHub extends PutAzureEventHub{
|
||||
|
||||
@Mock
|
||||
private EventHubClient client;
|
||||
|
||||
public EventHubClient getEventHubClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(
|
||||
final String namespace,
|
||||
final String eventHubName,
|
||||
final String policyName,
|
||||
final String policyKey,
|
||||
final ScheduledExecutorService executor) throws ProcessException {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
private void setUpStandardTestConfig() {
|
||||
testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
|
||||
testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
|
||||
|
|
Loading…
Reference in New Issue