NIFI-606 Add a NiFi Storm Spout

Fixing JavaDoc to pass check-styles

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
bbende 2015-05-30 10:00:54 -04:00 committed by Aldrin Piri
parent ba78556468
commit 960560723d
4 changed files with 310 additions and 0 deletions

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-storm-spout</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import java.util.Map;
/**
* <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
* a FlowFile's content and its attributes so that they can be processed by
* Storm
* </p>
*/
public interface NiFiDataPacket {
/**
* @return the contents of a NiFi FlowFile
*/
byte[] getContent();
/**
* @return a Map of attributes that are associated with the NiFi FlowFile
*/
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p>
* The <code>NiFiSpout</code> provides a way to pull data from Apache NiFi so
* that it can be processed by Apache Storm. The NiFi Spout connects to a NiFi
* instance provided in the config and requests data from the OutputPort that
* is named. In NiFi, when an OutputPort is added to the root process group,
* it acts as a queue of data for remote clients. This spout is then able to
* pull that data from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL
* that should be used is that of the NiFi Cluster Manager. The Receiver will
* automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiSpout, you will need to first build a
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* snippet of driver code to pull data from NiFi that is running on
* localhost:8080. This example assumes that NiFi exposes and OutputPort on the
* root group named "Data For Storm". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* {@code
*
* // Build a Site-To-Site client config
* SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
* .url("http://localhost:8080/nifi")
* .portName("Data for Storm")
* .buildConfig();
*
* // Build a topology starting with a NiFiSpout
* TopologyBuilder builder = new TopologyBuilder();
* builder.setSpout("nifi", new NiFiSpout(clientConfig));
*
* // Add a bolt that prints the attributes and content
* builder.setBolt("print", new BaseBasicBolt() {
* @Override
* public void execute(Tuple tuple, BasicOutputCollector collector) {
* NiFiDataPacket dp = (NiFiDataPacket) tuple.getValueByField("nifiDataPacket");
* System.out.println("Attributes: " + dp.getAttributes());
* System.out.println("Content: " + new String(dp.getContent()));
* }
*
* @Override
* public void declareOutputFields(OutputFieldsDeclarer declarer) {}
*
* }).shuffleGrouping("nifi");
*
* // Submit the topology running in local mode
* Config conf = new Config();
* LocalCluster cluster = new LocalCluster();
* cluster.submitTopology("test", conf, builder.createTopology());
*
* Utils.sleep(90000);
* cluster.shutdown();
* }
* </pre>
* </code>
*/
public class NiFiSpout extends BaseRichSpout {
private static final long serialVersionUID = 3067274587595578836L;
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
this.queue = new LinkedBlockingQueue<>(1000);
this.spoutReceiver = new NiFiSpoutReceiver();
this.spoutReceiver.setDaemon(true);
this.spoutReceiver.setName("NiFi Spout Receiver");
this.spoutReceiver.start();
}
@Override
public void nextTuple() {
NiFiDataPacket data = queue.poll();
if (data == null) {
Utils.sleep(50);
} else {
spoutOutputCollector.emit(new Values(data));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
}
@Override
public void close() {
super.close();
spoutReceiver.shutdown();
}
class NiFiSpoutReceiver extends Thread {
private boolean shutdown = false;
public synchronized void shutdown() {
this.shutdown = true;
}
@Override
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while (!shutdown) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket niFiDataPacket = new NiFiDataPacket() {
@Override
public byte[] getContent() {
return data;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
};
dataPackets.add(niFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
for (NiFiDataPacket dp : dataPackets) {
queue.offer(dp);
}
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
LOGGER.error("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
LOGGER.error("Failed to receive data from NiFi", ioe);
}
}
}
}

View File

@ -25,5 +25,6 @@
<packaging>pom</packaging> <packaging>pom</packaging>
<modules> <modules>
<module>nifi-spark-receiver</module> <module>nifi-spark-receiver</module>
<module>nifi-storm-spout</module>
</modules> </modules>
</project> </project>