From e12a79ea929a222a93fd64bfc63382441e31060f Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Mon, 18 Apr 2016 14:00:40 -0400 Subject: [PATCH] NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm - Adding example topology that creates a full loop between NiFi and Storm. - Bumping Storm to 0.10.0 NIFI-1778 Addressing code review comments This closes #361 --- nifi-external/nifi-storm-spout/pom.xml | 6 +- .../java/org/apache/nifi/storm/NiFiBolt.java | 195 +++++++++++++++++ .../nifi/storm/NiFiDataPacketBuilder.java | 28 +++ .../java/org/apache/nifi/storm/NiFiSpout.java | 12 +- .../nifi/storm/StandardNiFiDataPacket.java | 43 ++++ .../apache/nifi/storm/MockTupleHelpers.java | 39 ++++ .../apache/nifi/storm/NiFiStormTopology.java | 80 +++++++ .../org/apache/nifi/storm/TestNiFiBolt.java | 207 ++++++++++++++++++ 8 files changed, 598 insertions(+), 12 deletions(-) create mode 100644 nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java create mode 100644 nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java create mode 100644 nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java create mode 100644 nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java create mode 100644 nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java create mode 100644 nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java diff --git a/nifi-external/nifi-storm-spout/pom.xml b/nifi-external/nifi-storm-spout/pom.xml index 65fcdab9f0..301b41f38f 100644 --- a/nifi-external/nifi-storm-spout/pom.xml +++ b/nifi-external/nifi-storm-spout/pom.xml @@ -27,9 +27,13 @@ org.apache.storm storm-core - 0.9.5 + 0.10.0 provided + + org.apache.commons + commons-lang3 + org.apache.nifi nifi-site-to-site-client diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java new file mode 100644 index 0000000000..64cd0de950 --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher + * through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or + * until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple + * immediately in a single transaction. + */ +public class NiFiBolt extends BaseRichBolt { + + private static final long serialVersionUID = 3067274587595578836L; + public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class); + + private final SiteToSiteClientConfig clientConfig; + private final NiFiDataPacketBuilder builder; + private final int tickFrequencySeconds; + + private SiteToSiteClient client; + private OutputCollector collector; + private BlockingQueue queue = new LinkedBlockingQueue<>(); + + private int batchSize = 10; + private int batchIntervalInSec = 10; + private long lastBatchProcessTimeSeconds = 0; + + public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) { + Validate.notNull(clientConfig); + Validate.notNull(builder); + Validate.isTrue(tickFrequencySeconds > 0); + this.clientConfig = clientConfig; + this.builder = builder; + this.tickFrequencySeconds = tickFrequencySeconds; + } + + public NiFiBolt withBatchSize(int batchSize) { + Validate.isTrue(batchSize > 0); + this.batchSize = batchSize; + return this; + } + + public NiFiBolt withBatchInterval(int batchIntervalInSec) { + Validate.isTrue(batchIntervalInSec > 0); + this.batchIntervalInSec = batchIntervalInSec; + return this; + } + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + this.client = createSiteToSiteClient(); + this.collector = outputCollector; + this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; + + LOGGER.info("Bolt is prepared with Batch Size " + batchSize + + ", Batch Interval " + batchIntervalInSec + + ", Tick Frequency is " + tickFrequencySeconds); + } + + protected SiteToSiteClient createSiteToSiteClient() { + return new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + } + + @Override + public void execute(Tuple tuple) { + if (TupleUtils.isTick(tuple)) { + // if we have a tick tuple then lets see if enough time has passed since our last batch was processed + if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) { + LOGGER.debug("Received tick tuple and reached batch interval, executing batch"); + finishBatch(); + } else { + LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do"); + } + } else { + // for a regular tuple we add it to the queue and then see if our queue size exceeds batch size + this.queue.add(tuple); + + int queueSize = this.queue.size(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize); + } + + if (queueSize >= batchSize) { + LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch"); + finishBatch(); + } + } + } + + private void finishBatch() { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Finishing batch of size " + queue.size()); + } + + lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000; + + final List tuples = new ArrayList<>(); + queue.drainTo(tuples); + + if (tuples.size() == 0) { + LOGGER.debug("Finishing batch, but no tuples so returning..."); + return; + } + + try { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + if (transaction == null) { + throw new IllegalStateException("Unable to create a NiFi Transaction to send data"); + } + + // convert each tuple to a NiFiDataPacket and send it as part of the transaction + for (Tuple tuple : tuples) { + final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple); + transaction.send(dataPacket.getContent(), dataPacket.getAttributes()); + } + + transaction.confirm(); + transaction.complete(); + + // ack the tuples after successfully completing the transaction + for (Tuple tuple : tuples) { + collector.ack(tuple); + } + + } catch(Exception e){ + LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e); + for (Tuple tuple : tuples) { + collector.fail(tuple); + } + } + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } + + @Override + public void cleanup() { + super.cleanup(); + if (client != null) { + try { + client.close(); + } catch (final IOException ioe) { + LOGGER.error("Failed to close client", ioe); + } + } + } + + @Override + public Map getComponentConfiguration() { + Map conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencySeconds); + return conf; + } + +} diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java new file mode 100644 index 0000000000..fa2e20e376 --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiDataPacketBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.tuple.Tuple; + +/** + * Converts a Tuple into a NiFiDataPacket. + */ +public interface NiFiDataPacketBuilder { + + NiFiDataPacket createNiFiDataPacket(Tuple tuple); + +} diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java index 64dac6f0a2..2cb0b6626a 100644 --- a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java +++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiSpout.java @@ -226,17 +226,7 @@ public class NiFiSpout extends BaseRichSpout { StreamUtils.fillBuffer(inStream, data); final Map attributes = dataPacket.getAttributes(); - final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() { - @Override - public byte[] getContent() { - return data; - } - - @Override - public Map getAttributes() { - return attributes; - } - }; + final NiFiDataPacket niFiDataPacket = new StandardNiFiDataPacket(data, attributes); dataPackets.add(niFiDataPacket); dataPacket = transaction.receive(); diff --git a/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java new file mode 100644 index 0000000000..b3a9da6cc2 --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/StandardNiFiDataPacket.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import java.io.Serializable; +import java.util.Map; + +public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable { + private static final long serialVersionUID = 6364005260220243322L; + + private final byte[] content; + private final Map attributes; + + public StandardNiFiDataPacket(final byte[] content, final Map attributes) { + this.content = content; + this.attributes = attributes; + } + + @Override + public byte[] getContent() { + return content; + } + + @Override + public Map getAttributes() { + return attributes; + } + +} diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java new file mode 100644 index 0000000000..c63bb3378c --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/MockTupleHelpers.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Constants; +import backtype.storm.tuple.Tuple; +import org.mockito.Mockito; + +public final class MockTupleHelpers { + + private MockTupleHelpers() { + } + + public static Tuple mockTickTuple() { + return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); + } + + public static Tuple mockTuple(String componentId, String streamId) { + Tuple tuple = Mockito.mock(Tuple.class); + Mockito.when(tuple.getSourceComponent()).thenReturn(componentId); + Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId); + return tuple; + } +} \ No newline at end of file diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java new file mode 100644 index 0000000000..7592471e9e --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/NiFiStormTopology.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.Utils; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; + +import java.io.Serializable; + +/** + * Example topology that pulls data from a NiFi Output Port named 'Data for Storm' and writes the same + * data back to a NiFi Input Port named 'Data from Storm'. + */ +public class NiFiStormTopology { + + public static void main( String[] args ) { + // Build a Site-To-Site client config for pulling data + final SiteToSiteClientConfig inputConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data for Storm") + .buildConfig(); + + // Build a Site-To-Site client config for pushing data + final SiteToSiteClientConfig outputConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("Data from Storm") + .buildConfig(); + + final int tickFrequencySeconds = 5; + final NiFiDataPacketBuilder niFiDataPacketBuilder = new SimpleNiFiDataPacketBuilder(); + final NiFiBolt niFiBolt = new NiFiBolt(outputConfig, niFiDataPacketBuilder, tickFrequencySeconds) + //.withBatchSize(1) + ; + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("nifiInput", new NiFiSpout(inputConfig)); + builder.setBolt("nifiOutput", niFiBolt).shuffleGrouping("nifiInput"); + + // Submit the topology running in local mode + Config conf = new Config(); + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", conf, builder.createTopology()); + + Utils.sleep(90000); + cluster.shutdown(); + } + + /** + * Simple builder that returns the incoming data packet. + */ + static class SimpleNiFiDataPacketBuilder implements NiFiDataPacketBuilder, Serializable { + + private static final long serialVersionUID = 3067274587595578836L; + + @Override + public NiFiDataPacket createNiFiDataPacket(Tuple tuple) { + return (NiFiDataPacket) tuple.getValueByField(NiFiSpout.NIFI_DATA_PACKET); + } + } + +} diff --git a/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java new file mode 100644 index 0000000000..29d16841f5 --- /dev/null +++ b/nifi-external/nifi-storm-spout/src/test/java/org/apache/nifi/storm/TestNiFiBolt.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class TestNiFiBolt { + + private int tickFrequency; + private SiteToSiteClientConfig siteToSiteClientConfig; + private NiFiDataPacketBuilder niFiDataPacketBuilder; + + @Before + public void setup() { + tickFrequency = 30; + siteToSiteClientConfig = mock(SiteToSiteClientConfig.class); + niFiDataPacketBuilder = mock(NiFiDataPacketBuilder.class); + + // setup the builder to return empty data packets for testing + when(niFiDataPacketBuilder.createNiFiDataPacket(any(Tuple.class))).thenReturn(new NiFiDataPacket() { + @Override + public byte[] getContent() { + return new byte[0]; + } + + @Override + public Map getAttributes() { + return new HashMap<>(); + } + }); + } + + @Test + public void testTickTupleWhenNotExceedingBatchInterval() { + final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency); + + // prepare the bolt + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // process a regular tuple + Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple); + + // process a tick tuple + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + bolt.execute(tickTuple); + + // should not have produced any NiFiDataPackets + verifyZeroInteractions(niFiDataPacketBuilder); + } + + @Test + public void testTickTupleWhenExceedingBatchInterval() throws InterruptedException { + final int batchInterval = 1; + final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency) + .withBatchInterval(batchInterval); + + // prepare the bolt + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // process a regular tuple + Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple); + + // sleep so we pass the batch interval + Thread.sleep(batchInterval + 1000); + + // process a tick tuple + Tuple tickTuple = MockTupleHelpers.mockTickTuple(); + bolt.execute(tickTuple); + + // should have produced one data packet and acked it + verify(niFiDataPacketBuilder, times(1)).createNiFiDataPacket(eq(dataTuple)); + verify(collector, times(1)).ack(eq(dataTuple)); + } + + @Test + public void testBatchSize() { + final int batchSize = 3; + final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency) + .withBatchSize(batchSize); + + // prepare the bolt + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // process a regular tuple, haven't hit batch size yet + Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple1); + verifyZeroInteractions(niFiDataPacketBuilder); + + // process a regular tuple, haven't hit batch size yet + Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple2); + verifyZeroInteractions(niFiDataPacketBuilder); + + // process a regular tuple, triggers batch size + Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple3); + verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class)); + verify(collector, times(batchSize)).ack(any(Tuple.class)); + } + + @Test + public void testFailure() throws IOException { + final int batchSize = 3; + final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency) + .withBatchSize(batchSize); + + when(((TestableNiFiBolt)bolt).transaction.complete()) + .thenThrow(new RuntimeException("Could not complete transaction")); + + // prepare the bolt + Map conf = mock(Map.class); + TopologyContext context = mock(TopologyContext.class); + OutputCollector collector = mock(OutputCollector.class); + bolt.prepare(conf, context, collector); + + // process a regular tuple, haven't hit batch size yet + Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple1); + verifyZeroInteractions(niFiDataPacketBuilder); + + // process a regular tuple, haven't hit batch size yet + Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple2); + verifyZeroInteractions(niFiDataPacketBuilder); + + // process a regular tuple, triggers batch size + Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi"); + bolt.execute(dataTuple3); + verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class)); + + verify(collector, times(batchSize)).fail(any(Tuple.class)); + } + + /** + * Extend NiFiBolt to provide a mock SiteToSiteClient. + */ + private static final class TestableNiFiBolt extends NiFiBolt { + + SiteToSiteClient mockSiteToSiteClient; + Transaction transaction; + + public TestableNiFiBolt(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder, int tickFrequencySeconds) { + super(clientConfig, builder, tickFrequencySeconds); + + mockSiteToSiteClient = mock(SiteToSiteClient.class); + transaction = mock(Transaction.class); + } + + @Override + protected SiteToSiteClient createSiteToSiteClient() { + try { + when(mockSiteToSiteClient.createTransaction(eq(TransferDirection.SEND))) + .thenReturn(transaction); + } catch (IOException e) { + throw new RuntimeException(e); + } + return mockSiteToSiteClient; + } + + } + +}