NIFI-3541: Add local network interface capability to site-to-site client and remote group and ports

This commit is contained in:
Mark Payne 2017-03-01 13:30:04 -05:00 committed by Aldrin Piri
parent 000414e7ea
commit 9e68f02f1f
10 changed files with 303 additions and 126 deletions

View File

@ -39,6 +39,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.LinkedHashSet;
@ -168,6 +169,7 @@ public interface SiteToSiteClient extends Closeable {
private int batchCount;
private long batchSize;
private long batchNanos;
private InetAddress localAddress;
private SiteToSiteTransportProtocol transportProtocol = SiteToSiteTransportProtocol.RAW;
private HttpProxy httpProxy;
@ -198,6 +200,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchCount = config.getPreferredBatchCount();
this.batchSize = config.getPreferredBatchSize();
this.batchNanos = config.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
this.localAddress = config.getLocalAddress();
this.httpProxy = config.getHttpProxy();
return this;
@ -223,12 +226,31 @@ public interface SiteToSiteClient extends Closeable {
}
/**
* <p>Specifies the URLs of the remote NiFi instance.</p>
* <p>If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.</p>
* <p>
* Specifies the local address to use when communicating with the remote NiFi instance.
* </p>
*
* <p>Multiple urls provide better connectivity with a NiFi cluster, able to connect
* to the target cluster at long as one of the specified urls is accessible.</p>
* @param localAddress the local address to use, or <code>null</code> to use <code>anyLocal</code> address.
* @return the builder
*/
public Builder localAddress(final InetAddress localAddress) {
this.localAddress = localAddress;
return this;
}
/**
* <p>
* Specifies the URLs of the remote NiFi instance.
* </p>
* <p>
* If this URL points to a NiFi node in a NiFi cluster, data transfer to and from
* nodes will be automatically load balanced across the different nodes.
* </p>
*
* <p>
* Multiple urls provide better connectivity with a NiFi cluster, able to connect
* to the target cluster at long as one of the specified urls is accessible.
* </p>
*
* @param urls urls of remote instance
* @return the builder
@ -717,6 +739,7 @@ public interface SiteToSiteClient extends Closeable {
private final long batchSize;
private final long batchNanos;
private final HttpProxy httpProxy;
private final InetAddress localAddress;
// some serialization frameworks require a default constructor
private StandardSiteToSiteClientConfig() {
@ -740,6 +763,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchNanos = 0;
this.transportProtocol = null;
this.httpProxy = null;
this.localAddress = null;
}
private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) {
@ -766,6 +790,7 @@ public interface SiteToSiteClient extends Closeable {
this.batchNanos = builder.batchNanos;
this.transportProtocol = builder.getTransportProtocol();
this.httpProxy = builder.getHttpProxy();
this.localAddress = builder.localAddress;
}
@Override
@ -931,5 +956,10 @@ public interface SiteToSiteClient extends Closeable {
public HttpProxy getHttpProxy() {
return httpProxy;
}
@Override
public InetAddress getLocalAddress() {
return localAddress;
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote.client;
import java.io.File;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -36,6 +37,7 @@ public interface SiteToSiteClientConfig extends Serializable {
* for backward compatibility for implementations that does not expect multiple URLs.
* {@link #getUrls()} should be used instead then should support multiple URLs when making requests.
*/
@Deprecated
String getUrl();
/**
@ -171,4 +173,9 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
HttpProxy getHttpProxy();
/**
* @return the InetAddress to bind to for the local address when creating a socket, or
* {@code null} to bind to the {@code anyLocal} address.
*/
InetAddress getLocalAddress();
}

View File

