diff --git a/nifi-external/nifi-storm-spout/pom.xml b/nifi-external/nifi-storm-spout/pom.xml
index 65fcdab9f0..301b41f38f 100644
--- a/nifi-external/nifi-storm-spout/pom.xml
+++ b/nifi-external/nifi-storm-spout/pom.xml
@@ -27,9 +27,13 @@
org.apache.storm
storm-core
- 0.9.5
+ 0.10.0
provided
+
+ org.apache.commons
+ commons-lang3
+
org.apache.nifi
nifi-site-to-site-client
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
new file mode 100644
index 0000000000..64cd0de950
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
+ * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
+ * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
+ * immediately in a single transaction.
+ */
+public class NiFiBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 3067274587595578836L;
+ public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
+
+ private final SiteToSiteClientConfig clientConfig;
+ private final NiFiDataPacketBuilder builder;
+ private final int tickFrequencySeconds;
+
+ private SiteToSiteClient client;
+ private OutputCollector collector;
+ private BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ private int batchSize = 10;
+ private int batchIntervalInSec = 10;
+ private long lastBatchProcessTimeSeconds = 0;
+
+ public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
+ Validate.notNull(clientConfig);
+ Validate.notNull(builder);
+ Validate.isTrue(tickFrequencySeconds > 0);
+ this.clientConfig = clientConfig;
+ this.builder = builder;
+ this.tickFrequencySeconds = tickFrequencySeconds;
+ }
+
+ public NiFiBolt withBatchSize(int batchSize) {
+ Validate.isTrue(batchSize > 0);
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public NiFiBolt withBatchInterval(int batchIntervalInSec) {
+ Validate.isTrue(batchIntervalInSec > 0);
+ this.batchIntervalInSec = batchIntervalInSec;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.client = createSiteToSiteClient();
+ this.collector = outputCollector;
+ this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
+
+ LOGGER.info("Bolt is prepared with Batch Size " + batchSize
+ + ", Batch Interval " + batchIntervalInSec
+ + ", Tick Frequency is " + tickFrequencySeconds);
+ }
+
+ protected SiteToSiteClient createSiteToSiteClient() {
+ return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ if (TupleUtils.isTick(tuple)) {
+ // if we have a tick tuple then lets see if enough time has passed since our last batch was processed
+ if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
+ LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
+ finishBatch();
+ } else {
+ LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
+ }
+ } else {
+ // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
+ this.queue.add(tuple);
+
+ int queueSize = this.queue.size();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
+ }
+
+ if (queueSize >= batchSize) {
+ LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
+ finishBatch();
+ }
+ }
+ }
+
+ private void finishBatch() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Finishing batch of size " + queue.size());
+ }
+
+ lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
+
+ final List tuples = new ArrayList<>();
+ queue.drainTo(tuples);
+
+ if (tuples.size() == 0) {
+ LOGGER.debug("Finishing batch, but no tuples so returning...");
+ return;
+ }
+
+ try {
+ final Transaction transaction = client.createTransaction(TransferDirection.SEND);
+ if (transaction == null) {
+ throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
+ }
+
+ // convert each tuple to a NiFiDataPacket and send it as part of the transaction
+ for (Tuple tuple : tuples) {
+ final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
+ transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
+ }
+
+ transaction.confirm();
+ transaction.complete();
+
+ // ack the tuples after successfully completing the transaction
+ for (Tuple tuple : tuples) {
+ collector.ack(tuple);
+ }
+
+ } catch(Exception e){
+ LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
+ for (Tuple tuple : tuples) {
+ collector.fail(tuple);
+ }
+ }
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ if (client != null) {
+ try {
+ client.close();
+ } catch (final IOException ioe) {
+ LOGGER.error("Failed to close client", ioe);
+ }
+ }
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ Map conf = new HashMap<>();
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencySeconds);
+ return conf;
+ }
+
+}
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java
new file mode 100644
index 0000000000..fa2e20e376
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Converts a Tuple into a NiFiDataPacket.
+ */
+public interface NiFiDataPacketBuilder {
+
+ NiFiDataPacket createNiFiDataPacket(Tuple tuple);
+
+}
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
index 64dac6f0a2..2cb0b6626a 100644
--- a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java
@@ -226,17 +226,7 @@ public class NiFiSpout extends BaseRichSpout {
StreamUtils.fillBuffer(inStream, data);
final Map attributes = dataPacket.getAttributes();
- final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
- @Override
- public byte[] getContent() {
- return data;
- }
-
- @Override
- public Map getAttributes() {
- return attributes;
- }
- };
+ final NiFiDataPacket niFiDataPacket = new StandardNiFiDataPacket(data, attributes);
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();
diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java
new file mode 100644
index 0000000000..b3a9da6cc2
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
+ private static final long serialVersionUID = 6364005260220243322L;
+
+ private final byte[] content;
+ private final Map attributes;
+
+ public StandardNiFiDataPacket(final byte[] content, final Map attributes) {
+ this.content = content;
+ this.attributes = attributes;
+ }
+
+ @Override
+ public byte[] getContent() {
+ return content;
+ }
+
+ @Override
+ public Map getAttributes() {
+ return attributes;
+ }
+
+}
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java
new file mode 100644
index 0000000000..c63bb3378c
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+import org.mockito.Mockito;
+
+public final class MockTupleHelpers {
+
+ private MockTupleHelpers() {
+ }
+
+ public static Tuple mockTickTuple() {
+ return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
+ public static Tuple mockTuple(String componentId, String streamId) {
+ Tuple tuple = Mockito.mock(Tuple.class);
+ Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
+ Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
+ return tuple;
+ }
+}
\ No newline at end of file
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java
new file mode 100644
index 0000000000..7592471e9e
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+
+import java.io.Serializable;
+
+/**
+ * Example topology that pulls data from a NiFi Output Port named 'Data for Storm' and writes the same
+ * data back to a NiFi Input Port named 'Data from Storm'.
+ */
+public class NiFiStormTopology {
+
+ public static void main( String[] args ) {
+ // Build a Site-To-Site client config for pulling data
+ final SiteToSiteClientConfig inputConfig = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("Data for Storm")
+ .buildConfig();
+
+ // Build a Site-To-Site client config for pushing data
+ final SiteToSiteClientConfig outputConfig = new SiteToSiteClient.Builder()
+ .url("http://localhost:8080/nifi")
+ .portName("Data from Storm")
+ .buildConfig();
+
+ final int tickFrequencySeconds = 5;
+ final NiFiDataPacketBuilder niFiDataPacketBuilder = new SimpleNiFiDataPacketBuilder();
+ final NiFiBolt niFiBolt = new NiFiBolt(outputConfig, niFiDataPacketBuilder, tickFrequencySeconds)
+ //.withBatchSize(1)
+ ;
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("nifiInput", new NiFiSpout(inputConfig));
+ builder.setBolt("nifiOutput", niFiBolt).shuffleGrouping("nifiInput");
+
+ // Submit the topology running in local mode
+ Config conf = new Config();
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+
+ Utils.sleep(90000);
+ cluster.shutdown();
+ }
+
+ /**
+ * Simple builder that returns the incoming data packet.
+ */
+ static class SimpleNiFiDataPacketBuilder implements NiFiDataPacketBuilder, Serializable {
+
+ private static final long serialVersionUID = 3067274587595578836L;
+
+ @Override
+ public NiFiDataPacket createNiFiDataPacket(Tuple tuple) {
+ return (NiFiDataPacket) tuple.getValueByField(NiFiSpout.NIFI_DATA_PACKET);
+ }
+ }
+
+}
diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java
new file mode 100644
index 0000000000..29d16841f5
--- /dev/null
+++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestNiFiBolt {
+
+ private int tickFrequency;
+ private SiteToSiteClientConfig siteToSiteClientConfig;
+ private NiFiDataPacketBuilder niFiDataPacketBuilder;
+
+ @Before
+ public void setup() {
+ tickFrequency = 30;
+ siteToSiteClientConfig = mock(SiteToSiteClientConfig.class);
+ niFiDataPacketBuilder = mock(NiFiDataPacketBuilder.class);
+
+ // setup the builder to return empty data packets for testing
+ when(niFiDataPacketBuilder.createNiFiDataPacket(any(Tuple.class))).thenReturn(new NiFiDataPacket() {
+ @Override
+ public byte[] getContent() {
+ return new byte[0];
+ }
+
+ @Override
+ public Map getAttributes() {
+ return new HashMap<>();
+ }
+ });
+ }
+
+ @Test
+ public void testTickTupleWhenNotExceedingBatchInterval() {
+ final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency);
+
+ // prepare the bolt
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // process a regular tuple
+ Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple);
+
+ // process a tick tuple
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+ bolt.execute(tickTuple);
+
+ // should not have produced any NiFiDataPackets
+ verifyZeroInteractions(niFiDataPacketBuilder);
+ }
+
+ @Test
+ public void testTickTupleWhenExceedingBatchInterval() throws InterruptedException {
+ final int batchInterval = 1;
+ final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+ .withBatchInterval(batchInterval);
+
+ // prepare the bolt
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // process a regular tuple
+ Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple);
+
+ // sleep so we pass the batch interval
+ Thread.sleep(batchInterval + 1000);
+
+ // process a tick tuple
+ Tuple tickTuple = MockTupleHelpers.mockTickTuple();
+ bolt.execute(tickTuple);
+
+ // should have produced one data packet and acked it
+ verify(niFiDataPacketBuilder, times(1)).createNiFiDataPacket(eq(dataTuple));
+ verify(collector, times(1)).ack(eq(dataTuple));
+ }
+
+ @Test
+ public void testBatchSize() {
+ final int batchSize = 3;
+ final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+ .withBatchSize(batchSize);
+
+ // prepare the bolt
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // process a regular tuple, haven't hit batch size yet
+ Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple1);
+ verifyZeroInteractions(niFiDataPacketBuilder);
+
+ // process a regular tuple, haven't hit batch size yet
+ Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple2);
+ verifyZeroInteractions(niFiDataPacketBuilder);
+
+ // process a regular tuple, triggers batch size
+ Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple3);
+ verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
+ verify(collector, times(batchSize)).ack(any(Tuple.class));
+ }
+
+ @Test
+ public void testFailure() throws IOException {
+ final int batchSize = 3;
+ final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
+ .withBatchSize(batchSize);
+
+ when(((TestableNiFiBolt)bolt).transaction.complete())
+ .thenThrow(new RuntimeException("Could not complete transaction"));
+
+ // prepare the bolt
+ Map conf = mock(Map.class);
+ TopologyContext context = mock(TopologyContext.class);
+ OutputCollector collector = mock(OutputCollector.class);
+ bolt.prepare(conf, context, collector);
+
+ // process a regular tuple, haven't hit batch size yet
+ Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple1);
+ verifyZeroInteractions(niFiDataPacketBuilder);
+
+ // process a regular tuple, haven't hit batch size yet
+ Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple2);
+ verifyZeroInteractions(niFiDataPacketBuilder);
+
+ // process a regular tuple, triggers batch size
+ Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
+ bolt.execute(dataTuple3);
+ verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
+
+ verify(collector, times(batchSize)).fail(any(Tuple.class));
+ }
+
+ /**
+ * Extend NiFiBolt to provide a mock SiteToSiteClient.
+ */
+ private static final class TestableNiFiBolt extends NiFiBolt {
+
+ SiteToSiteClient mockSiteToSiteClient;
+ Transaction transaction;
+
+ public TestableNiFiBolt(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder, int tickFrequencySeconds) {
+ super(clientConfig, builder, tickFrequencySeconds);
+
+ mockSiteToSiteClient = mock(SiteToSiteClient.class);
+ transaction = mock(Transaction.class);
+ }
+
+ @Override
+ protected SiteToSiteClient createSiteToSiteClient() {
+ try {
+ when(mockSiteToSiteClient.createTransaction(eq(TransferDirection.SEND)))
+ .thenReturn(transaction);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return mockSiteToSiteClient;
+ }
+
+ }
+
+}