diff --git a/nifi-external/nifi-spark-receiver/pom.xml b/nifi-external/nifi-spark-receiver/pom.xml deleted file mode 100644 index 3917a109c8..0000000000 --- a/nifi-external/nifi-spark-receiver/pom.xml +++ /dev/null @@ -1,71 +0,0 @@ - - - - 4.0.0 - - org.apache.nifi - nifi-external - 2.0.0-SNAPSHOT - - org.apache.nifi - nifi-spark-receiver - - - org.apache.spark - spark-streaming_2.13 - provided - 3.3.2 - - - commons-logging - commons-logging - - - org.apache.logging.log4j - log4j-core - - - - - org.slf4j - log4j-over-slf4j - - - org.slf4j - jcl-over-slf4j - - - org.apache.nifi - nifi-site-to-site-client - 2.0.0-SNAPSHOT - - - - - - - org.apache.hadoop - hadoop-client-api - ${hadoop.version} - - - org.apache.hadoop - hadoop-client-runtime - ${hadoop.version} - - - - diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java deleted file mode 100644 index 608aa2c2fe..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.util.Map; - -/** - *

- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both - * a FlowFile's content and its attributes so that they can be processed by - * Spark - *

- */ -public interface NiFiDataPacket { - - /** - * @return the contents of a NiFi FlowFile - */ - byte[] getContent(); - - /** - * @return a Map of attributes that are associated with the NiFi FlowFile - */ - Map getAttributes(); -} diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java deleted file mode 100644 index 83a7e42ed7..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.receiver.Receiver; - -/** - *

- * The NiFiReceiver is a Reliable Receiver that provides a way to - * pull data from Apache NiFi so that it can be processed by Spark Streaming. - * The NiFi Receiver connects to NiFi instance provided in the config and - * requests data from the OutputPort that is named. In NiFi, when an OutputPort - * is added to the root process group, it acts as a queue of data for remote - * clients. This receiver is then able to pull that data from NiFi reliably. - *

- * - *

- * It is important to note that if pulling data from a NiFi cluster, the URL - * that should be used is that of the NiFi Cluster Manager. The Receiver will - * automatically handle determining the nodes in that cluster and pull from - * those nodes as appropriate. - *

- * - *

- * In order to use the NiFiReceiver, you will need to first build a - * {@link SiteToSiteClientConfig} to provide to the constructor. This can be - * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example - * snippet of driver code to pull data from NiFi that is running on - * localhost:8080. This example assumes that NiFi exposes and OutputPort on the - * root group named "Data For Spark". Additionally, it assumes that the data - * that it will receive from this OutputPort is text data, as it will map the - * byte array received from NiFi to a UTF-8 Encoded string. - *

- * - * - *
- * {@code
- * Pattern SPACE = Pattern.compile(" ");
- *
- * // Build a Site-to-site client config
- * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
- *   .setUrl("http://localhost:8080/nifi")
- *   .setPortName("Data For Spark")
- *   .buildConfig();
- *
- * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
- * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
- *
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
- * // specified Port
- * JavaReceiverInputDStream packetStream =
- *     ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
- *
- * // Map the data from NiFi to text, ignoring the attributes
- * JavaDStream text = packetStream.map(new Function() {
- *   public String call(final NiFiDataPacket dataPacket) throws Exception {
- *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
- *   }
- * });
- *
- * // Split the words by spaces
- * JavaDStream words = text.flatMap(new FlatMapFunction() {
- *   public Iterable call(final String text) throws Exception {
- *     return Arrays.asList(SPACE.split(text));
- *   }
- * });
- *
- * // Map each word to the number 1, then aggregate by key
- * JavaPairDStream wordCounts = words.mapToPair(
- *   new PairFunction() {
- *     public Tuple2 call(String s) {
- *       return new Tuple2(s, 1);
- *     }
- *   }).reduceByKey(new Function2() {
- *     public Integer call(Integer i1, Integer i2) {
- *       return i1 + i2;
- *     }
- *    }
- *  );
- *
- * // print the results
- * wordCounts.print();
- * ssc.start();
- * ssc.awaitTermination();
- * }
- * 
- *
- */ -public class NiFiReceiver extends Receiver { - - private static final long serialVersionUID = 3067274587595578836L; - private final SiteToSiteClientConfig clientConfig; - - public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { - super(storageLevel); - this.clientConfig = clientConfig; - } - - @Override - public void onStart() { - final Thread thread = new Thread(new ReceiveRunnable()); - thread.setDaemon(true); - thread.setName("NiFi Receiver"); - thread.start(); - } - - @Override - public void onStop() { - } - - class ReceiveRunnable implements Runnable { - - public ReceiveRunnable() { - } - - @Override - public void run() { - try { - final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - try { - while (!isStopped()) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - DataPacket dataPacket = transaction.receive(); - if (dataPacket == null) { - transaction.confirm(); - transaction.complete(); - - // no data available. Wait a bit and try again - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - } - - continue; - } - - final List dataPackets = new ArrayList<>(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map attributes = dataPacket.getAttributes(); - final NiFiDataPacket NiFiDataPacket = new StandardNiFiDataPacket(data, attributes); - dataPackets.add(NiFiDataPacket); - dataPacket = transaction.receive(); - } while (dataPacket != null); - - // Confirm transaction to verify the data - transaction.confirm(); - - store(dataPackets.iterator()); - - transaction.complete(); - } - } finally { - try { - client.close(); - } catch (final IOException ioe) { - reportError("Failed to close client", ioe); - } - } - } catch (final IOException ioe) { - restart("Failed to receive data from NiFi", ioe); - } - } - } -} diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java deleted file mode 100644 index 80bbe8366d..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.io.Serializable; -import java.util.Map; - -public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable { - private static final long serialVersionUID = 6364005260220243322L; - - private final byte[] content; - private final Map attributes; - - public StandardNiFiDataPacket(final byte[] content, final Map attributes) { - this.content = content; - this.attributes = attributes; - } - - @Override - public byte[] getContent() { - return content; - } - - @Override - public Map getAttributes() { - return attributes; - } - -} diff --git a/nifi-external/pom.xml b/nifi-external/pom.xml index aeb8fe5d84..d393f7e2d3 100644 --- a/nifi-external/pom.xml +++ b/nifi-external/pom.xml @@ -23,7 +23,6 @@ nifi-external pom - nifi-spark-receiver nifi-example-bundle nifi-kafka-connect