From 9766558cab563a89e89e113082fc0bd2fabb1907 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 20 Nov 2017 14:57:55 -0500 Subject: [PATCH] NIFI-4526: Allow Target URI's of Remote Process Groups to be changed. This closes #2298 --- .../nifi/groups/RemoteProcessGroup.java | 2 + .../remote/StandardRemoteProcessGroup.java | 67 ++++++++++++++++--- .../nifi/remote/StandardRemoteGroupPort.java | 9 ++- .../impl/StandardRemoteProcessGroupDAO.java | 4 ++ 4 files changed, 69 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index cb1e6c8748..79b95092f8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -39,6 +39,8 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable String getTargetUris(); + void setTargetUris(String targetUris); + ProcessGroup getProcessGroup(); void setProcessGroup(ProcessGroup group); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 8689d7156b..53d5c9f824 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -98,7 +98,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private final String id; - private final String targetUris; + private volatile String targetUris; private final ProcessScheduler scheduler; private final EventReporter eventReporter; private final NiFiProperties nifiProperties; @@ -176,8 +176,24 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { }; final Runnable checkAuthorizations = new InitializationTask(); - backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUris); + backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); + backgroundThreadExecutor.submit(() -> { + try { + refreshFlowContents(); + } catch (final Exception e) { + logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e}); + } + }); + } + + @Override + public void setTargetUris(final String targetUris) { + requireNonNull(targetUris); + verifyCanUpdate(); + + this.targetUris = targetUris; + backgroundThreadExecutor.submit(new InitializationTask()); } @Override @@ -425,12 +441,25 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { final Map inputPortByTargetId = inputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); - if (!inputPortByTargetId.containsKey(descriptor.getTargetId())) { - addInputPort(descriptor); + final Map inputPortByName = inputPorts.values().stream() + .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity())); + + // Check if we have a matching port already and add the port if not. We determine a matching port + // by first finding a port that has the same Target ID. If none exists, then we try to find a port with + // the same name. We do this because if the URL of this RemoteProcessGroup is changed, then we expect + // the NiFi at the new URL to have a Port with the same name but a different Identifier. This is okay + // because Ports are required to have unique names. + StandardRemoteGroupPort sendPort = inputPortByTargetId.get(descriptor.getTargetId()); + if (sendPort == null) { + sendPort = inputPortByName.get(descriptor.getName()); + if (sendPort == null) { + sendPort = addInputPort(descriptor); + } else { + sendPort.setTargetIdentifier(descriptor.getTargetId()); + } } // set the comments to ensure current description - final StandardRemoteGroupPort sendPort = inputPorts.get(descriptor.getId()); sendPort.setTargetExists(true); sendPort.setName(descriptor.getName()); if (descriptor.isTargetRunning() != null) { @@ -495,15 +524,28 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { for (final RemoteProcessGroupPortDescriptor descriptor : requireNonNull(ports)) { newPortTargetIds.add(descriptor.getTargetId()); - final Map outputPortsByTargetId = outputPorts.values().stream() + final Map outputPortByTargetId = outputPorts.values().stream() .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); - if (!outputPortsByTargetId.containsKey(descriptor.getTargetId())) { - addOutputPort(descriptor); + final Map outputPortByName = inputPorts.values().stream() + .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity())); + + // Check if we have a matching port already and add the port if not. We determine a matching port + // by first finding a port that has the same Target ID. If none exists, then we try to find a port with + // the same name. We do this because if the URL of this RemoteProcessGroup is changed, then we expect + // the NiFi at the new URL to have a Port with the same name but a different Identifier. This is okay + // because Ports are required to have unique names. + StandardRemoteGroupPort receivePort = outputPortByTargetId.get(descriptor.getTargetId()); + if (receivePort == null) { + receivePort = outputPortByName.get(descriptor.getName()); + if (receivePort == null) { + receivePort = addOutputPort(descriptor); + } else { + receivePort.setTargetIdentifier(descriptor.getTargetId()); + } } // set the comments to ensure current description - final StandardRemoteGroupPort receivePort = outputPorts.get(descriptor.getId()); receivePort.setTargetExists(true); receivePort.setName(descriptor.getName()); if (descriptor.isTargetRunning() != null) { @@ -624,7 +666,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * @throws IllegalStateException if an Output Port already exists with the * ID given by dto.getId() */ - private void addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) { + private StandardRemoteGroupPort addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) { writeLock.lock(); try { if (outputPorts.containsKey(requireNonNull(descriptor).getId())) { @@ -650,6 +692,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (!StringUtils.isBlank(descriptor.getBatchDuration())) { port.setBatchDuration(descriptor.getBatchDuration()); } + + return port; } finally { writeLock.unlock(); } @@ -699,7 +743,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { * @throws IllegalStateException if an Input Port already exists with the ID * given by the ID of the DTO. */ - private void addInputPort(final RemoteProcessGroupPortDescriptor descriptor) { + private StandardRemoteGroupPort addInputPort(final RemoteProcessGroupPortDescriptor descriptor) { writeLock.lock(); try { if (inputPorts.containsKey(descriptor.getId())) { @@ -730,6 +774,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } inputPorts.put(descriptor.getId(), port); + return port; } finally { writeLock.unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 606c32ab34..6802436a63 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -90,7 +90,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private final SSLContext sslContext; private final TransferDirection transferDirection; private final NiFiProperties nifiProperties; - private final String targetId; + private volatile String targetId; private final AtomicReference clientRef = new AtomicReference<>(); @@ -116,7 +116,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { @Override public String getTargetIdentifier() { - return targetId == null ? getIdentifier() : targetId; + final String target = this.targetId; + return target == null ? getIdentifier() : target; + } + + public void setTargetIdentifier(final String targetId) { + this.targetId = targetId; } private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index 039244227c..d638839848 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -382,6 +382,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO); // configure the remote process group + final String targetUris = remoteProcessGroupDTO.getTargetUris(); final String name = remoteProcessGroupDTO.getName(); final String comments = remoteProcessGroupDTO.getComments(); final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout(); @@ -394,6 +395,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol(); final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface(); + if (isNotNull(targetUris)) { + remoteProcessGroup.setTargetUris(targetUris); + } if (isNotNull(name)) { remoteProcessGroup.setName(name); }