NIFI-365: Initial implementation of spark receiver

This commit is contained in:
Mark Payne 2015-02-18 21:19:58 -05:00
parent 7ab4392c15
commit 8506a0ce4a
6 changed files with 325 additions and 0 deletions

View File

@ -0,0 +1,19 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# nifi-external
The nifi-external module is a location where components can be developed by the NiFi team
that are not intended to be used directly by NiFi but are to be used within other frameworks
in order to integrate with NiFi.

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-receiver</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-site-to-site-client</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spark;
import java.util.Map;
/**
* <p>
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
* content and its attributes so that they can be processed by Spark
* </p>
*/
public interface NiFiDataPacket {
/**
* Returns the contents of a NiFi FlowFile
* @return
*/
byte[] getContent();
/**
* Returns a Map of attributes that are associated with the NiFi FlowFile
* @return
*/
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spark;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
/**
* <p>
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data
* from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects
* to NiFi instance provided in the config and requests data from
* the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group,
* it acts as a queue of data for remote clients. This receiver is then able to pull that data
* from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL that should be used
* is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes
* in that cluster and pull from those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide
* to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}.
* Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This
* example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark".
* Additionally, it assumes that the data that it will receive from this OutputPort is text
* data, as it will map the byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* Pattern SPACE = Pattern.compile(" ");
*
* // Build a Site-to-site client config
* SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
* .setUrl("http://localhost:8080/nifi")
* .setPortName("Data For Spark")
* .buildConfig();
*
* SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
* JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
*
* // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
* // specified Port
* JavaReceiverInputDStream<NiFiDataPacket> packetStream =
* ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
*
* // Map the data from NiFi to text, ignoring the attributes
* JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
* public String call(final NiFiDataPacket dataPacket) throws Exception {
* return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
* }
* });
*
* // Split the words by spaces
* JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
* public Iterable<String> call(final String text) throws Exception {
* return Arrays.asList(SPACE.split(text));
* }
* });
*
* // Map each word to the number 1, then aggregate by key
* JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
* new PairFunction<String, String, Integer>() {
* public Tuple2<String, Integer> call(String s) {
* return new Tuple2<String, Integer>(s, 1);
* }
* }).reduceByKey(new Function2<Integer, Integer, Integer>() {
* public Integer call(Integer i1, Integer i2) {
* return i1 + i2;
* }
* }
* );
*
* // print the results
* wordCounts.print();
* ssc.start();
* ssc.awaitTermination();
* </pre>
* </code>
*/
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
private static final long serialVersionUID = 3067274587595578836L;
private final SiteToSiteClientConfig clientConfig;
public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
super(storageLevel);
this.clientConfig = clientConfig;
}
@Override
public void onStart() {
final Thread thread = new Thread(new ReceiveRunnable());
thread.setDaemon(true);
thread.setName("NiFi Receiver");
thread.start();
}
@Override
public void onStop() {
}
class ReceiveRunnable implements Runnable {
public ReceiveRunnable() {
}
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while ( !isStopped() ) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if ( dataPacket == null ) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
public byte[] getContent() {
return data;
}
public Map<String, String> getAttributes() {
return attributes;
}
};
dataPackets.add(NiFiDataPacket);
dataPacket = transaction.receive();
} while ( dataPacket != null );
// Confirm transaction to verify the data
transaction.confirm();
store(dataPackets.iterator());
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
reportError("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
restart("Failed to receive data from NiFi", ioe);
}
}
}
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-external</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-spark-receiver</module>
</modules>
</project>

View File

@ -65,6 +65,7 @@
<module>nifi-assembly</module>
<module>nifi-docs</module>
<module>nifi-maven-archetypes</module>
<module>nifi-external</module>
</modules>
<scm>
<connection>scm:git:git://git.apache.org/incubator-nifi.git</connection>