diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 3d7baccf2c..daff70d662 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -39,6 +39,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.net.InetAddress; import java.security.KeyStore; import java.security.SecureRandom; import java.util.LinkedHashSet; @@ -168,6 +169,7 @@ public interface SiteToSiteClient extends Closeable { private int batchCount; private long batchSize; private long batchNanos; + private InetAddress localAddress; private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW; private HttpProxy httpProxy; @@ -198,6 +200,7 @@ public interface SiteToSiteClient extends Closeable { this.batchCount = config.getPreferredBatchCount(); this.batchSize = config.getPreferredBatchSize(); this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + this.localAddress = config.getLocalAddress(); this.httpProxy = config.getHttpProxy(); return this; @@ -223,12 +226,31 @@ public interface SiteToSiteClient extends Closeable { } /** - *
Specifies the URLs of the remote NiFi instance.
- *If this URL points to a NiFi node in a NiFi cluster, data transfer to and from - * nodes will be automatically load balanced across the different nodes.
+ *+ * Specifies the local address to use when communicating with the remote NiFi instance. + *
* - *Multiple urls provide better connectivity with a NiFi cluster, able to connect - * to the target cluster at long as one of the specified urls is accessible.
+ * @param localAddress the local address to use, ornull
to use anyLocal
address.
+ * @return the builder
+ */
+ public Builder localAddress(final InetAddress localAddress) {
+ this.localAddress = localAddress;
+ return this;
+ }
+
+ /**
+ * + * Specifies the URLs of the remote NiFi instance. + *
+ *+ * If this URL points to a NiFi node in a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes. + *
+ * + *+ * Multiple urls provide better connectivity with a NiFi cluster, able to connect + * to the target cluster at long as one of the specified urls is accessible. + *
* * @param urls urls of remote instance * @return the builder @@ -717,6 +739,7 @@ public interface SiteToSiteClient extends Closeable { private final long batchSize; private final long batchNanos; private final HttpProxy httpProxy; + private final InetAddress localAddress; // some serialization frameworks require a default constructor private StandardSiteToSiteClientConfig() { @@ -740,6 +763,7 @@ public interface SiteToSiteClient extends Closeable { this.batchNanos = 0; this.transportProtocol = null; this.httpProxy = null; + this.localAddress = null; } private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { @@ -766,6 +790,7 @@ public interface SiteToSiteClient extends Closeable { this.batchNanos = builder.batchNanos; this.transportProtocol = builder.getTransportProtocol(); this.httpProxy = builder.getHttpProxy(); + this.localAddress = builder.localAddress; } @Override @@ -931,5 +956,10 @@ public interface SiteToSiteClient extends Closeable { public HttpProxy getHttpProxy() { return httpProxy; } + + @Override + public InetAddress getLocalAddress() { + return localAddress; + } } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 5bdeee45f0..83e8328471 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote.client; import java.io.File; import java.io.Serializable; +import java.net.InetAddress; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable { * for backward compatibility for implementations that does not expect multiple URLs. * {@link #getUrls()} should be used instead then should support multiple URLs when making requests. */ + @Deprecated String getUrl(); /** @@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable { */ HttpProxy getHttpProxy(); + /** + * @return the InetAddress to bind to for the local address when creating a socket, or + * {@code null} to bind to the {@code anyLocal} address. + */ + InetAddress getLocalAddress(); } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 89da6a0f9d..e6777b0eb2 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -16,6 +16,55 @@ */ package org.apache.nifi.remote.util; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; +import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -87,53 +136,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.security.cert.Certificate; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Predicate; -import java.util.regex.Pattern; - -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME; -import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; - public class SiteToSiteRestApiClient implements Closeable { private static final String EVENT_CATEGORY = "Site-to-Site"; @@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable { private CloseableHttpAsyncClient httpAsyncClient; private boolean compress = false; + private InetAddress localAddress = null; private long requestExpirationMillis = 0; private int serverTransactionTtl = 0; private int batchCount = 0; @@ -239,6 +242,10 @@ public class SiteToSiteRestApiClient implements Closeable { .setConnectTimeout(connectTimeoutMillis) .setSocketTimeout(readTimeoutMillis); + if (localAddress != null) { + requestConfigBuilder.setLocalAddress(localAddress); + } + if (proxy != null) { requestConfigBuilder.setProxy(proxy.getHttpHost()); } @@ -916,6 +923,8 @@ public class SiteToSiteRestApiClient implements Closeable { extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator; extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis; extendingApiClient.readTimeoutMillis = this.readTimeoutMillis; + extendingApiClient.localAddress = this.localAddress; + final int extendFrequency = serverTransactionTtl / 2; ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { @@ -1197,10 +1206,12 @@ public class SiteToSiteRestApiClient implements Closeable { public void setConnectTimeoutMillis(final int connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; + setupRequestConfig(); } public void setReadTimeoutMillis(final int readTimeoutMillis) { this.readTimeoutMillis = readTimeoutMillis; + setupRequestConfig(); } public static String getFirstUrl(final String clusterUrlStr) { @@ -1336,6 +1347,10 @@ public class SiteToSiteRestApiClient implements Closeable { public void setCompress(final boolean compress) { this.compress = compress; } + + public void setLocalAddress(final InetAddress localAddress) { + this.localAddress = localAddress; + } public void setRequestExpirationMillis(final long requestExpirationMillis) { if (requestExpirationMillis < 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java index 1177dad1af..4d061b872f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java @@ -16,6 +16,22 @@ */ package org.apache.nifi.controller; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.nifi.authorization.Resource; @@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.FormatUtils; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; - public abstract class AbstractPort implements Port { public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder() diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 64e2ca0989..cb1e6c8748 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -17,18 +17,22 @@ package org.apache.nifi.groups; import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.Positionable; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import java.net.InetAddress; +import java.util.Collection; import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable { + @Override String getIdentifier(); String getTargetUri(); @@ -154,6 +158,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable */ String getAuthorizationIssue(); + /** + * Validates the current configuration, returning ValidationResults for any + * invalid configuration parameter. + * + * @return Collection of validation result objects for any invalid findings + * only. If the collection is empty then the component is valid. Guaranteed + * non-null + */ + Collectionnull
if no specific address has been specified
+ */
+ InetAddress getLocalAddress();
+
/**
* Initiates a task in the remote process group to re-initialize, as a
* result of clustering changes
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index 8cad103434..f8f4b20922 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -33,6 +33,7 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo
public abstract TransferDirection getTransferDirection();
+ @Override
public abstract boolean isUseCompression();
public abstract void setUseCompression(boolean useCompression);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 855dab7549..67c8f11839 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,14 +18,15 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.UniformInterfaceException;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -40,12 +41,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
+
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
@@ -74,6 +78,11 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.UniformInterfaceException;
+
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as {@link IncomingPort}s and
@@ -99,7 +108,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final AtomicReference