mirror of https://github.com/apache/nifi.git
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
This commit is contained in:
commit
9033173eb3
|
@ -20,21 +20,20 @@ import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
|
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
|
||||||
* content and its attributes so that they can be processed by Spark
|
* a FlowFile's content and its attributes so that they can be processed by
|
||||||
|
* Spark
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public interface NiFiDataPacket {
|
public interface NiFiDataPacket {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the contents of a NiFi FlowFile
|
* @return the contents of a NiFi FlowFile
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
byte[] getContent();
|
byte[] getContent();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a Map of attributes that are associated with the NiFi FlowFile
|
* @return a Map of attributes that are associated with the NiFi FlowFile
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
Map<String, String> getAttributes();
|
Map<String, String> getAttributes();
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,34 +31,37 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
import org.apache.spark.storage.StorageLevel;
|
import org.apache.spark.storage.StorageLevel;
|
||||||
import org.apache.spark.streaming.receiver.Receiver;
|
import org.apache.spark.streaming.receiver.Receiver;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data
|
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
|
||||||
* from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects
|
* pull data from Apache NiFi so that it can be processed by Spark Streaming.
|
||||||
* to NiFi instance provided in the config and requests data from
|
* The NiFi Receiver connects to NiFi instance provided in the config and
|
||||||
* the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group,
|
* requests data from the OutputPort that is named. In NiFi, when an OutputPort
|
||||||
* it acts as a queue of data for remote clients. This receiver is then able to pull that data
|
* is added to the root process group, it acts as a queue of data for remote
|
||||||
* from NiFi reliably.
|
* clients. This receiver is then able to pull that data from NiFi reliably.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* It is important to note that if pulling data from a NiFi cluster, the URL that should be used
|
* It is important to note that if pulling data from a NiFi cluster, the URL
|
||||||
* is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes
|
* that should be used is that of the NiFi Cluster Manager. The Receiver will
|
||||||
* in that cluster and pull from those nodes as appropriate.
|
* automatically handle determining the nodes in that cluster and pull from
|
||||||
|
* those nodes as appropriate.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide
|
* In order to use the NiFiReceiver, you will need to first build a
|
||||||
* to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}.
|
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
|
||||||
* Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This
|
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
|
||||||
* example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark".
|
* snippet of driver code to pull data from NiFi that is running on
|
||||||
* Additionally, it assumes that the data that it will receive from this OutputPort is text
|
* localhost:8080. This example assumes that NiFi exposes and OutputPort on the
|
||||||
* data, as it will map the byte array received from NiFi to a UTF-8 Encoded string.
|
* root group named "Data For Spark". Additionally, it assumes that the data
|
||||||
|
* that it will receive from this OutputPort is text data, as it will map the
|
||||||
|
* byte array received from NiFi to a UTF-8 Encoded string.
|
||||||
* </p>
|
* </p>
|
||||||
*
|
*
|
||||||
* <code>
|
* <code>
|
||||||
* <pre>
|
* <pre>
|
||||||
|
* {@code
|
||||||
* Pattern SPACE = Pattern.compile(" ");
|
* Pattern SPACE = Pattern.compile(" ");
|
||||||
*
|
*
|
||||||
* // Build a Site-to-site client config
|
* // Build a Site-to-site client config
|
||||||
|
@ -106,10 +109,12 @@ import org.apache.spark.streaming.receiver.Receiver;
|
||||||
* wordCounts.print();
|
* wordCounts.print();
|
||||||
* ssc.start();
|
* ssc.start();
|
||||||
* ssc.awaitTermination();
|
* ssc.awaitTermination();
|
||||||
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
* </code>
|
* </code>
|
||||||
*/
|
*/
|
||||||
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
||||||
|
|
||||||
private static final long serialVersionUID = 3067274587595578836L;
|
private static final long serialVersionUID = 3067274587595578836L;
|
||||||
private final SiteToSiteClientConfig clientConfig;
|
private final SiteToSiteClientConfig clientConfig;
|
||||||
|
|
||||||
|
@ -131,9 +136,11 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
||||||
}
|
}
|
||||||
|
|
||||||
class ReceiveRunnable implements Runnable {
|
class ReceiveRunnable implements Runnable {
|
||||||
|
|
||||||
public ReceiveRunnable() {
|
public ReceiveRunnable() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
|
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
|
||||||
|
@ -148,12 +155,13 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
||||||
// no data available. Wait a bit and try again
|
// no data available. Wait a bit and try again
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000L);
|
Thread.sleep(1000L);
|
||||||
} catch (InterruptedException e) {}
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>();
|
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
|
||||||
do {
|
do {
|
||||||
// Read the data into a byte array and wrap it along with the attributes
|
// Read the data into a byte array and wrap it along with the attributes
|
||||||
// into a NiFiDataPacket.
|
// into a NiFiDataPacket.
|
||||||
|
@ -163,10 +171,12 @@ public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
||||||
|
|
||||||
final Map<String, String> attributes = dataPacket.getAttributes();
|
final Map<String, String> attributes = dataPacket.getAttributes();
|
||||||
final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
|
final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
|
||||||
|
@Override
|
||||||
public byte[] getContent() {
|
public byte[] getContent() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Map<String, String> getAttributes() {
|
public Map<String, String> getAttributes() {
|
||||||
return attributes;
|
return attributes;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue