NIFI-8335 Remove the nifi-storm-spout module that is no longer maintained

This closes #4912

Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
Nathan Gough 2021-03-17 12:42:42 -04:00 committed by Joey Frazee
parent 105a76b7b7
commit 61c4261bb7
11 changed files with 1 additions and 933 deletions

View File

@ -1594,7 +1594,7 @@ To create an Output Port for Site-to-Site in a child Process Group, enter the na
image::add-output-port-S2S.png["Add Output Port for Site-to-Site"]
In addition to other instances of NiFi, some other applications may use a Site-to-Site client in order to push data to or receive data from a NiFi instance. For example, NiFi provides an Apache Storm spout and an Apache Spark Receiver that are able to pull data from NiFi's Output Ports for Site-to-Site connections.
In addition to other instances of NiFi, some other applications may use a Site-to-Site client in order to push data to or receive data from a NiFi instance.
NOTE: For information on how to enable and configure Site-to-Site on a NiFi instance, see the
link:administration-guide.html#site_to_site_properties[Site-to-Site Properties] section of the

View File

@ -1,43 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>1.14.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-storm-spout</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
<version>1.14.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,196 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
* through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
* until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
* immediately in a single transaction.
*/
public class NiFiBolt extends BaseRichBolt {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
private final SiteToSiteClientConfig clientConfig;
private final NiFiDataPacketBuilder builder;
private final int tickFrequencySeconds;
private SiteToSiteClient client;
private OutputCollector collector;
private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
private int batchSize = 10;
private int batchIntervalInSec = 10;
private long lastBatchProcessTimeSeconds = 0;
public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
Validate.notNull(clientConfig);
Validate.notNull(builder);
Validate.isTrue(tickFrequencySeconds > 0);
this.clientConfig = clientConfig;
this.builder = builder;
this.tickFrequencySeconds = tickFrequencySeconds;
}
public NiFiBolt withBatchSize(int batchSize) {
Validate.isTrue(batchSize > 0);
this.batchSize = batchSize;
return this;
}
public NiFiBolt withBatchInterval(int batchIntervalInSec) {
Validate.isTrue(batchIntervalInSec > 0);
this.batchIntervalInSec = batchIntervalInSec;
return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.client = createSiteToSiteClient();
this.collector = outputCollector;
this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
LOGGER.info("Bolt is prepared with Batch Size " + batchSize
+ ", Batch Interval " + batchIntervalInSec
+ ", Tick Frequency is " + tickFrequencySeconds);
}
protected SiteToSiteClient createSiteToSiteClient() {
return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
}
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
// if we have a tick tuple then lets see if enough time has passed since our last batch was processed
if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
finishBatch();
} else {
LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
}
} else {
// for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
this.queue.add(tuple);
int queueSize = this.queue.size();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
}
if (queueSize >= batchSize) {
LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
finishBatch();
}
}
}
private void finishBatch() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Finishing batch of size " + queue.size());
}
lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
final List<Tuple> tuples = new ArrayList<>();
queue.drainTo(tuples);
if (tuples.size() == 0) {
LOGGER.debug("Finishing batch, but no tuples so returning...");
return;
}
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
if (transaction == null) {
throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
}
// convert each tuple to a NiFiDataPacket and send it as part of the transaction
for (Tuple tuple : tuples) {
final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
}
transaction.confirm();
transaction.complete();
// ack the tuples after successfully completing the transaction
for (Tuple tuple : tuples) {
collector.ack(tuple);
}
} catch(Exception e){
LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
for (Tuple tuple : tuples) {
collector.fail(tuple);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void cleanup() {
super.cleanup();
if (client != null) {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencySeconds);
return conf;
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.util.Map;
/**
* <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
* a FlowFile's content and its attributes so that they can be processed by
* Storm
* </p>
*/
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.storm.tuple.Tuple;
/**
* Converts a Tuple into a NiFiDataPacket.
*/
public interface NiFiDataPacketBuilder {
NiFiDataPacket createNiFiDataPacket(Tuple tuple);
}

View File

@ -1,256 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p>
* The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so
* that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi
* instance provided in the config and requests data from the OutputPort that
* is named. In NiFi, when an OutputPort is added to the root process group,
* it acts as a queue of data for remote clients. This spout is then able to
* pull that data from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL
* that should be used is that of the NiFi Cluster Manager. The Receiver will
* automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiSpout, you will need to first build a
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* snippet of driver code to pull data from NiFi that is running on
* localhost:8080. This example assumes that NiFi exposes an OutputPort on the
* root group named "Data For Storm". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* {@code
*
* // Build a Site-To-Site client config
* SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
* .url("http://localhost:8080/nifi")
* .portName("Data for Storm")
* .buildConfig();
*
* // Build a topology starting with a NiFiSpout
* TopologyBuilder builder = new TopologyBuilder();
* builder.setSpout("nifi", new NiFiSpout(clientConfig));
*
* // Add a bolt that prints the attributes and content
* builder.setBolt("print", new BaseBasicBolt() {
* @Override
* public void execute(Tuple tuple, BasicOutputCollector collector) {
* NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket");
* System.out.println("Attributes: " + dp.getAttributes());
* System.out.println("Content: " + new String(dp.getContent()));
* }
*
* @Override
* public void declareOutputFields(OutputFieldsDeclarer declarer) {}
*
* }).shuffleGrouping("nifi");
*
* // Submit the topology running in local mode
* Config conf = new Config();
* LocalCluster cluster = new LocalCluster();
* cluster.submitTopology("test", conf, builder.createTopology());
*
* Utils.sleep(90000);
* cluster.shutdown();
* }
* </pre>
* </code>
*/
public class NiFiSpout extends BaseRichSpout {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
public static final String NIFI_DATA_PACKET = "nifiDataPacket";
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
private final List<String> attributeNames;
/**
* @param clientConfig
* configuration used to build the SiteToSiteClient
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this(clientConfig, null);
}
/**
*
* @param clientConfig
* configuration used to build the SiteToSiteClient
* @param attributeNames
* names of FlowFile attributes to be added as values to each tuple, in addition
* to the nifiDataPacket value on all tuples
*
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig, List<String> attributeNames) {
this.clientConfig = clientConfig;
this.attributeNames = (attributeNames == null ? new ArrayList<String>() : attributeNames);
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
this.queue = new LinkedBlockingQueue<>(1000);
this.spoutReceiver = new NiFiSpoutReceiver();
this.spoutReceiver.setDaemon(true);
this.spoutReceiver.setName("NiFi Spout Receiver");
this.spoutReceiver.start();
}
@Override
public void nextTuple() {
NiFiDataPacket data = queue.poll();
if (data == null) {
Utils.sleep(50);
} else {
// always start with the data packet
Values values = new Values(data);
// add additional values based on the specified attribute names
for (String attributeName : attributeNames) {
if (data.getAttributes().containsKey(attributeName)) {
values.add(data.getAttributes().get(attributeName));
}
}
spoutOutputCollector.emit(values);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
final List<String> fieldNames = new ArrayList<>();
fieldNames.add(NIFI_DATA_PACKET);
fieldNames.addAll(attributeNames);
outputFieldsDeclarer.declare(new Fields(fieldNames));
}
@Override
public void close() {
super.close();
spoutReceiver.shutdown();
}
class NiFiSpoutReceiver extends Thread {
private boolean shutdown = false;
public synchronized void shutdown() {
this.shutdown = true;
}
@Override
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while (!shutdown) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket niFiDataPacket = new StandardNiFiDataPacket(data, attributes);
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
for (NiFiDataPacket dp : dataPackets) {
queue.offer(dp);
}
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
LOGGER.error("Failed to receive data from NiFi", ioe);
}
}
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.io.Serializable;
import java.util.Map;
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
private static final long serialVersionUID = 6364005260220243322L;
private final byte[] content;
private final Map<String, String> attributes;
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
this.content = content;
this.attributes = attributes;
}
@Override
public byte[] getContent() {
return content;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
}

View File

@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
import org.mockito.Mockito;
public final class MockTupleHelpers {
private MockTupleHelpers() {
}
public static Tuple mockTickTuple() {
return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
}
public static Tuple mockTuple(String componentId, String streamId) {
Tuple tuple = Mockito.mock(Tuple.class);
Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
return tuple;
}
}

View File

@ -1,80 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import java.io.Serializable;
/**
* Example topology that pulls data from a NiFi Output Port named 'Data for Storm' and writes the same
* data back to a NiFi Input Port named 'Data from Storm'.
*/
public class NiFiStormTopology {
public static void main( String[] args ) {
// Build a Site-To-Site client config for pulling data
final SiteToSiteClientConfig inputConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Storm")
.buildConfig();
// Build a Site-To-Site client config for pushing data
final SiteToSiteClientConfig outputConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Storm")
.buildConfig();
final int tickFrequencySeconds = 5;
final NiFiDataPacketBuilder niFiDataPacketBuilder = new SimpleNiFiDataPacketBuilder();
final NiFiBolt niFiBolt = new NiFiBolt(outputConfig, niFiDataPacketBuilder, tickFrequencySeconds)
//.withBatchSize(1)
;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("nifiInput", new NiFiSpout(inputConfig));
builder.setBolt("nifiOutput", niFiBolt).shuffleGrouping("nifiInput");
// Submit the topology running in local mode
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(90000);
cluster.shutdown();
}
/**
* Simple builder that returns the incoming data packet.
*/
static class SimpleNiFiDataPacketBuilder implements NiFiDataPacketBuilder, Serializable {
private static final long serialVersionUID = 3067274587595578836L;
@Override
public NiFiDataPacket createNiFiDataPacket(Tuple tuple) {
return (NiFiDataPacket) tuple.getValueByField(NiFiSpout.NIFI_DATA_PACKET);
}
}
}

View File

@ -1,207 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TestNiFiBolt {
private int tickFrequency;
private SiteToSiteClientConfig siteToSiteClientConfig;
private NiFiDataPacketBuilder niFiDataPacketBuilder;
@Before
public void setup() {
tickFrequency = 30;
siteToSiteClientConfig = mock(SiteToSiteClientConfig.class);
niFiDataPacketBuilder = mock(NiFiDataPacketBuilder.class);
// setup the builder to return empty data packets for testing
when(niFiDataPacketBuilder.createNiFiDataPacket(any(Tuple.class))).thenReturn(new NiFiDataPacket() {
@Override
public byte[] getContent() {
return new byte[0];
}
@Override
public Map<String, String> getAttributes() {
return new HashMap<>();
}
});
}
@Test
public void testTickTupleWhenNotExceedingBatchInterval() {
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple
Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple);
// process a tick tuple
Tuple tickTuple = MockTupleHelpers.mockTickTuple();
bolt.execute(tickTuple);
// should not have produced any NiFiDataPackets
verifyZeroInteractions(niFiDataPacketBuilder);
}
@Test
public void testTickTupleWhenExceedingBatchInterval() throws InterruptedException {
final int batchInterval = 1;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchInterval(batchInterval);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple
Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple);
// sleep so we pass the batch interval
Thread.sleep(batchInterval + 1000);
// process a tick tuple
Tuple tickTuple = MockTupleHelpers.mockTickTuple();
bolt.execute(tickTuple);
// should have produced one data packet and acked it
verify(niFiDataPacketBuilder, times(1)).createNiFiDataPacket(eq(dataTuple));
verify(collector, times(1)).ack(eq(dataTuple));
}
@Test
public void testBatchSize() {
final int batchSize = 3;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchSize(batchSize);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple1);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple2);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, triggers batch size
Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple3);
verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
verify(collector, times(batchSize)).ack(any(Tuple.class));
}
@Test
public void testFailure() throws IOException {
final int batchSize = 3;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchSize(batchSize);
when(((TestableNiFiBolt)bolt).transaction.complete())
.thenThrow(new RuntimeException("Could not complete transaction"));
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple1);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple2);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, triggers batch size
Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple3);
verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
verify(collector, times(batchSize)).fail(any(Tuple.class));
}
/**
* Extend NiFiBolt to provide a mock SiteToSiteClient.
*/
private static final class TestableNiFiBolt extends NiFiBolt {
SiteToSiteClient mockSiteToSiteClient;
Transaction transaction;
public TestableNiFiBolt(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder, int tickFrequencySeconds) {
super(clientConfig, builder, tickFrequencySeconds);
mockSiteToSiteClient = mock(SiteToSiteClient.class);
transaction = mock(Transaction.class);
}
@Override
protected SiteToSiteClient createSiteToSiteClient() {
try {
when(mockSiteToSiteClient.createTransaction(eq(TransferDirection.SEND)))
.thenReturn(transaction);
} catch (IOException e) {
throw new RuntimeException(e);
}
return mockSiteToSiteClient;
}
}
}

View File

@ -25,7 +25,6 @@
<packaging>pom</packaging>
<modules>
<module>nifi-spark-receiver</module>
<module>nifi-storm-spout</module>
<module>nifi-example-bundle</module>
<module>nifi-kafka-connect</module>
</modules>