NIFI-3026: Support multiple remote target URLs

- Added urls in addition to the existing url, to support multiple target
  URLs
- Backward compatibility is provided by returning the first url if
  multipe urls are specified, but component accessing the url doesn't
support multiple urls
- UI is not fully updated yet. Following UI components are planned to be updated
  by different commits
  - Search component: only the first URL is searchable and shown
  - Component status: RPG status shows only the first URL
  - Component action history: only the first URL is searchable and shown
  - Updated Search component to use URLs.

This closes #1208.
This commit is contained in:
Koji Kawamura 2016-11-30 09:28:16 +09:00 committed by Mark Payne
parent d8d29811f5
commit 7c5bd876bd
37 changed files with 788 additions and 309 deletions

View File

@ -16,30 +16,19 @@
*/
package org.apache.nifi.remote.client;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public abstract class AbstractSiteToSiteClient implements SiteToSiteClient {
protected final SiteToSiteClientConfig config;
protected final SiteInfoProvider siteInfoProvider;
protected final URI clusterUrl;
public AbstractSiteToSiteClient(final SiteToSiteClientConfig config) {
this.config = config;
try {
Objects.requireNonNull(config.getUrl(), "URL cannot be null");
clusterUrl = new URI(config.getUrl());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Invalid Cluster URL: " + config.getUrl());
}
final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS);
siteInfoProvider = new SiteInfoProvider();
siteInfoProvider.setClusterUrl(clusterUrl);
siteInfoProvider.setClusterUrls(config.getUrls());
siteInfoProvider.setSslContext(config.getSslContext());
siteInfoProvider.setConnectTimeoutMillis(commsTimeout);
siteInfoProvider.setReadTimeoutMillis(commsTimeout);

View File

@ -18,8 +18,10 @@ package org.apache.nifi.remote.client;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -50,19 +52,26 @@ public class SiteInfoProvider {
private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier
private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
private URI clusterUrl;
private Set<String> clusterUrls;
private URI activeClusterUrl;
private SSLContext sslContext;
private int connectTimeoutMillis;
private int readTimeoutMillis;
private ControllerDTO refreshRemoteInfo() throws IOException {
final ControllerDTO controller;
try (final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP)) {
apiClient.setBaseUrl(SiteToSiteRestApiClient.resolveBaseUrl(clusterUrl));
final ControllerDTO controller;
final URI connectedClusterUrl;
try (final SiteToSiteRestApiClient apiClient = createSiteToSiteRestApiClient(sslContext, proxy)) {
apiClient.setConnectTimeoutMillis(connectTimeoutMillis);
apiClient.setReadTimeoutMillis(readTimeoutMillis);
controller = apiClient.getController();
controller = apiClient.getController(clusterUrls);
try {
connectedClusterUrl = new URI(apiClient.getBaseUrl());
} catch (URISyntaxException e) {
// This should not happen since apiClient has successfully communicated with this URL.
throw new RuntimeException("Failed to parse connected cluster URL due to " + e);
}
}
remoteInfoWriteLock.lock();
@ -70,6 +79,7 @@ public class SiteInfoProvider {
this.siteToSitePort = controller.getRemoteSiteListeningPort();
this.siteToSiteHttpPort = controller.getRemoteSiteHttpListeningPort();
this.siteToSiteSecure = controller.isSiteToSiteSecure();
this.activeClusterUrl = connectedClusterUrl;
inputPortMap.clear();
for (final PortDTO inputPort : controller.getInputPorts()) {
@ -89,8 +99,12 @@ public class SiteInfoProvider {
return controller;
}
protected SiteToSiteRestApiClient createSiteToSiteRestApiClient(final SSLContext sslContext, final HttpProxy proxy) {
return new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP);
}
public boolean isWebInterfaceSecure() {
return clusterUrl.toString().startsWith("https");
return clusterUrls.stream().anyMatch(url -> url.startsWith("https"));
}
/**
@ -162,7 +176,7 @@ public class SiteInfoProvider {
final ControllerDTO controller = refreshRemoteInfo();
final Boolean isSecure = controller.isSiteToSiteSecure();
if (isSecure == null) {
throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
throw new IOException("Remote NiFi instance " + clusterUrls + " is not currently configured to accept site-to-site connections");
}
return isSecure;
@ -207,8 +221,39 @@ public class SiteInfoProvider {
}
}
public void setClusterUrl(URI clusterUrl) {
this.clusterUrl = clusterUrl;
/**
* Return an active cluster URL that is known to work.
* If it is unknown yet or cache is expired, then remote info will be refreshed.
* @return an active cluster URL
*/
public URI getActiveClusterUrl() throws IOException {
URI resultClusterUrl;
remoteInfoReadLock.lock();
try {
resultClusterUrl = this.activeClusterUrl;
if (resultClusterUrl != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
return resultClusterUrl;
}
} finally {
remoteInfoReadLock.unlock();
}
refreshRemoteInfo();
remoteInfoReadLock.lock();
try {
return this.activeClusterUrl;
} finally {
remoteInfoReadLock.unlock();
}
}
public void setClusterUrls(Set<String> clusterUrls) {
this.clusterUrls = clusterUrls;
}
public Set<String> getClusterUrls() {
return clusterUrls;
}
public void setSslContext(SSLContext sslContext) {

View File

@ -41,6 +41,8 @@ import java.io.InputStream;
import java.io.Serializable;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -147,7 +149,7 @@ public interface SiteToSiteClient extends Closeable {
private static final long serialVersionUID = -4954962284343090219L;
private String url;
private Set<String> urls;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
@ -176,7 +178,7 @@ public interface SiteToSiteClient extends Closeable {
* @return the builder
*/
public Builder fromConfig(final SiteToSiteClientConfig config) {
this.url = config.getUrl();
this.urls = config.getUrls();
this.timeoutNanos = config.getTimeout(TimeUnit.NANOSECONDS);
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
@ -202,15 +204,37 @@ public interface SiteToSiteClient extends Closeable {
}
/**
* Specifies the URL of the remote NiFi instance. If this URL points to
* the Cluster Manager of a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.
* <p>Specifies the URL of the remote NiFi instance.</p>
* <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.</p>
*
* <p>For better connectivity with a NiFi cluster, use {@link #urls(Set)} instead.</p>
*
* @param url url of remote instance
* @return the builder
*/
public Builder url(final String url) {
this.url = url;
final Set<String> urls = new LinkedHashSet<>();
if (url != null && url.length() > 0) {
urls.add(url);
}
this.urls = urls;
return this;
}
/**
* <p>Specifies the URLs of the remote NiFi instance.</p>
* <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.</p>
*
* <p>Multiple urls provide better connectivity with a NiFi cluster, able to connect
* to the target cluster at long as one of the specified urls is accessible.</p>
*
* @param urls urls of remote instance
* @return the builder
*/
public Builder urls(final Set<String> urls) {
this.urls = urls;
return this;
}
@ -542,7 +566,7 @@ public interface SiteToSiteClient extends Closeable {
* or if the transport protocol is not supported.
*/
public SiteToSiteClient build() {
if (url == null) {
if (urls == null) {
throw new IllegalStateException("Must specify URL to build Site-to-Site client");
}
@ -564,7 +588,10 @@ public interface SiteToSiteClient extends Closeable {
* @return the configured URL for the remote NiFi instance
*/
public String getUrl() {
return url;
if (urls != null && urls.size() > 0) {
return urls.iterator().next();
}
return null;
}
/**
@ -668,7 +695,8 @@ public interface SiteToSiteClient extends Closeable {
private static final long serialVersionUID = 1L;
private final String url;
// This Set instance has to be initialized here to be serialized via Kryo.
private final Set<String> urls = new LinkedHashSet<>();
private final long timeoutNanos;
private final long penalizationNanos;
private final long idleExpirationNanos;
@ -692,7 +720,6 @@ public interface SiteToSiteClient extends Closeable {
// some serialization frameworks require a default constructor
private StandardSiteToSiteClientConfig() {
this.url = null;
this.timeoutNanos = 0;
this.penalizationNanos = 0;
this.idleExpirationNanos = 0;
@ -716,7 +743,9 @@ public interface SiteToSiteClient extends Closeable {
}
private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
this.url = builder.url;
if (builder.urls != null) {
this.urls.addAll(builder.urls);
}
this.timeoutNanos = builder.timeoutNanos;
this.penalizationNanos = builder.penalizationNanos;
this.idleExpirationNanos = builder.idleExpirationNanos;
@ -746,7 +775,15 @@ public interface SiteToSiteClient extends Closeable {
@Override
public String getUrl() {
return url;
if (urls != null && urls.size() > 0) {
return urls.iterator().next();
}
return null;
}
@Override
public Set<String> getUrls() {
return urls;
}
@Override

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import java.io.File;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@ -31,9 +32,20 @@ public interface SiteToSiteClientConfig extends Serializable {
/**
* @return the configured URL for the remote NiFi instance
* @deprecated This method only returns single URL string even if multiple URLs are set
* for backward compatibility for implementations that does not expect multiple URLs.
* {@link #getUrls()} should be used instead then should support multiple URLs when making requests.
*/
String getUrl();
/**
* SiteToSite implementations should support multiple URLs when establishing a SiteToSite connection with a remote
* NiFi instance to provide robust connectivity so that it can keep working as long as at least one of
* the configured URLs is accessible.
* @return the configured URLs for the remote NiFi instances.
*/
Set<String> getUrls();
/**
* @param timeUnit unit over which to report the timeout
* @return the communications timeout in given unit

View File

@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -92,12 +91,7 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
throw new IOException("Remote instance of NiFi is not configured to allow HTTP site-to-site communications");
}
final URI clusterUrl;
try {
clusterUrl = new URI(config.getUrl());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Specified clusterUrl was: " + config.getUrl(), e);
}
final URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
return new PeerDescription(clusterUrl.getHost(), siteInfoProvider.getSiteToSiteHttpPort(), siteInfoProvider.isSecure());
}
@ -135,8 +129,14 @@ public class HttpClient extends AbstractSiteToSiteClient implements PeerStatusPr
final CommunicationsSession commSession = new HttpCommunicationsSession();
final String nodeApiUrl = resolveNodeApiUrl(peerStatus.getPeerDescription());
final String clusterUrl = config.getUrl();
final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrl);
final StringBuilder clusterUrls = new StringBuilder();
config.getUrls().forEach(url -> {
if (clusterUrls.length() > 0) {
clusterUrls.append(",");
clusterUrls.append(url);
}
});
final Peer peer = new Peer(peerStatus.getPeerDescription(), commSession, nodeApiUrl, clusterUrls.toString());
final int penaltyMillis = (int) config.getPenalizationPeriod(TimeUnit.MILLISECONDS);
String portId = config.getPortIdentifier();

View File

@ -73,7 +73,6 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap<>();
private final URI clusterUrl;
private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<>());
@ -85,17 +84,14 @@ public class EndpointConnectionPool implements PeerStatusProvider {
private volatile int commsTimeout;
private volatile boolean shutdown = false;
private volatile Set<PeerStatus> lastFetchedQueryablePeers;
private final SiteInfoProvider siteInfoProvider;
private final PeerSelector peerSelector;
public EndpointConnectionPool(final URI clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) {
Objects.requireNonNull(clusterUrl, "URL cannot be null");
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
this.clusterUrl = clusterUrl;
this.remoteDestination = remoteDestination;
this.sslContext = sslContext;
this.eventReporter = eventReporter;
@ -156,6 +152,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
SocketClientProtocol protocol = null;
EndpointConnection connection;
Peer peer = null;
URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
do {
final List<EndpointConnection> addBack = new ArrayList<>();
@ -361,7 +358,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
@Override
public PeerDescription getBootstrapPeerDescription() throws IOException {
final String hostname = clusterUrl.getHost();
final String hostname = siteInfoProvider.getActiveClusterUrl().getHost();
final Integer port = siteInfoProvider.getSiteToSitePort();
if (port == null) {
throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
@ -375,6 +372,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
public Set<PeerStatus> fetchRemotePeerStatuses(final PeerDescription peerDescription) throws IOException {
final String hostname = peerDescription.getHostname();
final int port = peerDescription.getPort();
final URI clusterUrl = siteInfoProvider.getActiveClusterUrl();
final PeerDescription clusterPeerDescription = new PeerDescription(hostname, port, clusterUrl.toString().startsWith("https://"));
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
@ -522,7 +520,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
@Override
public String toString() {
return "EndpointConnectionPool[Cluster URL=" + clusterUrl + "]";
return "EndpointConnectionPool[Cluster URL=" + siteInfoProvider.getClusterUrls() + "]";
}
private class IdEnrichedRemoteDestination implements RemoteDestination {

View File

@ -47,7 +47,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
super(config);
final int commsTimeout = (int) config.getTimeout(TimeUnit.MILLISECONDS);
pool = new EndpointConnectionPool(clusterUrl,
pool = new EndpointConnectionPool(
createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
commsTimeout,
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),

View File

@ -107,7 +107,10 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -118,6 +121,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
@ -316,7 +320,48 @@ public class SiteToSiteRestApiClient implements Closeable {
}
}
public ControllerDTO getController() throws IOException {
/**
* Parse the clusterUrls String, and try each URL in clusterUrls one by one to get a controller resource
* from those remote NiFi instances until a controller is successfully returned or try out all URLs.
* After this method execution, the base URL is set with the successful URL.
* @param clusterUrls url of the remote NiFi instance, multiple urls can be specified in comma-separated format
* @throws IllegalArgumentException when it fails to parse the URLs string,
* URLs string contains multiple protocols (http and https mix),
* or none of URL is specified.
*/
public ControllerDTO getController(final String clusterUrls) throws IOException {
return getController(parseClusterUrls(clusterUrls));
}
/**
* Try each URL in clusterUrls one by one to get a controller resource
* from those remote NiFi instances until a controller is successfully returned or try out all URLs.
* After this method execution, the base URL is set with the successful URL.
*/
public ControllerDTO getController(final Set<String> clusterUrls) throws IOException {
IOException lastException = null;
for (final String clusterUrl : clusterUrls) {
// The url may not be normalized if it passed directly without parsed with parseClusterUrls.
setBaseUrl(resolveBaseUrl(clusterUrl));
try {
return getController();
} catch (IOException e) {
lastException = e;
logger.warn("Failed to get controller from " + clusterUrl + " due to " + e);
if (logger.isDebugEnabled()) {
logger.debug("", e);
}
}
}
if (clusterUrls.size() > 1) {
throw new IOException("Tried all cluster URLs but none of those was accessible. Last Exception was " + lastException, lastException);
}
throw lastException;
}
private ControllerDTO getController() throws IOException {
try {
final HttpGet get = createGetControllerRequest();
return execute(get, ControllerEntity.class).getController();
@ -1158,15 +1203,78 @@ public class SiteToSiteRestApiClient implements Closeable {
this.readTimeoutMillis = readTimeoutMillis;
}
public static String resolveBaseUrl(final String clusterUrl) {
Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
URI clusterUri;
try {
clusterUri = new URI(clusterUrl.trim());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Specified clusterUrl was: " + clusterUrl, e);
public static String getFirstUrl(final String clusterUrlStr) {
if (clusterUrlStr == null) {
return null;
}
return resolveBaseUrl(clusterUri);
final int commaIndex = clusterUrlStr.indexOf(',');
if (commaIndex > -1) {
return clusterUrlStr.substring(0, commaIndex);
}
return clusterUrlStr;
}
/**
* Parse the comma-separated URLs string for the remote NiFi instances.
* @return A set containing one or more URLs
* @throws IllegalArgumentException when it fails to parse the URLs string,
* URLs string contains multiple protocols (http and https mix),
* or none of URL is specified.
*/
public static Set<String> parseClusterUrls(final String clusterUrlStr) {
final Set<String> urls = new LinkedHashSet<>();
if (clusterUrlStr != null && clusterUrlStr.length() > 0) {
Arrays.stream(clusterUrlStr.split(","))
.map(s -> s.trim())
.filter(s -> s.length() > 0)
.forEach(s -> {
validateUriString(s);
urls.add(resolveBaseUrl(s).intern());
});
}
if (urls.size() == 0) {
throw new IllegalArgumentException("Cluster URL was not specified.");
}
final Predicate<String> isHttps = url -> url.toLowerCase().startsWith("https:");
if (urls.stream().anyMatch(isHttps) && urls.stream().anyMatch(isHttps.negate())) {
throw new IllegalArgumentException("Different protocols are used in the cluster URLs " + clusterUrlStr);
}
return Collections.unmodifiableSet(urls);
}
private static void validateUriString(String s) {
// parse the uri
final URI uri;
try {
uri = URI.create(s);
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s);
}
// validate each part of the uri
if (uri.getScheme() == null || uri.getHost() == null) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + s);
}
if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) {
throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + s);
}
}
private static String resolveBaseUrl(final String clusterUrl) {
Objects.requireNonNull(clusterUrl, "clusterUrl cannot be null.");
final URI uri;
try {
uri = new URI(clusterUrl.trim());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl);
}
return resolveBaseUrl(uri);
}
/**
@ -1179,7 +1287,17 @@ public class SiteToSiteRestApiClient implements Closeable {
* @param clusterUrl url to be resolved
* @return resolved url
*/
public static String resolveBaseUrl(final URI clusterUrl) {
private static String resolveBaseUrl(final URI clusterUrl) {
if (clusterUrl.getScheme() == null || clusterUrl.getHost() == null) {
throw new IllegalArgumentException("The specified URL is malformed: " + clusterUrl);
}
if (!(clusterUrl.getScheme().equalsIgnoreCase("http") || clusterUrl.getScheme().equalsIgnoreCase("https"))) {
throw new IllegalArgumentException("The specified URL is invalid because it is not http or https: " + clusterUrl);
}
String uriPath = clusterUrl.getPath().trim();
if (StringUtils.isEmpty(uriPath) || uriPath.equals("/")) {

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.junit.Test;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestSiteInfoProvider {
@Test
public void testSecure() throws Exception {
final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"https://node1:8443", "https://node2:8443"}));
final String expectedActiveClusterUrl = "https://node2:8443/nifi-api";
final SSLContext expectedSslConText = mock(SSLContext.class);
final HttpProxy expectedHttpProxy = mock(HttpProxy.class);
final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
siteInfoProvider.setClusterUrls(expectedClusterUrl);
siteInfoProvider.setSslContext(expectedSslConText);
siteInfoProvider.setProxy(expectedHttpProxy);
final ControllerDTO controllerDTO = new ControllerDTO();
final PortDTO inputPort1 = new PortDTO();
inputPort1.setName("input-one");
inputPort1.setId("input-0001");
final PortDTO inputPort2 = new PortDTO();
inputPort2.setName("input-two");
inputPort2.setId("input-0002");
final PortDTO outputPort1 = new PortDTO();
outputPort1.setName("output-one");
outputPort1.setId("output-0001");
final PortDTO outputPort2 = new PortDTO();
outputPort2.setName("output-two");
outputPort2.setId("output-0002");
final Set<PortDTO> inputPorts = new HashSet<>();
inputPorts.add(inputPort1);
inputPorts.add(inputPort2);
final Set<PortDTO> outputPorts = new HashSet<>();
outputPorts.add(outputPort1);
outputPorts.add(outputPort2);
controllerDTO.setInputPorts(inputPorts);
controllerDTO.setOutputPorts(outputPorts);
controllerDTO.setRemoteSiteListeningPort(8081);
controllerDTO.setRemoteSiteHttpListeningPort(8443);
controllerDTO.setSiteToSiteSecure(true);
// SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
doAnswer(invocation -> {
final SSLContext sslContext = invocation.getArgumentAt(0, SSLContext.class);
final HttpProxy httpProxy = invocation.getArgumentAt(1, HttpProxy.class);
assertEquals(expectedSslConText, sslContext);
assertEquals(expectedHttpProxy, httpProxy);
final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO);
when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl);
return apiClient;
}).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
// siteInfoProvider should expose correct information of the remote NiFi cluster.
assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort());
assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort());
assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure());
assertTrue(siteInfoProvider.isWebInterfaceSecure());
assertEquals(inputPort1.getId(), siteInfoProvider.getInputPortIdentifier(inputPort1.getName()));
assertEquals(inputPort2.getId(), siteInfoProvider.getInputPortIdentifier(inputPort2.getName()));
assertEquals(outputPort1.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort1.getName()));
assertEquals(outputPort2.getId(), siteInfoProvider.getOutputPortIdentifier(outputPort2.getName()));
assertNull(siteInfoProvider.getInputPortIdentifier("not-exist"));
assertNull(siteInfoProvider.getOutputPortIdentifier("not-exist"));
assertEquals(inputPort1.getId(), siteInfoProvider.getPortIdentifier(inputPort1.getName(), TransferDirection.SEND));
assertEquals(outputPort1.getId(), siteInfoProvider.getPortIdentifier(outputPort1.getName(), TransferDirection.RECEIVE));
assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString());
}
@Test
public void testPlain() throws Exception {
final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"}));
final String expectedActiveClusterUrl = "http://node2:8443/nifi-api";
final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
siteInfoProvider.setClusterUrls(expectedClusterUrl);
final ControllerDTO controllerDTO = new ControllerDTO();
controllerDTO.setInputPorts(Collections.emptySet());
controllerDTO.setOutputPorts(Collections.emptySet());
controllerDTO.setRemoteSiteListeningPort(8081);
controllerDTO.setRemoteSiteHttpListeningPort(8080);
controllerDTO.setSiteToSiteSecure(false);
// SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
doAnswer(invocation -> {
final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
when(apiClient.getController(eq(expectedClusterUrl))).thenReturn(controllerDTO);
when(apiClient.getBaseUrl()).thenReturn(expectedActiveClusterUrl);
return apiClient;
}).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
// siteInfoProvider should expose correct information of the remote NiFi cluster.
assertEquals(controllerDTO.getRemoteSiteListeningPort(), siteInfoProvider.getSiteToSitePort());
assertEquals(controllerDTO.getRemoteSiteHttpListeningPort(), siteInfoProvider.getSiteToSiteHttpPort());
assertEquals(controllerDTO.isSiteToSiteSecure(), siteInfoProvider.isSecure());
assertFalse(siteInfoProvider.isWebInterfaceSecure());
assertEquals(expectedActiveClusterUrl, siteInfoProvider.getActiveClusterUrl().toString());
}
@Test
public void testConnectException() throws Exception {
final Set<String> expectedClusterUrl = new LinkedHashSet<>(Arrays.asList(new String[]{"http://node1:8443, http://node2:8443"}));
final SiteInfoProvider siteInfoProvider = spy(new SiteInfoProvider());
siteInfoProvider.setClusterUrls(expectedClusterUrl);
final ControllerDTO controllerDTO = new ControllerDTO();
controllerDTO.setInputPorts(Collections.emptySet());
controllerDTO.setOutputPorts(Collections.emptySet());
controllerDTO.setRemoteSiteListeningPort(8081);
controllerDTO.setRemoteSiteHttpListeningPort(8080);
controllerDTO.setSiteToSiteSecure(false);
// SiteInfoProvider uses SiteToSIteRestApiClient to get ControllerDTO.
doAnswer(invocation -> {
final SiteToSiteRestApiClient apiClient = mock(SiteToSiteRestApiClient.class);
when(apiClient.getController(eq(expectedClusterUrl))).thenThrow(new IOException("Connection refused."));
return apiClient;
}).when(siteInfoProvider).createSiteToSiteRestApiClient(any(), any());
try {
siteInfoProvider.getSiteToSitePort();
fail();
} catch (IOException e) {
}
try {
siteInfoProvider.getActiveClusterUrl();
fail();
} catch (IOException e) {
}
}
}

View File

@ -80,6 +80,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -730,10 +731,10 @@ public class TestHttpClient {
final URI uri = server.getURI();
try (
SiteToSiteClient client = getDefaultBuilder()
.url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong")
.portName("input-running")
.build()
SiteToSiteClient client = getDefaultBuilder()
.url("http://" + uri.getHost() + ":" + uri.getPort() + "/wrong")
.portName("input-running")
.build()
) {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
@ -811,6 +812,24 @@ public class TestHttpClient {
}
@Test
public void testSendSuccessMultipleUrls() throws Exception {
final Set<String> urls = new LinkedHashSet<>();
urls.add("http://localhost:9999");
urls.add("http://localhost:" + httpConnector.getLocalPort() + "/nifi");
try (
final SiteToSiteClient client = getDefaultBuilder()
.urls(urls)
.portName("input-running")
.build()
) {
testSend(client);
}
}
@Test
public void testSendSuccessWithProxy() throws Exception {

View File

@ -35,7 +35,9 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
public class TestSiteToSiteClient {
@ -128,10 +130,23 @@ public class TestSiteToSiteClient {
try {
SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class);
Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl());
Assert.assertEquals(clientConfig.getUrls(), clientConfig2.getUrls());
} finally {
input.close();
}
}
@Test
public void testGetUrlBackwardCompatibility() {
final Set<String> urls = new LinkedHashSet<>();
urls.add("http://node1:8080/nifi");
urls.add("http://node2:8080/nifi");
final SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
.urls(urls)
.buildConfig();
Assert.assertEquals("http://node1:8080/nifi", config.getUrl());
Assert.assertEquals(urls, config.getUrls());
}
}

View File

@ -16,119 +16,163 @@
*/
package org.apache.nifi.remote.util;
import org.apache.nifi.events.EventReporter;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import java.util.Iterator;
import java.util.Set;
import static org.apache.nifi.remote.util.SiteToSiteRestApiClient.parseClusterUrls;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestSiteToSiteRestApiClient {
private static void assertSingleUri(final String expected, final Set<String> urls) {
Assert.assertEquals(1, urls.size());
Assert.assertEquals(expected, urls.iterator().next().toString());
}
@Test
public void testResolveBaseUrlHttp() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/nifi");
Assert.assertEquals("http://nifi.example.com/nifi-api", baseUrl);
assertSingleUri("http://nifi.example.com/nifi-api", parseClusterUrls("http://nifi.example.com/nifi"));
}
@Test
public void testResolveBaseUrlHttpSub() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com/foo/bar/baz/nifi");
Assert.assertEquals("http://nifi.example.com/foo/bar/baz/nifi-api", baseUrl);
assertSingleUri("http://nifi.example.com/foo/bar/baz/nifi-api", parseClusterUrls("http://nifi.example.com/foo/bar/baz/nifi"));
}
@Test
public void testResolveBaseUrlHttpPort() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("http://nifi.example.com:8080/nifi");
Assert.assertEquals("http://nifi.example.com:8080/nifi-api", baseUrl);
assertSingleUri("http://nifi.example.com:8080/nifi-api", parseClusterUrls("http://nifi.example.com:8080/nifi"));
}
@Test
public void testResolveBaseUrlHttps() throws Exception{
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com/nifi");
Assert.assertEquals("https://nifi.example.com/nifi-api", baseUrl);
assertSingleUri("https://nifi.example.com/nifi-api", parseClusterUrls("https://nifi.example.com/nifi"));
}
@Test
public void testResolveBaseUrlHttpsPort() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
final String baseUrl = apiClient.resolveBaseUrl("https://nifi.example.com:8443/nifi");
Assert.assertEquals("https://nifi.example.com:8443/nifi-api", baseUrl);
assertSingleUri("https://nifi.example.com:8443/nifi-api", parseClusterUrls("https://nifi.example.com:8443/nifi"));
}
@Test
public void testResolveBaseUrlLeniency() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "http://localhost:8080/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/nifi-api/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080 "));
assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080 "));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi/ "));
assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/nifi/ "));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/nifi-api/"));
expectedUri = "http://localhost/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost/nifi-api"));
expectedUri = "http://localhost:8080/some/path/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" http://localhost:8080/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("http://localhost:8080/some/path/nifi-api/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path"));
assertSingleUri(expectedUri, parseClusterUrls(" http://localhost:8080/some/path"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path "));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi/"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("http://localhost:8080/some/path/nifi-api/"));
}
@Test
public void testResolveBaseUrlLeniencyHttps() {
final SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(null, null, EventReporter.NO_OP);
String expectedUri = "https://localhost:8443/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443 "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/nifi/ "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/nifi-api/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443 "));
assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443 "));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi/ "));
assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/nifi/ "));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/nifi-api/"));
expectedUri = "https://localhost/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost/nifi-api"));
expectedUri = "https://localhost:8443/some/path/nifi-api";
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl(" https://localhost:8443/some/path"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path "));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi/"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api"));
assertEquals(expectedUri, apiClient.resolveBaseUrl("https://localhost:8443/some/path/nifi-api/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path"));
assertSingleUri(expectedUri, parseClusterUrls(" https://localhost:8443/some/path"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path "));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi/"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api"));
assertSingleUri(expectedUri, parseClusterUrls("https://localhost:8443/some/path/nifi-api/"));
}
@Test
public void testGetUrlsEmpty() throws Exception {
try {
parseClusterUrls(null);
fail("Should fail if cluster URL was not specified.");
} catch (IllegalArgumentException e) {
}
try {
parseClusterUrls("");
fail("Should fail if cluster URL was not specified.");
} catch (IllegalArgumentException e) {
}
}
@Test
public void testGetUrlsOne() throws Exception {
final Set<String> urls = parseClusterUrls("http://localhost:8080/nifi");
Assert.assertEquals(1, urls.size());
Assert.assertEquals("http://localhost:8080/nifi-api", urls.iterator().next());
}
@Test
public void testGetUrlsThree() throws Exception {
final Set<String> urls = parseClusterUrls("http://host1:8080/nifi,http://host2:8080/nifi,http://host3:8080/nifi");
Assert.assertEquals(3, urls.size());
final Iterator<String> iterator = urls.iterator();
Assert.assertEquals("http://host1:8080/nifi-api", iterator.next());
Assert.assertEquals("http://host2:8080/nifi-api", iterator.next());
Assert.assertEquals("http://host3:8080/nifi-api", iterator.next());
}
@Test
public void testGetUrlsDifferentProtocols() throws Exception {
try {
parseClusterUrls("http://host1:8080/nifi,https://host2:8080/nifi,http://host3:8080/nifi");
fail("Should fail if cluster URLs contain different protocols.");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Different protocols"));
}
}
@Test
public void testGetUrlsMalformed() throws Exception {
try {
parseClusterUrls("http://host1:8080/nifi,host&2:8080,http://host3:8080/nifi");
fail("Should fail if cluster URLs contain illegal URL.");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("malformed"));
}
}
}

