mirror of https://github.com/apache/nifi.git
NIFI-3541: - Allowing the user to specify the network interface to send/receive data for a Remote Process Group.
This closes #1550. Signed-off-by: Mark Payne <markap14@hotmail.com> Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
9e68f02f1f
commit
16bde02ed0
|
@ -18,6 +18,7 @@ package org.apache.nifi.remote.io.socket.ssl;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
|
@ -68,9 +69,13 @@ public class SSLSocketChannel implements Closeable {
|
|||
private boolean closed = false;
|
||||
private volatile boolean interrupted = false;
|
||||
|
||||
public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
|
||||
public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final InetAddress localAddress, final boolean client) throws IOException {
|
||||
this.socketAddress = new InetSocketAddress(hostname, port);
|
||||
this.channel = SocketChannel.open();
|
||||
if (localAddress != null) {
|
||||
final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0);
|
||||
this.channel.bind(localSocketAddress);
|
||||
}
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
this.engine = sslContext.createSSLEngine();
|
||||
|
|
|
@ -23,7 +23,9 @@ import java.io.DataInputStream;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.security.cert.CertificateException;
|
||||
|
@ -87,9 +89,11 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
|
||||
private final SiteInfoProvider siteInfoProvider;
|
||||
private final PeerSelector peerSelector;
|
||||
private final InetAddress localAddress;
|
||||
|
||||
public EndpointConnectionPool(final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis,
|
||||
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider) {
|
||||
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile, final SiteInfoProvider siteInfoProvider,
|
||||
final InetAddress localAddress) {
|
||||
Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
|
||||
|
||||
this.remoteDestination = remoteDestination;
|
||||
|
@ -97,6 +101,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
this.eventReporter = eventReporter;
|
||||
this.commsTimeout = commsTimeoutMillis;
|
||||
this.idleExpirationMillis = idleExpirationMillis;
|
||||
this.localAddress = localAddress;
|
||||
|
||||
this.siteInfoProvider = siteInfoProvider;
|
||||
|
||||
|
@ -440,7 +445,7 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
+ " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
|
||||
}
|
||||
|
||||
final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
|
||||
final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, localAddress, true);
|
||||
socketChannel.connect();
|
||||
|
||||
commsSession = new SSLSocketChannelCommunicationsSession(socketChannel);
|
||||
|
@ -452,6 +457,11 @@ public class EndpointConnectionPool implements PeerStatusProvider {
|
|||
}
|
||||
} else {
|
||||
final SocketChannel socketChannel = SocketChannel.open();
|
||||
if (localAddress != null) {
|
||||
final SocketAddress localSocketAddress = new InetSocketAddress(localAddress, 0);
|
||||
socketChannel.socket().bind(localSocketAddress);
|
||||
}
|
||||
|
||||
socketChannel.socket().connect(new InetSocketAddress(hostname, port), commsTimeout);
|
||||
socketChannel.socket().setSoTimeout(commsTimeout);
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ public class SocketClient extends AbstractSiteToSiteClient {
|
|||
commsTimeout,
|
||||
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
|
||||
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile(),
|
||||
siteInfoProvider
|
||||
siteInfoProvider, config.getLocalAddress()
|
||||
);
|
||||
|
||||
this.compress = config.isUseCompression();
|
||||
|
|
BIN
nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png
Executable file → Normal file
BIN
nifi-docs/src/main/asciidoc/images/configure-remote-process-group.png
Executable file → Normal file
Binary file not shown.
Before Width: | Height: | Size: 48 KiB After Width: | Height: | Size: 51 KiB |
|
@ -367,9 +367,9 @@ image:iconRemoteProcessGroup.png["Remote Process Group", width=32]
|
|||
*Remote Process Group*: Remote Process Groups appear and behave similar to Process Groups. However, the Remote Process Group (RPG)
|
||||
references a remote instance of NiFi. When an RPG is dragged onto the canvas, rather than being prompted for a name, the DFM
|
||||
is prompted for the URL of the remote NiFi instance. If the remote NiFi is a clustered instance, the URL that should be used
|
||||
is the URL of the remote instance's NiFi Cluster Manager (NCM). When data is transferred to a clustered instance of NiFi
|
||||
via an RPG, the RPG will first connect to the remote instance's NCM to determine which nodes are in the cluster and
|
||||
how busy each node is. This information is then used to load balance the data that is pushed to each node. The remote NCM is
|
||||
is the URL of any NiFi instance in that cluster. When data is transferred to a clustered instance of NiFi
|
||||
via an RPG, the RPG will first connect to the remote instance whose URL is configured to determine which nodes are in the cluster and
|
||||
how busy each node is. This information is then used to load balance the data that is pushed to each node. The remote instances are
|
||||
then interrogated periodically to determine information about any nodes that are dropped from or added to the cluster and to
|
||||
recalculate the load balancing based on each node's load. For more information, see the section on <<site-to-site,Site-to-Site>>.
|
||||
|
||||
|
@ -995,7 +995,13 @@ link:administration-guide.html[System Administrator’s Guide].
|
|||
|
||||
image:configure-remote-process-group.png["Configure Remote Process Group"]
|
||||
|
||||
By default, it is set to _RAW_ which uses raw socket communication using a dedicated port. _HTTP_ transport protocol is especially useful if the remote NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server. For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported.
|
||||
By default, it is set to _RAW_ which uses raw socket communication using a dedicated port. _HTTP_ transport protocol is especially useful if the remote
|
||||
NiFi instance is in a restricted network that only allow access through HTTP(S) protocol or only accessible from a specific HTTP Proxy server.
|
||||
For accessing through a HTTP Proxy Server, BASIC and DIGEST authentication are supported.
|
||||
|
||||
*Local Network Interface*: In some cases, it may be desirable to prefer one network interface over another. For example, if a wired interface and a wireless
|
||||
interface both exist, the wired interface may be preferred. This can be configured by specifying the name of the network interface to use in this box. If the
|
||||
value entered is not valid, the Remote Process Group will not be valid and will not communicate with other NiFi instances until this is resolved.
|
||||
|
||||
==== Configure Site-to-Site server NiFi instance
|
||||
|
||||
|
|
|
@ -39,12 +39,14 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
|
|||
private String communicationsTimeout;
|
||||
private String yieldDuration;
|
||||
private String transportProtocol;
|
||||
private String localNetworkInterface;
|
||||
private String proxyHost;
|
||||
private Integer proxyPort;
|
||||
private String proxyUser;
|
||||
private String proxyPassword;
|
||||
|
||||
private Collection<String> authorizationIssues;
|
||||
private Collection<String> validationErrors;
|
||||
private Boolean transmitting;
|
||||
|
||||
private Integer inputPortCount;
|
||||
|
@ -349,6 +351,25 @@ public class RemoteProcessGroupDTO extends ComponentDTO {
|
|||
this.transportProtocol = transportProtocol;
|
||||
}
|
||||
|
||||
@ApiModelProperty("The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier.")
|
||||
public String getLocalNetworkInterface() {
|
||||
return localNetworkInterface;
|
||||
}
|
||||
|
||||
public void setLocalNetworkInterface(String localNetworkInterface) {
|
||||
this.localNetworkInterface = localNetworkInterface;
|
||||
}
|
||||
|
||||
@ApiModelProperty(
|
||||
"The validation errors for the remote process group. These validation errors represent the problems with the remote process group that must be resolved before it can transmit."
|
||||
)
|
||||
public Collection<String> getValidationErrors() {
|
||||
return validationErrors;
|
||||
}
|
||||
|
||||
public void setValidationErrors(Collection<String> validationErrors) {
|
||||
this.validationErrors = validationErrors;
|
||||
}
|
||||
|
||||
public String getProxyHost() {
|
||||
return proxyHost;
|
||||
|
|
|
@ -33,7 +33,6 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
|
|||
public void merge(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) {
|
||||
ComponentEntityMerger.super.merge(clientEntity, entityMap);
|
||||
for (Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : entityMap.entrySet()) {
|
||||
final NodeIdentifier nodeId = entry.getKey();
|
||||
final RemoteProcessGroupEntity entityStatus = entry.getValue();
|
||||
if (entityStatus != clientEntity) {
|
||||
mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
|
||||
|
@ -47,6 +46,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
|
|||
* @param clientEntity the entity being returned to the client
|
||||
* @param entityMap all node responses
|
||||
*/
|
||||
@Override
|
||||
public void mergeComponents(final RemoteProcessGroupEntity clientEntity, final Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) {
|
||||
final RemoteProcessGroupDTO clientDto = clientEntity.getComponent();
|
||||
final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new HashMap<>();
|
||||
|
@ -75,6 +75,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
|
|||
final RemoteProcessGroupContentsDTO remoteProcessGroupContents = clientDto.getContents();
|
||||
|
||||
final Map<String, Set<NodeIdentifier>> authorizationErrorMap = new HashMap<>();
|
||||
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
|
||||
Boolean mergedIsTargetSecure = null;
|
||||
final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>();
|
||||
final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>();
|
||||
|
@ -88,6 +89,7 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
|
|||
|
||||
// merge the authorization errors
|
||||
ErrorMerger.mergeErrors(authorizationErrorMap, nodeId, nodeRemoteProcessGroup.getAuthorizationIssues());
|
||||
ErrorMerger.mergeErrors(validationErrorMap, nodeId, nodeRemoteProcessGroup.getValidationErrors());
|
||||
|
||||
// use the first target secure flag since they will all be the same
|
||||
final Boolean nodeIsTargetSecure = nodeRemoteProcessGroup.isTargetSecure();
|
||||
|
@ -124,5 +126,6 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
|
|||
|
||||
// set the merged the validation errors
|
||||
clientDto.setAuthorizationIssues(ErrorMerger.normalizedMergedErrors(authorizationErrorMap, dtoMap.size()));
|
||||
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,8 +145,8 @@ public interface ServerProtocol extends VersionedRemoteResource {
|
|||
Peer peer,
|
||||
Optional<ClusterNodeInformation> clusterNodeInfo,
|
||||
String remoteInputHost,
|
||||
int remoteInputPort,
|
||||
int remoteInputHttpPort,
|
||||
Integer remoteInputPort,
|
||||
Integer remoteInputHttpPort,
|
||||
boolean isSiteToSiteSecure) throws IOException;
|
||||
|
||||
void shutdown(Peer peer);
|
||||
|
|
|
@ -45,6 +45,7 @@ import javax.xml.validation.Schema;
|
|||
import javax.xml.validation.SchemaFactory;
|
||||
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
|
||||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.cluster.protocol.DataFlow;
|
||||
|
@ -1126,6 +1127,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
remoteGroup.setProxyPassword(remoteGroupDto.getProxyPassword());
|
||||
}
|
||||
|
||||
if (StringUtils.isBlank(remoteGroupDto.getLocalNetworkInterface())) {
|
||||
remoteGroup.setNetworkInterface(null);
|
||||
} else {
|
||||
remoteGroup.setNetworkInterface(remoteGroupDto.getLocalNetworkInterface());
|
||||
}
|
||||
|
||||
final Set<RemoteProcessGroupPortDescriptor> inputPorts = new HashSet<>();
|
||||
for (final Element portElement : getChildrenByTagName(remoteProcessGroupElement, "inputPort")) {
|
||||
inputPorts.add(FlowFromDOMFactory.getRemoteProcessGroupPort(portElement));
|
||||
|
|
|
@ -262,6 +262,7 @@ public class FlowFromDOMFactory {
|
|||
dto.setProxyHost(getString(element, "proxyHost"));
|
||||
dto.setProxyPort(getOptionalInt(element, "proxyPort"));
|
||||
dto.setProxyUser(getString(element, "proxyUser"));
|
||||
dto.setLocalNetworkInterface(getString(element, "networkInterface"));
|
||||
|
||||
final String rawPassword = getString(element, "proxyPassword");
|
||||
final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor);
|
||||
|
|
|
@ -69,7 +69,7 @@ import org.w3c.dom.Node;
|
|||
*/
|
||||
public class StandardFlowSerializer implements FlowSerializer {
|
||||
|
||||
private static final String MAX_ENCODING_VERSION = "1.0";
|
||||
private static final String MAX_ENCODING_VERSION = "1.1";
|
||||
|
||||
private final StringEncryptor encryptor;
|
||||
|
||||
|
@ -261,6 +261,9 @@ public class StandardFlowSerializer implements FlowSerializer {
|
|||
final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX;
|
||||
addTextElement(element, "proxyPassword", value);
|
||||
}
|
||||
if (remoteRef.getNetworkInterface() != null) {
|
||||
addTextElement(element, "networkInterface", remoteRef.getNetworkInterface());
|
||||
}
|
||||
|
||||
for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
|
||||
if (port.hasIncomingConnection()) {
|
||||
|
|
|
@ -679,6 +679,7 @@ public class FingerprintFactory {
|
|||
private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException {
|
||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "id"));
|
||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "url"));
|
||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "networkInterface"));
|
||||
|
||||
final NodeList inputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "inputPort");
|
||||
final NodeList outputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "outputPort");
|
||||
|
|
|
@ -879,7 +879,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
|||
writeLock.lock();
|
||||
try {
|
||||
this.networkInterfaceName = interfaceName;
|
||||
|
||||
if (interfaceName == null) {
|
||||
this.nicValidationResult = null;
|
||||
} else {
|
||||
try {
|
||||
final Enumeration<InetAddress> inetAddresses = NetworkInterface.getByName(interfaceName).getInetAddresses();
|
||||
|
||||
|
@ -904,6 +906,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
|
|||
.explanation("Could not obtain Network Interface with name " + interfaceName)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -205,6 +205,7 @@
|
|||
<xs:element name="proxyPort" type="xs:int" minOccurs="0" maxOccurs="1" />
|
||||
<xs:element name="proxyUser" type="xs:string" minOccurs="0" maxOccurs="1" />
|
||||
<xs:element name="proxyPassword" type="xs:string" minOccurs="0" maxOccurs="1" />
|
||||
<xs:element name="networkInterface" type="xs:string" minOccurs="0" maxOccurs="1" />
|
||||
|
||||
<!-- The input ports and output ports of the remote group may change without our knowledge; however,
|
||||
they are persisted here because on a restart of NiFi, we need to have the Input & Output Ports' IDs
|
||||
|
|
|
@ -228,7 +228,7 @@ public class StandardHttpFlowFileServerProtocol extends AbstractFlowFileServerPr
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo, String remoteInputHost, int remoteInputPort, int remoteInputHttpPort,
|
||||
public void sendPeerList(Peer peer, Optional<ClusterNodeInformation> clusterNodeInfo, String remoteInputHost, Integer remoteInputPort, Integer remoteInputHttpPort,
|
||||
boolean isSiteToSiteSecure) throws IOException {
|
||||
}
|
||||
|
||||
|
|
|
@ -156,8 +156,8 @@ public class SocketFlowFileServerProtocol extends AbstractFlowFileServerProtocol
|
|||
final Peer peer,
|
||||
final Optional<ClusterNodeInformation> clusterNodeInfo,
|
||||
final String remoteInputHost,
|
||||
final int remoteInputPort,
|
||||
final int remoteInputHttpPort,
|
||||
final Integer remoteInputPort,
|
||||
final Integer remoteInputHttpPort,
|
||||
final boolean isSiteToSiteSecure) throws IOException {
|
||||
if (!handshakeCompleted) {
|
||||
throw new IllegalStateException("Handshake has not been completed");
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.nifi.web.api.dto;
|
|||
|
||||
import java.text.Collator;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -1550,21 +1551,21 @@ public final class DtoFactory {
|
|||
}
|
||||
|
||||
if (group.getAuthorizationIssue() != null) {
|
||||
final List<String> authIssues = new ArrayList<>();
|
||||
final String authIssue = group.getAuthorizationIssue();
|
||||
if (authIssue != null) {
|
||||
authIssues.add(authIssue);
|
||||
dto.setAuthorizationIssues(Arrays.asList(group.getAuthorizationIssue()));
|
||||
}
|
||||
|
||||
final Collection<ValidationResult> validationResults = group.validate();
|
||||
validationResults.stream()
|
||||
.filter(result -> !result.isValid())
|
||||
.map(result -> result.toString())
|
||||
.forEach(str -> authIssues.add(str));
|
||||
|
||||
dto.setAuthorizationIssues(authIssues);
|
||||
final Collection<ValidationResult> validationErrors = group.validate();
|
||||
if (validationErrors != null && !validationErrors.isEmpty()) {
|
||||
final List<String> errors = new ArrayList<>();
|
||||
for (final ValidationResult validationResult : validationErrors) {
|
||||
errors.add(validationResult.toString());
|
||||
}
|
||||
|
||||
dto.setValidationErrors(errors);
|
||||
}
|
||||
|
||||
dto.setLocalNetworkInterface(group.getNetworkInterface());
|
||||
|
||||
dto.setActiveRemoteInputPortCount(activeRemoteInputPortCount);
|
||||
dto.setInactiveRemoteInputPortCount(inactiveRemoteInputPortCount);
|
||||
dto.setActiveRemoteOutputPortCount(activeRemoteOutputPortCount);
|
||||
|
|
|
@ -16,6 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.web.dao.impl;
|
||||
|
||||
import static org.apache.nifi.util.StringUtils.isEmpty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.connectable.Position;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.exception.ValidationException;
|
||||
|
@ -32,13 +40,6 @@ import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
|
||||
import static org.apache.nifi.util.StringUtils.isEmpty;
|
||||
|
||||
public class StandardRemoteProcessGroupDAO extends ComponentDAO implements RemoteProcessGroupDAO {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardRemoteProcessGroupDAO.class);
|
||||
|
@ -144,6 +145,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
|
|||
|
||||
// if any remote group properties are changing, verify update
|
||||
if (isAnyNotNull(remoteProcessGroupDto.getYieldDuration(),
|
||||
remoteProcessGroupDto.getLocalNetworkInterface(),
|
||||
remoteProcessGroupDto.getCommunicationsTimeout(),
|
||||
remoteProcessGroupDto.getProxyHost(),
|
||||
remoteProcessGroupDto.getProxyPort(),
|
||||
|
@ -359,6 +361,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
|
|||
final String proxyPassword = remoteProcessGroupDTO.getProxyPassword();
|
||||
|
||||
final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol();
|
||||
final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface();
|
||||
|
||||
if (isNotNull(name)) {
|
||||
remoteProcessGroup.setName(name);
|
||||
|
@ -391,6 +394,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
|
|||
remoteProcessGroup.setProxyPassword(proxyPassword);
|
||||
}
|
||||
}
|
||||
if (localNetworkInterface != null) {
|
||||
if (StringUtils.isBlank(localNetworkInterface)) {
|
||||
remoteProcessGroup.setNetworkInterface(null);
|
||||
} else {
|
||||
remoteProcessGroup.setNetworkInterface(localNetworkInterface);
|
||||
}
|
||||
}
|
||||
|
||||
final Boolean isTransmitting = remoteProcessGroupDTO.isTransmitting();
|
||||
if (isNotNull(isTransmitting)) {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
</div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Transport Protocol
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the transport protocol to use for this Remote Process Group communication."></div>
|
||||
|
@ -34,8 +35,19 @@
|
|||
<div id="new-remote-process-group-transport-protocol-combo"></div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Local Network Interface
|
||||
<div class="fa fa-question-circle" alt="Info" title="The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier."></div>
|
||||
</div>
|
||||
<div class="setting-field">
|
||||
<input type="text" class="small-setting-input" id="new-remote-process-group-local-network-interface"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-host-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server hostname
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance."></div>
|
||||
|
@ -44,7 +56,7 @@
|
|||
<input type="text" class="small-setting-input" id="new-remote-process-group-proxy-host"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-port-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server port
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's port number, optional. If not specified, default port 80 will be used."></div>
|
||||
|
@ -56,7 +68,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-user-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy user
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user name to connect to the proxy server, optional."></div>
|
||||
|
@ -65,7 +77,7 @@
|
|||
<input type="text" class="small-setting-input" id="new-remote-process-group-proxy-user"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-password-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy password
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user password to connect to the proxy server, optional."></div>
|
||||
|
@ -77,7 +89,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-timeout-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Communications timeout
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group takes longer than this amount of time, it will timeout."></div>
|
||||
|
@ -86,7 +98,7 @@
|
|||
<input type="text" class="small-setting-input" id="new-remote-process-group-timeout"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-yield-duration-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Yield duration
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group fails, it will not be scheduled again until this amount of time elapses."></div>
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
</div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Transport Protocol
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the transport protocol to use for this Remote Process Group communication."></div>
|
||||
|
@ -44,8 +45,19 @@
|
|||
<div id="remote-process-group-transport-protocol-combo"></div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Local Network Interface
|
||||
<div class="fa fa-question-circle" alt="Info" title="The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier."></div>
|
||||
</div>
|
||||
<div class="setting-field">
|
||||
<input type="text" class="small-setting-input" id="remote-process-group-local-network-interface"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-host-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server hostname
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance."></div>
|
||||
|
@ -54,7 +66,7 @@
|
|||
<input type="text" class="small-setting-input" id="remote-process-group-proxy-host"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-port-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server port
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's port number, optional. If not specified, default port 80 will be used."></div>
|
||||
|
@ -66,7 +78,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-user-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy user
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user name to connect to the proxy server, optional."></div>
|
||||
|
@ -75,7 +87,7 @@
|
|||
<input type="text" class="small-setting-input" id="remote-process-group-proxy-user"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-password-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy password
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user password to connect to the proxy server, optional."></div>
|
||||
|
@ -87,7 +99,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-timeout-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Communications timeout
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group takes longer than this amount of time, it will timeout."></div>
|
||||
|
@ -96,7 +108,7 @@
|
|||
<input type="text" class="small-setting-input" id="remote-process-group-timeout"/>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-yield-duration-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Yield duration
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group fails, it will not be scheduled again until this amount of time elapses."></div>
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
</div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Transport Protocol
|
||||
<div class="fa fa-question-circle" alt="Info" title="Transport protocol to use for this Remote Process Group communication."></div>
|
||||
|
@ -44,8 +45,19 @@
|
|||
<div id="read-only-remote-process-group-transport-protocol"></div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Local Network Interface
|
||||
<div class="fa fa-question-circle" alt="Info" title="The local network interface to send/receive data. If not specified, any local address is used. If clustered, all nodes must have an interface with this identifier."></div>
|
||||
</div>
|
||||
<div class="setting-field">
|
||||
<span id="read-only-remote-process-group-local-network-interface"></span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-host-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server hostname
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance."></div>
|
||||
|
@ -54,7 +66,7 @@
|
|||
<span id="read-only-remote-process-group-proxy-host"></span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-port-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy server port
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify the proxy server's port number, optional. If not specified, default port 80 will be used."></div>
|
||||
|
@ -66,7 +78,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-proxy-user-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy user
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user name to connect to the proxy server, optional."></div>
|
||||
|
@ -75,7 +87,7 @@
|
|||
<span id="read-only-remote-process-group-proxy-user"></span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-proxy-password-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
HTTP Proxy password
|
||||
<div class="fa fa-question-circle" alt="Info" title="Specify an user password to connect to the proxy server, optional."></div>
|
||||
|
@ -87,7 +99,7 @@
|
|||
<div class="clear"></div>
|
||||
</div>
|
||||
<div class="setting">
|
||||
<div class="remote-process-group-timeout-setting">
|
||||
<div class="remote-process-group-setting-left">
|
||||
<div class="setting-name">
|
||||
Communications timeout
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group takes longer than this amount of time, it will timeout."></div>
|
||||
|
@ -96,7 +108,7 @@
|
|||
<span id="read-only-remote-process-group-timeout"></span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="remote-process-group-yield-duration-setting">
|
||||
<div class="remote-process-group-setting-right">
|
||||
<div class="setting-name">
|
||||
Yield duration
|
||||
<div class="fa fa-question-circle" alt="Info" title="When communication with this remote process group fails, it will not be scheduled again until this amount of time elapses."></div>
|
||||
|
|
|
@ -18,10 +18,6 @@
|
|||
Specific dialog settings.
|
||||
*/
|
||||
|
||||
#new-remote-process-group-transport-protocol-combo {
|
||||
width: 160px;
|
||||
}
|
||||
|
||||
#fill-color-dialog {
|
||||
display: none;
|
||||
width: 240px;
|
||||
|
|
|
@ -49,22 +49,13 @@
|
|||
display: none;
|
||||
}
|
||||
|
||||
#remote-process-group-transport-protocol-combo {
|
||||
width: 160px;
|
||||
}
|
||||
|
||||
div.remote-process-group-timeout-setting, div.remote-process-group-yield-duration-setting,
|
||||
div.remote-process-group-proxy-host-setting, div.remote-process-group-proxy-port-setting,
|
||||
div.remote-process-group-proxy-user-setting, div.remote-process-group-proxy-password-setting {
|
||||
div.remote-process-group-setting-left, div.remote-process-group-setting-right {
|
||||
float: left;
|
||||
width: 49%;
|
||||
}
|
||||
|
||||
div.remote-process-group-yield-duration-setting,
|
||||
div.remote-process-group-proxy-port-setting,
|
||||
div.remote-process-group-proxy-password-setting {
|
||||
div.remote-process-group-setting-right {
|
||||
margin-left: 2%;
|
||||
width: 49%;
|
||||
}
|
||||
|
||||
/* remote process group port configuration */
|
||||
|
|
|
@ -78,6 +78,7 @@
|
|||
'communicationsTimeout': $('#new-remote-process-group-timeout').val(),
|
||||
'yieldDuration': $('#new-remote-process-group-yield-duration').val(),
|
||||
'transportProtocol': $('#new-remote-process-group-transport-protocol-combo').combo('getSelectedOption').value,
|
||||
'localNetworkInterface': $('#new-remote-process-group-local-network-interface').val(),
|
||||
'proxyHost': $('#new-remote-process-group-proxy-host').val(),
|
||||
'proxyPort': $('#new-remote-process-group-proxy-port').val(),
|
||||
'proxyUser': $('#new-remote-process-group-proxy-user').val(),
|
||||
|
@ -155,6 +156,7 @@
|
|||
init: function () {
|
||||
var defaultTimeout = "30 sec";
|
||||
var defaultYieldDuration = "10 sec";
|
||||
|
||||
// configure the new remote process group dialog
|
||||
this.getElement().modal({
|
||||
scrollableContentStyle: 'scrollable',
|
||||
|
@ -167,6 +169,7 @@
|
|||
$('#new-remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
|
||||
value: 'RAW'
|
||||
});
|
||||
$('#new-remote-process-group-local-network-interface').val('');
|
||||
$('#new-remote-process-group-proxy-host').val('');
|
||||
$('#new-remote-process-group-proxy-port').val('');
|
||||
$('#new-remote-process-group-proxy-user').val('');
|
||||
|
@ -174,9 +177,11 @@
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
// set default values
|
||||
$('#new-remote-process-group-timeout').val(defaultTimeout);
|
||||
$('#new-remote-process-group-yield-duration').val(defaultYieldDuration);
|
||||
|
||||
// initialize the transport protocol combo
|
||||
$('#new-remote-process-group-transport-protocol-combo').combo({
|
||||
options: [{
|
||||
|
|
|
@ -84,7 +84,8 @@
|
|||
proxyHost: $('#remote-process-group-proxy-host').val(),
|
||||
proxyPort: $('#remote-process-group-proxy-port').val(),
|
||||
proxyUser: $('#remote-process-group-proxy-user').val(),
|
||||
proxyPassword: $('#remote-process-group-proxy-password').val()
|
||||
proxyPassword: $('#remote-process-group-proxy-password').val(),
|
||||
localNetworkInterface: $('#remote-process-group-local-network-interface').val()
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -150,6 +151,7 @@
|
|||
$('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
|
||||
value: 'RAW'
|
||||
});
|
||||
$('#remote-process-group-local-network-interface').val('');
|
||||
$('#remote-process-group-proxy-host').val('');
|
||||
$('#remote-process-group-proxy-port').val('');
|
||||
$('#remote-process-group-proxy-user').val('');
|
||||
|
@ -191,6 +193,7 @@
|
|||
$('#remote-process-group-proxy-port').val(selectionData.component.proxyPort);
|
||||
$('#remote-process-group-proxy-user').val(selectionData.component.proxyUser);
|
||||
$('#remote-process-group-proxy-password').val(selectionData.component.proxyPassword);
|
||||
$('#remote-process-group-local-network-interface').val(selectionData.component.localNetworkInterface);
|
||||
|
||||
// select the appropriate transport-protocol
|
||||
$('#remote-process-group-transport-protocol-combo').combo('setSelectedOption', {
|
||||
|
|
|
@ -65,6 +65,7 @@
|
|||
nfCommon.clearField('read-only-remote-process-group-timeout');
|
||||
nfCommon.clearField('read-only-remote-process-group-yield-duration');
|
||||
nfCommon.clearField('read-only-remote-process-group-transport-protocol');
|
||||
nfCommon.clearField('read-only-remote-process-group-local-network-interface');
|
||||
nfCommon.clearField('read-only-remote-process-group-proxy-host');
|
||||
nfCommon.clearField('read-only-remote-process-group-proxy-port');
|
||||
nfCommon.clearField('read-only-remote-process-group-proxy-user');
|
||||
|
@ -91,6 +92,7 @@
|
|||
nfCommon.populateField('read-only-remote-process-group-timeout', selectionData.component.communicationsTimeout);
|
||||
nfCommon.populateField('read-only-remote-process-group-yield-duration', selectionData.component.yieldDuration);
|
||||
nfCommon.populateField('read-only-remote-process-group-transport-protocol', selectionData.component.transportProtocol);
|
||||
nfCommon.populateField('read-only-remote-process-group-local-network-interface', selectionData.component.localNetworkInterface);
|
||||
nfCommon.populateField('read-only-remote-process-group-proxy-host', selectionData.component.proxyHost);
|
||||
nfCommon.populateField('read-only-remote-process-group-proxy-port', selectionData.component.proxyPort);
|
||||
nfCommon.populateField('read-only-remote-process-group-proxy-user', selectionData.component.proxyUser);
|
||||
|
|
|
@ -667,6 +667,21 @@
|
|||
});
|
||||
};
|
||||
|
||||
var hasIssues = function (d) {
|
||||
return !nfCommon.isEmpty(d.component.authorizationIssues) || !nfCommon.isEmpty(d.component.validationErrors);
|
||||
};
|
||||
|
||||
var getIssues = function (d) {
|
||||
var issues = [];
|
||||
if (!nfCommon.isEmpty(d.component.authorizationIssues)) {
|
||||
issues = issues.concat(d.component.authorizationIssues);
|
||||
}
|
||||
if (!nfCommon.isEmpty(d.component.validationErrors)) {
|
||||
issues = issues.concat(d.component.validationErrors);
|
||||
}
|
||||
return issues;
|
||||
};
|
||||
|
||||
/**
|
||||
* Updates the process group status.
|
||||
*
|
||||
|
@ -723,7 +738,7 @@
|
|||
.text(function (d) {
|
||||
var icon = '';
|
||||
if (d.permissions.canRead) {
|
||||
if (!nfCommon.isEmpty(d.component.authorizationIssues)) {
|
||||
if (hasIssues(d)) {
|
||||
icon = '\uf071';
|
||||
} else if (d.component.transmitting === true) {
|
||||
icon = '\uf140';
|
||||
|
@ -736,7 +751,7 @@
|
|||
.attr('font-family', function (d) {
|
||||
var family = '';
|
||||
if (d.permissions.canRead) {
|
||||
if (!nfCommon.isEmpty(d.component.authorizationIssues) || d.component.transmitting) {
|
||||
if (hasIssues(d) || d.component.transmitting) {
|
||||
family = 'FontAwesome';
|
||||
} else {
|
||||
family = 'flowfont';
|
||||
|
@ -745,20 +760,20 @@
|
|||
return family;
|
||||
})
|
||||
.classed('invalid', function (d) {
|
||||
return d.permissions.canRead && !nfCommon.isEmpty(d.component.authorizationIssues);
|
||||
return d.permissions.canRead && hasIssues(d);
|
||||
})
|
||||
.classed('transmitting', function (d) {
|
||||
return d.permissions.canRead && nfCommon.isEmpty(d.component.authorizationIssues) && d.component.transmitting === true;
|
||||
return d.permissions.canRead && !hasIssues(d) && d.component.transmitting === true;
|
||||
})
|
||||
.classed('not-transmitting', function (d) {
|
||||
return d.permissions.canRead && nfCommon.isEmpty(d.component.authorizationIssues) && d.component.transmitting === false;
|
||||
return d.permissions.canRead && !hasIssues(d) && d.component.transmitting === false;
|
||||
})
|
||||
.each(function (d) {
|
||||
// get the tip
|
||||
var tip = d3.select('#authorization-issues-' + d.id);
|
||||
|
||||
// if there are validation errors generate a tooltip
|
||||
if (d.permissions.canRead && !nfCommon.isEmpty(d.component.authorizationIssues)) {
|
||||
if (d.permissions.canRead && hasIssues(d)) {
|
||||
// create the tip if necessary
|
||||
if (tip.empty()) {
|
||||
tip = d3.select('#remote-process-group-tooltips').append('div')
|
||||
|
@ -770,7 +785,7 @@
|
|||
|
||||
// update the tip
|
||||
tip.html(function () {
|
||||
var list = nfCommon.formatUnorderedList(d.component.authorizationIssues);
|
||||
var list = nfCommon.formatUnorderedList(getIssues(d));
|
||||
if (list === null || list.length === 0) {
|
||||
return '';
|
||||
} else {
|
||||
|
|
|
@ -45,7 +45,7 @@ public class SSLCommsSession implements CommsSession {
|
|||
private int protocolVersion;
|
||||
|
||||
public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
|
||||
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
|
||||
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, null, true);
|
||||
|
||||
in = new SSLSocketChannelInputStream(sslSocketChannel);
|
||||
bufferedIn = new BufferedInputStream(in);
|
||||
|
|
Loading…
Reference in New Issue