NIFI-1778 Adding NiFiBolt to write back to NiFi from Storm

- Adding example topology that creates a full loop between NiFi and Storm.
- Bumping Storm to 0.10.0

NIFI-1778 Addressing code review comments
This closes #361
This commit is contained in:
Bryan Bende 2016-04-18 14:00:40 -04:00 committed by Oleg Zhurakousky
parent f54e5d4b1b
commit e12a79ea92
8 changed files with 598 additions and 12 deletions

View File

@ -27,9 +27,13 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<version>0.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.TupleUtils;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A Storm bolt that can send tuples back to NiFi. This bolt provides a micro-batching approach for higher
* through put scenarios. The bolt will queue tuples until the number of tuples reaches the provided batch size, or
* until the provided batch interval in seconds has been exceeded. Setting the batch size to 1 will send each tuple
* immediately in a single transaction.
*/
public class NiFiBolt extends BaseRichBolt {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiBolt.class);
private final SiteToSiteClientConfig clientConfig;
private final NiFiDataPacketBuilder builder;
private final int tickFrequencySeconds;
private SiteToSiteClient client;
private OutputCollector collector;
private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
private int batchSize = 10;
private int batchIntervalInSec = 10;
private long lastBatchProcessTimeSeconds = 0;
public NiFiBolt(final SiteToSiteClientConfig clientConfig, final NiFiDataPacketBuilder builder, final int tickFrequencySeconds) {
Validate.notNull(clientConfig);
Validate.notNull(builder);
Validate.isTrue(tickFrequencySeconds > 0);
this.clientConfig = clientConfig;
this.builder = builder;
this.tickFrequencySeconds = tickFrequencySeconds;
}
public NiFiBolt withBatchSize(int batchSize) {
Validate.isTrue(batchSize > 0);
this.batchSize = batchSize;
return this;
}
public NiFiBolt withBatchInterval(int batchIntervalInSec) {
Validate.isTrue(batchIntervalInSec > 0);
this.batchIntervalInSec = batchIntervalInSec;
return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.client = createSiteToSiteClient();
this.collector = outputCollector;
this.lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
LOGGER.info("Bolt is prepared with Batch Size " + batchSize
+ ", Batch Interval " + batchIntervalInSec
+ ", Tick Frequency is " + tickFrequencySeconds);
}
protected SiteToSiteClient createSiteToSiteClient() {
return new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
}
@Override
public void execute(Tuple tuple) {
if (TupleUtils.isTick(tuple)) {
// if we have a tick tuple then lets see if enough time has passed since our last batch was processed
if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
LOGGER.debug("Received tick tuple and reached batch interval, executing batch");
finishBatch();
} else {
LOGGER.debug("Received tick tuple, but haven't reached batch interval, nothing to do");
}
} else {
// for a regular tuple we add it to the queue and then see if our queue size exceeds batch size
this.queue.add(tuple);
int queueSize = this.queue.size();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Current queue size is " + queueSize + ", and batch size is " + batchSize);
}
if (queueSize >= batchSize) {
LOGGER.debug("Queue Size is greater than or equal to batch size, executing batch");
finishBatch();
}
}
}
private void finishBatch() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Finishing batch of size " + queue.size());
}
lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
final List<Tuple> tuples = new ArrayList<>();
queue.drainTo(tuples);
if (tuples.size() == 0) {
LOGGER.debug("Finishing batch, but no tuples so returning...");
return;
}
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
if (transaction == null) {
throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
}
// convert each tuple to a NiFiDataPacket and send it as part of the transaction
for (Tuple tuple : tuples) {
final NiFiDataPacket dataPacket = builder.createNiFiDataPacket(tuple);
transaction.send(dataPacket.getContent(), dataPacket.getAttributes());
}
transaction.confirm();
transaction.complete();
// ack the tuples after successfully completing the transaction
for (Tuple tuple : tuples) {
collector.ack(tuple);
}
} catch(Exception e){
LOGGER.warn("Unable to process tuples due to: " + e.getMessage(), e);
for (Tuple tuple : tuples) {
collector.fail(tuple);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void cleanup() {
super.cleanup();
if (client != null) {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencySeconds);
return conf;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.tuple.Tuple;
/**
* Converts a Tuple into a NiFiDataPacket.
*/
public interface NiFiDataPacketBuilder {
NiFiDataPacket createNiFiDataPacket(Tuple tuple);
}

View File

@ -226,17 +226,7 @@ public class NiFiSpout extends BaseRichSpout {
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
@Override
public byte[] getContent() {
return data;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
};
final NiFiDataPacket niFiDataPacket = new StandardNiFiDataPacket(data, attributes);
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.io.Serializable;
import java.util.Map;
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
private static final long serialVersionUID = 6364005260220243322L;
private final byte[] content;
private final Map<String, String> attributes;
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
this.content = content;
this.attributes = attributes;
}
@Override
public byte[] getContent() {
return content;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
import org.mockito.Mockito;
public final class MockTupleHelpers {
private MockTupleHelpers() {
}
public static Tuple mockTickTuple() {
return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
}
public static Tuple mockTuple(String componentId, String streamId) {
Tuple tuple = Mockito.mock(Tuple.class);
Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
return tuple;
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import java.io.Serializable;
/**
* Example topology that pulls data from a NiFi Output Port named 'Data for Storm' and writes the same
* data back to a NiFi Input Port named 'Data from Storm'.
*/
public class NiFiStormTopology {
public static void main( String[] args ) {
// Build a Site-To-Site client config for pulling data
final SiteToSiteClientConfig inputConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data for Storm")
.buildConfig();
// Build a Site-To-Site client config for pushing data
final SiteToSiteClientConfig outputConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Storm")
.buildConfig();
final int tickFrequencySeconds = 5;
final NiFiDataPacketBuilder niFiDataPacketBuilder = new SimpleNiFiDataPacketBuilder();
final NiFiBolt niFiBolt = new NiFiBolt(outputConfig, niFiDataPacketBuilder, tickFrequencySeconds)
//.withBatchSize(1)
;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("nifiInput", new NiFiSpout(inputConfig));
builder.setBolt("nifiOutput", niFiBolt).shuffleGrouping("nifiInput");
// Submit the topology running in local mode
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(90000);
cluster.shutdown();
}
/**
* Simple builder that returns the incoming data packet.
*/
static class SimpleNiFiDataPacketBuilder implements NiFiDataPacketBuilder, Serializable {
private static final long serialVersionUID = 3067274587595578836L;
@Override
public NiFiDataPacket createNiFiDataPacket(Tuple tuple) {
return (NiFiDataPacket) tuple.getValueByField(NiFiSpout.NIFI_DATA_PACKET);
}
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class TestNiFiBolt {
private int tickFrequency;
private SiteToSiteClientConfig siteToSiteClientConfig;
private NiFiDataPacketBuilder niFiDataPacketBuilder;
@Before
public void setup() {
tickFrequency = 30;
siteToSiteClientConfig = mock(SiteToSiteClientConfig.class);
niFiDataPacketBuilder = mock(NiFiDataPacketBuilder.class);
// setup the builder to return empty data packets for testing
when(niFiDataPacketBuilder.createNiFiDataPacket(any(Tuple.class))).thenReturn(new NiFiDataPacket() {
@Override
public byte[] getContent() {
return new byte[0];
}
@Override
public Map<String, String> getAttributes() {
return new HashMap<>();
}
});
}
@Test
public void testTickTupleWhenNotExceedingBatchInterval() {
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple
Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple);
// process a tick tuple
Tuple tickTuple = MockTupleHelpers.mockTickTuple();
bolt.execute(tickTuple);
// should not have produced any NiFiDataPackets
verifyZeroInteractions(niFiDataPacketBuilder);
}
@Test
public void testTickTupleWhenExceedingBatchInterval() throws InterruptedException {
final int batchInterval = 1;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchInterval(batchInterval);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple
Tuple dataTuple = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple);
// sleep so we pass the batch interval
Thread.sleep(batchInterval + 1000);
// process a tick tuple
Tuple tickTuple = MockTupleHelpers.mockTickTuple();
bolt.execute(tickTuple);
// should have produced one data packet and acked it
verify(niFiDataPacketBuilder, times(1)).createNiFiDataPacket(eq(dataTuple));
verify(collector, times(1)).ack(eq(dataTuple));
}
@Test
public void testBatchSize() {
final int batchSize = 3;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchSize(batchSize);
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple1);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple2);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, triggers batch size
Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple3);
verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
verify(collector, times(batchSize)).ack(any(Tuple.class));
}
@Test
public void testFailure() throws IOException {
final int batchSize = 3;
final NiFiBolt bolt = new TestableNiFiBolt(siteToSiteClientConfig, niFiDataPacketBuilder, tickFrequency)
.withBatchSize(batchSize);
when(((TestableNiFiBolt)bolt).transaction.complete())
.thenThrow(new RuntimeException("Could not complete transaction"));
// prepare the bolt
Map conf = mock(Map.class);
TopologyContext context = mock(TopologyContext.class);
OutputCollector collector = mock(OutputCollector.class);
bolt.prepare(conf, context, collector);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple1 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple1);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, haven't hit batch size yet
Tuple dataTuple2 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple2);
verifyZeroInteractions(niFiDataPacketBuilder);
// process a regular tuple, triggers batch size
Tuple dataTuple3 = MockTupleHelpers.mockTuple("nifi", "nifi");
bolt.execute(dataTuple3);
verify(niFiDataPacketBuilder, times(batchSize)).createNiFiDataPacket(any(Tuple.class));
verify(collector, times(batchSize)).fail(any(Tuple.class));
}
/**
* Extend NiFiBolt to provide a mock SiteToSiteClient.
*/
private static final class TestableNiFiBolt extends NiFiBolt {
SiteToSiteClient mockSiteToSiteClient;
Transaction transaction;
public TestableNiFiBolt(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder, int tickFrequencySeconds) {
super(clientConfig, builder, tickFrequencySeconds);
mockSiteToSiteClient = mock(SiteToSiteClient.class);
transaction = mock(Transaction.class);
}
@Override
protected SiteToSiteClient createSiteToSiteClient() {
try {
when(mockSiteToSiteClient.createTransaction(eq(TransferDirection.SEND)))
.thenReturn(transaction);
} catch (IOException e) {
throw new RuntimeException(e);
}
return mockSiteToSiteClient;
}
}
}