From 2601f722b3230a640e86c83320a75b86288c0e60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20=C5=A0imek?= Date: Thu, 9 Jan 2020 17:32:50 +0100 Subject: [PATCH] NIFI-6998 This closes #3972. Batch & Partitioning key + Asynchronous sending in limited batches + Property to determine attribute name carrying partitioning key + Maximum batch size property + Carrier object - Unit test fakery NIFI-6998 Attributes to User Defined properties NIFI-6998 Unit tests NIFI-6998 Review corrections + Interruption propagation (& test) + Final carrier members + Unnecessary generic modifiers removed from generic container NIFI-6998 checkstyle corrections + Tabs to spaces, trailnig spaces + Absolute Imports + Braces locations NIFI-6998 imports & license Signed-off-by: Joe Witt --- .../azure/eventhub/PutAzureEventHub.java | 229 ++++++++++++--- .../storage/utils/FlowFileResultCarrier.java | 51 ++++ .../azure/eventhub/PutAzureEventHubTest.java | 260 +++++++++++++++++- 3 files changed, 494 insertions(+), 46 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index fb584e8c80..2c9a8aea37 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -19,10 +19,16 @@ package org.apache.nifi.processors.azure.eventhub; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -35,6 +41,8 @@ import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException; import com.microsoft.azure.eventhubs.impl.EventHubClientImpl; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -53,6 +61,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; @@ -92,6 +101,24 @@ public class PutAzureEventHub extends AbstractProcessor { .sensitive(true) .required(true) .build(); + static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("partitioning-key-attribute-name") + .displayName("Partitioning Key Attribute Name") + .description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .defaultValue(null) + .build(); + static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("max-batch-size") + .displayName("Maximum batch size") + .description("Maximum count of flow files being processed in one batch.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .defaultValue("100") + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -117,6 +144,8 @@ public class PutAzureEventHub extends AbstractProcessor { _propertyDescriptors.add(NAMESPACE); _propertyDescriptors.add(ACCESS_POLICY); _propertyDescriptors.add(POLICY_PRIMARY_KEY); + _propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME); + _propertyDescriptors.add(MAX_BATCH_SIZE); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -150,6 +179,132 @@ public class PutAzureEventHub extends AbstractProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + populateSenderQueue(context); + + final StopWatch stopWatch = new StopWatch(true); + + final String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue(); + + // Get N flow files + final int maxBatchSize = NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100); + final List flowFileList = session.get(maxBatchSize); + + // Convert and send each flow file + final BlockingQueue>> futureQueue = new LinkedBlockingQueue<>(); + for (FlowFile flowFile : flowFileList) { + if (flowFile == null) { + continue; + } + + futureQueue.offer(handleFlowFile(flowFile, partitioningKeyAttributeName, session)); + } + + waitForAllFutures(context, session, stopWatch, futureQueue); + } + + /** + * Joins all the futures so it can determine which flow files from given batch were sent successfully and which were not. + * + * @param context of this instance of the processor + * @param session that handles all flow files sent within the future queue + * @param stopWatch for time measurements + * @param futureQueue a list of futures of messages that had been sent within above context and session before this method was called. + */ + protected void waitForAllFutures( + final ProcessContext context, + final ProcessSession session, + final StopWatch stopWatch, + final BlockingQueue>> futureQueue){ + + try { + for (CompletableFuture> completableFuture : futureQueue) { + completableFuture.join(); + + final FlowFileResultCarrier flowFileResult = completableFuture.get(); + if(flowFileResult == null) { + continue; + } + + final FlowFile flowFile = flowFileResult.getFlowFile(); + + if(flowFileResult.getResult() == REL_SUCCESS) { + final String namespace = context.getProperty(NAMESPACE).getValue(); + final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); + session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + } else { + final Throwable processException = flowFileResult.getException(); + getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + } + } catch (InterruptedException | ExecutionException | CancellationException | CompletionException e) { + getLogger().error("Batch processing failed", e); + session.rollback(); + + if(e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + + throw new ProcessException("Batch processing failed", e); + } + } + + /** + * Convert flow file to eventhub message entities (and send)!!! + * + * @param flowFile to be converted to a message and sent to Eventhub (Body = content, User Properties = attributes, partitioning key = value configured attribute) + * @param partitioningKeyAttributeName where the partitioning is saved within each flow file + * @param session under which is this flow file being managed + * + * @return Completable future carrying the context of flowfile used as a base for message being send. Never Null. + * */ + protected CompletableFuture> handleFlowFile(FlowFile flowFile, final String partitioningKeyAttributeName, final ProcessSession session) { + + // Read message body + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); + + // Lift partitioning key + final String partitioningKey; + if (StringUtils.isNotBlank(partitioningKeyAttributeName)) { + partitioningKey = flowFile.getAttribute(partitioningKeyAttributeName); + } else { + partitioningKey = null; + } + + // Prepare user properties + final Map userProperties; + Map attributes = flowFile.getAttributes(); + if(attributes == null) { + userProperties = Collections.emptyMap(); + }else { + userProperties = new HashMap<>(attributes); + } + + // Send the message + try { + return sendMessage(buffer, partitioningKey, userProperties) + .thenApplyAsync(param -> { + return new FlowFileResultCarrier(flowFile, REL_SUCCESS); + }) + .exceptionally(processException -> { + return new FlowFileResultCarrier(flowFile, REL_FAILURE, processException); + }); + + } catch (final ProcessException processException) { + return CompletableFuture.completedFuture(new FlowFileResultCarrier(flowFile, REL_FAILURE, processException)); + } + } + + + /** + * Prepare at least one Event hub sender based on this instance of processor. + * + * @param context of this processor instance from which all connectivity information properties are taken. + */ + protected void populateSenderQueue(ProcessContext context) { if(senderQueue.size() == 0){ final int numThreads = context.getMaxConcurrentTasks(); senderQueue = new LinkedBlockingQueue<>(numThreads); @@ -165,31 +320,17 @@ public class PutAzureEventHub extends AbstractProcessor { } } } - - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final StopWatch stopWatch = new StopWatch(true); - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer)); - - try { - sendMessage(buffer); - } catch (final ProcessException processException) { - getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException); - session.transfer(session.penalize(flowFile), REL_FAILURE); - return; - } - - final String namespace = context.getProperty(NAMESPACE).getValue(); - final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); - session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } + /** + * @param namespace name of the Eventhub namespace (part of the domain name) + * @param eventHubName name of the eventhub, a message broker entity. Like topic. + * @param policyName technically it is username bound to eventhub namespace or hub and privileges. + * @param policyKey password belonging to the above policy + * @param executor thread executor to perform the client connection. + * @return An initialized eventhub client based on supplied parameters. + * @throws ProcessException when creation of event hub fails due to formatting of conection string. Authorization or even network connectivity. + */ protected EventHubClient createEventHubClient( final String namespace, final String eventHubName, @@ -206,22 +347,44 @@ public class PutAzureEventHub extends AbstractProcessor { throw new ProcessException(e); } } + protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){ return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString(); } - protected void sendMessage(final byte[] buffer) throws ProcessException { + + /** + * @param buffer Block of data to be sent as a message body. Entire array is used. See Event hub limits for body size. + * @param partitioningKey A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions. + * @param userProperties A key value set of customary information that is attached in User defined properties part of the message. + * @return future object for referencing a success/failure of this message sending. + * @throws ProcessException + * + * @see Event Hubs Quotas + */ + protected CompletableFuture sendMessage(final byte[] buffer, String partitioningKey, Map userProperties) throws ProcessException { final EventHubClient sender = senderQueue.poll(); - if(null != sender) { - try { - sender.sendSync(EventData.create(buffer)); - } catch (final EventHubException sbe) { - throw new ProcessException("Caught exception trying to send message to eventbus", sbe); - } finally { - senderQueue.offer(sender); - } - }else{ + if(sender == null) { throw new ProcessException("No EventHubClients are configured for sending"); } + + // Create message with properties + final EventData eventData = EventData.create(buffer); + final Map properties = eventData.getProperties(); + if(userProperties != null && properties != null) { + properties.putAll(userProperties); + } + + // Send with optional partition key + final CompletableFuture eventFuture; + if(StringUtils.isNotBlank(partitioningKey)) { + eventFuture = sender.send(eventData, partitioningKey); + }else { + eventFuture = sender.send(eventData); + } + + senderQueue.offer(sender); + + return eventFuture; } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java new file mode 100644 index 0000000000..7c49b99051 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import org.apache.nifi.flowfile.FlowFile; + +public class FlowFileResultCarrier { + + final private FlowFile flowFile; + final private T result; + final private Throwable exception; + + public FlowFileResultCarrier(FlowFile flowFile, T result) { + this.flowFile = flowFile; + this.result = result; + this.exception = null; + } + + public FlowFileResultCarrier(FlowFile flowFile, T result, Throwable exception) { + this.flowFile = flowFile; + this.result = result; + this.exception = exception; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public T getResult() { + return result; + } + + public Throwable getException() { + return exception; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index ab556c5d98..83ed649188 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -16,20 +16,57 @@ */ package org.apache.nifi.processors.azure.eventhub; -import com.microsoft.azure.eventhubs.EventHubClient; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier; +import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; -import java.util.concurrent.ScheduledExecutorService; +import com.google.common.collect.ImmutableMap; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; public class PutAzureEventHubTest { private static final String namespaceName = "nifi-azure-hub"; private static final String eventHubName = "get-test"; private static final String sasKeyName = "bogus-policy"; private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!"; + private static final String TEST_PARTITIONING_KEY_ATTRIBUTE_NAME = "x-opt-partition-key"; + private static final String TEST_PARTITIONING_KEY = "some-partitioning-key"; private TestRunner testRunner; @@ -92,35 +129,213 @@ public class PutAzureEventHubTest { testRunner.run(1, true); } + @Test + public void testMessageIsSentWithPartitioningKeyIfSpecifiedAndPopulated() { + MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub(); + MockitoAnnotations.initMocks(processor); + + EventHubClient eventHubClient = processor.getEventHubClient(); + when(eventHubClient.send(any(EventData.class), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + + when(eventHubClient.send(any(EventData.class))) + .thenThrow(new RuntimeException("Partition-key-less method called despite key is defined and required.")); + + testRunner = TestRunners.newTestRunner(processor); + setUpStandardTestConfig(); + testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY_ATTRIBUTE_NAME); + + MockFlowFile flowFile = new MockFlowFile(1234); + flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY)); + testRunner.enqueue(flowFile); + testRunner.run(1, true); + + Mockito.verify(eventHubClient).send(any(EventData.class), eq(TEST_PARTITIONING_KEY)); + } + + @Test + public void testMessageIsSentWithoutPartitioningKeyIfNotSpecifiedOrNotPopulated() { + MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub(); + MockitoAnnotations.initMocks(processor); + + EventHubClient eventHubClient = processor.getEventHubClient(); + when(eventHubClient.send(any(EventData.class), anyString())) + .thenThrow(new RuntimeException("Partition-key-full method called despite key is Not required or not populated.")); + + when(eventHubClient.send(any(EventData.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + + testRunner = TestRunners.newTestRunner(processor); + setUpStandardTestConfig(); + + MockFlowFile flowFile = new MockFlowFile(1234); + flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY)); + + // Key not specified + testRunner.enqueue(flowFile); + testRunner.run(1, true); + + Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY)); + Mockito.verify(eventHubClient).send(any(EventData.class)); + + // Key wanted but not available + testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, "Non-existing-attribute"); + + testRunner.enqueue(flowFile); + testRunner.run(1, true); + + Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY)); + Mockito.verify(eventHubClient, times(2)).send(any(EventData.class)); + } + + @Test + public void testAllAttributesAreLiftedToProperties() { + MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub(); + MockitoAnnotations.initMocks(processor); + + EventHubClient eventHubClient = processor.getEventHubClient(); + when(eventHubClient.send(any(EventData.class))) + .thenReturn(CompletableFuture.completedFuture(null)); + + testRunner = TestRunners.newTestRunner(processor); + setUpStandardTestConfig(); + + MockFlowFile flowFile = new MockFlowFile(1234); + ImmutableMap demoAttributes = ImmutableMap.of("A", "a", "B", "b", "D", "d", "C", "c"); + flowFile.putAttributes(demoAttributes); + + testRunner.enqueue(flowFile); + testRunner.run(1, true); + ArgumentCaptor eventDataCaptor = ArgumentCaptor.forClass(EventData.class); + + Mockito.verify(eventHubClient).send(eventDataCaptor.capture()); + + EventData event = eventDataCaptor.getValue(); + assertTrue(event.getProperties().entrySet().containsAll(demoAttributes.entrySet())); + } + + @Test + public void testBatchProcessesUptoMaximum() { + MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub(); + MockitoAnnotations.initMocks(processor); + + EventHubClient eventHubClient = processor.getEventHubClient(); + + CompletableFuture failedFuture = new CompletableFuture(); + failedFuture.completeExceptionally(new IllegalArgumentException()); + + when(eventHubClient.send(any(EventData.class))) + .thenReturn(failedFuture) + .thenReturn(CompletableFuture.completedFuture(null)); + + testRunner = TestRunners.newTestRunner(processor); + setUpStandardTestConfig(); + + List flowFiles = Arrays.asList(new MockFlowFile(1), new MockFlowFile(2), new MockFlowFile(3), + new MockFlowFile(4), new MockFlowFile(5), new MockFlowFile(6)); + + flowFiles.stream().forEachOrdered(ff -> testRunner.enqueue(ff)); + + testRunner.setProperty(PutAzureEventHub.MAX_BATCH_SIZE, "4"); + testRunner.run(1, true); + + Mockito.verify(eventHubClient, times(4)).send(any(EventData.class)); + testRunner.assertTransferCount(PutAzureEventHub.REL_SUCCESS, 3); + testRunner.assertTransferCount(PutAzureEventHub.REL_FAILURE, 1); + } + + @Test + public void testFailedBatchProcessesRollsBackTransactions() throws InterruptedException, ExecutionException { + MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub(); + MockitoAnnotations.initMocks(processor); + + final BlockingQueue>> futureQueue = new LinkedBlockingQueue>>(); + + @SuppressWarnings("unchecked") + CompletableFuture> throwingFuture = (CompletableFuture>)mock(CompletableFuture.class); + + when(throwingFuture.get()).thenThrow(new ExecutionException(new IllegalArgumentException())); + + MockFlowFile flowFile1 = new MockFlowFile(1); + MockFlowFile flowFile2 = new MockFlowFile(2); + + futureQueue.offer(CompletableFuture.completedFuture(null)); + futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier(flowFile1, PutAzureEventHub.REL_SUCCESS))); + futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier(flowFile2, PutAzureEventHub.REL_FAILURE, new IllegalArgumentException()))); + futureQueue.offer(throwingFuture); + + testRunner = TestRunners.newTestRunner(processor); + setUpStandardTestConfig(); + testRunner.enqueue(flowFile1); + testRunner.enqueue(flowFile2); + + + final ProcessContext context = testRunner.getProcessContext(); + final ProcessSession session = spy(testRunner.getProcessSessionFactory().createSession()); + doNothing().when(session).transfer(any(FlowFile.class), any()); + doReturn(flowFile2).when(session).penalize(any()); + + try { + processor.waitForAllFutures(context, session, new StopWatch(true), futureQueue); + assertFalse(true); + }catch(ProcessException pe) { + assertTrue(true); + assertFalse(Thread.currentThread().isInterrupted()); + } + + verify(session).transfer(flowFile1, PutAzureEventHub.REL_SUCCESS); + verify(session).transfer(flowFile2, PutAzureEventHub.REL_FAILURE); + verify(session).rollback(); + + //Second run to test interrupted exception + Mockito.reset(throwingFuture, session); + when(throwingFuture.get()).thenThrow(new InterruptedException()); + doNothing().when(session).transfer(any(FlowFile.class), any()); + doReturn(flowFile2).when(session).penalize(any()); + + try { + processor.waitForAllFutures(context, session, new StopWatch(true), futureQueue); + assertFalse(true); + }catch(ProcessException pe) { + assertTrue(true); + assertTrue(Thread.currentThread().isInterrupted()); + } + + } + private static class MockPutAzureEventHub extends PutAzureEventHub{ byte[] receivedBuffer = null; + byte[] getReceivedBuffer(){ return receivedBuffer; } + @Override protected EventHubClient createEventHubClient( - final String namespace, - final String eventHubName, - final String policyName, - final String policyKey, - final ScheduledExecutorService executor) throws ProcessException { + final String namespace, + final String eventHubName, + final String policyName, + final String policyKey, + final ScheduledExecutorService executor) throws ProcessException { return null; } @Override - protected void sendMessage(final byte[] buffer) throws ProcessException { + protected CompletableFuture sendMessage(final byte[] buffer, String partitioningKey, Map userProperties) throws ProcessException { receivedBuffer = buffer; + + return CompletableFuture.completedFuture(null); } } private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{ @Override protected EventHubClient createEventHubClient( - final String namespace, - final String eventHubName, - final String policyName, - final String policyKey, - final ScheduledExecutorService executor) throws ProcessException { + final String namespace, + final String eventHubName, + final String policyName, + final String policyKey, + final ScheduledExecutorService executor) throws ProcessException { return null; } } @@ -130,6 +345,25 @@ public class PutAzureEventHubTest { return "Bogus Connection String"; } } + private static class MockedEventhubClientMockPutAzureEventHub extends PutAzureEventHub{ + + @Mock + private EventHubClient client; + + public EventHubClient getEventHubClient() { + return client; + } + + @Override + protected EventHubClient createEventHubClient( + final String namespace, + final String eventHubName, + final String policyName, + final String policyKey, + final ScheduledExecutorService executor) throws ProcessException { + return client; + } + } private void setUpStandardTestConfig() { testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName); testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);