This commit is contained in:
joewitt 2015-04-24 16:44:04 -04:00
parent dca93a5070
commit d68f71b126
3 changed files with 141 additions and 132 deletions

View File

@ -23,15 +23,15 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-receiver</artifactId> <artifactId>nifi-spark-receiver</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId> <artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId> <artifactId>nifi-site-to-site-client</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -20,21 +20,20 @@ import java.util.Map;
/** /**
* <p> * <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
* content and its attributes so that they can be processed by Spark * a FlowFile's content and its attributes so that they can be processed by
* Spark
* </p> * </p>
*/ */
public interface NiFiDataPacket { public interface NiFiDataPacket {
/** /**
* Returns the contents of a NiFi FlowFile * @return the contents of a NiFi FlowFile
* @return */
*/ byte[] getContent();
byte[] getContent();
/** /**
* Returns a Map of attributes that are associated with the NiFi FlowFile * @return a Map of attributes that are associated with the NiFi FlowFile
* @return */
*/ Map<String, String> getAttributes();
Map<String, String> getAttributes();
} }

View File

@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.spark.storage.StorageLevel; import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver; import org.apache.spark.streaming.receiver.Receiver;
/** /**
* <p> * <p>
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
* from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects * pull data from Apache NiFi so that it can be processed by Spark Streaming.
* to NiFi instance provided in the config and requests data from * The NiFi Receiver connects to NiFi instance provided in the config and
* the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group, * requests data from the OutputPort that is named. In NiFi, when an OutputPort
* it acts as a queue of data for remote clients. This receiver is then able to pull that data * is added to the root process group, it acts as a queue of data for remote
* from NiFi reliably. * clients. This receiver is then able to pull that data from NiFi reliably.
* </p> * </p>
* *
* <p> * <p>
* It is important to note that if pulling data from a NiFi cluster, the URL that should be used * It is important to note that if pulling data from a NiFi cluster, the URL
* is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes * that should be used is that of the NiFi Cluster Manager. The Receiver will
* in that cluster and pull from those nodes as appropriate. * automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p> * </p>
* *
* <p> * <p>
* In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide * In order to use the NiFiReceiver, you will need to first build a
* to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}. * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark". * snippet of driver code to pull data from NiFi that is running on
* Additionally, it assumes that the data that it will receive from this OutputPort is text * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
* data, as it will map the byte array received from NiFi to a UTF-8 Encoded string. * root group named "Data For Spark". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p> * </p>
* *
* <code> * <code>
* <pre> * <pre>
* {@code
* Pattern SPACE = Pattern.compile(" "); * Pattern SPACE = Pattern.compile(" ");
* *
* // Build a Site-to-site client config * // Build a Site-to-site client config
* SiteToSiteClientConfig config = new SiteToSiteClient.Builder() * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
* .setUrl("http://localhost:8080/nifi") * .setUrl("http://localhost:8080/nifi")
* .setPortName("Data For Spark") * .setPortName("Data For Spark")
* .buildConfig(); * .buildConfig();
* *
* SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example"); * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
* JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L)); * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
* *
* // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
* // specified Port * // specified Port
* JavaReceiverInputDStream<NiFiDataPacket> packetStream = * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
* ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY())); * ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
* *
* // Map the data from NiFi to text, ignoring the attributes * // Map the data from NiFi to text, ignoring the attributes
* JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() { * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
* public String call(final NiFiDataPacket dataPacket) throws Exception { * public String call(final NiFiDataPacket dataPacket) throws Exception {
* return new String(dataPacket.getContent(), StandardCharsets.UTF_8); * return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
* } * }
* }); * });
* *
* // Split the words by spaces * // Split the words by spaces
* JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() { * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
* public Iterable<String> call(final String text) throws Exception { * public Iterable<String> call(final String text) throws Exception {
* return Arrays.asList(SPACE.split(text)); * return Arrays.asList(SPACE.split(text));
* } * }
* }); * });
* *
* // Map each word to the number 1, then aggregate by key * // Map each word to the number 1, then aggregate by key
* JavaPairDStream<String, Integer> wordCounts = words.mapToPair( * JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
* new PairFunction<String, String, Integer>() { * new PairFunction<String, String, Integer>() {
@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver;
* } * }
* } * }
* ); * );
* *
* // print the results * // print the results
* wordCounts.print(); * wordCounts.print();
* ssc.start(); * ssc.start();
* ssc.awaitTermination(); * ssc.awaitTermination();
* }
* </pre> * </pre>
* </code> * </code>
*/ */
public class NiFiReceiver extends Receiver<NiFiDataPacket> { public class NiFiReceiver extends Receiver<NiFiDataPacket> {
private static final long serialVersionUID = 3067274587595578836L;
private final SiteToSiteClientConfig clientConfig;
public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
super(storageLevel);
this.clientConfig = clientConfig;
}
@Override
public void onStart() {
final Thread thread = new Thread(new ReceiveRunnable());
thread.setDaemon(true);
thread.setName("NiFi Receiver");
thread.start();
}
@Override private static final long serialVersionUID = 3067274587595578836L;
public void onStop() { private final SiteToSiteClientConfig clientConfig;
}
class ReceiveRunnable implements Runnable { public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
public ReceiveRunnable() { super(storageLevel);
} this.clientConfig = clientConfig;
}
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while ( !isStopped() ) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if ( dataPacket == null ) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
public byte[] getContent() {
return data;
}
public Map<String, String> getAttributes() { @Override
return attributes; public void onStart() {
} final Thread thread = new Thread(new ReceiveRunnable());
}; thread.setDaemon(true);
thread.setName("NiFi Receiver");
dataPackets.add(NiFiDataPacket); thread.start();
dataPacket = transaction.receive(); }
} while ( dataPacket != null );
// Confirm transaction to verify the data @Override
transaction.confirm(); public void onStop() {
}
store(dataPackets.iterator());
class ReceiveRunnable implements Runnable {
transaction.complete();
} public ReceiveRunnable() {
} finally { }
try {
client.close(); @Override
} catch (final IOException ioe) { public void run() {
reportError("Failed to close client", ioe); try {
} final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
} try {
} catch (final IOException ioe) { while (!isStopped()) {
restart("Failed to receive data from NiFi", ioe); final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
} DataPacket dataPacket = transaction.receive();
} if (dataPacket == null) {
} transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
@Override
public byte[] getContent() {
return data;
}
@Override
public Map<String, String> getAttributes() {
return attributes;
}
};
dataPackets.add(NiFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
store(dataPackets.iterator());
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
reportError("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
restart("Failed to receive data from NiFi", ioe);
}
}
}
} }