diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
new file mode 100644
index 0000000000..508ab37d40
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
+public interface RemoteDestination {
+ /**
+ * Returns the identifier of the remote destination
+ *
+ * @return
+ */
+ String getIdentifier();
+
+ /**
+ * Returns the human-readable name of the remote destination
+ * @return
+ */
+ String getName();
+
+ /**
+ * Returns the amount of time that system should pause sending to a particular node if unable to
+ * send data to or receive data from this endpoint
+ * @param timeUnit
+ * @return
+ */
+ long getYieldPeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns whether or not compression should be used when transferring data to or receiving
+ * data from the remote endpoint
+ * @return
+ */
+ boolean isUseCompression();
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
new file mode 100644
index 0000000000..0d21a3d3e2
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -0,0 +1,45 @@
+
+
+ * Provides a transaction for performing site-to-site data transfers.
+ *
+ * A Transaction is created by calling the
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)}
+ * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
+ * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
+ * other operation.
+ *
+ * The general flow of execute of a Transaction is as follows:
+ * null
if the Distinguished Name is unknown
+ * @return
+ */
+ String getDistinguishedName();
+}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
similarity index 53%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 2422fe144a..24280781ca 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -18,31 +18,50 @@ package org.apache.nifi.remote;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.nifi.remote.protocol.CommunicationsSession;
-public class Peer {
+public class Peer implements Communicant {
+ private final PeerDescription description;
private final CommunicationsSession commsSession;
private final String url;
+ private final String clusterUrl;
private final String host;
- private long penalizationExpiration = 0L;
+ private final int port;
+
+ private final Map
+ *
+ * receive()
should not be
+ * called a second time without first fully consuming the stream from the previous Packet that was received.
+ * It is important that the Transaction be terminated in order to free the resources held + * by the Transaction. If a Transaction is not terminated, its resources will not be freed and + * if the Transaction holds connections from a connection pool, the connections in that pool + * will eventually become exhausted. A Transaction is terminated by calling one of the following + * methods: + *
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction + * is automatically closed via a call to {@link #error()}. + *
+ * + *+ * The Transaction class should not be assumed to be thread-safe. + *
+ */ +public interface Transaction { + + /** + * Sends information to the remote NiFi instance. + * + * @param dataPacket the data packet to send + * @throws IOException + */ + void send(DataPacket dataPacket) throws IOException; + + /** + * Sends the given byte array as the content of a {@link DataPacket} along with the + * provided attributes + * + * @param content + * @param attributes + * @throws IOException + */ + void send(byte[] content, Map+ * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received. + *
+ * + *+ * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP), + * it is still required that we confirm the transaction before completing the transaction. This is done as + * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if + * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the + * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the + * sender and receiver will cancel the transaction automatically. + *
+ * + *+ * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless + * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}). + *
+ * + *+ * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be + * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}. + *
+ * + * @throws IOException + */ + void confirm() throws IOException; + + /** + *+ * Completes the transaction and indicates to both the sender and receiver that the data transfer was + * successful. + *
+ * + * @throws IOException + * + * @return a TransactionCompletion that contains details about the Transaction + */ + TransactionCompletion complete() throws IOException; + + /** + *+ * Cancels this transaction, indicating to the sender that the data has not been successfully received so that + * the sender can retry or handle however is appropriate. + *
+ * + * @param explanation an explanation to tell the other party why the transaction was canceled. + * @throws IOException + */ + void cancel(final String explanation) throws IOException; + + + /** + *+ * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes + * the Transaction. The underlying connection should not be returned to a connection pool in this case. + *
+ */ + void error(); + + + /** + * Returns the current state of the Transaction. + * @return + * @throws IOException + */ + TransactionState getState() throws IOException; + + /** + * Returns a Communicant that represents the other side of this Transaction (i.e., + * the remote NiFi instance) + * @return + */ + Communicant getCommunicant(); + + + public enum TransactionState { + /** + * Transaction has been started but no data has been sent or received. + */ + TRANSACTION_STARTED, + + /** + * Transaction has been started and data has been sent or received. + */ + DATA_EXCHANGED, + + /** + * Data that has been transferred has been confirmed via its CRC. Transaction is + * ready to be completed. + */ + TRANSACTION_CONFIRMED, + + /** + * Transaction has been successfully completed. + */ + TRANSACTION_COMPLETED, + + /** + * The Transaction has been canceled. + */ + TRANSACTION_CANCELED, + + /** + * The Transaction ended in an error. + */ + ERROR; + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java new file mode 100644 index 0000000000..be5f73a84e --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.remote.protocol.DataPacket; + + +/** + * A TransactionCompletion provides information about a {@link Transaction} that has completed successfully. + */ +public interface TransactionCompletion { + + /** + * When a sending to a NiFi instance, the server may accept the content sent to it + * but indicate that its queues are full and that the client should backoff sending + * data for a bit. This method returnstrue
if the server did in fact
+ * request that, false
otherwise.
+ * @return
+ */
+ boolean isBackoff();
+
+ /**
+ * Returns the number of Data Packets that were sent to or received from the remote
+ * NiFi instance in the Transaction
+ * @return
+ */
+ int getDataPacketsTransferred();
+
+ /**
+ * Returns the number of bytes of DataPacket content that were sent to or received from
+ * the remote NiFI instance in the Transaction. Note that this is different than the number
+ * of bytes actually transferred between the client and server, as it does not take into
+ * account the attributes or protocol-specific information that is exchanged but rather
+ * takes into account only the data in the {@link InputStream} of the {@link DataPacket}
+ * @return
+ */
+ long getBytesTransferred();
+
+ /**
+ * Returns the amount of time that the Transaction took, from the time that the Transaction
+ * was created to the time that the Transaction was completed.
+ * @param timeUnit
+ * @return
+ */
+ long getDuration(TimeUnit timeUnit);
+}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
similarity index 75%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
index 56432d50f2..45029a4ca3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java
@@ -16,8 +16,19 @@
*/
package org.apache.nifi.remote;
-public enum TransferDirection {
+/**
+ * An enumeration for specifying the direction in which data should be transferred between a client
+ * and a remote NiFi instance.
+ */
+public enum TransferDirection {
+ /**
+ * The client is to send data to the remote instance.
+ */
SEND,
+
+ /**
+ * The client is to receive data from the remote instance.
+ */
RECEIVE;
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
new file mode 100644
index 0000000000..629032a07e
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * + * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi + * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster). + *
+ * + *+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must + * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance + * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL + * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load + * balancing the transactions across the different nodes in the cluster. + *
+ * + *+ * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the + * remote instance takes place. After data has been exchanged or it is determined that no data + * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)} + * method) or can be completed (via the {@link Transaction#complete(boolean)} method). + *
+ * + *+ * An instance of SiteToSiteClient can be obtained by constructing a new instance of the + * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the + * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method. + *
+ * + *+ * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can + * share access to the same client. However, the {@link Transaction} that is created by the client + * is not thread safe and should not be shared among threads. + *
+ */ +public interface SiteToSiteClient extends Closeable { + + /** + *+ * Creates a new Transaction that can be used to either send data to a remote NiFi instance + * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument. + *
+ * + *
+ * Note: If all of the nodes are penalized (See {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then
+ * this method will return null
.
+ *
null
if all nodes are penalized.
+ * @throws IOException
+ */
+ Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException;
+
+ /**
+ * + * Returns {@code true} if site-to-site communications with the remote instance are secure, + * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not + * communications are secure depends on the server, not the client. + *
+ * + *+ * In order to determine whether the server is configured for secure communications, the client may have + * to query the server's RESTful interface. Doing so could result in an IOException. + *
+ * + * @return + * @throws IOException if unable to query the remote instance's RESTful interface or if the remote + * instance is not configured to allow site-to-site communications + */ + boolean isSecure() throws IOException; + + /** + *+ * Returns the configuration object that was built by the Builder + *
+ * @return + */ + SiteToSiteClientConfig getConfig(); + + /** + *+ * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient. + * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If + * a change in configuration should be desired, the client should be {@link Closeable#close() closed} + * and a new client created. + *
+ */ + public static class Builder implements Serializable { + private static final long serialVersionUID = -4954962284343090219L; + + private String url; + private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); + private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); + private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); + private SSLContext sslContext; + private EventReporter eventReporter; + private File peerPersistenceFile; + private boolean useCompression; + private String portName; + private String portIdentifier; + private int batchCount; + private long batchSize; + private long batchNanos; + + /** + * Populates the builder with values from the provided config + * @param config + * @return + */ + public Builder fromConfig(final SiteToSiteClientConfig config) { + this.url = config.getUrl(); + this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); + this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); + this.sslContext = config.getSslContext(); + this.eventReporter = config.getEventReporter(); + this.peerPersistenceFile = config.getPeerPersistenceFile(); + this.useCompression = config.isUseCompression(); + this.portName = config.getPortName(); + this.portIdentifier = config.getPortIdentifier(); + this.batchCount = config.getPreferredBatchCount(); + this.batchSize = config.getPreferredBatchSize(); + this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + + return this; + } + + /** + * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of + * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across + * the different nodes. + * + * @param url + * @return + */ + public Builder url(final String url) { + this.url = url; + return this; + } + + /** + * Specifies the communications timeouts to use when interacting with the remote instances. The + * default value is 30 seconds. + * + * @param timeout + * @param unit + * @return + */ + public Builder timeout(final long timeout, final TimeUnit unit) { + this.timeoutNanos = unit.toNanos(timeout); + return this; + } + + /** + * Specifies the amount of time that a connection can remain idle in the connection pool before it + * is "expired" and shutdown. The default value is 30 seconds. + * + * @param timeout + * @param unit + * @return + */ + public Builder idleExpiration(final long timeout, final TimeUnit unit) { + this.idleExpirationNanos = unit.toNanos(timeout); + return this; + } + + /** + * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster + * or the remote instance of NiFi if it is standalone), specifies how long the client should + * wait before attempting to communicate with that node again. While a particular node is penalized, + * all other nodes in the remote cluster (if any) will still be available for communication. + * The default value is 3 seconds. + * + * @param period + * @param unit + * @return + */ + public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { + this.penalizationNanos = unit.toNanos(period); + return this; + } + + /** + * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not + * specified, communications will not be secure. The remote instance of NiFi always determines + * whether or not Site-to-Site communications are secure (i.e., the client will always use + * secure or non-secure communications, depending on what the server dictates). + * + * @param sslContext + * @return + */ + public Builder sslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + return this; + } + + + /** + * Provides an EventReporter that can be used by the client in order to report any events that + * could be of interest when communicating with the remote instance. The EventReporter provided + * must be threadsafe. + * + * @param eventReporter + * @return + */ + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + + + /** + * Specifies a file that the client can write to in order to persist the list of nodes in the + * remote cluster and recover the list of nodes upon restart. This allows the client to function + * if the remote Cluster Manager is unavailable, even after a restart of the client software. + * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager + * will result in not being able to communicate with the remote instance if a new client + * is created. + * + * @param peerPersistenceFile + * @return + */ + public Builder peerPersistenceFile(final File peerPersistenceFile) { + this.peerPersistenceFile = peerPersistenceFile; + return this; + } + + /** + * Specifies whether or not data should be compressed before being transferred to or from the + * remote instance. + * + * @param compress + * @return + */ + public Builder useCompression(final boolean compress) { + this.useCompression = compress; + return this; + } + + /** + * Specifies the name of the port to communicate with. Either the port name or the port identifier + * must be specified. + * + * @param portName + * @return + */ + public Builder portName(final String portName) { + this.portName = portName; + return this; + } + + /** + * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing + * the port name, as the port name may change. + * + * @param portIdentifier + * @return + */ + public Builder portIdentifier(final String portIdentifier) { + this.portIdentifier = portIdentifier; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of {@link DataPacket}s to include in a Transaction. + * + * @return + */ + public Builder requestBatchCount(final int count) { + this.batchCount = count; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of bytes to include in a Transaction. + * + * @return + */ + public Builder requestBatchSize(final long bytes) { + this.batchSize = bytes; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred amount of time that a Transaction should span. + * + * @return + */ + public Builder requestBatchDuration(final long value, final TimeUnit unit) { + this.batchNanos = unit.toNanos(value); + return this; + } + + /** + * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient + * @return + */ + public SiteToSiteClientConfig buildConfig() { + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + private static final long serialVersionUID = 1323119754841633818L; + + @Override + public boolean isUseCompression() { + return Builder.this.isUseCompression(); + } + + @Override + public String getUrl() { + return Builder.this.getUrl(); + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return Builder.this.getTimeout(timeUnit); + } + + @Override + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return Builder.this.getIdleConnectionExpiration(timeUnit); + } + + @Override + public SSLContext getSslContext() { + return Builder.this.getSslContext(); + } + + @Override + public String getPortName() { + return Builder.this.getPortName(); + } + + @Override + public String getPortIdentifier() { + return Builder.this.getPortIdentifier(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return Builder.this.getPenalizationPeriod(timeUnit); + } + + @Override + public File getPeerPersistenceFile() { + return Builder.this.getPeerPersistenceFile(); + } + + @Override + public EventReporter getEventReporter() { + return Builder.this.getEventReporter(); + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return Builder.this.batchSize; + } + + @Override + public int getPreferredBatchCount() { + return Builder.this.batchCount; + } + }; + + return config; + } + + /** + * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi + * @return + * + * @throws IllegalStateException if either the url is not set or neither the port name nor port identifier + * is set. + */ + public SiteToSiteClient build() { + if ( url == null ) { + throw new IllegalStateException("Must specify URL to build Site-to-Site client"); + } + + if ( portName == null && portIdentifier == null ) { + throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); + } + + return new SocketClient(buildConfig()); + } + + /** + * Returns the configured URL for the remote NiFi instance + * @return + */ + public String getUrl() { + return url; + } + + /** + * Returns the communications timeout + * @return + */ + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + /** + * Returns the amount of of time that a connection can remain idle in the connection + * pool before being shutdown + * @param timeUnit + * @return + */ + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); + } + + /** + * Returns the amount of time that a particular node will be ignored after a + * communications error with that node occurs + * @param timeUnit + * @return + */ + public long getPenalizationPeriod(TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + /** + * Returns the SSL Context that is configured for this builder + * @return + */ + public SSLContext getSslContext() { + return sslContext; + } + + /** + * Returns the EventReporter that is to be used by clients to report events + * @return + */ + public EventReporter getEventReporter() { + return eventReporter; + } + + /** + * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. + * @return + */ + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + /** + * Returns a boolean indicating whether or not compression will be used to transfer data + * to and from the remote instance + * @return + */ + public boolean isUseCompression() { + return useCompression; + } + + /** + * Returns the name of the port that the client is to communicate with. + * @return + */ + public String getPortName() { + return portName; + } + + /** + * Returns the identifier of the port that the client is to communicate with. + * @return + */ + public String getPortIdentifier() { + return portIdentifier; + } + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java new file mode 100644 index 0000000000..5e7fbe8571 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.File; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.protocol.DataPacket; + +public interface SiteToSiteClientConfig extends Serializable { + + /** + * Returns the configured URL for the remote NiFi instance + * @return + */ + String getUrl(); + + /** + * Returns the communications timeout in nanoseconds + * @return + */ + long getTimeout(final TimeUnit timeUnit); + + /** + * Returns the amount of time that a connection can remain idle before it is + * "expired" and shut down + * @param timeUnit + * @return + */ + long getIdleConnectionExpiration(TimeUnit timeUnit); + + /** + * Returns the amount of time that a particular node will be ignored after a + * communications error with that node occurs + * @param timeUnit + * @return + */ + long getPenalizationPeriod(TimeUnit timeUnit); + + /** + * Returns the SSL Context that is configured for this builder + * @return + */ + SSLContext getSslContext(); + + /** + * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. + * @return + */ + File getPeerPersistenceFile(); + + /** + * Returns a boolean indicating whether or not compression will be used to transfer data + * to and from the remote instance + * @return + */ + boolean isUseCompression(); + + /** + * Returns the name of the port that the client is to communicate with. + * @return + */ + String getPortName(); + + /** + * Returns the identifier of the port that the client is to communicate with. + * @return + */ + String getPortIdentifier(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * amount of time that we will request a NiFi instance to send data to us in a Transaction. + * + * @param timeUnit + * @return + */ + long getPreferredBatchDuration(TimeUnit timeUnit); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of bytes that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + long getPreferredBatchSize(); + + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + int getPreferredBatchCount(); + + /** + * Returns the EventReporter that is to be used by clients to report events + * @return + */ + EventReporter getEventReporter(); + +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java new file mode 100644 index 0000000000..651ae5064b --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnection.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; + +public class EndpointConnection { + private final Peer peer; + private final SocketClientProtocol socketClientProtocol; + private final FlowFileCodec codec; + private volatile long lastUsed; + + public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { + this.peer = peer; + this.socketClientProtocol = socketClientProtocol; + this.codec = codec; + } + + public FlowFileCodec getCodec() { + return codec; + } + + public SocketClientProtocol getSocketClientProtocol() { + return socketClientProtocol; + } + + public Peer getPeer() { + return peer; + } + + public void setLastTimeUsed() { + lastUsed = System.currentTimeMillis(); + } + + public long getLastTimeUsed() { + return lastUsed; + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java new file mode 100644 index 0000000000..42428f60e2 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -0,0 +1,970 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.security.cert.CertificateExpiredException; +import javax.security.cert.CertificateNotYetValidException; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerDescription; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.util.NiFiRestApiUtil; +import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EndpointConnectionPool { + public static final long PEER_REFRESH_PERIOD = 60000L; + public static final String CATEGORY = "Site-to-Site"; + public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + + private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); + + private final ConcurrentMapnull
if the remote instance
+ * is not configured to allow site-to-site communications.
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ private Integer getSiteToSitePort() throws IOException {
+ Integer listeningPort;
+ remoteInfoReadLock.lock();
+ try {
+ listeningPort = this.siteToSitePort;
+ if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return listeningPort;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ listeningPort = controller.getRemoteSiteListeningPort();
+
+ return listeningPort;
+ }
+
+ @Override
+ public String toString() {
+ return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
+ }
+
+
+ /**
+ * Returns {@code true} if the remote instance is configured for secure site-to-site communications,
+ * {@code false} otherwise.
+ *
+ * @return
+ * @throws IOException
+ */
+ public boolean isSecure() throws IOException {
+ remoteInfoReadLock.lock();
+ try {
+ final Boolean secure = this.siteToSiteSecure;
+ if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return secure;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ final Boolean isSecure = controller.isSiteToSiteSecure();
+ if ( isSecure == null ) {
+ throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
+ }
+
+ return isSecure;
+ }
+
+
+ private class IdEnrichedRemoteDestination implements RemoteDestination {
+ private final RemoteDestination original;
+ private final String identifier;
+
+ public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) {
+ this.original = original;
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String getName() {
+ return original.getName();
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return original.getYieldPeriod(timeUnit);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return original.isUseCompression();
+ }
+ }
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
new file mode 100644
index 0000000000..bd9319f8c1
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.Communicant;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.util.ObjectHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClient implements SiteToSiteClient {
+ private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
+
+ private final SiteToSiteClientConfig config;
+ private final EndpointConnectionPool pool;
+ private final boolean compress;
+ private final String portName;
+ private final long penalizationNanos;
+ private volatile String portIdentifier;
+ private volatile boolean closed = false;
+
+ public SocketClient(final SiteToSiteClientConfig config) {
+ pool = new EndpointConnectionPool(config.getUrl(),
+ createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
+ (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
+ config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
+
+ this.config = config;
+ this.compress = config.isUseCompression();
+ this.portIdentifier = config.getPortIdentifier();
+ this.portName = config.getPortName();
+ this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public SiteToSiteClientConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean isSecure() throws IOException {
+ return pool.isSecure();
+ }
+
+ private String getPortIdentifier(final TransferDirection direction) throws IOException {
+ final String id = this.portIdentifier;
+ if ( id != null ) {
+ return id;
+ }
+
+ final String portId;
+ if ( direction == TransferDirection.SEND ) {
+ portId = pool.getInputPortIdentifier(this.portName);
+ } else {
+ portId = pool.getOutputPortIdentifier(this.portName);
+ }
+
+ if (portId == null) {
+ logger.debug("Unable to resolve port [{}] to an identifier", portName);
+ } else {
+ logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
+ }
+
+ return portId;
+ }
+
+
+ private RemoteDestination createRemoteDestination(final String portId, final String portName) {
+ return new RemoteDestination() {
+ @Override
+ public String getIdentifier() {
+ return portId;
+ }
+
+ @Override
+ public String getName() {
+ return portName;
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return compress;
+ }
+ };
+ }
+
+ @Override
+ public Transaction createTransaction(final TransferDirection direction) throws IOException {
+ if ( closed ) {
+ throw new IllegalStateException("Client is closed");
+ }
+ final String portId = getPortIdentifier(direction);
+
+ if ( portId == null ) {
+ throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
+ }
+
+ final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig());
+ if ( connectionState == null ) {
+ return null;
+ }
+
+ final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
+ connectionState.getPeer(), connectionState.getCodec(), direction);
+
+ // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
+ // the transaction is either completed or canceled.
+ final ObjectHolder
@@ -44,36 +43,29 @@ public interface FlowFileCodec extends VersionedRemoteResource {
public List
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
+ * content and its attributes so that they can be processed by Spark
+ *
+ * The
+ * It is important to note that if pulling data from a NiFi cluster, the URL that should be used
+ * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes
+ * in that cluster and pull from those nodes as appropriate.
+ *
+ * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide
+ * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}.
+ * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This
+ * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark".
+ * Additionally, it assumes that the data that it will receive from this OutputPort is text
+ * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string.
+ *
@@ -167,7 +167,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, hdfsResources.get().getKey(), compressionType);
session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
- } catch (Exception e) {
+ } catch (ProcessException e) {
getLogger().error("Failed to create Sequence File. Transferring {} to 'failure'", new Object[]{flowFile}, e);
session.transfer(flowFile, RELATIONSHIP_FAILURE);
}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index d9175e06ca..cd272ff4e2 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -28,6 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
@@ -36,11 +41,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.util.StopWatch;
@@ -136,7 +137,7 @@ public class Base64EncodeContent extends AbstractProcessor {
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (Exception e) {
+ } catch (ProcessException e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
index cf20f16d40..e631cd0e62 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java
@@ -32,30 +32,31 @@ import java.util.concurrent.TimeUnit;
import lzma.sdk.lzma.Decoder;
import lzma.streams.LzmaInputStream;
import lzma.streams.LzmaOutputStream;
+
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
-import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZInputStream;
import org.tukaani.xz.XZOutputStream;
@@ -290,7 +291,7 @@ public class CompressContent extends AbstractProcessor {
compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final Exception e) {
+ } catch (final ProcessException e) {
logger.error("Unable to {} {} using {} compression format due to {}; routing to failure", new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, e});
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
index 827653bec0..9f8a16c4d9 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
@@ -28,23 +28,23 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.NullOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ObjectHolder;
@EventDriven
@@ -143,7 +143,7 @@ public class HashContent extends AbstractProcessor {
logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_SUCCESS);
- } catch (final Exception e) {
+ } catch (final ProcessException e) {
logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
index 2fa71c8994..eb6b1cca63 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutEmail.java
@@ -32,6 +32,7 @@ import java.util.Set;
import javax.activation.DataHandler;
import javax.mail.Message;
import javax.mail.Message.RecipientType;
+import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.URLName;
import javax.mail.internet.AddressException;
@@ -56,9 +57,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-
import org.apache.commons.codec.binary.Base64;
import com.sun.mail.smtp.SMTPTransport;
@@ -263,7 +264,7 @@ public class PutEmail extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
session.transfer(flowFile, REL_SUCCESS);
logger.info("Sent email as a result of receiving {}", new Object[]{flowFile});
- } catch (final Exception e) {
+ } catch (final ProcessException | MessagingException | IOException e) {
context.yield();
logger.error("Failed to send email for {}: {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index dfdd401033..cf0539ed71 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -25,6 +25,11 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -34,12 +39,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@@ -102,62 +101,57 @@ public class SegmentContent extends AbstractProcessor {
return;
}
- try {
- final String segmentId = UUID.randomUUID().toString();
- final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
+ final String segmentId = UUID.randomUUID().toString();
+ final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
- final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
- if (flowFile.getSize() <= segmentSize) {
- flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
- flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
- flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
- flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
+ if (flowFile.getSize() <= segmentSize) {
+ flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
+ flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
+ flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
+ flowFile = session.putAttribute(flowFile, SEGMENT_ORIGINAL_FILENAME, originalFileName);
- flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
- flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
- flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
+ flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
+ flowFile = session.putAttribute(flowFile, FRAGMENT_INDEX, "1");
+ flowFile = session.putAttribute(flowFile, FRAGMENT_COUNT, "1");
- FlowFile clone = session.clone(flowFile);
- session.transfer(flowFile, REL_ORIGINAL);
- session.transfer(clone, REL_SEGMENTS);
- return;
- }
-
- int totalSegments = (int) (flowFile.getSize() / segmentSize);
- if (totalSegments * segmentSize < flowFile.getSize()) {
- totalSegments++;
- }
-
- final Mapnull
if the stream
+ * @return the DataPacket that was created, or null
if the stream
* was out of data
*
* @throws IOException
* @throws ProtocolException if the input is malformed
+ * @throws TransmissionDisabledException if a user terminates the connection
*/
- FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+ DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException;
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
similarity index 58%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index d18a4ee9f2..6fd92de266 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -26,14 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
public class StandardFlowFileCodec implements FlowFileCodec {
public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec {
}
@Override
- public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException {
+ public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException {
final DataOutputStream out = new DataOutputStream(encodedOut);
- final Maptrue
if remote instance indicates that the port is
* invalid
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
similarity index 87%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
index d2e2946c0d..5e5690239a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -21,6 +21,12 @@ import java.io.InputStream;
public interface CommunicationsInput {
+ /**
+ * Reads all data currently on the socket and throws it away
+ * @throws IOException
+ */
+ void consume() throws IOException;
+
InputStream getInputStream() throws IOException;
long getBytesRead();
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000000..3f0ec4ffda
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from a NiFi instance.
+ */
+public interface DataPacket {
+
+ /**
+ * The key-value attributes that are to be associated with the data
+ * @return
+ */
+ MapNiFiReceiver
is a Reliable Receiver that provides a way to pull data
+ * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects
+ * to NiFi instance provided in the config and requests data from
+ * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group,
+ * it acts as a queue of data for remote clients. This receiver is then able to pull that data
+ * from NiFi reliably.
+ *
+ *
+ */
+public class NiFiReceiver extends Receiver
+ * Pattern SPACE = Pattern.compile(" ");
+ *
+ * // Build a Site-to-site client config
+ * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
+ * .setUrl("http://localhost:8080/nifi")
+ * .setPortName("Data For Spark")
+ * .buildConfig();
+ *
+ * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
+ * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
+ *
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
+ * // specified Port
+ * JavaReceiverInputDStream
+ * null
if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- Integer getListeningPort() throws IOException;
-
/**
* Indicates whether or not the RemoteProcessGroup is currently scheduled to
* transmit data
@@ -211,11 +199,6 @@ public interface RemoteProcessGroup {
*/
void removeNonExistentPort(final RemoteGroupPort port);
- /**
- *
- * @return @throws IOException
- */
- CommunicationsSession establishSiteToSiteConnection() throws IOException;
/**
* Called whenever RemoteProcessGroup is removed from the flow, so that any
@@ -232,24 +215,4 @@ public interface RemoteProcessGroup {
void verifyCanStopTransmitting();
void verifyCanUpdate();
-
- /**
- * Returns a set of PeerStatus objects that describe the different peers
- * that we can communicate with for this RemoteProcessGroup.
- *
- * If the destination is a cluster, this set will contain PeerStatuses for
- * each of the nodes in the cluster.
- *
- * If the destination is a standalone instance, this set will contain just a
- * PeerStatus for the destination.
- *
- * Once the PeerStatuses have been obtained, they may be cached by this
- * RemoteProcessGroup for some amount of time.
- *
- * If unable to obtain the PeerStatuses or no peer status has yet been
- * obtained, will return null.
- *
- * @return
- */
- Settrue
if the Connectable should be yielded, false
otherwise.
+ */
+public class ContinuallyRunConnectableTask implements Callabletrue
if the processor should be yielded, false
otherwise.
+ */
+public class ContinuallyRunProcessorTask implements Callablenull
if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+ * @param timeoutMillis
+ * @return
+ * @throws IOException
+ */
+ public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteListeningPort(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteRootGroupId(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getController(uriObject, timeoutMillis).getInstanceId();
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or null
if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the full URI to fetch, including the path.
+ * @return
+ * @throws IOException
+ */
+ private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+ }
+
+ private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getId();
+ }
+
+ public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+ final ClientResponse response = get(uri, timeoutMillis);
+
+ if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ return entity.getController();
+ } else {
+ final String responseMessage = response.getEntity(String.class);
+ throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+ }
+ }
+
+ /**
+ * Issues a registration request on behalf of the current user.
+ *
+ * @param baseApiUri
+ * @return
+ */
+ public ClientResponse issueRegistrationRequest(String baseApiUri) {
+ final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+ // set up the query params
+ MultivaluedMapImpl entity = new MultivaluedMapImpl();
+ entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+
+ // create the web resource
+ WebResource webResource = client.resource(uri);
+
+ // get the client utils and make the request
+ return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 2c1b0857f2..6b70fe69ea 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,21 +18,9 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -49,11 +37,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
import javax.ws.rs.core.Response;
import org.apache.nifi.connectable.ConnectableType;
@@ -64,7 +49,6 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.util.RemoteProcessGroupUtils;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
@@ -72,16 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.remote.exception.BadRequestException;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
@@ -108,7 +82,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public static final String CONTROLLER_URI_PATH = "/controller";
public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
- public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
// status codes
public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
@@ -150,15 +123,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private ProcessGroupCounts counts = new ProcessGroupCounts(0, 0, 0, 0, 0, 0, 0, 0);
private Long refreshContentsTimestamp = null;
- private Integer listeningPort;
- private long listeningPortRetrievalTime = 0L;
private Boolean destinationSecure;
+ private Integer listeningPort;
private volatile String authorizationIssue;
- private volatile PeerStatusCache peerStatusCache;
- private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
-
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
@@ -200,72 +169,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
- final Runnable socketCleanup = new Runnable() {
- @Override
- public void run() {
- final Setnull
if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- @Override
- public Integer getListeningPort() throws IOException {
- Integer listeningPort;
- readLock.lock();
- try {
- listeningPort = this.listeningPort;
- if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
- return listeningPort;
- }
- } finally {
- readLock.unlock();
- }
-
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
- listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
- writeLock.lock();
- try {
- this.listeningPort = listeningPort;
- this.listeningPortRetrievalTime = System.currentTimeMillis();
- } finally {
- writeLock.unlock();
- }
-
- return listeningPort;
- }
-
@Override
public boolean isTransmitting() {
return transmitting.get();
@@ -1255,52 +1128,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- @Override
- public CommunicationsSession establishSiteToSiteConnection() throws IOException {
- final URI uri = apiUri;
- final String destinationUri = uri.toString();
- CommunicationsSession commsSession = null;
- try {
- if (isSecure()) {
- if (sslContext == null) {
- throw new IOException("Unable to communicate with " + getTargetUri() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
- }
-
- final Integer listeningPort = getListeningPort();
- if (listeningPort == null) {
- throw new IOException("Remote instance is not configured to allow incoming Site-to-Site connections");
- }
-
- final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, uri.getHost(), listeningPort, true);
- socketChannel.connect();
- commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
- try {
- commsSession.setUserDn(socketChannel.getDn());
- } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
- throw new IOException(ex);
- }
- } else {
- final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(uri.getHost(), getListeningPort()));
-
- commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
- }
-
- commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
- commsSession.setUri("nifi://" + uri.getHost() + ":" + uri.getPort());
- } catch (final IOException e) {
- if (commsSession != null) {
- try {
- commsSession.close();
- } catch (final IOException ignore) {
- }
- }
-
- throw e;
- }
- return commsSession;
- }
@Override
public EventReporter getEventReporter() {
@@ -1312,7 +1139,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
try {
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
final int statusCode = response.getStatus();
@@ -1385,7 +1212,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public String getYieldDuration() {
return yieldDuration;
}
-
+
@Override
public void verifyCanDelete() {
verifyCanDelete(false);
@@ -1487,135 +1314,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- @Override
- public Set