From 7c5bd876bdd49ec14482a24dd31a073757f06ed4 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 30 Nov 2016 09:28:16 +0900 Subject: [PATCH] NIFI-3026: Support multiple remote target URLs - Added urls in addition to the existing url, to support multiple target URLs - Backward compatibility is provided by returning the first url if multipe urls are specified, but component accessing the url doesn't support multiple urls - UI is not fully updated yet. Following UI components are planned to be updated by different commits - Search component: only the first URL is searchable and shown - Component status: RPG status shows only the first URL - Component action history: only the first URL is searchable and shown - Updated Search component to use URLs. This closes #1208. --- .../client/AbstractSiteToSiteClient.java | 13 +- .../nifi/remote/client/SiteInfoProvider.java | 63 +++++- .../nifi/remote/client/SiteToSiteClient.java | 61 +++++- .../remote/client/SiteToSiteClientConfig.java | 12 + .../nifi/remote/client/http/HttpClient.java | 18 +- .../client/socket/EndpointConnectionPool.java | 12 +- .../remote/client/socket/SocketClient.java | 2 +- .../remote/util/SiteToSiteRestApiClient.java | 138 +++++++++++- .../remote/client/TestSiteInfoProvider.java | 207 ++++++++++++++++++ .../remote/client/http/TestHttpClient.java | 27 ++- .../client/socket/TestSiteToSiteClient.java | 17 +- .../util/TestSiteToSiteRestApiClient.java | 182 +++++++++------ .../web/api/dto/RemoteProcessGroupDTO.java | 50 ++++- .../api/dto/TestRemoteProcessGroupDTO.java | 49 +++++ .../nifi/groups/RemoteProcessGroup.java | 5 +- .../nifi/controller/FlowController.java | 10 +- .../controller/StandardFlowSynchronizer.java | 2 +- .../serialization/FlowFromDOMFactory.java | 1 + .../serialization/StandardFlowSerializer.java | 3 +- .../remote/StandardRemoteProcessGroup.java | 72 ++---- .../TestStandardRemoteProcessGroup.java | 59 ----- .../nifi/remote/StandardRemoteGroupPort.java | 7 +- .../remote/TestStandardRemoteGroupPort.java | 3 +- .../nifi/audit/RemoteProcessGroupAuditor.java | 2 +- .../nifi/web/api/ProcessGroupResource.java | 30 +-- .../apache/nifi/web/api/dto/DtoFactory.java | 4 +- .../nifi/web/controller/ControllerFacade.java | 2 +- .../impl/StandardRemoteProcessGroupDAO.java | 8 +- .../new-remote-process-group-dialog.jsp | 6 +- .../remote-process-group-configuration.jsp | 4 +- .../canvas/remote-process-group-details.jsp | 4 +- .../canvas/remote-process-group-ports.jsp | 4 +- .../nf-ng-remote-process-group-component.js | 6 +- .../nf-remote-process-group-configuration.js | 4 +- .../canvas/nf-remote-process-group-details.js | 4 +- .../canvas/nf-remote-process-group-ports.js | 4 +- .../js/nf/canvas/nf-remote-process-group.js | 2 +- 37 files changed, 788 insertions(+), 309 deletions(-) create mode 100644 nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java index 0dec3dfea9..05cf1f9c53 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/AbstractSiteToSiteClient.java @@ -16,30 +16,19 @@ */ package org.apache.nifi.remote.client; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Objects; import java.util.concurrent.TimeUnit; public abstract class AbstractSiteToSiteClient implements SiteToSiteClient { protected final SiteToSiteClientConfig config; protected final SiteInfoProvider siteInfoProvider; - protected final URI clusterUrl; public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) { this.config = config; - try { - Objects.requireNonNull(config.getUrl(), "URL cannot be null"); - clusterUrl = new URI(config.getUrl()); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Invalid Cluster URL: " + config.getUrl()); - } - final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS); siteInfoProvider = new SiteInfoProvider(); - siteInfoProvider.setClusterUrl(clusterUrl); + siteInfoProvider.setClusterUrls(config.getUrls()); siteInfoProvider.setSslContext(config.getSslContext()); siteInfoProvider.setConnectTimeoutMillis(commsTimeout); siteInfoProvider.setReadTimeoutMillis(commsTimeout); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java index aac7912ce9..a1a9a9c32b 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteInfoProvider.java @@ -18,8 +18,10 @@ package org.apache.nifi.remote.client; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -50,19 +52,26 @@ public class SiteInfoProvider { private final Map inputPortMap = new HashMap<>(); // map input port name to identifier private final Map outputPortMap = new HashMap<>(); // map output port name to identifier - private URI clusterUrl; + private Set clusterUrls; + private URI activeClusterUrl; private SSLContext sslContext; private int connectTimeoutMillis; private int readTimeoutMillis; private ControllerDTO refreshRemoteInfo() throws IOException { - final ControllerDTO controller; - try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) { - apiClient.setBaseUrl(SiteToSiteRestApiClient.resolveBaseUrl(clusterUrl)); + final ControllerDTO controller; + final URI connectedClusterUrl; + try (final SiteToSiteRestApiClient apiClient = createSiteToSiteRestApiClient(sslContext, proxy)) { apiClient.setConnectTimeoutMillis(connectTimeoutMillis); apiClient.setReadTimeoutMillis(readTimeoutMillis); - controller = apiClient.getController(); + controller = apiClient.getController(clusterUrls); + try { + connectedClusterUrl = new URI(apiClient.getBaseUrl()); + } catch (URISyntaxException e) { + // This should not happen since apiClient has successfully communicated with this URL. + throw new RuntimeException("Failed to parse connected cluster URL due to " + e); + } } remoteInfoWriteLock.lock(); @@ -70,6 +79,7 @@ public class SiteInfoProvider { this.siteToSitePort = controller.getRemoteSiteListeningPort(); this.siteToSiteHttpPort = controller.getRemoteSiteHttpListeningPort(); this.siteToSiteSecure = controller.isSiteToSiteSecure(); + this.activeClusterUrl = connectedClusterUrl; inputPortMap.clear(); for (final PortDTO inputPort : controller.getInputPorts()) { @@ -89,8 +99,12 @@ public class SiteInfoProvider { return controller; } + protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) { + return new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP); + } + public boolean isWebInterfaceSecure() { - return clusterUrl.toString().startsWith("https"); + return clusterUrls.stream().anyMatch(url -> url.startsWith("https")); } /** @@ -162,7 +176,7 @@ public class SiteInfoProvider { final ControllerDTO controller = refreshRemoteInfo(); final Boolean isSecure = controller.isSiteToSiteSecure(); if (isSecure == null) { - throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); + throw new IOException("Remote NiFi instance " + clusterUrls + " is not currently configured to accept site-to-site connections"); } return isSecure; @@ -207,8 +221,39 @@ public class SiteInfoProvider { } } - public void setClusterUrl(URI clusterUrl) { - this.clusterUrl = clusterUrl; + /** + * Return an active cluster URL that is known to work. + * If it is unknown yet or cache is expired, then remote info will be refreshed. + * @return an active cluster URL + */ + public URI getActiveClusterUrl() throws IOException { + URI resultClusterUrl; + remoteInfoReadLock.lock(); + try { + resultClusterUrl = this.activeClusterUrl; + if (resultClusterUrl != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return resultClusterUrl; + } + } finally { + remoteInfoReadLock.unlock(); + } + + refreshRemoteInfo(); + + remoteInfoReadLock.lock(); + try { + return this.activeClusterUrl; + } finally { + remoteInfoReadLock.unlock(); + } + } + + public void setClusterUrls(Set clusterUrls) { + this.clusterUrls = clusterUrls; + } + + public Set getClusterUrls() { + return clusterUrls; } public void setSslContext(SSLContext sslContext) { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 94ebca6395..3d7baccf2c 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -41,6 +41,8 @@ import java.io.InputStream; import java.io.Serializable; import java.security.KeyStore; import java.security.SecureRandom; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -147,7 +149,7 @@ public interface SiteToSiteClient extends Closeable { private static final long serialVersionUID = -4954962284343090219L; - private String url; + private Set urls; private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L); @@ -176,7 +178,7 @@ public interface SiteToSiteClient extends Closeable { * @return the builder */ public Builder fromConfig(final SiteToSiteClientConfig config) { - this.url = config.getUrl(); + this.urls = config.getUrls(); this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS); this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS); @@ -202,15 +204,37 @@ public interface SiteToSiteClient extends Closeable { } /** - * Specifies the URL of the remote NiFi instance. If this URL points to - * the Cluster Manager of a NiFi cluster, data transfer to and from - * nodes will be automatically load balanced across the different nodes. + *

