mirror of https://github.com/apache/nifi.git
NIFI-12065 Removed nifi-spark-receiver module
This closes #7732 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
a1fca9b44e
commit
71dd5576db
|
@ -1,71 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<!--
|
|
||||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
contributor license agreements. See the NOTICE file distributed with
|
|
||||||
this work for additional information regarding copyright ownership.
|
|
||||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
(the "License"); you may not use this file except in compliance with
|
|
||||||
the License. You may obtain a copy of the License at
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
-->
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
<parent>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-external</artifactId>
|
|
||||||
<version>2.0.0-SNAPSHOT</version>
|
|
||||||
</parent>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-spark-receiver</artifactId>
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.spark</groupId>
|
|
||||||
<artifactId>spark-streaming_2.13</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
<version>3.3.2</version>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>commons-logging</groupId>
|
|
||||||
<artifactId>commons-logging</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.apache.logging.log4j</groupId>
|
|
||||||
<artifactId>log4j-core</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>log4j-over-slf4j</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>jcl-over-slf4j</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.nifi</groupId>
|
|
||||||
<artifactId>nifi-site-to-site-client</artifactId>
|
|
||||||
<version>2.0.0-SNAPSHOT</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
<dependencyManagement>
|
|
||||||
<dependencies>
|
|
||||||
<!-- Override Hadoop -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-client-api</artifactId>
|
|
||||||
<version>${hadoop.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop</groupId>
|
|
||||||
<artifactId>hadoop-client-runtime</artifactId>
|
|
||||||
<version>${hadoop.version}</version>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
</dependencyManagement>
|
|
||||||
</project>
|
|
|
@ -1,39 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.spark;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
|
|
||||||
* a FlowFile's content and its attributes so that they can be processed by
|
|
||||||
* Spark
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
public interface NiFiDataPacket {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the contents of a NiFi FlowFile
|
|
||||||
*/
|
|
||||||
byte[] getContent();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a Map of attributes that are associated with the NiFi FlowFile
|
|
||||||
*/
|
|
||||||
Map<String, String> getAttributes();
|
|
||||||
}
|
|
|
@ -1,197 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.spark;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.nifi.remote.Transaction;
|
|
||||||
import org.apache.nifi.remote.TransferDirection;
|
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
|
||||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
|
||||||
import org.apache.nifi.remote.protocol.DataPacket;
|
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
|
||||||
import org.apache.spark.storage.StorageLevel;
|
|
||||||
import org.apache.spark.streaming.receiver.Receiver;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>
|
|
||||||
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
|
|
||||||
* pull data from Apache NiFi so that it can be processed by Spark Streaming.
|
|
||||||
* The NiFi Receiver connects to NiFi instance provided in the config and
|
|
||||||
* requests data from the OutputPort that is named. In NiFi, when an OutputPort
|
|
||||||
* is added to the root process group, it acts as a queue of data for remote
|
|
||||||
* clients. This receiver is then able to pull that data from NiFi reliably.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* It is important to note that if pulling data from a NiFi cluster, the URL
|
|
||||||
* that should be used is that of the NiFi Cluster Manager. The Receiver will
|
|
||||||
* automatically handle determining the nodes in that cluster and pull from
|
|
||||||
* those nodes as appropriate.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* In order to use the NiFiReceiver, you will need to first build a
|
|
||||||
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
|
|
||||||
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
|
|
||||||
* snippet of driver code to pull data from NiFi that is running on
|
|
||||||
* localhost:8080. This example assumes that NiFi exposes and OutputPort on the
|
|
||||||
* root group named "Data For Spark". Additionally, it assumes that the data
|
|
||||||
* that it will receive from this OutputPort is text data, as it will map the
|
|
||||||
* byte array received from NiFi to a UTF-8 Encoded string.
|
|
||||||
* </p>
|
|
||||||
*
|
|
||||||
* <code>
|
|
||||||
* <pre>
|
|
||||||
* {@code
|
|
||||||
* Pattern SPACE = Pattern.compile(" ");
|
|
||||||
*
|
|
||||||
* // Build a Site-to-site client config
|
|
||||||
* SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
|
|
||||||
* .setUrl("http://localhost:8080/nifi")
|
|
||||||
* .setPortName("Data For Spark")
|
|
||||||
* .buildConfig();
|
|
||||||
*
|
|
||||||
* SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
|
|
||||||
* JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
|
|
||||||
*
|
|
||||||
* // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
|
|
||||||
* // specified Port
|
|
||||||
* JavaReceiverInputDStream<NiFiDataPacket> packetStream =
|
|
||||||
* ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
|
|
||||||
*
|
|
||||||
* // Map the data from NiFi to text, ignoring the attributes
|
|
||||||
* JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
|
|
||||||
* public String call(final NiFiDataPacket dataPacket) throws Exception {
|
|
||||||
* return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
|
|
||||||
* }
|
|
||||||
* });
|
|
||||||
*
|
|
||||||
* // Split the words by spaces
|
|
||||||
* JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
|
|
||||||
* public Iterable<String> call(final String text) throws Exception {
|
|
||||||
* return Arrays.asList(SPACE.split(text));
|
|
||||||
* }
|
|
||||||
* });
|
|
||||||
*
|
|
||||||
* // Map each word to the number 1, then aggregate by key
|
|
||||||
* JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
|
|
||||||
* new PairFunction<String, String, Integer>() {
|
|
||||||
* public Tuple2<String, Integer> call(String s) {
|
|
||||||
* return new Tuple2<String, Integer>(s, 1);
|
|
||||||
* }
|
|
||||||
* }).reduceByKey(new Function2<Integer, Integer, Integer>() {
|
|
||||||
* public Integer call(Integer i1, Integer i2) {
|
|
||||||
* return i1 + i2;
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* );
|
|
||||||
*
|
|
||||||
* // print the results
|
|
||||||
* wordCounts.print();
|
|
||||||
* ssc.start();
|
|
||||||
* ssc.awaitTermination();
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
* </code>
|
|
||||||
*/
|
|
||||||
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 3067274587595578836L;
|
|
||||||
private final SiteToSiteClientConfig clientConfig;
|
|
||||||
|
|
||||||
public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
|
|
||||||
super(storageLevel);
|
|
||||||
this.clientConfig = clientConfig;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onStart() {
|
|
||||||
final Thread thread = new Thread(new ReceiveRunnable());
|
|
||||||
thread.setDaemon(true);
|
|
||||||
thread.setName("NiFi Receiver");
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onStop() {
|
|
||||||
}
|
|
||||||
|
|
||||||
class ReceiveRunnable implements Runnable {
|
|
||||||
|
|
||||||
public ReceiveRunnable() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
|
|
||||||
try {
|
|
||||||
while (!isStopped()) {
|
|
||||||
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
|
|
||||||
DataPacket dataPacket = transaction.receive();
|
|
||||||
if (dataPacket == null) {
|
|
||||||
transaction.confirm();
|
|
||||||
transaction.complete();
|
|
||||||
|
|
||||||
// no data available. Wait a bit and try again
|
|
||||||
try {
|
|
||||||
Thread.sleep(1000L);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
|
|
||||||
do {
|
|
||||||
// Read the data into a byte array and wrap it along with the attributes
|
|
||||||
// into a NiFiDataPacket.
|
|
||||||
final InputStream inStream = dataPacket.getData();
|
|
||||||
final byte[] data = new byte[(int) dataPacket.getSize()];
|
|
||||||
StreamUtils.fillBuffer(inStream, data);
|
|
||||||
|
|
||||||
final Map<String, String> attributes = dataPacket.getAttributes();
|
|
||||||
final NiFiDataPacket NiFiDataPacket = new StandardNiFiDataPacket(data, attributes);
|
|
||||||
dataPackets.add(NiFiDataPacket);
|
|
||||||
dataPacket = transaction.receive();
|
|
||||||
} while (dataPacket != null);
|
|
||||||
|
|
||||||
// Confirm transaction to verify the data
|
|
||||||
transaction.confirm();
|
|
||||||
|
|
||||||
store(dataPackets.iterator());
|
|
||||||
|
|
||||||
transaction.complete();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
client.close();
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
reportError("Failed to close client", ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (final IOException ioe) {
|
|
||||||
restart("Failed to receive data from NiFi", ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.spark;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
|
|
||||||
private static final long serialVersionUID = 6364005260220243322L;
|
|
||||||
|
|
||||||
private final byte[] content;
|
|
||||||
private final Map<String, String> attributes;
|
|
||||||
|
|
||||||
public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) {
|
|
||||||
this.content = content;
|
|
||||||
this.attributes = attributes;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] getContent() {
|
|
||||||
return content;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getAttributes() {
|
|
||||||
return attributes;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -23,7 +23,6 @@
|
||||||
<artifactId>nifi-external</artifactId>
|
<artifactId>nifi-external</artifactId>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<modules>
|
<modules>
|
||||||
<module>nifi-spark-receiver</module>
|
|
||||||
<module>nifi-example-bundle</module>
|
<module>nifi-example-bundle</module>
|
||||||
<module>nifi-kafka-connect</module>
|
<module>nifi-kafka-connect</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
Loading…
Reference in New Issue