View File

@ -31,6 +31,7 @@ import java.util.Date;
public class RemoteProcessGroupDTO extends ComponentDTO {
private String targetUri;
private String targetUris;
private Boolean targetSecure;
private String name;
@ -74,15 +75,60 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
}
/**
* @return target uri of this remote process group
* @return target uri of this remote process group.
* If target uri is not set, but uris are set, then returns the first url in the urls.
* If neither target uri nor uris are set, then returns null.
*/
@ApiModelProperty(
value = "The target URI of the remote process group."
value = "The target URI of the remote process group." +
" If target uri is not set, but uris are set, then returns the first url in the urls." +
" If neither target uri nor uris are set, then returns null."
)
public String getTargetUri() {
if (targetUri == null || targetUri.length() == 0) {
synchronized (this) {
if (targetUri == null || targetUri.length() == 0) {
if (targetUris != null && targetUris.length() > 0) {
if (targetUris.indexOf(',') > -1) {
targetUri = targetUris.substring(0, targetUris.indexOf(','));
} else {
targetUri = targetUris;
}
}
}
}
}
return this.targetUri;
}
public void setTargetUris(String targetUris) {
this.targetUris = targetUris;
}
/**
* @return target uris of this remote process group
* If targetUris was not set but target uri was set, then returns a collection containing the single uri.
* If neither target uris nor uri were set, then returns null.
*/
@ApiModelProperty(
value = "The target URI of the remote process group." +
" If target uris is not set but target uri is set," +
" then returns a collection containing the single target uri." +
" If neither target uris nor uris are set, then returns null."
)
public String getTargetUris() {
if (targetUris == null || targetUris.length() == 0) {
synchronized (this) {
if (targetUris == null || targetUris.length() == 0) {
targetUris = targetUri;
}
}
}
return this.targetUris;
}
/**
* @param name of this remote process group
*/

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestRemoteProcessGroupDTO {
@Test
public void testGetTargetUriAndUris() {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
assertNull(dto.getTargetUri());
dto.setTargetUris("http://node1:8080/nifi, http://node2:8080/nifi");
assertEquals("If targetUris are set but targetUri is not, it should returns the first uru of the targetUris",
"http://node1:8080/nifi", dto.getTargetUri());
assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris());
dto.setTargetUri("http://node3:9090/nifi");
assertEquals("If both targetUri and targetUris are set, each returns its own values",
"http://node3:9090/nifi", dto.getTargetUri());
assertEquals("http://node1:8080/nifi, http://node2:8080/nifi", dto.getTargetUris());
dto.setTargetUris(null);
assertEquals("http://node3:9090/nifi", dto.getTargetUri());
assertEquals("getTargetUris should return targetUri when it's not set",
"http://node3:9090/nifi", dto.getTargetUris());
}
}