@ -16,6 +16,55 @@
*/
package org.apache.nifi.remote.util;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
@ -87,53 +136,6 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
public class SiteToSiteRestApiClient implements Closeable {
private static final String EVENT_CATEGORY = "Site-to-Site";
@ -160,6 +162,7 @@ public class SiteToSiteRestApiClient implements Closeable {
private CloseableHttpAsyncClient httpAsyncClient;
private boolean compress = false;
private InetAddress localAddress = null;
private long requestExpirationMillis = 0;
private int serverTransactionTtl = 0;
private int batchCount = 0;
@ -239,6 +242,10 @@ public class SiteToSiteRestApiClient implements Closeable {
.setConnectTimeout(connectTimeoutMillis)
.setSocketTimeout(readTimeoutMillis);
if (localAddress != null) {
requestConfigBuilder.setLocalAddress(localAddress);
}
if (proxy != null) {
requestConfigBuilder.setProxy(proxy.getHttpHost());
}
@ -916,6 +923,8 @@ public class SiteToSiteRestApiClient implements Closeable {
extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator;
extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis;
extendingApiClient.readTimeoutMillis = this.readTimeoutMillis;
extendingApiClient.localAddress = this.localAddress;
final int extendFrequency = serverTransactionTtl / 2;
ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> {
@ -1197,10 +1206,12 @@ public class SiteToSiteRestApiClient implements Closeable {
public void setConnectTimeoutMillis(final int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
setupRequestConfig();
}
public void setReadTimeoutMillis(final int readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis;
setupRequestConfig();
}
public static String getFirstUrl(final String clusterUrlStr) {
@ -1337,6 +1348,10 @@ public class SiteToSiteRestApiClient implements Closeable {
this.compress = compress;
}
public void setLocalAddress(final InetAddress localAddress) {
this.localAddress = localAddress;
}
public void setRequestExpirationMillis(final long requestExpirationMillis) {
if (requestExpirationMillis < 0) {
throw new IllegalArgumentException("requestExpirationMillis can't be a negative value.");

View File

@ -16,6 +16,22 @@
*/
package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.nifi.authorization.Resource;
@ -36,22 +52,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public abstract class AbstractPort implements Port {
public static final Relationship PORT_RELATIONSHIP = new Relationship.Builder()

View File

@ -17,18 +17,22 @@
package org.apache.nifi.groups;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Positionable;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {
@Override
String getIdentifier();
String getTargetUri();
@ -154,6 +158,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
*/
String getAuthorizationIssue();
/**
* Validates the current configuration, returning ValidationResults for any
* invalid configuration parameter.
*
* @return Collection of validation result objects for any invalid findings
* only. If the collection is empty then the component is valid. Guaranteed
* non-null
*/
Collection<ValidationResult> validate();
/**
* @return the {@link EventReporter} that can be used to report any notable
* events
@ -180,6 +194,16 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
void setProxyPassword(String proxyPassword);
void setNetworkInterface(String interfaceName);
String getNetworkInterface();
/**
* Returns the InetAddress that the will this instance will bind to when communicating with a
* remote NiFi instance, or <code>null</code> if no specific address has been specified
*/
InetAddress getLocalAddress();
/**
* Initiates a task in the remote process group to re-initialize, as a
* result of clustering changes

View File

@ -33,6 +33,7 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo
public abstract TransferDirection getTransferDirection();
@Override
public abstract boolean isUseCompression();
public abstract void setUseCompression(boolean useCompression);

View File

@ -18,14 +18,15 @@ package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -40,12 +41,15 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
@ -74,6 +78,11 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.ClientResponse.Status;
import com.sun.jersey.api.client.UniformInterfaceException;
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as {@link IncomingPort}s and
@ -99,7 +108,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final AtomicReference<String> comments = new AtomicReference<>();
private final AtomicReference<ProcessGroup> processGroup;
private final AtomicBoolean transmitting = new AtomicBoolean(false);
private final FlowController flowController;
private final SSLContext sslContext;
private volatile String communicationsTimeout = "30 sec";
@ -111,6 +119,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private volatile String proxyUser;
private volatile String proxyPassword;
private String networkInterfaceName;
private InetAddress localAddress;
private ValidationResult nicValidationResult;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
@ -135,7 +148,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final FlowController flowController, final SSLContext sslContext, final NiFiProperties nifiProperties) {
this.nifiProperties = nifiProperties;
this.id = requireNonNull(id);
this.flowController = requireNonNull(flowController);
this.targetUris = targetUris;
this.targetId = null;
@ -354,6 +366,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return authorizationIssue;
}
@Override
public Collection<ValidationResult> validate() {
return (nicValidationResult == null) ? Collections.emptyList() : Collections.singletonList(nicValidationResult);
}
public int getInputPortCount() {
readLock.lock();
try {
@ -606,7 +623,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(),
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
outputPorts.put(descriptor.getId(), port);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@ -672,7 +689,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), this,
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount());
@ -741,15 +758,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
private ProcessGroup getRootGroup() {
return getRootGroup(getProcessGroup());
}
private ProcessGroup getRootGroup(final ProcessGroup context) {
final ProcessGroup parent = context.getParent();
return parent == null ? context : getRootGroup(parent);
}
@Override
public Date getLastRefreshTime() {
readLock.lock();
@ -856,10 +864,75 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
@Override
public String getNetworkInterface() {
readLock.lock();
try {
return networkInterfaceName;
} finally {
readLock.unlock();
}
}
@Override
public void setNetworkInterface(final String interfaceName) {
writeLock.lock();
try {
this.networkInterfaceName = interfaceName;
try {
final Enumeration<InetAddress> inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses();
if (inetAddresses.hasMoreElements()) {
this.localAddress = inetAddresses.nextElement();
this.nicValidationResult = null;
} else {
this.localAddress = null;
this.nicValidationResult = new ValidationResult.Builder()
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("No IP Address could be found that is bound to the interface with name " + interfaceName)
.build();
}
} catch (final Exception e) {
this.localAddress = null;
this.nicValidationResult = new ValidationResult.Builder()
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("Could not obtain Network Interface with name " + interfaceName)
.build();
}
} finally {
writeLock.unlock();
}
}
@Override
public InetAddress getLocalAddress() {
readLock.lock();
try {
if (nicValidationResult != null && !nicValidationResult.isValid()) {
return null;
}
return localAddress;
} finally {
readLock.unlock();
}
}
private SiteToSiteRestApiClient getSiteToSiteRestApiClient() {
SiteToSiteRestApiClient apiClient = new SiteToSiteRestApiClient(sslContext, new HttpProxy(proxyHost, proxyPort, proxyUser, proxyPassword), getEventReporter());
apiClient.setConnectTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
apiClient.setReadTimeoutMillis(getCommunicationsTimeout(TimeUnit.MILLISECONDS));
final InetAddress localAddress = getLocalAddress();
if (localAddress != null) {
apiClient.setLocalAddress(localAddress);
}
return apiClient;
}
@ -886,17 +959,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return remotePorts;
}
private RemoteProcessGroupPortDescriptor convertPortToRemotePortDescriptor(final Port port) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
descriptor.setComments(port.getComments());
descriptor.setExists(true);
descriptor.setGroupId(port.getProcessGroup().getIdentifier());
descriptor.setId(port.getIdentifier());
descriptor.setName(port.getName());
descriptor.setTargetRunning(port.isRunning());
return descriptor;
}
@Override
public boolean isTransmitting() {
return transmitting.get();
@ -1216,6 +1278,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (port.hasIncomingConnection() && !port.getTargetExists()) {
throw new IllegalStateException(this.getIdentifier() + " has a Connection to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
}
port.verifyCanStart();
}
for (final StandardRemoteGroupPort port : outputPorts.values()) {
@ -1226,6 +1290,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (!port.getConnections().isEmpty() && !port.getTargetExists()) {
throw new IllegalStateException(this.getIdentifier() + " has a Connection to Port " + port.getIdentifier() + ", but that Port no longer exists on the remote system");
}
port.verifyCanStart();
}
} finally {
readLock.unlock();

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -93,7 +94,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler,
final NiFiProperties nifiProperties) {
final NiFiProperties nifiProperties) {
// remote group port id needs to be unique but cannot just be the id of the port
// in the remote group instance. this supports referencing the same remote
// instance more than once.
@ -167,6 +168,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol())
.httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword()))
.localAddress(remoteGroup.getLocalAddress())
.build();
clientRef.set(client);
}
@ -407,8 +409,19 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public boolean isValid() {
return targetExists.get()
&& (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true);
if (!targetExists.get()) {
return false;
}
if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
// if it's an output port, ensure that there is an outbound connection
return false;
}
final boolean groupValid = remoteGroup.validate().stream()
.allMatch(result -> result.isValid());
return groupValid;
}
@Override
@ -444,6 +457,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) {
throw new IllegalStateException("Port " + getName() + " has no incoming connections");
}
final Optional<ValidationResult> resultOption = remoteGroup.validate().stream()
.filter(result -> !result.isValid())
.findFirst();
if (resultOption.isPresent()) {
throw new IllegalStateException("Remote Process Group is not valid: " + resultOption.get().toString());
}
}
@Override

View File

@ -110,6 +110,7 @@ public class TestStandardRemoteGroupPort {
connectableType = null;
break;
}
port = spy(new StandardRemoteGroupPort(ID, NAME,
processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null)));

View File

@ -16,6 +16,32 @@
*/
package org.apache.nifi.web.api.dto;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.component.details.ComponentDetails;
import org.apache.nifi.action.component.details.ExtensionDetails;
@ -157,32 +183,6 @@ import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@ -1550,7 +1550,19 @@ public final class DtoFactory {
}
if (group.getAuthorizationIssue() != null) {
dto.setAuthorizationIssues(Arrays.asList(group.getAuthorizationIssue()));
final List<String> authIssues = new ArrayList<>();
final String authIssue = group.getAuthorizationIssue();
if (authIssue != null) {
authIssues.add(authIssue);
}
final Collection<ValidationResult> validationResults = group.validate();
validationResults.stream()
.filter(result -> !result.isValid())
.map(result -> result.toString())
.forEach(str -> authIssues.add(str));
dto.setAuthorizationIssues(authIssues);
}
dto.setActiveRemoteInputPortCount(activeRemoteInputPortCount);