NIFI-4526: Allow Target URI's of Remote Process Groups to be changed. This closes #2298

This commit is contained in:
Mark Payne 2017-11-20 14:57:55 -05:00 committed by Matt Gilman
parent ce9787a414
commit 9766558cab
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
4 changed files with 69 additions and 13 deletions

View File

@ -39,6 +39,8 @@ public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable
String getTargetUris(); String getTargetUris();
void setTargetUris(String targetUris);
ProcessGroup getProcessGroup(); ProcessGroup getProcessGroup();
void setProcessGroup(ProcessGroup group); void setProcessGroup(ProcessGroup group);

View File

@ -98,7 +98,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private final String id; private final String id;
private final String targetUris; private volatile String targetUris;
private final ProcessScheduler scheduler; private final ProcessScheduler scheduler;
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final NiFiProperties nifiProperties; private final NiFiProperties nifiProperties;
@ -176,8 +176,24 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}; };
final Runnable checkAuthorizations = new InitializationTask(); final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUris); backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
backgroundThreadExecutor.submit(() -> {
try {
refreshFlowContents();
} catch (final Exception e) {
logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e});
}
});
}
@Override
public void setTargetUris(final String targetUris) {
requireNonNull(targetUris);
verifyCanUpdate();
this.targetUris = targetUris;
backgroundThreadExecutor.submit(new InitializationTask());
} }
@Override @Override
@ -425,12 +441,25 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final Map<String, StandardRemoteGroupPort> inputPortByTargetId = inputPorts.values().stream() final Map<String, StandardRemoteGroupPort> inputPortByTargetId = inputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
if (!inputPortByTargetId.containsKey(descriptor.getTargetId())) { final Map<String, StandardRemoteGroupPort> inputPortByName = inputPorts.values().stream()
addInputPort(descriptor); .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
// Check if we have a matching port already and add the port if not. We determine a matching port
// by first finding a port that has the same Target ID. If none exists, then we try to find a port with
// the same name. We do this because if the URL of this RemoteProcessGroup is changed, then we expect
// the NiFi at the new URL to have a Port with the same name but a different Identifier. This is okay
// because Ports are required to have unique names.
StandardRemoteGroupPort sendPort = inputPortByTargetId.get(descriptor.getTargetId());
if (sendPort == null) {
sendPort = inputPortByName.get(descriptor.getName());
if (sendPort == null) {
sendPort = addInputPort(descriptor);
} else {
sendPort.setTargetIdentifier(descriptor.getTargetId());
}
} }
// set the comments to ensure current description // set the comments to ensure current description
final StandardRemoteGroupPort sendPort = inputPorts.get(descriptor.getId());
sendPort.setTargetExists(true); sendPort.setTargetExists(true);
sendPort.setName(descriptor.getName()); sendPort.setName(descriptor.getName());
if (descriptor.isTargetRunning() != null) { if (descriptor.isTargetRunning() != null) {
@ -495,15 +524,28 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
for (final RemoteProcessGroupPortDescriptor descriptor : requireNonNull(ports)) { for (final RemoteProcessGroupPortDescriptor descriptor : requireNonNull(ports)) {
newPortTargetIds.add(descriptor.getTargetId()); newPortTargetIds.add(descriptor.getTargetId());
final Map<String, StandardRemoteGroupPort> outputPortsByTargetId = outputPorts.values().stream() final Map<String, StandardRemoteGroupPort> outputPortByTargetId = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
if (!outputPortsByTargetId.containsKey(descriptor.getTargetId())) { final Map<String, StandardRemoteGroupPort> outputPortByName = inputPorts.values().stream()
addOutputPort(descriptor); .collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
// Check if we have a matching port already and add the port if not. We determine a matching port
// by first finding a port that has the same Target ID. If none exists, then we try to find a port with
// the same name. We do this because if the URL of this RemoteProcessGroup is changed, then we expect
// the NiFi at the new URL to have a Port with the same name but a different Identifier. This is okay
// because Ports are required to have unique names.
StandardRemoteGroupPort receivePort = outputPortByTargetId.get(descriptor.getTargetId());
if (receivePort == null) {
receivePort = outputPortByName.get(descriptor.getName());
if (receivePort == null) {
receivePort = addOutputPort(descriptor);
} else {
receivePort.setTargetIdentifier(descriptor.getTargetId());
}
} }
// set the comments to ensure current description // set the comments to ensure current description
final StandardRemoteGroupPort receivePort = outputPorts.get(descriptor.getId());
receivePort.setTargetExists(true); receivePort.setTargetExists(true);
receivePort.setName(descriptor.getName()); receivePort.setName(descriptor.getName());
if (descriptor.isTargetRunning() != null) { if (descriptor.isTargetRunning() != null) {
@ -624,7 +666,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* @throws IllegalStateException if an Output Port already exists with the * @throws IllegalStateException if an Output Port already exists with the
* ID given by dto.getId() * ID given by dto.getId()
*/ */
private void addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) { private StandardRemoteGroupPort addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) {
writeLock.lock(); writeLock.lock();
try { try {
if (outputPorts.containsKey(requireNonNull(descriptor).getId())) { if (outputPorts.containsKey(requireNonNull(descriptor).getId())) {
@ -650,6 +692,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (!StringUtils.isBlank(descriptor.getBatchDuration())) { if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration()); port.setBatchDuration(descriptor.getBatchDuration());
} }
return port;
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -699,7 +743,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* @throws IllegalStateException if an Input Port already exists with the ID * @throws IllegalStateException if an Input Port already exists with the ID
* given by the ID of the DTO. * given by the ID of the DTO.
*/ */
private void addInputPort(final RemoteProcessGroupPortDescriptor descriptor) { private StandardRemoteGroupPort addInputPort(final RemoteProcessGroupPortDescriptor descriptor) {
writeLock.lock(); writeLock.lock();
try { try {
if (inputPorts.containsKey(descriptor.getId())) { if (inputPorts.containsKey(descriptor.getId())) {
@ -730,6 +774,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} }
inputPorts.put(descriptor.getId(), port); inputPorts.put(descriptor.getId(), port);
return port;
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -90,7 +90,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final SSLContext sslContext; private final SSLContext sslContext;
private final TransferDirection transferDirection; private final TransferDirection transferDirection;
private final NiFiProperties nifiProperties; private final NiFiProperties nifiProperties;
private final String targetId; private volatile String targetId;
private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>(); private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
@ -116,7 +116,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override @Override
public String getTargetIdentifier() { public String getTargetIdentifier() {
return targetId == null ? getIdentifier() : targetId; final String target = this.targetId;
return target == null ? getIdentifier() : target;
}
public void setTargetIdentifier(final String targetId) {
this.targetId = targetId;
} }
private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) { private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) {

View File

@ -382,6 +382,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO); verifyUpdate(remoteProcessGroup, remoteProcessGroupDTO);
// configure the remote process group // configure the remote process group
final String targetUris = remoteProcessGroupDTO.getTargetUris();
final String name = remoteProcessGroupDTO.getName(); final String name = remoteProcessGroupDTO.getName();
final String comments = remoteProcessGroupDTO.getComments(); final String comments = remoteProcessGroupDTO.getComments();
final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout(); final String communicationsTimeout = remoteProcessGroupDTO.getCommunicationsTimeout();
@ -394,6 +395,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol(); final String transportProtocol = remoteProcessGroupDTO.getTransportProtocol();
final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface(); final String localNetworkInterface = remoteProcessGroupDTO.getLocalNetworkInterface();
if (isNotNull(targetUris)) {
remoteProcessGroup.setTargetUris(targetUris);
}
if (isNotNull(name)) { if (isNotNull(name)) {
remoteProcessGroup.setName(name); remoteProcessGroup.setName(name);
} }