From 2d3e5abf81d5fa70cd6efd60644755b51c785369 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 6 Nov 2017 10:10:29 -0500 Subject: [PATCH] NIFI-3155, NIFI-4571: Fix issue of Remote Group Port having ID the same as on the remote system, by adding an additional targetId field RemoteGroupPort's; this also fixes issue of stats being mixed together for RPG's with the same URL. This closes #2253 --- .../api/dto/RemoteProcessGroupPortDTO.java | 12 ++++- .../RemoteProcessGroupPortDescriptor.java | 7 ++- .../apache/nifi/remote/RemoteGroupPort.java | 2 + .../nifi/controller/FlowController.java | 1 + .../serialization/FlowFromDOMFactory.java | 4 ++ .../serialization/StandardFlowSerializer.java | 1 + .../nifi/fingerprint/FingerprintFactory.java | 2 +- .../remote/StandardRemoteProcessGroup.java | 51 ++++++++++++------- ...ndardRemoteProcessGroupPortDescriptor.java | 10 ++++ .../src/main/resources/FlowConfiguration.xsd | 1 + .../nifi/remote/StandardRemoteGroupPort.java | 11 +++- .../remote/TestStandardRemoteGroupPort.java | 2 +- .../apache/nifi/web/api/dto/DtoFactory.java | 2 + .../apache/nifi/web/util/SnippetUtils.java | 15 +++++- .../audit/TestRemoteProcessGroupAuditor.java | 1 + .../TestStandardRemoteProcessGroupDAO.java | 1 + 16 files changed, 98 insertions(+), 25 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index 83bd59eb78..1ebb70e61b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType; public class RemoteProcessGroupPortDTO { private String id; + private String targetId; private String groupId; private String name; private String comments; @@ -70,7 +71,7 @@ public class RemoteProcessGroupPortDTO { * @return id of the target port */ @ApiModelProperty( - value = "The id of the target port." + value = "The id of the port." ) public String getId() { return id; @@ -79,6 +80,15 @@ public class RemoteProcessGroupPortDTO { public void setId(String id) { this.id = id; } + + @ApiModelProperty("The id of the target port.") + public String getTargetId() { + return targetId; + } + + public void setTargetId(String targetId) { + this.targetId = targetId; + } /** * @return id of the remote process group that this port resides in diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index c330c1342d..b7977494f3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -30,10 +30,15 @@ public interface RemoteProcessGroupPortDescriptor { Integer getConcurrentlySchedulableTaskCount(); /** - * @return id of the target port + * @return id of the port */ String getId(); + /** + * @return the id of the target port + */ + String getTargetId(); + /** * @return id of the remote process group that this port resides in */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index 07faf42036..5d5fe7ddb7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -40,6 +40,8 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo public abstract boolean getTargetExists(); + public abstract String getTargetIdentifier(); + public abstract boolean isTargetRunning(); public abstract Integer getBatchCount(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 99d8ed0801..49c17899a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2061,6 +2061,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final RemoteProcessGroupPortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setId(port.getId()); + descriptor.setTargetId(port.getTargetId()); descriptor.setName(port.getName()); descriptor.setComments(port.getComments()); descriptor.setTargetRunning(port.isTargetRunning()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index d4550435ce..f67ecd927d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -366,6 +366,10 @@ public class FlowFromDOMFactory { } descriptor.setId(id); + + final String targetId = getString(element, "targetId"); + descriptor.setTargetId(targetId == null ? id : targetId); + descriptor.setName(getString(element, "name")); descriptor.setComments(getString(element, "comments")); descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index 2a8df96aba..bc28a255cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -326,6 +326,7 @@ public class StandardFlowSerializer implements FlowSerializer { addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); + addTextElement(element, "targetId", port.getTargetIdentifier()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); final Integer batchCount = port.getBatchCount(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 39579557a8..7c68475837 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -544,7 +544,7 @@ public class FingerprintFactory { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { + for (final String childName : new String[] {"id", "targetId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 2b2e1fb974..49fd90fc8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -34,6 +35,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +43,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; @@ -410,11 +414,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public void setInputPorts(final Set ports) { writeLock.lock(); try { - final List newPortIds = new ArrayList<>(); + final List newPortTargetIds = new ArrayList<>(); for (final RemoteProcessGroupPortDescriptor descriptor : ports) { - newPortIds.add(descriptor.getId()); + newPortTargetIds.add(descriptor.getTargetId()); - if (!inputPorts.containsKey(descriptor.getId())) { + final Map inputPortByTargetId = inputPorts.values().stream() + .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); + + if (!inputPortByTargetId.containsKey(descriptor.getTargetId())) { addInputPort(descriptor); } @@ -430,11 +437,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator> itr = inputPorts.entrySet().iterator(); + final Iterator itr = inputPorts.values().iterator(); while (itr.hasNext()) { - final Map.Entry entry = itr.next(); - if (!newPortIds.contains(entry.getKey())) { - final StandardRemoteGroupPort port = entry.getValue(); + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { port.setTargetExists(false); port.setTargetRunning(false); @@ -481,11 +487,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public void setOutputPorts(final Set ports) { writeLock.lock(); try { - final List newPortIds = new ArrayList<>(); + final List newPortTargetIds = new ArrayList<>(); for (final RemoteProcessGroupPortDescriptor descriptor : requireNonNull(ports)) { - newPortIds.add(descriptor.getId()); + newPortTargetIds.add(descriptor.getTargetId()); - if (!outputPorts.containsKey(descriptor.getId())) { + final Map outputPortsByTargetId = outputPorts.values().stream() + .collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity())); + + if (!outputPortsByTargetId.containsKey(descriptor.getTargetId())) { addOutputPort(descriptor); } @@ -501,11 +510,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // See if we have any ports that no longer exist; cannot be removed within the loop because it would cause // a ConcurrentModificationException. - final Iterator> itr = outputPorts.entrySet().iterator(); + final Iterator itr = outputPorts.values().iterator(); while (itr.hasNext()) { - final Map.Entry entry = itr.next(); - if (!newPortIds.contains(entry.getKey())) { - final StandardRemoteGroupPort port = entry.getValue(); + final StandardRemoteGroupPort port = itr.next(); + if (!newPortTargetIds.contains(port.getTargetIdentifier())) { port.setTargetExists(false); port.setTargetRunning(false); @@ -619,7 +627,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { throw new IllegalStateException("Output Port with ID " + descriptor.getId() + " already exists"); } - final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), + final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties); outputPorts.put(descriptor.getId(), port); @@ -694,7 +702,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { throw new IllegalStateException("Input Port with ID " + descriptor.getId() + " already exists"); } - final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), this, + // We need to generate the port's UUID deterministically because we need + // all nodes in a cluster to use the same UUID. However, we want the ID to be + // unique for each Remote Group Port, so that if we have multiple RPG's pointing + // to the same target, we have unique ID's for each of those ports. + final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this, TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties); if (descriptor.getConcurrentlySchedulableTaskCount() != null) { @@ -719,6 +731,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } + private String generatePortId(final String targetId) { + return UUID.nameUUIDFromBytes((this.getIdentifier() + targetId).getBytes(StandardCharsets.UTF_8)).toString(); + } + @Override public RemoteGroupPort getOutputPort(final String portIdentifier) { readLock.lock(); @@ -964,7 +980,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { for (final PortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); final ScheduledState scheduledState = ScheduledState.valueOf(port.getState()); - descriptor.setId(port.getId()); + descriptor.setId(generatePortId(port.getId())); + descriptor.setTargetId(port.getId()); descriptor.setName(port.getName()); descriptor.setComments(port.getComments()); descriptor.setTargetRunning(ScheduledState.RUNNING.equals(scheduledState)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java index c3a8f5e195..7dbcec1503 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java @@ -21,6 +21,7 @@ import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGroupPortDescriptor { private String id; + private String targetId; private String groupId; private String name; private String comments; @@ -61,6 +62,15 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr this.id = id; } + @Override + public String getTargetId() { + return targetId; + } + + public void setTargetId(String targetId) { + this.targetId = targetId; + } + @Override public String getGroupId() { return groupId; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 247a79034b..19e1c55efb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -285,6 +285,7 @@ + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 1be5b42214..606c32ab34 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -90,6 +90,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private final SSLContext sslContext; private final TransferDirection transferDirection; private final NiFiProperties nifiProperties; + private final String targetId; private final AtomicReference clientRef = new AtomicReference<>(); @@ -97,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return clientRef.get(); } - public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, + public StandardRemoteGroupPort(final String id, final String targetId, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { // remote group port id needs to be unique but cannot just be the id of the port @@ -105,6 +106,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { // instance more than once. super(id, name, processGroup, type, scheduler); + this.targetId = targetId; this.remoteGroup = remoteGroup; this.transferDirection = direction; this.sslContext = sslContext; @@ -112,6 +114,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); } + @Override + public String getTargetIdentifier() { + return targetId == null ? getIdentifier() : targetId; + } + private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) { final File stateDir = nifiProperties.getPersistentStateDirectory(); return new File(stateDir, portId + ".peers"); @@ -164,7 +171,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris())) - .portIdentifier(getIdentifier()) + .portIdentifier(getTargetIdentifier()) .sslContext(sslContext) .useCompression(isUseCompression()) .eventReporter(remoteGroup.getEventReporter()) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index f677b88db1..1b4194373b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -120,7 +120,7 @@ public class TestStandardRemoteGroupPort { break; } - port = spy(new StandardRemoteGroupPort(ID, NAME, + port = spy(new StandardRemoteGroupPort(ID, ID, NAME, processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null))); doReturn(true).when(remoteGroup).isTransmitting(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index f995f6bcd3..37e146c215 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1502,6 +1502,7 @@ public final class DtoFactory { final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); dto.setId(port.getIdentifier()); + dto.setTargetId(port.getTargetIdentifier()); dto.setName(port.getName()); dto.setComments(port.getComments()); dto.setTransmitting(port.isRunning()); @@ -3168,6 +3169,7 @@ public final class DtoFactory { public RemoteProcessGroupPortDTO copy(final RemoteProcessGroupPortDTO original) { final RemoteProcessGroupPortDTO copy = new RemoteProcessGroupPortDTO(); copy.setId(original.getId()); + copy.setTargetId(original.getTargetId()); copy.setGroupId(original.getGroupId()); copy.setName(original.getName()); copy.setComments(original.getComments()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index db7a5b0347..4469ea1e94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -658,13 +658,24 @@ public final class SnippetUtils { if (contents != null && contents.getInputPorts() != null) { for (final RemoteProcessGroupPortDTO remotePort : contents.getInputPorts()) { remotePort.setGroupId(cp.getId()); - connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT)); + final String originalId = remotePort.getId(); + if (remotePort.getTargetId() == null) { + remotePort.setTargetId(originalId); + } + remotePort.setId(generateId(remotePort.getId(), idGenerationSeed, isCopy)); + + connectableMap.put(remoteGroupDTO.getId() + "-" + originalId, dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT)); } } if (contents != null && contents.getOutputPorts() != null) { for (final RemoteProcessGroupPortDTO remotePort : contents.getOutputPorts()) { remotePort.setGroupId(cp.getId()); - connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT)); + final String originalId = remotePort.getId(); + if (remotePort.getTargetId() == null) { + remotePort.setTargetId(originalId); + } + remotePort.setId(generateId(remotePort.getId(), idGenerationSeed, isCopy)); + connectableMap.put(remoteGroupDTO.getId() + "-" + originalId, dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT)); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java index 68be1cdce7..fb9e43c8f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -415,6 +415,7 @@ public class TestRemoteProcessGroupAuditor { final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class); final String remoteProcessGroupId = "remote-process-group-id"; inputRPGPortDTO.setId(remoteProcessGroupId); + inputRPGPortDTO.setTargetId(remoteProcessGroupId); final String targetUrl = "http://localhost:8080/nifi"; when(existingRPG.getIdentifier()).thenReturn(remoteProcessGroupId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java index f68a115d6e..d8840ecac8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java @@ -79,6 +79,7 @@ public class TestStandardRemoteProcessGroupDAO { final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); dto.setGroupId(remoteProcessGroupId); dto.setId(remoteProcessGroupInputPortId); + dto.setTargetId(remoteProcessGroupInputPortId); final BatchSettingsDTO batchSettings = new BatchSettingsDTO(); dto.setBatchSettings(batchSettings);