From d68f71b126c58112070d81f14e804497f9649e5b Mon Sep 17 00:00:00 2001 From: joewitt Date: Fri, 24 Apr 2015 16:44:04 -0400 Subject: [PATCH] NIFI-271 --- .../nifi-external/nifi-spark-receiver/pom.xml | 22 +- .../org/apache/nifi/spark/NiFiDataPacket.java | 23 +- .../org/apache/nifi/spark/NiFiReceiver.java | 228 +++++++++--------- 3 files changed, 141 insertions(+), 132 deletions(-) diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml b/nifi/nifi-external/nifi-spark-receiver/pom.xml index 5c93f6b2d1..a6d9378f8f 100644 --- a/nifi/nifi-external/nifi-spark-receiver/pom.xml +++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml @@ -23,15 +23,15 @@ org.apache.nifi nifi-spark-receiver - - - org.apache.spark - spark-streaming_2.10 - 1.2.0 - - - org.apache.nifi - nifi-site-to-site-client - - + + + org.apache.spark + spark-streaming_2.10 + 1.2.0 + + + org.apache.nifi + nifi-site-to-site-client + + \ No newline at end of file diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java index 2f08dc5c6d..484c2a95a8 100644 --- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java @@ -20,21 +20,20 @@ import java.util.Map; /** *

- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's - * content and its attributes so that they can be processed by Spark + * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both + * a FlowFile's content and its attributes so that they can be processed by + * Spark *

*/ public interface NiFiDataPacket { - /** - * Returns the contents of a NiFi FlowFile - * @return - */ - byte[] getContent(); + /** + * @return the contents of a NiFi FlowFile + */ + byte[] getContent(); - /** - * Returns a Map of attributes that are associated with the NiFi FlowFile - * @return - */ - Map getAttributes(); + /** + * @return a Map of attributes that are associated with the NiFi FlowFile + */ + Map getAttributes(); } diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java index 9f3106210c..8cbf60c484 100644 --- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java @@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.receiver.Receiver; - /** *

- * The NiFiReceiver is a Reliable Receiver that provides a way to pull data - * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects - * to NiFi instance provided in the config and requests data from - * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group, - * it acts as a queue of data for remote clients. This receiver is then able to pull that data - * from NiFi reliably. + * The NiFiReceiver is a Reliable Receiver that provides a way to + * pull data from Apache NiFi so that it can be processed by Spark Streaming. + * The NiFi Receiver connects to NiFi instance provided in the config and + * requests data from the OutputPort that is named. In NiFi, when an OutputPort + * is added to the root process group, it acts as a queue of data for remote + * clients. This receiver is then able to pull that data from NiFi reliably. *

- * + * *

- * It is important to note that if pulling data from a NiFi cluster, the URL that should be used - * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes - * in that cluster and pull from those nodes as appropriate. + * It is important to note that if pulling data from a NiFi cluster, the URL + * that should be used is that of the NiFi Cluster Manager. The Receiver will + * automatically handle determining the nodes in that cluster and pull from + * those nodes as appropriate. *

- * + * *

- * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide - * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}. - * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This - * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark". - * Additionally, it assumes that the data that it will receive from this OutputPort is text - * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string. + * In order to use the NiFiReceiver, you will need to first build a + * {@link SiteToSiteClientConfig} to provide to the constructor. This can be + * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example + * snippet of driver code to pull data from NiFi that is running on + * localhost:8080. This example assumes that NiFi exposes and OutputPort on the + * root group named "Data For Spark". Additionally, it assumes that the data + * that it will receive from this OutputPort is text data, as it will map the + * byte array received from NiFi to a UTF-8 Encoded string. *

- * + * * *
+ * {@code
  * Pattern SPACE = Pattern.compile(" ");
- * 
+ *
  * // Build a Site-to-site client config
  * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  *   .setUrl("http://localhost:8080/nifi")
  *   .setPortName("Data For Spark")
  *   .buildConfig();
- * 
+ *
  * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
  * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
- * 
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
+ *
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
  * // specified Port
- * JavaReceiverInputDStream packetStream = 
+ * JavaReceiverInputDStream packetStream =
  *     ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
- * 
+ *
  * // Map the data from NiFi to text, ignoring the attributes
  * JavaDStream text = packetStream.map(new Function() {
  *   public String call(final NiFiDataPacket dataPacket) throws Exception {
  *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
  *   }
  * });
- * 
+ *
  * // Split the words by spaces
  * JavaDStream words = text.flatMap(new FlatMapFunction() {
  *   public Iterable call(final String text) throws Exception {
  *     return Arrays.asList(SPACE.split(text));
  *   }
  * });
- * 	    
+ *
  * // Map each word to the number 1, then aggregate by key
  * JavaPairDStream wordCounts = words.mapToPair(
  *   new PairFunction() {
@@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver;
  *     }
  *    }
  *  );
- * 
+ *
  * // print the results
  * wordCounts.print();
  * ssc.start();
  * ssc.awaitTermination();
+ * }
  * 
*
*/ public class NiFiReceiver extends Receiver { - private static final long serialVersionUID = 3067274587595578836L; - private final SiteToSiteClientConfig clientConfig; - - public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { - super(storageLevel); - this.clientConfig = clientConfig; - } - - @Override - public void onStart() { - final Thread thread = new Thread(new ReceiveRunnable()); - thread.setDaemon(true); - thread.setName("NiFi Receiver"); - thread.start(); - } - @Override - public void onStop() { - } + private static final long serialVersionUID = 3067274587595578836L; + private final SiteToSiteClientConfig clientConfig; - class ReceiveRunnable implements Runnable { - public ReceiveRunnable() { - } - - public void run() { - try { - final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - try { - while ( !isStopped() ) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - DataPacket dataPacket = transaction.receive(); - if ( dataPacket == null ) { - transaction.confirm(); - transaction.complete(); - - // no data available. Wait a bit and try again - try { - Thread.sleep(1000L); - } catch (InterruptedException e) {} - - continue; - } - - final List dataPackets = new ArrayList(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map attributes = dataPacket.getAttributes(); - final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() { - public byte[] getContent() { - return data; - } + public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { + super(storageLevel); + this.clientConfig = clientConfig; + } - public Map getAttributes() { - return attributes; - } - }; - - dataPackets.add(NiFiDataPacket); - dataPacket = transaction.receive(); - } while ( dataPacket != null ); + @Override + public void onStart() { + final Thread thread = new Thread(new ReceiveRunnable()); + thread.setDaemon(true); + thread.setName("NiFi Receiver"); + thread.start(); + } - // Confirm transaction to verify the data - transaction.confirm(); - - store(dataPackets.iterator()); - - transaction.complete(); - } - } finally { - try { - client.close(); - } catch (final IOException ioe) { - reportError("Failed to close client", ioe); - } - } - } catch (final IOException ioe) { - restart("Failed to receive data from NiFi", ioe); - } - } - } + @Override + public void onStop() { + } + + class ReceiveRunnable implements Runnable { + + public ReceiveRunnable() { + } + + @Override + public void run() { + try { + final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); + try { + while (!isStopped()) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); + + // no data available. Wait a bit and try again + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + + continue; + } + + final List dataPackets = new ArrayList<>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); + + final Map attributes = dataPacket.getAttributes(); + final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() { + @Override + public byte[] getContent() { + return data; + } + + @Override + public Map getAttributes() { + return attributes; + } + }; + + dataPackets.add(NiFiDataPacket); + dataPacket = transaction.receive(); + } while (dataPacket != null); + + // Confirm transaction to verify the data + transaction.confirm(); + + store(dataPackets.iterator()); + + transaction.complete(); + } + } finally { + try { + client.close(); + } catch (final IOException ioe) { + reportError("Failed to close client", ioe); + } + } + } catch (final IOException ioe) { + restart("Failed to receive data from NiFi", ioe); + } + } + } }