diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
new file mode 100644
index 0000000000..f718581575
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
+public interface RemoteDestination {
+ /**
+ * Returns the identifier of the remote destination
+ *
+ * @return
+ */
+ String getIdentifier();
+
+ /**
+ * Returns the amount of time that system should pause sending to a particular node if unable to
+ * send data to or receive data from this endpoint
+ * @param timeUnit
+ * @return
+ */
+ long getYieldPeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns whether or not compression should be used when transferring data to or receiving
+ * data from the remote endpoint
+ * @return
+ */
+ boolean isUseCompression();
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
new file mode 100644
index 0000000000..3fc00a2038
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -0,0 +1,43 @@
+
+
+ * Provides a transaction for performing site-to-site data transfers. + *
+ * + *+ * A Transaction is created by calling the + * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} + * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction + * can be used to either send or receive data but not both. A new Transaction must be created in order perform the + * other operation. + *
+ * + *+ * The general flow of execute of a Transaction is as follows: + *
receive()
should not be
+ * called a second time without first fully consuming the stream from the previous Packet that was received.+ * It is important that the Transaction be terminated in order to free the resources held + * by the Transaction. If a Transaction is not terminated, its resources will not be freed and + * if the Transaction holds connections from a connection pool, the connections in that pool + * will eventually become exhausted. A Transaction is terminated by calling one of the following + * methods: + *
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction + * is automatically closed via a call to {@link #error()}. + *
+ * + *+ * The Transaction class should not be assumed to be thread-safe. + *
+ */ +public interface Transaction { + + /** + * Sends information to the remote NiFi instance. + * + * @param dataPacket the data packet to send + * @throws IOException + */ + void send(DataPacket dataPacket) throws IOException; + + /** + * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return + * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to + * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction + * has finished. This is done in order to prevent the need for a round-trip network request to receive data for + * each data packet. + * + * @return the DataPacket received, or {@code null} if there is no more data to receive. + * @throws IOException + */ + DataPacket receive() throws IOException; + + /** + *+ * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received. + *
+ * + *+ * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP), + * it is still required that we confirm the transaction before completing the transaction. This is done as + * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if + * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the + * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the + * sender and receiver will cancel the transaction automatically. + *
+ * + *+ * If the {@link TransferDirection} of this Transaction is RECEIVE, this method will throw an Exception unless + * all data from the remote instance has been consumed (i.e., a call to {@link #receive()} returns {@code null}). + *
+ * + *+ * If the {@link TransferDirection} of this Transaction is SEND, calling this method dictates that no more data will be + * sent in this transaction. I.e., there will be no more calls to {@link #send(DataPacket)}. + *
+ * + * @throws IOException + */ + void confirm() throws IOException; + + /** + *+ * Completes the transaction and indicates to both the sender and receiver that the data transfer was + * successful. If receiving data, this method can also optionally request that the sender back off sending + * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender + * that the receiver is not ready to receive data and made not service another request in the short term. + *
+ * + * @param requestBackoff iftrue
and the TransferDirection is RECEIVE, indicates to sender that it
+ * should back off sending data for a short period of time. If false
or if the TransferDirection of
+ * this Transaction is SEND, then this argument is ignored.
+ *
+ * @throws IOException
+ */
+ void complete(boolean requestBackoff) throws IOException;
+
+ /**
+ * + * Cancels this transaction, indicating to the sender that the data has not been successfully received so that + * the sender can retry or handle however is appropriate. + *
+ * + * @param explanation an explanation to tell the other party why the transaction was canceled. + * @throws IOException + */ + void cancel(final String explanation) throws IOException; + + + /** + *+ * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes + * the Transaction. The underlying connection should not be returned to a connection pool in this case. + *
+ */ + void error(); + + + /** + * Returns the current state of the Transaction. + * @return + * @throws IOException + */ + TransactionState getState() throws IOException; + + + public enum TransactionState { + /** + * Transaction has been started but no data has been sent or received. + */ + TRANSACTION_STARTED, + + /** + * Transaction has been started and data has been sent or received. + */ + DATA_EXCHANGED, + + /** + * Data that has been transferred has been confirmed via its CRC. Transaction is + * ready to be completed. + */ + TRANSACTION_CONFIRMED, + + /** + * Transaction has been successfully completed. + */ + TRANSACTION_COMPLETED, + + /** + * The Transaction has been canceled. + */ + TRANSACTION_CANCELED, + + /** + * The Transaction ended in an error. + */ + ERROR; + } +} diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java similarity index 75% rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java index 56432d50f2..45029a4ca3 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java @@ -16,8 +16,19 @@ */ package org.apache.nifi.remote; -public enum TransferDirection { +/** + * An enumeration for specifying the direction in which data should be transferred between a client + * and a remote NiFi instance. + */ +public enum TransferDirection { + /** + * The client is to send data to the remote instance. + */ SEND, + + /** + * The client is to receive data from the remote instance. + */ RECEIVE; } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java similarity index 100% rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java new file mode 100644 index 0000000000..fa94b81255 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.socket.SocketClient; +import org.apache.nifi.remote.protocol.DataPacket; + +/** + *+ * The SiteToSiteClient provides a mechanism for sending data to a remote instance of NiFi + * (or NiFi cluster) and retrieving data from a remote instance of NiFi (or NiFi cluster). + *
+ * + *+ * When configuring the client via the {@link SiteToSiteClient.Builder}, the Builder must + * be provided the URL of the remote NiFi instance. If the URL points to a standalone instance + * of NiFi, all interaction will take place with that instance of NiFi. However, if the URL + * points to the NiFi Cluster Manager of a cluster, the client will automatically handle load + * balancing the transactions across the different nodes in the cluster. + *
+ * + *+ * The SiteToSiteClient provides a {@link Transaction} through which all interaction with the + * remote instance takes place. After data has been exchanged or it is determined that no data + * is available, the Transaction can then be canceled (via the {@link Transaction#cancel(String)} + * method) or can be completed (via the {@link Transaction#complete(boolean)} method). + *
+ * + *+ * An instance of SiteToSiteClient can be obtained by constructing a new instance of the + * {@link SiteToSiteClient.Builder} class, calling the appropriate methods to configured the + * client as desired, and then calling the {@link SiteToSiteClient.Builder#build() build()} method. + *
+ * + *+ * The SiteToSiteClient itself is immutable once constructed and is thread-safe. Many threads can + * share access to the same client. However, the {@link Transaction} that is created by the client + * is not thread safe and should not be shared among threads. + *
+ */ +public interface SiteToSiteClient extends Closeable { + + /** + * Creates a new Transaction that can be used to either send data to a remote NiFi instance + * or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument. + * + * + * @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND} + * indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates + * that this Transaction will be used to receive data from the remote instance. + * + * @return + * @throws IOException + */ + Transaction createTransaction(TransferDirection direction) throws IOException; + + /** + *+ * Returns {@code true} if site-to-site communications with the remote instance are secure, + * {@code false} if site-to-site communications with the remote instance are not secure. Whether or not + * communications are secure depends on the server, not the client. + *
+ * + *+ * In order to determine whether the server is configured for secure communications, the client may have + * to query the server's RESTful interface. Doing so could result in an IOException. + *
+ * + * @return + * @throws IOException if unable to query the remote instance's RESTful interface or if the remote + * instance is not configured to allow site-to-site communications + */ + boolean isSecure() throws IOException; + + /** + *+ * Returns the configuration object that was built by the Builder + *
+ * @return + */ + SiteToSiteClientConfig getConfig(); + + /** + *+ * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient. + * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If + * a change in configuration should be desired, the client should be {@link Closeable#close() closed} + * and a new client created. + *
+ */ + public static class Builder { + private String url; + private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); + private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); + private SSLContext sslContext; + private EventReporter eventReporter; + private File peerPersistenceFile; + private boolean useCompression; + private String portName; + private String portIdentifier; + private int batchCount; + private long batchSize; + private long batchNanos; + + /** + * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of + * a NiFi cluster, data transfer to and from nodes will be automatically load balanced across + * the different nodes. + * + * @param url + * @return + */ + public Builder url(final String url) { + this.url = url; + return this; + } + + /** + * Specifies the communications timeouts to use when interacting with the remote instances. The + * default value is 30 seconds. + * + * @param timeout + * @param unit + * @return + */ + public Builder timeout(final long timeout, final TimeUnit unit) { + this.timeoutNanos = unit.toNanos(timeout); + return this; + } + + /** + * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster + * or the remote instance of NiFi if it is standalone), specifies how long the client should + * wait before attempting to communicate with that node again. While a particular node is penalized, + * all other nodes in the remote cluster (if any) will still be available for communication. + * The default value is 3 seconds. + * + * @param period + * @param unit + * @return + */ + public Builder nodePenalizationPeriod(final long period, final TimeUnit unit) { + this.penalizationNanos = unit.toNanos(period); + return this; + } + + /** + * Specifies the SSL Context to use when communicating with the remote NiFi instance(s). If not + * specified, communications will not be secure. The remote instance of NiFi always determines + * whether or not Site-to-Site communications are secure (i.e., the client will always use + * secure or non-secure communications, depending on what the server dictates). + * + * @param sslContext + * @return + */ + public Builder sslContext(final SSLContext sslContext) { + this.sslContext = sslContext; + return this; + } + + + /** + * Provides an EventReporter that can be used by the client in order to report any events that + * could be of interest when communicating with the remote instance. The EventReporter provided + * must be threadsafe. + * + * @param eventReporter + * @return + */ + public Builder eventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + return this; + } + + + /** + * Specifies a file that the client can write to in order to persist the list of nodes in the + * remote cluster and recover the list of nodes upon restart. This allows the client to function + * if the remote Cluster Manager is unavailable, even after a restart of the client software. + * If not specified, the list of nodes will not be persisted and a failure of the Cluster Manager + * will result in not being able to communicate with the remote instance if a new client + * is created. + * + * @param peerPersistenceFile + * @return + */ + public Builder peerPersistenceFile(final File peerPersistenceFile) { + this.peerPersistenceFile = peerPersistenceFile; + return this; + } + + /** + * Specifies whether or not data should be compressed before being transferred to or from the + * remote instance. + * + * @param compress + * @return + */ + public Builder useCompression(final boolean compress) { + this.useCompression = compress; + return this; + } + + /** + * Specifies the name of the port to communicate with. Either the port name or the port identifier + * must be specified. + * + * @param portName + * @return + */ + public Builder portName(final String portName) { + this.portName = portName; + return this; + } + + /** + * Specifies the unique identifier of the port to communicate with. If it is known, this is preferred over providing + * the port name, as the port name may change. + * + * @param portIdentifier + * @return + */ + public Builder portIdentifier(final String portIdentifier) { + this.portIdentifier = portIdentifier; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of {@link DataPacket}s to include in a Transaction. + * + * @return + */ + public Builder requestBatchCount(final int count) { + this.batchCount = count; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred number of bytes to include in a Transaction. + * + * @return + */ + public Builder requestBatchSize(final long bytes) { + this.batchSize = bytes; + return this; + } + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This method specifies + * the preferred amount of time that a Transaction should span. + * + * @return + */ + public Builder requestBatchDuration(final long value, final TimeUnit unit) { + this.batchNanos = unit.toNanos(value); + return this; + } + + + /** + * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi + * @return + */ + public SiteToSiteClient build() { + if ( url == null ) { + throw new IllegalStateException("Must specify URL to build Site-to-Site client"); + } + + if ( portName == null && portIdentifier == null ) { + throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client"); + } + + final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { + + @Override + public boolean isUseCompression() { + return Builder.this.isUseCompression(); + } + + @Override + public String getUrl() { + return Builder.this.getUrl(); + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return Builder.this.getTimeout(timeUnit); + } + + @Override + public SSLContext getSslContext() { + return Builder.this.getSslContext(); + } + + @Override + public String getPortName() { + return Builder.this.getPortName(); + } + + @Override + public String getPortIdentifier() { + return Builder.this.getPortIdentifier(); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return Builder.this.getPenalizationPeriod(timeUnit); + } + + @Override + public File getPeerPersistenceFile() { + return Builder.this.getPeerPersistenceFile(); + } + + @Override + public EventReporter getEventReporter() { + return Builder.this.getEventReporter(); + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return Builder.this.batchSize; + } + + @Override + public int getPreferredBatchCount() { + return Builder.this.batchCount; + } + }; + + return new SocketClient(config); + } + + /** + * Returns the configured URL for the remote NiFi instance + * @return + */ + public String getUrl() { + return url; + } + + /** + * Returns the communications timeout in nanoseconds + * @return + */ + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + /** + * Returns the amount of time that a particular node will be ignored after a + * communications error with that node occurs + * @param timeUnit + * @return + */ + public long getPenalizationPeriod(TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + /** + * Returns the SSL Context that is configured for this builder + * @return + */ + public SSLContext getSslContext() { + return sslContext; + } + + /** + * Returns the EventReporter that is to be used by clients to report events + * @return + */ + public EventReporter getEventReporter() { + return eventReporter; + } + + /** + * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. + * @return + */ + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + /** + * Returns a boolean indicating whether or not compression will be used to transfer data + * to and from the remote instance + * @return + */ + public boolean isUseCompression() { + return useCompression; + } + + /** + * Returns the name of the port that the client is to communicate with. + * @return + */ + public String getPortName() { + return portName; + } + + /** + * Returns the identifier of the port that the client is to communicate with. + * @return + */ + public String getPortIdentifier() { + return portIdentifier; + } + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java new file mode 100644 index 0000000000..37c48f856c --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.protocol.DataPacket; + +public interface SiteToSiteClientConfig { + + /** + * Returns the configured URL for the remote NiFi instance + * @return + */ + String getUrl(); + + /** + * Returns the communications timeout in nanoseconds + * @return + */ + long getTimeout(final TimeUnit timeUnit); + + /** + * Returns the amount of time that a particular node will be ignored after a + * communications error with that node occurs + * @param timeUnit + * @return + */ + long getPenalizationPeriod(TimeUnit timeUnit); + + /** + * Returns the SSL Context that is configured for this builder + * @return + */ + SSLContext getSslContext(); + + /** + * Returns the EventReporter that is to be used by clients to report events + * @return + */ + EventReporter getEventReporter(); + + /** + * Returns the file that is to be used for persisting the nodes of a remote cluster, if any. + * @return + */ + File getPeerPersistenceFile(); + + /** + * Returns a boolean indicating whether or not compression will be used to transfer data + * to and from the remote instance + * @return + */ + boolean isUseCompression(); + + /** + * Returns the name of the port that the client is to communicate with. + * @return + */ + String getPortName(); + + /** + * Returns the identifier of the port that the client is to communicate with. + * @return + */ + String getPortIdentifier(); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * amount of time that we will request a NiFi instance to send data to us in a Transaction. + * + * @param timeUnit + * @return + */ + long getPreferredBatchDuration(TimeUnit timeUnit); + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of bytes that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + long getPreferredBatchSize(); + + + /** + * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However, + * the client has the ability to request a particular batch size/duration. This returns the maximum + * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction. + * + * @return + */ + int getPreferredBatchCount(); +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java new file mode 100644 index 0000000000..f4ac727401 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; + +public class EndpointConnectionState { + private final Peer peer; + private final SocketClientProtocol socketClientProtocol; + private final FlowFileCodec codec; + private volatile long lastUsed; + + public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { + this.peer = peer; + this.socketClientProtocol = socketClientProtocol; + this.codec = codec; + } + + public FlowFileCodec getCodec() { + return codec; + } + + public SocketClientProtocol getSocketClientProtocol() { + return socketClientProtocol; + } + + public Peer getPeer() { + return peer; + } + + public void setLastTimeUsed() { + lastUsed = System.currentTimeMillis(); + } + + public long getLastTimeUsed() { + return lastUsed; + } +} diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java new file mode 100644 index 0000000000..8c23e28813 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -0,0 +1,835 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.security.cert.CertificateExpiredException; +import javax.security.cert.CertificateNotYetValidException; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.remote.util.RemoteNiFiUtils; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EndpointConnectionStatePool { + public static final long PEER_REFRESH_PERIOD = 60000L; + public static final String CATEGORY = "Site-to-Site"; + public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + + private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class); + + private final BlockingQueuenull
if the remote instance
+ * is not configured to allow site-to-site communications.
+ *
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ private Integer getSiteToSitePort() throws IOException {
+ Integer listeningPort;
+ remoteInfoReadLock.lock();
+ try {
+ listeningPort = this.siteToSitePort;
+ if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return listeningPort;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ listeningPort = controller.getRemoteSiteListeningPort();
+
+ return listeningPort;
+ }
+
+ /**
+ * Returns {@code true} if the remote instance is configured for secure site-to-site communications,
+ * {@code false} otherwise.
+ *
+ * @return
+ * @throws IOException
+ */
+ public boolean isSecure() throws IOException {
+ remoteInfoReadLock.lock();
+ try {
+ final Boolean secure = this.siteToSiteSecure;
+ if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
+ return secure;
+ }
+ } finally {
+ remoteInfoReadLock.unlock();
+ }
+
+ final ControllerDTO controller = refreshRemoteInfo();
+ return controller.isSiteToSiteSecure();
+ }
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
new file mode 100644
index 0000000000..0494d04fc8
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.util.ObjectHolder;
+
+public class SocketClient implements SiteToSiteClient {
+ private final SiteToSiteClientConfig config;
+ private final EndpointConnectionStatePool pool;
+ private final boolean compress;
+ private final String portName;
+ private final long penalizationNanos;
+ private volatile String portIdentifier;
+
+ public SocketClient(final SiteToSiteClientConfig config) {
+ pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
+
+ this.config = config;
+ this.compress = config.isUseCompression();
+ this.portIdentifier = config.getPortIdentifier();
+ this.portName = config.getPortName();
+ this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public SiteToSiteClientConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public boolean isSecure() throws IOException {
+ return pool.isSecure();
+ }
+
+ private String getPortIdentifier(final TransferDirection direction) throws IOException {
+ final String id = this.portIdentifier;
+ if ( id != null ) {
+ return id;
+ }
+
+ if ( direction == TransferDirection.SEND ) {
+ return pool.getInputPortIdentifier(this.portName);
+ } else {
+ return pool.getOutputPortIdentifier(this.portName);
+ }
+ }
+
+
+ @Override
+ public Transaction createTransaction(final TransferDirection direction) throws IOException {
+ final String portId = getPortIdentifier(TransferDirection.SEND);
+
+ if ( portId == null ) {
+ throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance");
+ }
+
+ final RemoteDestination remoteDestination = new RemoteDestination() {
+ @Override
+ public String getIdentifier() {
+ return portId;
+ }
+
+ @Override
+ public long getYieldPeriod(final TimeUnit timeUnit) {
+ return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public boolean isUseCompression() {
+ return compress;
+ }
+ };
+
+ final EndpointConnectionState connectionState;
+ try {
+ connectionState = pool.getEndpointConnectionState(remoteDestination, direction);
+ } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
+ throw new IOException(e);
+ }
+
+ final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
+ connectionState.getPeer(), connectionState.getCodec(), direction);
+
+ // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
+ // the transaction is either completed or canceled.
+ final ObjectHolder
@@ -44,36 +43,29 @@ public interface FlowFileCodec extends VersionedRemoteResource {
public Listnull
if the stream
+ * @return the DataPacket that was created, or null
if the stream
* was out of data
*
* @throws IOException
* @throws ProtocolException if the input is malformed
+ * @throws TransmissionDisabledException if a user terminates the connection
*/
- FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
+ DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException;
}
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
similarity index 58%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index d18a4ee9f2..6fd92de266 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -26,14 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
public class StandardFlowFileCodec implements FlowFileCodec {
public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec {
}
@Override
- public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException {
+ public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException {
final DataOutputStream out = new DataOutputStream(encodedOut);
- final Maptrue
if remote instance indicates that the port is
* invalid
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
similarity index 100%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
similarity index 79%
rename from nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
rename to nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index c4519cd68e..f4fa4d02f3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -14,10 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.remote.protocol.socket;
+package org.apache.nifi.remote.protocol;
-public enum HandshakeProperty {
- GZIP,
- PORT_IDENTIFIER,
- REQUEST_EXPIRATION_MILLIS;
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+ Mapnull
if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port.
+ * @param timeoutMillis
+ * @return
+ * @throws IOException
+ */
+ public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteListeningPort(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getRemoteRootGroupId(uriObject, timeoutMillis);
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException {
+ try {
+ final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+ return getController(uriObject, timeoutMillis).getInstanceId();
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri);
+ }
+ }
+
+ /**
+ * Returns the port on which the remote instance is listening for Flow File transfers, or null
if the remote instance
+ * is not configured to use Site-to-Site transfers.
+ *
+ * @param uri the full URI to fetch, including the path.
+ * @return
+ * @throws IOException
+ */
+ private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+ }
+
+ private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException {
+ return getController(uri, timeoutMillis).getId();
+ }
+
+ public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException {
+ final ClientResponse response = get(uri, timeoutMillis);
+
+ if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ return entity.getController();
+ } else {
+ final String responseMessage = response.getEntity(String.class);
+ throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage);
+ }
+ }
+
+ /**
+ * Issues a registration request on behalf of the current user.
+ *
+ * @param baseApiUri
+ * @return
+ */
+ public ClientResponse issueRegistrationRequest(String baseApiUri) {
+ final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users"));
+
+ // set up the query params
+ MultivaluedMapImpl entity = new MultivaluedMapImpl();
+ entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first.");
+
+ // create the web resource
+ WebResource webResource = client.resource(uri);
+
+ // get the client utils and make the request
+ return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+ }
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000000..bd1b50c5a4
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+ private final Mapnull
if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- Integer getListeningPort() throws IOException;
-
/**
* Indicates whether or not the RemoteProcessGroup is currently scheduled to
* transmit data
@@ -211,11 +203,6 @@ public interface RemoteProcessGroup {
*/
void removeNonExistentPort(final RemoteGroupPort port);
- /**
- *
- * @return @throws IOException
- */
- CommunicationsSession establishSiteToSiteConnection() throws IOException;
/**
* Called whenever RemoteProcessGroup is removed from the flow, so that any
@@ -232,24 +219,4 @@ public interface RemoteProcessGroup {
void verifyCanStopTransmitting();
void verifyCanUpdate();
-
- /**
- * Returns a set of PeerStatus objects that describe the different peers
- * that we can communicate with for this RemoteProcessGroup.
- *
- * If the destination is a cluster, this set will contain PeerStatuses for
- * each of the nodes in the cluster.
- *
- * If the destination is a standalone instance, this set will contain just a
- * PeerStatus for the destination.
- *
- * Once the PeerStatuses have been obtained, they may be cached by this
- * RemoteProcessGroup for some amount of time.
- *
- * If unable to obtain the PeerStatuses or no peer status has yet been
- * obtained, will return null.
- *
- * @return
- */
- Setnull
if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- @Override
- public Integer getListeningPort() throws IOException {
- Integer listeningPort;
- readLock.lock();
- try {
- listeningPort = this.listeningPort;
- if (listeningPort != null && this.listeningPortRetrievalTime > System.currentTimeMillis() - LISTENING_PORT_REFRESH_MILLIS) {
- return listeningPort;
- }
- } finally {
- readLock.unlock();
- }
-
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
- listeningPort = utils.getRemoteListeningPort(apiUri.toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
- writeLock.lock();
- try {
- this.listeningPort = listeningPort;
- this.listeningPortRetrievalTime = System.currentTimeMillis();
- } finally {
- writeLock.unlock();
- }
-
- return listeningPort;
- }
-
@Override
public boolean isTransmitting() {
return transmitting.get();
@@ -1255,52 +1136,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- @Override
- public CommunicationsSession establishSiteToSiteConnection() throws IOException {
- final URI uri = apiUri;
- final String destinationUri = uri.toString();
- CommunicationsSession commsSession = null;
- try {
- if (isSecure()) {
- if (sslContext == null) {
- throw new IOException("Unable to communicate with " + getTargetUri() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
- }
-
- final Integer listeningPort = getListeningPort();
- if (listeningPort == null) {
- throw new IOException("Remote instance is not configured to allow incoming Site-to-Site connections");
- }
-
- final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, uri.getHost(), listeningPort, true);
- socketChannel.connect();
- commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
- try {
- commsSession.setUserDn(socketChannel.getDn());
- } catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
- throw new IOException(ex);
- }
- } else {
- final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(uri.getHost(), getListeningPort()));
-
- commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri);
- }
-
- commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
- commsSession.setUri("nifi://" + uri.getHost() + ":" + uri.getPort());
- } catch (final IOException e) {
- if (commsSession != null) {
- try {
- commsSession.close();
- } catch (final IOException ignore) {
- }
- }
-
- throw e;
- }
- return commsSession;
- }
@Override
public EventReporter getEventReporter() {
@@ -1312,7 +1147,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void run() {
try {
- final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
+ final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
final int statusCode = response.getStatus();
@@ -1385,6 +1220,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public String getYieldDuration() {
return yieldDuration;
}
+
+ @Override
+ public EndpointConnectionStatePool getConnectionPool() {
+ return endpointConnectionPool;
+ }
@Override
public void verifyCanDelete() {
@@ -1487,135 +1327,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
- @Override
- public Set