Specifies the URL of the remote NiFi instance.

+ *

If this URL points to a NiFi node in a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes.

+ * + *

For better connectivity with a NiFi cluster, use {@link #urls(Set)} instead.

* * @param url url of remote instance * @return the builder */ public Builder url(final String url) { - this.url = url; + final Set urls = new LinkedHashSet<>(); + if (url != null && url.length() > 0) { + urls.add(url); + } + this.urls = urls; + return this; + } + + /** + *

Specifies the URLs of the remote NiFi instance.

+ *

If this URL points to a NiFi node in a NiFi cluster, data transfer to and from + * nodes will be automatically load balanced across the different nodes.

+ * + *

Multiple urls provide better connectivity with a NiFi cluster, able to connect + * to the target cluster at long as one of the specified urls is accessible.

+ * + * @param urls urls of remote instance + * @return the builder + */ + public Builder urls(final Set urls) { + this.urls = urls; return this; } @@ -542,7 +566,7 @@ public interface SiteToSiteClient extends Closeable { * or if the transport protocol is not supported. */ public SiteToSiteClient build() { - if (url == null) { + if (urls == null) { throw new IllegalStateException("Must specify URL to build Site-to-Site client"); } @@ -564,7 +588,10 @@ public interface SiteToSiteClient extends Closeable { * @return the configured URL for the remote NiFi instance */ public String getUrl() { - return url; + if (urls != null && urls.size() > 0) { + return urls.iterator().next(); + } + return null; } /** @@ -668,7 +695,8 @@ public interface SiteToSiteClient extends Closeable { private static final long serialVersionUID = 1L; - private final String url; + // This Set instance has to be initialized here to be serialized via Kryo. + private final Set urls = new LinkedHashSet<>(); private final long timeoutNanos; private final long penalizationNanos; private final long idleExpirationNanos; @@ -692,7 +720,6 @@ public interface SiteToSiteClient extends Closeable { // some serialization frameworks require a default constructor private StandardSiteToSiteClientConfig() { - this.url = null; this.timeoutNanos = 0; this.penalizationNanos = 0; this.idleExpirationNanos = 0; @@ -716,7 +743,9 @@ public interface SiteToSiteClient extends Closeable { } private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { - this.url = builder.url; + if (builder.urls != null) { + this.urls.addAll(builder.urls); + } this.timeoutNanos = builder.timeoutNanos; this.penalizationNanos = builder.penalizationNanos; this.idleExpirationNanos = builder.idleExpirationNanos; @@ -746,7 +775,15 @@ public interface SiteToSiteClient extends Closeable { @Override public String getUrl() { - return url; + if (urls != null && urls.size() > 0) { + return urls.iterator().next(); + } + return null; + } + + @Override + public Set getUrls() { + return urls; } @Override diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java index 65a7cfca3d..5bdeee45f0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote.client; import java.io.File; import java.io.Serializable; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -31,9 +32,20 @@ public interface SiteToSiteClientConfig extends Serializable { /** * @return the configured URL for the remote NiFi instance + * @deprecated This method only returns single URL string even if multiple URLs are set + * for backward compatibility for implementations that does not expect multiple URLs. + * {@link #getUrls()} should be used instead then should support multiple URLs when making requests. */ String getUrl(); + /** + * SiteToSite implementations should support multiple URLs when establishing a SiteToSite connection with a remote + * NiFi instance to provide robust connectivity so that it can keep working as long as at least one of + * the configured URLs is accessible. + * @return the configured URLs for the remote NiFi instances. + */ + Set getUrls(); + /** * @param timeUnit unit over which to report the timeout * @return the communications timeout in given unit diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java index b27526543c..c933db789e 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/http/HttpClient.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -92,12 +91,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications"); } - final URI clusterUrl; - try { - clusterUrl = new URI(config.getUrl()); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e); - } + final URI clusterUrl = siteInfoProvider.getActiveClusterUrl(); return new PeerDescription(clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure()); } @@ -135,8 +129,14 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr final CommunicationsSession commSession = new HttpCommunicationsSession(); final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription()); - final String clusterUrl = config.getUrl(); - final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl); + final StringBuilder clusterUrls = new StringBuilder(); + config.getUrls().forEach(url -> { + if (clusterUrls.length() > 0) { + clusterUrls.append(","); + clusterUrls.append(url); + } + }); + final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrls.toString()); final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS); String portId = config.getPortIdentifier(); diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index a17deaabe5..6f08f73f98 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -73,7 +73,6 @@ public class EndpointConnectionPool implements PeerStatusProvider { private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class); private final ConcurrentMap> connectionQueueMap = new ConcurrentHashMap<>(); - private final URI clusterUrl; private final Set activeConnections = Collections.synchronizedSet(new HashSet<>()); @@ -85,17 +84,14 @@ public class EndpointConnectionPool implements PeerStatusProvider { private volatile int commsTimeout; private volatile boolean shutdown = false; - private volatile Set lastFetchedQueryablePeers; private final SiteInfoProvider siteInfoProvider; private final PeerSelector peerSelector; - public EndpointConnectionPool(final URI clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, + public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) { - Objects.requireNonNull(clusterUrl, "URL cannot be null"); Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); - this.clusterUrl = clusterUrl; this.remoteDestination = remoteDestination; this.sslContext = sslContext; this.eventReporter = eventReporter; @@ -156,6 +152,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { SocketClientProtocol protocol = null; EndpointConnection connection; Peer peer = null; + URI clusterUrl = siteInfoProvider.getActiveClusterUrl(); do { final List addBack = new ArrayList<>(); @@ -361,7 +358,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { @Override public PeerDescription getBootstrapPeerDescription() throws IOException { - final String hostname = clusterUrl.getHost(); + final String hostname = siteInfoProvider.getActiveClusterUrl().getHost(); final Integer port = siteInfoProvider.getSiteToSitePort(); if (port == null) { throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications"); @@ -375,6 +372,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { public Set fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException { final String hostname = peerDescription.getHostname(); final int port = peerDescription.getPort(); + final URI clusterUrl = siteInfoProvider.getActiveClusterUrl(); final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://")); final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); @@ -522,7 +520,7 @@ public class EndpointConnectionPool implements PeerStatusProvider { @Override public String toString() { - return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]"; + return "EndpointConnectionPool[Cluster URL=" + siteInfoProvider.getClusterUrls() + "]"; } private class IdEnrichedRemoteDestination implements RemoteDestination { diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index d04234f8a2..1d3cce7d48 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -47,7 +47,7 @@ public class SocketClient extends AbstractSiteToSiteClient { super(config); final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS); - pool = new EndpointConnectionPool(clusterUrl, + pool = new EndpointConnectionPool( createRemoteDestination(config.getPortIdentifier(), config.getPortName()), commsTimeout, (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 67bd75e758..89da6a0f9d 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -107,7 +107,10 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -118,6 +121,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.regex.Pattern; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -316,7 +320,48 @@ public class SiteToSiteRestApiClient implements Closeable { } } - public ControllerDTO getController() throws IOException { + /** + * Parse the clusterUrls String, and try each URL in clusterUrls one by one to get a controller resource + * from those remote NiFi instances until a controller is successfully returned or try out all URLs. + * After this method execution, the base URL is set with the successful URL. + * @param clusterUrls url of the remote NiFi instance, multiple urls can be specified in comma-separated format + * @throws IllegalArgumentException when it fails to parse the URLs string, + * URLs string contains multiple protocols (http and https mix), + * or none of URL is specified. + */ + public ControllerDTO getController(final String clusterUrls) throws IOException { + return getController(parseClusterUrls(clusterUrls)); + } + + /** + * Try each URL in clusterUrls one by one to get a controller resource + * from those remote NiFi instances until a controller is successfully returned or try out all URLs. + * After this method execution, the base URL is set with the successful URL. + */ + public ControllerDTO getController(final Set clusterUrls) throws IOException { + + IOException lastException = null; + for (final String clusterUrl : clusterUrls) { + // The url may not be normalized if it passed directly without parsed with parseClusterUrls. + setBaseUrl(resolveBaseUrl(clusterUrl)); + try { + return getController(); + } catch (IOException e) { + lastException = e; + logger.warn("Failed to get controller from " + clusterUrl + " due to " + e); + if (logger.isDebugEnabled()) { + logger.debug("", e); + } + } + } + + if (clusterUrls.size() > 1) { + throw new IOException("Tried all cluster URLs but none of those was accessible. Last Exception was " + lastException, lastException); + } + throw lastException; + } + + private ControllerDTO getController() throws IOException { try { final HttpGet get = createGetControllerRequest(); return execute(get, ControllerEntity.class).getController(); @@ -1158,15 +1203,78 @@ public class SiteToSiteRestApiClient implements Closeable { this.readTimeoutMillis = readTimeoutMillis; } - public static String resolveBaseUrl(final String clusterUrl) { - Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null."); - URI clusterUri; - try { - clusterUri = new URI(clusterUrl.trim()); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e); + public static String getFirstUrl(final String clusterUrlStr) { + if (clusterUrlStr == null) { + return null; } - return resolveBaseUrl(clusterUri); + + final int commaIndex = clusterUrlStr.indexOf(','); + if (commaIndex > -1) { + return clusterUrlStr.substring(0, commaIndex); + } + return clusterUrlStr; + } + + /** + * Parse the comma-separated URLs string for the remote NiFi instances. + * @return A set containing one or more URLs + * @throws IllegalArgumentException when it fails to parse the URLs string, + * URLs string contains multiple protocols (http and https mix), + * or none of URL is specified. + */ + public static Set parseClusterUrls(final String clusterUrlStr) { + final Set urls = new LinkedHashSet<>(); + if (clusterUrlStr != null && clusterUrlStr.length() > 0) { + Arrays.stream(clusterUrlStr.split(",")) + .map(s -> s.trim()) + .filter(s -> s.length() > 0) + .forEach(s -> { + validateUriString(s); + urls.add(resolveBaseUrl(s).intern()); + }); + } + + if (urls.size() == 0) { + throw new IllegalArgumentException("Cluster URL was not specified."); + } + + final Predicate isHttps = url -> url.toLowerCase().startsWith("https:"); + if (urls.stream().anyMatch(isHttps) && urls.stream().anyMatch(isHttps.negate())) { + throw new IllegalArgumentException("Different protocols are used in the cluster URLs " + clusterUrlStr); + } + + return Collections.unmodifiableSet(urls); + } + + private static void validateUriString(String s) { + // parse the uri + final URI uri; + try { + uri = URI.create(s); + } catch (final IllegalArgumentException e) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s); + } + + // validate each part of the uri + if (uri.getScheme() == null || uri.getHost() == null) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s); + } + + if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { + throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + s); + } + } + + private static String resolveBaseUrl(final String clusterUrl) { + Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null."); + final URI uri; + try { + uri = new URI(clusterUrl.trim()); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl); + } + + return resolveBaseUrl(uri); } /** @@ -1179,7 +1287,17 @@ public class SiteToSiteRestApiClient implements Closeable { * @param clusterUrl url to be resolved * @return resolved url */ - public static String resolveBaseUrl(final URI clusterUrl) { + private static String resolveBaseUrl(final URI clusterUrl) { + + if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) { + throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl); + } + + if (!(clusterUrl.getScheme().equalsIgnoreCase("http") || clusterUrl.getScheme().equalsIgnoreCase("https"))) { + throw new IllegalArgumentException("The specified URL is invalid because it is not http or https: " + clusterUrl); + } + + String uriPath = clusterUrl.getPath().trim(); if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java new file mode 100644 index 0000000000..d9ba4edd5f --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestSiteInfoProvider.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestSiteInfoProvider { + + @Test + public void testSecure() throws Exception { + + final Set expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"https://node1:8443", "https://node2:8443"})); + final String expectedActiveClusterUrl = "https://node2:8443/nifi-api"; + final SSLContext expectedSslConText = mock(SSLContext.class); + final HttpProxy expectedHttpProxy = mock(HttpProxy.class); + + final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider()); + siteInfoProvider.setClusterUrls(expectedClusterUrl); + siteInfoProvider.setSslContext(expectedSslConText); + siteInfoProvider.setProxy(expectedHttpProxy); + + final ControllerDTO controllerDTO = new ControllerDTO(); + + final PortDTO inputPort1 = new PortDTO(); + inputPort1.setName("input-one"); + inputPort1.setId("input-0001"); + + final PortDTO inputPort2 = new PortDTO(); + inputPort2.setName("input-two"); + inputPort2.setId("input-0002"); + + final PortDTO outputPort1 = new PortDTO(); + outputPort1.setName("output-one"); + outputPort1.setId("output-0001"); + + final PortDTO outputPort2 = new PortDTO(); + outputPort2.setName("output-two"); + outputPort2.setId("output-0002"); + + final Set inputPorts = new HashSet<>(); + inputPorts.add(inputPort1); + inputPorts.add(inputPort2); + + final Set outputPorts = new HashSet<>(); + outputPorts.add(outputPort1); + outputPorts.add(outputPort2); + + controllerDTO.setInputPorts(inputPorts); + controllerDTO.setOutputPorts(outputPorts); + controllerDTO.setRemoteSiteListeningPort(8081); + controllerDTO.setRemoteSiteHttpListeningPort(8443); + controllerDTO.setSiteToSiteSecure(true); + + // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO. + doAnswer(invocation -> { + final SSLContext sslContext = invocation.getArgumentAt(0, SSLContext.class); + final HttpProxy httpProxy = invocation.getArgumentAt(1, HttpProxy.class); + + assertEquals(expectedSslConText, sslContext); + assertEquals(expectedHttpProxy, httpProxy); + + final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); + + when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO); + + when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl); + + return apiClient; + }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any()); + + // siteInfoProvider should expose correct information of the remote NiFi cluster. + assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort()); + assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort()); + assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure()); + assertTrue(siteInfoProvider.isWebInterfaceSecure()); + + assertEquals(inputPort1.getId(), siteInfoProvider.getInputPortIdentifier(inputPort1.getName())); + assertEquals(inputPort2.getId(), siteInfoProvider.getInputPortIdentifier(inputPort2.getName())); + assertEquals(outputPort1.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort1.getName())); + assertEquals(outputPort2.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort2.getName())); + assertNull(siteInfoProvider.getInputPortIdentifier("not-exist")); + assertNull(siteInfoProvider.getOutputPortIdentifier("not-exist")); + + assertEquals(inputPort1.getId(), siteInfoProvider.getPortIdentifier(inputPort1.getName(), TransferDirection.SEND)); + assertEquals(outputPort1.getId(), siteInfoProvider.getPortIdentifier(outputPort1.getName(), TransferDirection.RECEIVE)); + + assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString()); + + } + + @Test + public void testPlain() throws Exception { + + final Set expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"})); + final String expectedActiveClusterUrl = "http://node2:8443/nifi-api"; + + final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider()); + siteInfoProvider.setClusterUrls(expectedClusterUrl); + + final ControllerDTO controllerDTO = new ControllerDTO(); + + controllerDTO.setInputPorts(Collections.emptySet()); + controllerDTO.setOutputPorts(Collections.emptySet()); + controllerDTO.setRemoteSiteListeningPort(8081); + controllerDTO.setRemoteSiteHttpListeningPort(8080); + controllerDTO.setSiteToSiteSecure(false); + + // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO. + doAnswer(invocation -> { + final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); + + when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO); + + when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl); + + return apiClient; + }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any()); + + // siteInfoProvider should expose correct information of the remote NiFi cluster. + assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort()); + assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort()); + assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure()); + assertFalse(siteInfoProvider.isWebInterfaceSecure()); + + assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString()); + + } + + @Test + public void testConnectException() throws Exception { + + final Set expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"})); + + final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider()); + siteInfoProvider.setClusterUrls(expectedClusterUrl); + + final ControllerDTO controllerDTO = new ControllerDTO(); + + controllerDTO.setInputPorts(Collections.emptySet()); + controllerDTO.setOutputPorts(Collections.emptySet()); + controllerDTO.setRemoteSiteListeningPort(8081); + controllerDTO.setRemoteSiteHttpListeningPort(8080); + controllerDTO.setSiteToSiteSecure(false); + + // SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO. + doAnswer(invocation -> { + final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class); + + when(apiClient.getController(eq(expectedClusterUrl))).thenThrow(new IOException("Connection refused.")); + + return apiClient; + }).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any()); + + try { + siteInfoProvider.getSiteToSitePort(); + fail(); + } catch (IOException e) { + } + + try { + siteInfoProvider.getActiveClusterUrl(); + fail(); + } catch (IOException e) { + } + + } + +} diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java index 9e76a7894c..1ae9f2e458 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java @@ -80,6 +80,7 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -730,10 +731,10 @@ public class TestHttpClient { final URI uri = server.getURI(); try ( - SiteToSiteClient client = getDefaultBuilder() - .url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong") - .portName("input-running") - .build() + SiteToSiteClient client = getDefaultBuilder() + .url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong") + .portName("input-running") + .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.SEND); @@ -811,6 +812,24 @@ public class TestHttpClient { } + @Test + public void testSendSuccessMultipleUrls() throws Exception { + + final Set urls = new LinkedHashSet<>(); + urls.add("http://localhost:9999"); + urls.add("http://localhost:" + httpConnector.getLocalPort() + "/nifi"); + + try ( + final SiteToSiteClient client = getDefaultBuilder() + .urls(urls) + .portName("input-running") + .build() + ) { + testSend(client); + } + + } + @Test public void testSendSuccessWithProxy() throws Exception { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index 194a167039..c0b5e83f5a 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -35,7 +35,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; public class TestSiteToSiteClient { @@ -128,10 +130,23 @@ public class TestSiteToSiteClient { try { SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class); - Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl()); + Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls()); } finally { input.close(); } } + @Test + public void testGetUrlBackwardCompatibility() { + final Set urls = new LinkedHashSet<>(); + urls.add("http://node1:8080/nifi"); + urls.add("http://node2:8080/nifi"); + final SiteToSiteClientConfig config = new SiteToSiteClient.Builder() + .urls(urls) + .buildConfig(); + + Assert.assertEquals("http://node1:8080/nifi", config.getUrl()); + Assert.assertEquals(urls, config.getUrls()); + } + } diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java index 0dfb90ccf7..22b192baa6 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestSiteToSiteRestApiClient.java @@ -16,119 +16,163 @@ */ package org.apache.nifi.remote.util; -import org.apache.nifi.events.EventReporter; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; +import java.util.Iterator; +import java.util.Set; + +import static org.apache.nifi.remote.util.SiteToSiteRestApiClient.parseClusterUrls; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestSiteToSiteRestApiClient { + private static void assertSingleUri(final String expected, final Set urls) { + Assert.assertEquals(1, urls.size()); + Assert.assertEquals(expected, urls.iterator().next().toString()); + } + @Test public void testResolveBaseUrlHttp() throws Exception{ - - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); - - final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/nifi"); - Assert.assertEquals("http://nifi.example.com/nifi-api", baseUrl); + assertSingleUri("http://nifi.example.com/nifi-api", parseClusterUrls("http://nifi.example.com/nifi")); } @Test public void testResolveBaseUrlHttpSub() throws Exception{ - - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); - - final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/foo/bar/baz/nifi"); - Assert.assertEquals("http://nifi.example.com/foo/bar/baz/nifi-api", baseUrl); + assertSingleUri("http://nifi.example.com/foo/bar/baz/nifi-api", parseClusterUrls("http://nifi.example.com/foo/bar/baz/nifi")); } @Test public void testResolveBaseUrlHttpPort() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); - - final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com:8080/nifi"); - Assert.assertEquals("http://nifi.example.com:8080/nifi-api", baseUrl); + assertSingleUri("http://nifi.example.com:8080/nifi-api", parseClusterUrls("http://nifi.example.com:8080/nifi")); } @Test public void testResolveBaseUrlHttps() throws Exception{ - - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); - - final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com/nifi"); - Assert.assertEquals("https://nifi.example.com/nifi-api", baseUrl); + assertSingleUri("https://nifi.example.com/nifi-api", parseClusterUrls("https://nifi.example.com/nifi")); } @Test public void testResolveBaseUrlHttpsPort() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); - - final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com:8443/nifi"); - Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl); + assertSingleUri("https://nifi.example.com:8443/nifi-api", parseClusterUrls("https://nifi.example.com:8443/nifi")); } @Test public void testResolveBaseUrlLeniency() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); String expectedUri = "http://localhost:8080/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080 ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080 ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/ ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/nifi/ ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080 ")); + assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080 ")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/ ")); + assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/nifi/ ")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api/")); expectedUri = "http://localhost/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi-api")); expectedUri = "http://localhost:8080/some/path/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/some/path")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path")); + assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/some/path")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path ")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi/")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api/")); } @Test public void testResolveBaseUrlLeniencyHttps() { - final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP); String expectedUri = "https://localhost:8443/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443 ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443 ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/ ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/nifi/ ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443 ")); + assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443 ")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/ ")); + assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/nifi/ ")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api/")); expectedUri = "https://localhost/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi-api")); expectedUri = "https://localhost:8443/some/path/nifi-api"; - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path")); - assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/some/path")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path ")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi/")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api")); - assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path")); + assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/some/path")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path ")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi/")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api")); + assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api/")); } + @Test + public void testGetUrlsEmpty() throws Exception { + try { + parseClusterUrls(null); + fail("Should fail if cluster URL was not specified."); + } catch (IllegalArgumentException e) { + } + + try { + parseClusterUrls(""); + fail("Should fail if cluster URL was not specified."); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testGetUrlsOne() throws Exception { + final Set urls = parseClusterUrls("http://localhost:8080/nifi"); + + Assert.assertEquals(1, urls.size()); + Assert.assertEquals("http://localhost:8080/nifi-api", urls.iterator().next()); + } + + @Test + public void testGetUrlsThree() throws Exception { + final Set urls = parseClusterUrls("http://host1:8080/nifi,http://host2:8080/nifi,http://host3:8080/nifi"); + + Assert.assertEquals(3, urls.size()); + final Iterator iterator = urls.iterator(); + Assert.assertEquals("http://host1:8080/nifi-api", iterator.next()); + Assert.assertEquals("http://host2:8080/nifi-api", iterator.next()); + Assert.assertEquals("http://host3:8080/nifi-api", iterator.next()); + } + + @Test + public void testGetUrlsDifferentProtocols() throws Exception { + + try { + parseClusterUrls("http://host1:8080/nifi,https://host2:8080/nifi,http://host3:8080/nifi"); + fail("Should fail if cluster URLs contain different protocols."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Different protocols")); + } + } + + @Test + public void testGetUrlsMalformed() throws Exception { + + try { + parseClusterUrls("http://host1:8080/nifi,host&2:8080,http://host3:8080/nifi"); + fail("Should fail if cluster URLs contain illegal URL."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("malformed")); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java index 0afc1d53a2..df01b82e1a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupDTO.java @@ -31,6 +31,7 @@ import java.util.Date; public class RemoteProcessGroupDTO extends ComponentDTO { private String targetUri; + private String targetUris; private Boolean targetSecure; private String name; @@ -74,15 +75,60 @@ public class RemoteProcessGroupDTO extends ComponentDTO { } /** - * @return target uri of this remote process group + * @return target uri of this remote process group. + * If target uri is not set, but uris are set, then returns the first url in the urls. + * If neither target uri nor uris are set, then returns null. */ @ApiModelProperty( - value = "The target URI of the remote process group." + value = "The target URI of the remote process group." + + " If target uri is not set, but uris are set, then returns the first url in the urls." + + " If neither target uri nor uris are set, then returns null." ) public String getTargetUri() { + if (targetUri == null || targetUri.length() == 0) { + synchronized (this) { + if (targetUri == null || targetUri.length() == 0) { + if (targetUris != null && targetUris.length() > 0) { + if (targetUris.indexOf(',') > -1) { + targetUri = targetUris.substring(0, targetUris.indexOf(',')); + } else { + targetUri = targetUris; + } + } + } + } + } + return this.targetUri; } + public void setTargetUris(String targetUris) { + this.targetUris = targetUris; + } + + /** + * @return target uris of this remote process group + * If targetUris was not set but target uri was set, then returns a collection containing the single uri. + * If neither target uris nor uri were set, then returns null. + */ + @ApiModelProperty( + value = "The target URI of the remote process group." + + " If target uris is not set but target uri is set," + + " then returns a collection containing the single target uri." + + " If neither target uris nor uris are set, then returns null." + ) + public String getTargetUris() { + if (targetUris == null || targetUris.length() == 0) { + synchronized (this) { + if (targetUris == null || targetUris.length() == 0) { + targetUris = targetUri; + } + } + } + + return this.targetUris; + } + /** * @param name of this remote process group */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java new file mode 100644 index 0000000000..ff8c61bbfb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/test/java/org/apache/nifi/web/api/dto/TestRemoteProcessGroupDTO.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestRemoteProcessGroupDTO { + + @Test + public void testGetTargetUriAndUris() { + final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO(); + + assertNull(dto.getTargetUri()); + + dto.setTargetUris("http://node1:8080/nifi, http://node2:8080/nifi"); + assertEquals("If targetUris are set but targetUri is not, it should returns the first uru of the targetUris", + "http://node1:8080/nifi", dto.getTargetUri()); + assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris()); + + dto.setTargetUri("http://node3:9090/nifi"); + assertEquals("If both targetUri and targetUris are set, each returns its own values", + "http://node3:9090/nifi", dto.getTargetUri()); + assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris()); + + dto.setTargetUris(null); + assertEquals("http://node3:9090/nifi", dto.getTargetUri()); + assertEquals("getTargetUris should return targetUri when it's not set", + "http://node3:9090/nifi", dto.getTargetUris()); + + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index 133f274e23..64e2ca0989 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -23,7 +23,6 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; -import java.net.URI; import java.util.Date; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -32,7 +31,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable String getIdentifier(); - URI getTargetUri(); + String getTargetUri(); + + String getTargetUris(); ProcessGroup getProcessGroup(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index f78363e651..0f22f51231 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1226,13 +1226,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * given URI * * @param id group id - * @param uri group uri + * @param uris group uris, multiple url can be specified in comma-separated format * @return new group * @throws NullPointerException if either argument is null * @throws IllegalArgumentException if uri is not a valid URI. */ - public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) { - return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext, nifiProperties); + public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) { + return new StandardRemoteProcessGroup(requireNonNull(id).intern(), uris, null, this, sslContext, nifiProperties); } public ProcessGroup getRootGroup() { @@ -1769,7 +1769,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Instantiate Remote Process Groups // for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri()); + final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris()); remoteGroup.setComments(remoteGroupDTO.getComments()); remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition())); remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout()); @@ -2608,7 +2608,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final RemoteProcessGroupStatus status = new RemoteProcessGroupStatus(); status.setGroupId(remoteGroup.getProcessGroup().getIdentifier()); status.setName(isRemoteProcessGroupAuthorized ? remoteGroup.getName() : remoteGroup.getIdentifier()); - status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri().toString() : null); + status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri() : null); long lineageMillis = 0L; int flowFilesRemoved = 0; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 1d7ebdea12..d8654756a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -1091,7 +1091,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final List remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup"); for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) { final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor); - final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUri()); + final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris()); remoteGroup.setComments(remoteGroupDto.getComments()); remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition())); final String name = remoteGroupDto.getName(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index f1e4232c04..6c39f167cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -252,6 +252,7 @@ public class FlowFromDOMFactory { dto.setId(getString(element, "id")); dto.setName(getString(element, "name")); dto.setTargetUri(getString(element, "url")); + dto.setTargetUris(getString(element, "urls")); dto.setTransmitting(getBoolean(element, "transmitting")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setCommunicationsTimeout(getString(element, "timeout")); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 0ead668f23..b8936ba1c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -246,7 +246,8 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", remoteRef.getName()); addPosition(element, remoteRef.getPosition()); addTextElement(element, "comment", remoteRef.getComments()); - addTextElement(element, "url", remoteRef.getTargetUri().toString()); + addTextElement(element, "url", remoteRef.getTargetUri()); + addTextElement(element, "urls", remoteRef.getTargetUris()); addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout()); addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration()); addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting())); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 091f00e2b4..1fd4d32770 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -24,8 +24,6 @@ import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.UniformInterfaceException; import java.io.File; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -91,10 +89,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final String id; - private final URI targetUri; - private final URI apiUri; - private final String host; - private final String protocol; + private final String targetUris; private final ProcessScheduler scheduler; private final EventReporter eventReporter; private final NiFiProperties nifiProperties; @@ -136,30 +131,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final ScheduledExecutorService backgroundThreadExecutor; - public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup, - final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) { + public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup, + final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) { this.nifiProperties = nifiProperties; this.id = requireNonNull(id); this.flowController = requireNonNull(flowController); - final URI uri; - try { - uri = new URI(requireNonNull(targetUri.trim())); - final String apiPath = SiteToSiteRestApiClient.resolveBaseUrl(uri); - - apiUri = new URI(apiPath); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException(e); - } - - this.host = uri.getHost(); - this.protocol = uri.getAuthority(); - this.targetUri = uri; + this.targetUris = targetUris; this.targetId = null; this.processGroup = new AtomicReference<>(processGroup); this.sslContext = sslContext; this.scheduler = flowController.getProcessScheduler(); - this.authorizationIssue = "Establishing connection to " + targetUri; + this.authorizationIssue = "Establishing connection to " + targetUris; final BulletinRepository bulletinRepository = flowController.getBulletinRepository(); eventReporter = new EventReporter() { @@ -176,7 +159,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { }; final Runnable checkAuthorizations = new InitializationTask(); - backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri); + backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUris); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); } @@ -298,14 +281,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return targetId; } - public String getProtocol() { - return protocol; - } - @Override public String getName() { final String name = this.name.get(); - return name == null ? targetUri.toString() : name; + return name == null ? getTargetUri() : name; } @Override @@ -361,8 +340,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } @Override - public URI getTargetUri() { - return targetUri; + public String getTargetUri() { + return SiteToSiteRestApiClient.getFirstUrl(targetUris); + } + + @Override + public String getTargetUris() { + return targetUris; } @Override @@ -370,10 +354,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { return authorizationIssue; } - public String getHost() { - return host; - } - public int getInputPortCount() { readLock.lock(); try { @@ -739,7 +719,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public String toString() { - return "RemoteProcessGroup[" + targetUri + "]"; + return "RemoteProcessGroup[" + targetUris + "]"; } @Override @@ -786,7 +766,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // perform the request final ControllerDTO dto; try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { - dto = apiClient.getController(); + dto = apiClient.getController(targetUris); } catch (IOException e) { writeLock.lock(); try { @@ -807,7 +787,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { writeLock.unlock(); } - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + getApiUri() + " due to: " + e.getMessage()); + throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage()); } writeLock.lock(); @@ -878,16 +858,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private SiteToSiteRestApiClient getSiteToSiteRestApiClient() { SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter()); - apiClient.setBaseUrl(getApiUri()); apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS)); return apiClient; } - protected String getApiUri() { - return apiUri.toString(); - } - /** * Converts a set of ports into a set of remote process group ports. * @@ -1092,10 +1067,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } - private boolean isWebApiSecure() { - return targetUri.toString().toLowerCase().startsWith("https"); - } - @Override public boolean isSiteToSiteEnabled() { readLock.lock(); @@ -1117,7 +1088,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public void run() { try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) { try { - final ControllerDTO dto = apiClient.getController(); + final ControllerDTO dto = apiClient.getController(targetUris); if (dto.getRemoteSiteListeningPort() == null && SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) { authorizationIssue = "Remote instance is not configured to allow RAW Site-to-Site communications at this time."; @@ -1140,8 +1111,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) { try { // attempt to issue a registration request in case the target instance is a 0.x - final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null); - final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); + final boolean isApiSecure = apiClient.getBaseUrl().toLowerCase().startsWith("https"); + final RemoteNiFiUtils utils = new RemoteNiFiUtils(isApiSecure ? sslContext : null); + final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiClient.getBaseUrl()); if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) { logger.info("{} Issued a Request to communicate with remote instance", this); } else { @@ -1169,7 +1141,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } catch (final Exception e) { logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s", - StandardRemoteProcessGroup.this.getTargetUri().toString(), e)); + StandardRemoteProcessGroup.this.getTargetUris(), e)); } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java deleted file mode 100644 index 69d38e9239..0000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/remote/TestStandardRemoteProcessGroup.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.remote; - -import static org.junit.Assert.assertEquals; - -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.util.NiFiProperties; -import org.junit.Test; -import org.mockito.Mockito; - -public class TestStandardRemoteProcessGroup { - - @Test - public void testApiUri() { - final NiFiProperties properties = Mockito.mock(NiFiProperties.class); - final FlowController controller = Mockito.mock(FlowController.class); - final ProcessGroup group = Mockito.mock(ProcessGroup.class); - - final String expectedUri = "http://localhost:8080/nifi-api"; - StandardRemoteProcessGroup rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/ ", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", " http://localhost:8080/nifi/ ", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - - rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080 ", group, controller, null, properties); - assertEquals(expectedUri, rpg.getApiUri()); - } - -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 48d60d6993..3a23601984 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -49,6 +49,7 @@ import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.http.HttpProxy; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -143,7 +144,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS); final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url(remoteGroup.getTargetUri().toString()) + .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris())) .portIdentifier(getIdentifier()) .sslContext(sslContext) .useCompression(isUseCompression()) @@ -169,7 +170,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return; } - final String url = getRemoteProcessGroup().getTargetUri().toString(); + final String url = getRemoteProcessGroup().getTargetUri(); // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, // we don't want to create a transaction at all. @@ -433,7 +434,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { @Override public String toString() { - return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]"; + return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]"; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index b44f118083..23d3fdaf60 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -39,7 +39,6 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.InputStream; -import java.net.URI; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; @@ -108,7 +107,7 @@ public class TestStandardRemoteGroupPort { doReturn(true).when(remoteGroup).isTransmitting(); doReturn(protocol).when(remoteGroup).getTransportProtocol(); - doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri(); + doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri(); doReturn(siteToSiteClient).when(port).getSiteToSiteClient(); doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); doReturn(eventReporter).when(remoteGroup).getEventReporter(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java index d9a5df6614..e119437ce4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java @@ -236,7 +236,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { // create the remote process group details FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails(); - remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString()); + remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri()); // save the actions if necessary if (!details.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 8b9366fb60..5809a06611 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -36,6 +36,7 @@ import org.apache.nifi.authorization.TemplateAuthorizable; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; +import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.Revision; @@ -1356,31 +1357,12 @@ public class ProcessGroupResource extends ApplicationResource { // set the processor id as appropriate remoteProcessGroupDTO.setId(generateUuid()); - // parse the uri - final URI uri; - try { - uri = URI.create(remoteProcessGroupDTO.getTargetUri()); - } catch (final IllegalArgumentException e) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri()); - } + // parse the uri to check if the uri is valid + final String targetUris = remoteProcessGroupDTO.getTargetUris(); + SiteToSiteRestApiClient.parseClusterUrls(targetUris); - // validate each part of the uri - if (uri.getScheme() == null || uri.getHost() == null) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri()); - } - - if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + remoteProcessGroupDTO.getTargetUri()); - } - - // normalize the uri to the other controller - String controllerUri = uri.toString(); - if (controllerUri.endsWith("/")) { - controllerUri = StringUtils.substringBeforeLast(controllerUri, "/"); - } - - // since the uri is valid, use the normalized version - remoteProcessGroupDTO.setTargetUri(controllerUri); + // since the uri is valid, use it + remoteProcessGroupDTO.setTargetUris(targetUris); // create the remote process group final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 3f481288db..ec0392d806 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1533,7 +1533,7 @@ public final class DtoFactory { dto.setCommunicationsTimeout(group.getCommunicationsTimeout()); dto.setYieldDuration(group.getYieldDuration()); dto.setParentGroupId(group.getProcessGroup().getIdentifier()); - dto.setTargetUri(group.getTargetUri().toString()); + dto.setTargetUris(group.getTargetUris()); dto.setFlowRefreshed(group.getLastRefreshTime()); dto.setContents(contents); dto.setTransportProtocol(group.getTransportProtocol().name()); @@ -2857,7 +2857,7 @@ public final class DtoFactory { copy.setActiveRemoteOutputPortCount(original.getActiveRemoteOutputPortCount()); copy.setInactiveRemoteOutputPortCount(original.getInactiveRemoteOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); - copy.setTargetUri(original.getTargetUri()); + copy.setTargetUris(original.getTargetUris()); copy.setTransportProtocol(original.getTransportProtocol()); copy.setProxyHost(original.getProxyHost()); copy.setProxyPort(original.getProxyPort()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 30fcbd740b..2db2bbe835 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -1805,7 +1805,7 @@ public class ControllerFacade implements Authorizable { addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches); addIfAppropriate(searchStr, group.getName(), "Name", matches); addIfAppropriate(searchStr, group.getComments(), "Comments", matches); - addIfAppropriate(searchStr, group.getTargetUri().toString(), "URL", matches); + addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches); // consider the transmission status if ((StringUtils.containsIgnoreCase("transmitting", searchStr) || StringUtils.containsIgnoreCase("transmission enabled", searchStr)) && group.isTransmitting()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index bf4c96e259..d022b15b0b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -75,13 +75,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Remote Process Group is being added."); } - final String rawTargetUri = remoteProcessGroupDTO.getTargetUri(); - if (rawTargetUri == null) { - throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI"); + final String targetUris = remoteProcessGroupDTO.getTargetUris(); + if (targetUris == null || targetUris.length() == 0) { + throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI(s)"); } // create the remote process group - RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), rawTargetUri); + RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris); // set other properties updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp index ab6c3aeea2..4af046e609 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/new-remote-process-group-dialog.jsp @@ -18,9 +18,11 @@
-
URL
+
URLs
- +
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp index 8899f331b1..672800c6a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-process-group-ports.jsp @@ -33,9 +33,9 @@
 
-
URL
+
URLs
- +
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js index 7bf2633df7..90e67b6fa8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/header/components/nf-ng-remote-process-group-component.js @@ -34,7 +34,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) { } }), 'component': { - 'targetUri': $('#new-remote-process-group-uri').val(), + 'targetUris': $('#new-remote-process-group-uris').val(), 'position': { 'x': pt.x, 'y': pt.y @@ -125,7 +125,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) { headerText: 'Add Remote Process Group', handler: { close: function () { - $('#new-remote-process-group-uri').val(''); + $('#new-remote-process-group-uris').val(''); $('#new-remote-process-group-timeout').val(defaultTimeout); $('#new-remote-process-group-yield-duration').val(defaultYieldDuration); $('#new-remote-process-group-transport-protocol-combo').combo('setSelectedOption', { @@ -265,7 +265,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) { this.modal.show(); // set the focus and key handlers - $('#new-remote-process-group-uri').focus().off('keyup').on('keyup', function (e) { + $('#new-remote-process-group-uris').focus().off('keyup').on('keyup', function (e) { var code = e.keyCode ? e.keyCode : e.which; if (code === $.ui.keyCode.ENTER) { addRemoteProcessGroup(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js index 6a33af7950..ece8be69fe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-configuration.js @@ -106,7 +106,7 @@ nf.RemoteProcessGroupConfiguration = (function () { // clear the remote process group details $('#remote-process-group-id').text(''); $('#remote-process-group-name').text(''); - $('#remote-process-group-url').text(''); + $('#remote-process-group-urls').text(''); $('#remote-process-group-timeout').val(''); $('#remote-process-group-yield-duration').val(''); $('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', { @@ -144,7 +144,7 @@ nf.RemoteProcessGroupConfiguration = (function () { // populate the port settings $('#remote-process-group-id').text(selectionData.id); $('#remote-process-group-name').text(selectionData.component.name); - $('#remote-process-group-url').text(selectionData.component.targetUri); + $('#remote-process-group-urls').text(selectionData.component.targetUris); // populate the text fields $('#remote-process-group-timeout').val(selectionData.component.communicationsTimeout); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js index d757496f09..ebe116c8aa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-details.js @@ -41,7 +41,7 @@ nf.RemoteProcessGroupDetails = (function () { // clear the remote process group details nf.Common.clearField('read-only-remote-process-group-id'); nf.Common.clearField('read-only-remote-process-group-name'); - nf.Common.clearField('read-only-remote-process-group-url'); + nf.Common.clearField('read-only-remote-process-group-urls'); nf.Common.clearField('read-only-remote-process-group-timeout'); nf.Common.clearField('read-only-remote-process-group-yield-duration'); nf.Common.clearField('read-only-remote-process-group-transport-protocol'); @@ -67,7 +67,7 @@ nf.RemoteProcessGroupDetails = (function () { // populate the port settings nf.Common.populateField('read-only-remote-process-group-id', selectionData.id); nf.Common.populateField('read-only-remote-process-group-name', selectionData.component.name); - nf.Common.populateField('read-only-remote-process-group-url', selectionData.component.targetUri); + nf.Common.populateField('read-only-remote-process-group-urls', selectionData.component.targetUris); nf.Common.populateField('read-only-remote-process-group-timeout', selectionData.component.communicationsTimeout); nf.Common.populateField('read-only-remote-process-group-yield-duration', selectionData.component.yieldDuration); nf.Common.populateField('read-only-remote-process-group-transport-protocol', selectionData.component.transportProtocol); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js index 634a65f046..529cda7017 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js @@ -179,7 +179,7 @@ nf.RemoteProcessGroupPorts = (function () { // clear the remote process group details $('#remote-process-group-ports-id').text(''); $('#remote-process-group-ports-name').text(''); - $('#remote-process-group-ports-url').text(''); + $('#remote-process-group-ports-urls').text(''); // clear any tooltips var dialog = $('#remote-process-group-ports'); @@ -484,7 +484,7 @@ nf.RemoteProcessGroupPorts = (function () { // populate the port settings $('#remote-process-group-ports-id').text(remoteProcessGroup.id); $('#remote-process-group-ports-name').text(remoteProcessGroup.name); - $('#remote-process-group-ports-url').text(remoteProcessGroup.targetUri); + $('#remote-process-group-ports-urls').text(remoteProcessGroup.targetUris); // get the contents var remoteProcessGroupContents = remoteProcessGroup.contents; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js index 6e14bf6e2b..1903e44a18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group.js @@ -499,7 +499,7 @@ nf.RemoteProcessGroup = (function () { remoteProcessGroupUri.text(null).selectAll('title').remove(); // apply ellipsis to the remote process group name as necessary - nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUri); + nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUris); }).append('title').text(function (d) { return d.component.name; });