View File

@ -23,7 +23,6 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.net.URI;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -32,7 +31,9 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
String getIdentifier();
URI getTargetUri();
String getTargetUri();
String getTargetUris();
ProcessGroup getProcessGroup();

View File

@ -1226,13 +1226,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* given URI
*
* @param id group id
* @param uri group uri
* @param uris group uris, multiple url can be specified in comma-separated format
* @return new group
* @throws NullPointerException if either argument is null
* @throws IllegalArgumentException if <code>uri</code> is not a valid URI.
*/
public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uri) {
return new StandardRemoteProcessGroup(requireNonNull(id).intern(), requireNonNull(uri).intern(), null, this, sslContext, nifiProperties);
public RemoteProcessGroup createRemoteProcessGroup(final String id, final String uris) {
return new StandardRemoteProcessGroup(requireNonNull(id).intern(), uris, null, this, sslContext, nifiProperties);
}
public ProcessGroup getRootGroup() {
@ -1769,7 +1769,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Instantiate Remote Process Groups
//
for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUri());
final RemoteProcessGroup remoteGroup = createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
remoteGroup.setComments(remoteGroupDTO.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
@ -2608,7 +2608,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final RemoteProcessGroupStatus status = new RemoteProcessGroupStatus();
status.setGroupId(remoteGroup.getProcessGroup().getIdentifier());
status.setName(isRemoteProcessGroupAuthorized ? remoteGroup.getName() : remoteGroup.getIdentifier());
status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri().toString() : null);
status.setTargetUri(isRemoteProcessGroupAuthorized ? remoteGroup.getTargetUri() : null);
long lineageMillis = 0L;
int flowFilesRemoved = 0;

View File

@ -1091,7 +1091,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final List<Element> remoteProcessGroupNodeList = getChildrenByTagName(processGroupElement, "remoteProcessGroup");
for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUri());
final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
remoteGroup.setComments(remoteGroupDto.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition()));
final String name = remoteGroupDto.getName();

