diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 7a09f5f1c6..7e5b303260 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote.io.socket.ssl; import java.io.Closeable; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; @@ -68,9 +69,13 @@ public class SSLSocketChannel implements Closeable { private boolean closed = false; private volatile boolean interrupted = false; - public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException { + public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final InetAddress localAddress, final boolean client) throws IOException { this.socketAddress = new InetSocketAddress(hostname, port); this.channel = SocketChannel.open(); + if (localAddress != null) { + final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0); + this.channel.bind(localSocketAddress); + } this.hostname = hostname; this.port = port; this.engine = sslContext.createSSLEngine(); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 6f08f73f98..926e4b4d75 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -23,7 +23,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.nio.channels.SocketChannel; import java.security.cert.CertificateException; @@ -87,9 +89,11 @@ public class EndpointConnectionPool implements PeerStatusProvider { private final SiteInfoProvider siteInfoProvider; private final PeerSelector peerSelector; + private final InetAddress localAddress; public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, - final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) { + final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider, + final InetAddress localAddress) { Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); this.remoteDestination = remoteDestination; @@ -97,6 +101,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { this.eventReporter = eventReporter; this.commsTimeout = commsTimeoutMillis; this.idleExpirationMillis = idleExpirationMillis; + this.localAddress = localAddress; this.siteInfoProvider = siteInfoProvider; @@ -440,7 +445,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); } - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, localAddress, true); socketChannel.connect(); commsSession = new SSLSocketChannelCommunicationsSession(socketChannel); @@ -452,6 +457,11 @@ public class EndpointConnectionPool implements PeerStatusProvider { } } else { final SocketChannel socketChannel = SocketChannel.open(); + if (localAddress != null) { + final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0); + socketChannel.socket().bind(localSocketAddress); + } + socketChannel.socket().connect(new InetSocketAddress(hostname, port), commsTimeout); socketChannel.socket().setSoTimeout(commsTimeout); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 1d3cce7d48..ba6839ce43 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient { commsTimeout, (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(), - siteInfoProvider + siteInfoProvider, config.getLocalAddress() ); this.compress = config.isUseCompression(); diff --git a/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png b/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png old mode 100755 new mode 100644 index 9a13c2708b..a1477da231 Binary files a/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png and b/nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png differ diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 2fe43742aa..085785eee8 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -367,9 +367,9 @@ image:iconRemoteProcessGroup.png["Remote Process Group", width=32] *Remote Process Group*: Remote Process Groups appear and behave similar to Process Groups. However, the Remote Process Group (RPG) references a remote instance of NiFi. When an RPG is dragged onto the canvas, rather than being prompted for a name, the DFM is prompted for the URL of the remote NiFi instance. If the remote NiFi is a clustered instance, the URL that should be used -is the URL of the remote instance's NiFi Cluster Manager (NCM). When data is transferred to a clustered instance of NiFi -via an RPG, the RPG will first connect to the remote instance's NCM to determine which nodes are in the cluster and -how busy each node is. This information is then used to load balance the data that is pushed to each node. The remote NCM is +is the URL of any NiFi instance in that cluster. When data is transferred to a clustered instance of NiFi +via an RPG, the RPG will first connect to the remote instance whose URL is configured to determine which nodes are in the cluster and +how busy each node is. This information is then used to load balance the data that is pushed to each node. The remote instances are then interrogated periodically to determine information about any nodes that are dropped from or added to the cluster and to recalculate the load balancing based on each node's load. For more information, see the section on <>. @@ -995,7 +995,13 @@ link:administration-guide.html[System Administrator’s Guide]. image:configure-remote-process-group.png["Configure Remote Process Group"] -By default, it is set to _RAW_ which uses raw socket communication using a dedicated port. _HTTP_ transport protocol is especially useful if the remote NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server. For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported. +By default, it is set to _RAW_ which uses raw socket communication using a dedicated port. _HTTP_ transport protocol is especially useful if the remote +NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server. +For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported. + +*Local Network Interface*: In some cases, it may be desirable to prefer one network interface over another. For example, if a wired interface and a wireless +interface both exist, the wired interface may be preferred. This can be configured by specifying the name of the network interface to use in this box. If the +value entered is not valid, the Remote Process Group will not be valid and will not communicate with other NiFi instances until this is resolved. ==== Configure Site-to-Site server NiFi instance diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java index df01b82e1a..ee64b69f52 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java @@ -39,12 +39,14 @@ public class RemoteProcessGroupDTO extends ComponentDTO { private String communicationsTimeout; private String yieldDuration; private String transportProtocol; + private String localNetworkInterface; private String proxyHost; private Integer proxyPort; private String proxyUser; private String proxyPassword; private Collection authorizationIssues; + private Collection validationErrors; private Boolean transmitting; private Integer inputPortCount; @@ -349,6 +351,25 @@ public class RemoteProcessGroupDTO extends ComponentDTO { this.transportProtocol = transportProtocol; } + @ApiModelProperty("The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier.") + public String getLocalNetworkInterface() { + return localNetworkInterface; + } + + public void setLocalNetworkInterface(String localNetworkInterface) { + this.localNetworkInterface = localNetworkInterface; + } + + @ApiModelProperty( + "The validation errors for the remote process group. These validation errors represent the problems with the remote process group that must be resolved before it can transmit." + ) + public Collection getValidationErrors() { + return validationErrors; + } + + public void setValidationErrors(Collection validationErrors) { + this.validationErrors = validationErrors; + } public String getProxyHost() { return proxyHost; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java index 3209e025c1..a426d939e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/RemoteProcessGroupEntityMerger.java @@ -33,7 +33,6 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger entityMap) { ComponentEntityMerger.super.merge(clientEntity, entityMap); for (Map.Entry entry : entityMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final RemoteProcessGroupEntity entityStatus = entry.getValue(); if (entityStatus != clientEntity) { mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey()); @@ -47,6 +46,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger entityMap) { final RemoteProcessGroupDTO clientDto = clientEntity.getComponent(); final Map dtoMap = new HashMap<>(); @@ -75,6 +75,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger> authorizationErrorMap = new HashMap<>(); + final Map> validationErrorMap = new HashMap<>(); Boolean mergedIsTargetSecure = null; final Set mergedInputPorts = new HashSet<>(); final Set mergedOutputPorts = new HashSet<>(); @@ -88,6 +89,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger clusterNodeInfo, String remoteInputHost, - int remoteInputPort, - int remoteInputHttpPort, + Integer remoteInputPort, + Integer remoteInputHttpPort, boolean isSiteToSiteSecure) throws IOException; void shutdown(Peer peer); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index d8654756a5..db9e68b821 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -45,6 +45,7 @@ import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.cluster.protocol.DataFlow; @@ -1126,6 +1127,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { remoteGroup.setProxyPassword(remoteGroupDto.getProxyPassword()); } + if (StringUtils.isBlank(remoteGroupDto.getLocalNetworkInterface())) { + remoteGroup.setNetworkInterface(null); + } else { + remoteGroup.setNetworkInterface(remoteGroupDto.getLocalNetworkInterface()); + } + final Set inputPorts = new HashSet<>(); for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) { inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 6c39f167cb..f8d38bc815 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -262,6 +262,7 @@ public class FlowFromDOMFactory { dto.setProxyHost(getString(element, "proxyHost")); dto.setProxyPort(getOptionalInt(element, "proxyPort")); dto.setProxyUser(getString(element, "proxyUser")); + dto.setLocalNetworkInterface(getString(element, "networkInterface")); final String rawPassword = getString(element, "proxyPassword"); final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index b8936ba1c3..f6e3d2b288 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -69,7 +69,7 @@ import org.w3c.dom.Node; */ public class StandardFlowSerializer implements FlowSerializer { - private static final String MAX_ENCODING_VERSION = "1.0"; + private static final String MAX_ENCODING_VERSION = "1.1"; private final StringEncryptor encryptor; @@ -261,6 +261,9 @@ public class StandardFlowSerializer implements FlowSerializer { final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX; addTextElement(element, "proxyPassword", value); } + if (remoteRef.getNetworkInterface() != null) { + addTextElement(element, "networkInterface", remoteRef.getNetworkInterface()); + } for (final RemoteGroupPort port : remoteRef.getInputPorts()) { if (port.hasIncomingConnection()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 3634850b3b..3679b9819f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -679,6 +679,7 @@ public class FingerprintFactory { private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "id")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "url")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "networkInterface")); final NodeList inputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "inputPort"); final NodeList outputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "outputPort"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 67c8f11839..286b2dcef1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -879,30 +879,33 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { writeLock.lock(); try { this.networkInterfaceName = interfaceName; + if (interfaceName == null) { + this.nicValidationResult = null; + } else { + try { + final Enumeration inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses(); - try { - final Enumeration inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses(); - - if (inetAddresses.hasMoreElements()) { - this.localAddress = inetAddresses.nextElement(); - this.nicValidationResult = null; - } else { + if (inetAddresses.hasMoreElements()) { + this.localAddress = inetAddresses.nextElement(); + this.nicValidationResult = null; + } else { + this.localAddress = null; + this.nicValidationResult = new ValidationResult.Builder() + .input(interfaceName) + .subject("Network Interface Name") + .valid(false) + .explanation("No IP Address could be found that is bound to the interface with name " + interfaceName) + .build(); + } + } catch (final Exception e) { this.localAddress = null; this.nicValidationResult = new ValidationResult.Builder() .input(interfaceName) .subject("Network Interface Name") .valid(false) - .explanation("No IP Address could be found that is bound to the interface with name " + interfaceName) + .explanation("Could not obtain Network Interface with name " + interfaceName) .build(); } - } catch (final Exception e) { - this.localAddress = null; - this.nicValidationResult = new ValidationResult.Builder() - .input(interfaceName) - .subject("Network Interface Name") - .valid(false) - .explanation("Could not obtain Network Interface with name " + interfaceName) - .build(); } } finally { writeLock.unlock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 4607320270..02a9ca51d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -205,6 +205,7 @@ +