View File

@ -252,6 +252,7 @@ public class FlowFromDOMFactory {
dto.setId(getString(element, "id"));
dto.setName(getString(element, "name"));
dto.setTargetUri(getString(element, "url"));
dto.setTargetUris(getString(element, "urls"));
dto.setTransmitting(getBoolean(element, "transmitting"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
dto.setCommunicationsTimeout(getString(element, "timeout"));

View File

@ -246,7 +246,8 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "name", remoteRef.getName());
addPosition(element, remoteRef.getPosition());
addTextElement(element, "comment", remoteRef.getComments());
addTextElement(element, "url", remoteRef.getTargetUri().toString());
addTextElement(element, "url", remoteRef.getTargetUri());
addTextElement(element, "urls", remoteRef.getTargetUris());
addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));

View File

@ -24,8 +24,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -91,10 +89,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final String id;
private final URI targetUri;
private final URI apiUri;
private final String host;
private final String protocol;
private final String targetUris;
private final ProcessScheduler scheduler;
private final EventReporter eventReporter;
private final NiFiProperties nifiProperties;
@ -136,30 +131,18 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
public StandardRemoteProcessGroup(final String id, final String targetUris, final ProcessGroup processGroup,
final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
this.nifiProperties = nifiProperties;
this.id = requireNonNull(id);
this.flowController = requireNonNull(flowController);
final URI uri;
try {
uri = new URI(requireNonNull(targetUri.trim()));
final String apiPath = SiteToSiteRestApiClient.resolveBaseUrl(uri);
apiUri = new URI(apiPath);
} catch (final URISyntaxException e) {
throw new IllegalArgumentException(e);
}
this.host = uri.getHost();
this.protocol = uri.getAuthority();
this.targetUri = uri;
this.targetUris = targetUris;
this.targetId = null;
this.processGroup = new AtomicReference<>(processGroup);
this.sslContext = sslContext;
this.scheduler = flowController.getProcessScheduler();
this.authorizationIssue = "Establishing connection to " + targetUri;
this.authorizationIssue = "Establishing connection to " + targetUris;
final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
@ -176,7 +159,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
};
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUris);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
}
@ -298,14 +281,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return targetId;
}
public String getProtocol() {
return protocol;
}
@Override
public String getName() {
final String name = this.name.get();
return name == null ? targetUri.toString() : name;
return name == null ? getTargetUri() : name;
}
@Override
@ -361,8 +340,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
@Override
public URI getTargetUri() {
return targetUri;
public String getTargetUri() {
return SiteToSiteRestApiClient.getFirstUrl(targetUris);
}
@Override
public String getTargetUris() {
return targetUris;
}
@Override
@ -370,10 +354,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return authorizationIssue;
}
public String getHost() {
return host;
}
public int getInputPortCount() {
readLock.lock();
try {
@ -739,7 +719,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public String toString() {
return "RemoteProcessGroup[" + targetUri + "]";
return "RemoteProcessGroup[" + targetUris + "]";
}
@Override
@ -786,7 +766,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// perform the request
final ControllerDTO dto;
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
dto = apiClient.getController();
dto = apiClient.getController(targetUris);
} catch (IOException e) {
writeLock.lock();
try {
@ -807,7 +787,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
writeLock.unlock();
}
throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + getApiUri() + " due to: " + e.getMessage());
throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + targetUris + " due to: " + e.getMessage());
}
writeLock.lock();
@ -878,16 +858,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter());
apiClient.setBaseUrl(getApiUri());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
return apiClient;
}
protected String getApiUri() {
return apiUri.toString();
}
/**
* Converts a set of ports into a set of remote process group ports.
*
@ -1092,10 +1067,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
private boolean isWebApiSecure() {
return targetUri.toString().toLowerCase().startsWith("https");
}
@Override
public boolean isSiteToSiteEnabled() {
readLock.lock();
@ -1117,7 +1088,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public void run() {
try (final SiteToSiteRestApiClient apiClient = getSiteToSiteRestApiClient()) {
try {
final ControllerDTO dto = apiClient.getController();
final ControllerDTO dto = apiClient.getController(targetUris);
if (dto.getRemoteSiteListeningPort() == null && SiteToSiteTransportProtocol.RAW.equals(transportProtocol)) {
authorizationIssue = "Remote instance is not configured to allow RAW Site-to-Site communications at this time.";
@ -1140,8 +1111,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (e.getResponseCode() == UNAUTHORIZED_STATUS_CODE) {
try {
// attempt to issue a registration request in case the target instance is a 0.x
final RemoteNiFiUtils utils = new RemoteNiFiUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
final boolean isApiSecure = apiClient.getBaseUrl().toLowerCase().startsWith("https");
final RemoteNiFiUtils utils = new RemoteNiFiUtils(isApiSecure ? sslContext : null);
final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiClient.getBaseUrl());
if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily())) {
logger.info("{} Issued a Request to communicate with remote instance", this);
} else {
@ -1169,7 +1141,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} catch (final Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
StandardRemoteProcessGroup.this.getTargetUri().toString(), e));
StandardRemoteProcessGroup.this.getTargetUris(), e));
}
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import static org.junit.Assert.assertEquals;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import org.mockito.Mockito;
public class TestStandardRemoteProcessGroup {
@Test
public void testApiUri() {
final NiFiProperties properties = Mockito.mock(NiFiProperties.class);
final FlowController controller = Mockito.mock(FlowController.class);
final ProcessGroup group = Mockito.mock(ProcessGroup.class);
final String expectedUri = "http://localhost:8080/nifi-api";
StandardRemoteProcessGroup rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/nifi/ ", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", " http://localhost:8080/nifi/ ", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080/", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
rpg = new StandardRemoteProcessGroup("id", "http://localhost:8080 ", group, controller, null, properties);
assertEquals(expectedUri, rpg.getApiUri());
}
}

View File

@ -49,6 +49,7 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.http.HttpProxy;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
@ -143,7 +144,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS);
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url(remoteGroup.getTargetUri().toString())
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
.portIdentifier(getIdentifier())
.sslContext(sslContext)
.useCompression(isUseCompression())
@ -169,7 +170,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
final String url = getRemoteProcessGroup().getTargetUri().toString();
final String url = getRemoteProcessGroup().getTargetUri();
// If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
// we don't want to create a transaction at all.
@ -433,7 +434,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public String toString() {
return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]";
return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]";
}
@Override

View File

@ -39,7 +39,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
@ -108,7 +107,7 @@ public class TestStandardRemoteGroupPort {
doReturn(true).when(remoteGroup).isTransmitting();
doReturn(protocol).when(remoteGroup).getTransportProtocol();
doReturn(new URI(REMOTE_CLUSTER_URL)).when(remoteGroup).getTargetUri();
doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri();
doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
doReturn(eventReporter).when(remoteGroup).getEventReporter();

View File

@ -236,7 +236,7 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
// create the remote process group details
FlowChangeRemoteProcessGroupDetails remoteProcessGroupDetails = new FlowChangeRemoteProcessGroupDetails();
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri().toString());
remoteProcessGroupDetails.setUri(remoteProcessGroup.getTargetUri());
// save the actions if necessary
if (!details.isEmpty()) {

View File

@ -36,6 +36,7 @@ import org.apache.nifi.authorization.TemplateAuthorizable;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
@ -1356,31 +1357,12 @@ public class ProcessGroupResource extends ApplicationResource {
// set the processor id as appropriate
remoteProcessGroupDTO.setId(generateUuid());
// parse the uri
final URI uri;
try {
uri = URI.create(remoteProcessGroupDTO.getTargetUri());
} catch (final IllegalArgumentException e) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri());
}
// parse the uri to check if the uri is valid
final String targetUris = remoteProcessGroupDTO.getTargetUris();
SiteToSiteRestApiClient.parseClusterUrls(targetUris);
// validate each part of the uri
if (uri.getScheme() == null || uri.getHost() == null) {
throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri());
}
if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) {
throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + remoteProcessGroupDTO.getTargetUri());
}
// normalize the uri to the other controller
String controllerUri = uri.toString();
if (controllerUri.endsWith("/")) {
controllerUri = StringUtils.substringBeforeLast(controllerUri, "/");
}
// since the uri is valid, use the normalized version
remoteProcessGroupDTO.setTargetUri(controllerUri);
// since the uri is valid, use it
remoteProcessGroupDTO.setTargetUris(targetUris);
// create the remote process group
final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId());

View File

@ -1533,7 +1533,7 @@ public final class DtoFactory {
dto.setCommunicationsTimeout(group.getCommunicationsTimeout());
dto.setYieldDuration(group.getYieldDuration());
dto.setParentGroupId(group.getProcessGroup().getIdentifier());
dto.setTargetUri(group.getTargetUri().toString());
dto.setTargetUris(group.getTargetUris());
dto.setFlowRefreshed(group.getLastRefreshTime());
dto.setContents(contents);
dto.setTransportProtocol(group.getTransportProtocol().name());
@ -2857,7 +2857,7 @@ public final class DtoFactory {
copy.setActiveRemoteOutputPortCount(original.getActiveRemoteOutputPortCount());
copy.setInactiveRemoteOutputPortCount(original.getInactiveRemoteOutputPortCount());
copy.setParentGroupId(original.getParentGroupId());
copy.setTargetUri(original.getTargetUri());
copy.setTargetUris(original.getTargetUris());
copy.setTransportProtocol(original.getTransportProtocol());
copy.setProxyHost(original.getProxyHost());
copy.setProxyPort(original.getProxyPort());

View File

@ -1805,7 +1805,7 @@ public class ControllerFacade implements Authorizable {
addIfAppropriate(searchStr, group.getIdentifier(), "Id", matches);
addIfAppropriate(searchStr, group.getName(), "Name", matches);
addIfAppropriate(searchStr, group.getComments(), "Comments", matches);
addIfAppropriate(searchStr, group.getTargetUri().toString(), "URL", matches);
addIfAppropriate(searchStr, group.getTargetUris(), "URLs", matches);
// consider the transmission status
if ((StringUtils.containsIgnoreCase("transmitting", searchStr) || StringUtils.containsIgnoreCase("transmission enabled", searchStr)) && group.isTransmitting()) {

View File

@ -75,13 +75,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
throw new IllegalArgumentException("Cannot specify a different Parent Group ID than the Group to which the Remote Process Group is being added.");
}
final String rawTargetUri = remoteProcessGroupDTO.getTargetUri();
if (rawTargetUri == null) {
throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI");
final String targetUris = remoteProcessGroupDTO.getTargetUris();
if (targetUris == null || targetUris.length() == 0) {
throw new IllegalArgumentException("Cannot add a Remote Process Group without specifying the Target URI(s)");
}
// create the remote process group
RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), rawTargetUri);
RemoteProcessGroup remoteProcessGroup = flowController.createRemoteProcessGroup(remoteProcessGroupDTO.getId(), targetUris);
// set other properties
updateRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupDTO);

View File

@ -18,9 +18,11 @@
<div id="new-remote-process-group-dialog" class="hidden large-dialog">
<div class="dialog-content">
<div class="setting">
<div class="setting-name">URL</div>
<div class="setting-name">URLs
<div class="fa fa-question-circle" alt="Info" title="Specify the remote target NiFi URLs. Multiple URLs can be specified in comma-separated format. Different protocols cannot be mixed. If remote NiFi is a cluster, two or more node URLs are recommended for better connection establishment availability."></div>
</div>
<div class="setting-field">
<input id="new-remote-process-group-uri" type="text" placeholder="https://remotehost:8080/nifi"/>
<input id="new-remote-process-group-uris" type="text" placeholder="https://remotehost:8080/nifi"/>
</div>
</div>
<div class="setting">

View File

@ -30,9 +30,9 @@
</div>
</div>
<div class="setting">
<div class="setting-name">URL</div>
<div class="setting-name">URLs</div>
<div class="setting-field">
<span id="remote-process-group-url"></span>
<span id="remote-process-group-urls"></span>
</div>
</div>
<div class="setting">

View File

@ -30,9 +30,9 @@
</div>
</div>
<div class="setting">
<div class="setting-name">URL</div>
<div class="setting-name">URLs</div>
<div class="setting-field">
<span id="read-only-remote-process-group-url"></span>
<span id="read-only-remote-process-group-urls"></span>
</div>
</div>
<div class="setting">

View File

@ -33,9 +33,9 @@
<div class="spacer">&nbsp;</div>
<div class="settings-right">
<div class="setting">
<div class="setting-name">URL</div>
<div class="setting-name">URLs</div>
<div class="setting-field">
<span id="remote-process-group-ports-url"></span>
<span id="remote-process-group-ports-urls"></span>
</div>
</div>
<div class="remote-port-header">

View File

@ -34,7 +34,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
}
}),
'component': {
'targetUri': $('#new-remote-process-group-uri').val(),
'targetUris': $('#new-remote-process-group-uris').val(),
'position': {
'x': pt.x,
'y': pt.y
@ -125,7 +125,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
headerText: 'Add Remote Process Group',
handler: {
close: function () {
$('#new-remote-process-group-uri').val('');
$('#new-remote-process-group-uris').val('');
$('#new-remote-process-group-timeout').val(defaultTimeout);
$('#new-remote-process-group-yield-duration').val(defaultYieldDuration);
$('#new-remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
@ -265,7 +265,7 @@ nf.ng.RemoteProcessGroupComponent = function (serviceProvider) {
this.modal.show();
// set the focus and key handlers
$('#new-remote-process-group-uri').focus().off('keyup').on('keyup', function (e) {
$('#new-remote-process-group-uris').focus().off('keyup').on('keyup', function (e) {
var code = e.keyCode ? e.keyCode : e.which;
if (code === $.ui.keyCode.ENTER) {
addRemoteProcessGroup();

View File

@ -106,7 +106,7 @@ nf.RemoteProcessGroupConfiguration = (function () {
// clear the remote process group details
$('#remote-process-group-id').text('');
$('#remote-process-group-name').text('');
$('#remote-process-group-url').text('');
$('#remote-process-group-urls').text('');
$('#remote-process-group-timeout').val('');
$('#remote-process-group-yield-duration').val('');
$('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
@ -144,7 +144,7 @@ nf.RemoteProcessGroupConfiguration = (function () {
// populate the port settings
$('#remote-process-group-id').text(selectionData.id);
$('#remote-process-group-name').text(selectionData.component.name);
$('#remote-process-group-url').text(selectionData.component.targetUri);
$('#remote-process-group-urls').text(selectionData.component.targetUris);
// populate the text fields
$('#remote-process-group-timeout').val(selectionData.component.communicationsTimeout);

View File

@ -41,7 +41,7 @@ nf.RemoteProcessGroupDetails = (function () {
// clear the remote process group details
nf.Common.clearField('read-only-remote-process-group-id');
nf.Common.clearField('read-only-remote-process-group-name');
nf.Common.clearField('read-only-remote-process-group-url');
nf.Common.clearField('read-only-remote-process-group-urls');
nf.Common.clearField('read-only-remote-process-group-timeout');
nf.Common.clearField('read-only-remote-process-group-yield-duration');
nf.Common.clearField('read-only-remote-process-group-transport-protocol');
@ -67,7 +67,7 @@ nf.RemoteProcessGroupDetails = (function () {
// populate the port settings
nf.Common.populateField('read-only-remote-process-group-id', selectionData.id);
nf.Common.populateField('read-only-remote-process-group-name', selectionData.component.name);
nf.Common.populateField('read-only-remote-process-group-url', selectionData.component.targetUri);
nf.Common.populateField('read-only-remote-process-group-urls', selectionData.component.targetUris);
nf.Common.populateField('read-only-remote-process-group-timeout', selectionData.component.communicationsTimeout);
nf.Common.populateField('read-only-remote-process-group-yield-duration', selectionData.component.yieldDuration);
nf.Common.populateField('read-only-remote-process-group-transport-protocol', selectionData.component.transportProtocol);

View File

@ -179,7 +179,7 @@ nf.RemoteProcessGroupPorts = (function () {
// clear the remote process group details
$('#remote-process-group-ports-id').text('');
$('#remote-process-group-ports-name').text('');
$('#remote-process-group-ports-url').text('');
$('#remote-process-group-ports-urls').text('');
// clear any tooltips
var dialog = $('#remote-process-group-ports');
@ -484,7 +484,7 @@ nf.RemoteProcessGroupPorts = (function () {
// populate the port settings
$('#remote-process-group-ports-id').text(remoteProcessGroup.id);
$('#remote-process-group-ports-name').text(remoteProcessGroup.name);
$('#remote-process-group-ports-url').text(remoteProcessGroup.targetUri);
$('#remote-process-group-ports-urls').text(remoteProcessGroup.targetUris);
// get the contents
var remoteProcessGroupContents = remoteProcessGroup.contents;

View File

@ -499,7 +499,7 @@ nf.RemoteProcessGroup = (function () {
remoteProcessGroupUri.text(null).selectAll('title').remove();
// apply ellipsis to the remote process group name as necessary
nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUri);
nf.CanvasUtils.ellipsis(remoteProcessGroupUri, d.component.targetUris);
}).append('title').text(function (d) {
return d.component.name;
});