NIFI-3155, NIFI-4571: Fix issue of Remote Group Port having ID the same as on the remote system, by adding an additional targetId field RemoteGroupPort's; this also fixes issue of stats being mixed together for RPG's with the same URL. This closes #2253

This commit is contained in:
Mark Payne 2017-11-06 10:10:29 -05:00 committed by Matt Gilman
parent c70a86eac0
commit 2d3e5abf81
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
16 changed files with 98 additions and 25 deletions

View File

@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType;
public class RemoteProcessGroupPortDTO {
private String id;
private String targetId;
private String groupId;
private String name;
private String comments;
@ -70,7 +71,7 @@ public class RemoteProcessGroupPortDTO {
* @return id of the target port
*/
@ApiModelProperty(
value = "The id of the target port."
value = "The id of the port."
)
public String getId() {
return id;
@ -80,6 +81,15 @@ public class RemoteProcessGroupPortDTO {
this.id = id;
}
@ApiModelProperty("The id of the target port.")
public String getTargetId() {
return targetId;
}
public void setTargetId(String targetId) {
this.targetId = targetId;
}
/**
* @return id of the remote process group that this port resides in
*/

View File

@ -30,10 +30,15 @@ public interface RemoteProcessGroupPortDescriptor {
Integer getConcurrentlySchedulableTaskCount();
/**
* @return id of the target port
* @return id of the port
*/
String getId();
/**
* @return the id of the target port
*/
String getTargetId();
/**
* @return id of the remote process group that this port resides in
*/

View File

@ -40,6 +40,8 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo
public abstract boolean getTargetExists();
public abstract String getTargetIdentifier();
public abstract boolean isTargetRunning();
public abstract Integer getBatchCount();

View File

@ -2061,6 +2061,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
for (final RemoteProcessGroupPortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
descriptor.setId(port.getId());
descriptor.setTargetId(port.getTargetId());
descriptor.setName(port.getName());
descriptor.setComments(port.getComments());
descriptor.setTargetRunning(port.isTargetRunning());

View File

@ -366,6 +366,10 @@ public class FlowFromDOMFactory {
}
descriptor.setId(id);
final String targetId = getString(element, "targetId");
descriptor.setTargetId(targetId == null ? id : targetId);
descriptor.setName(getString(element, "name"));
descriptor.setComments(getString(element, "comments"));
descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));

View File

@ -326,6 +326,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "targetId", port.getTargetIdentifier());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
final Integer batchCount = port.getBatchCount();

View File

@ -544,7 +544,7 @@ public class FingerprintFactory {
}
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
for (final String childName : new String[] {"id", "targetId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
}

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -34,6 +35,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -41,6 +43,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
@ -410,11 +414,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public void setInputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) {
writeLock.lock();
try {
final List<String> newPortIds = new ArrayList<>();
final List<String> newPortTargetIds = new ArrayList<>();
for (final RemoteProcessGroupPortDescriptor descriptor : ports) {
newPortIds.add(descriptor.getId());
newPortTargetIds.add(descriptor.getTargetId());
if (!inputPorts.containsKey(descriptor.getId())) {
final Map<String, StandardRemoteGroupPort> inputPortByTargetId = inputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
if (!inputPortByTargetId.containsKey(descriptor.getTargetId())) {
addInputPort(descriptor);
}
@ -430,11 +437,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// See if we have any ports that no longer exist; cannot be removed within the loop because it would cause
// a ConcurrentModificationException.
final Iterator<Map.Entry<String, StandardRemoteGroupPort>> itr = inputPorts.entrySet().iterator();
final Iterator<StandardRemoteGroupPort> itr = inputPorts.values().iterator();
while (itr.hasNext()) {
final Map.Entry<String, StandardRemoteGroupPort> entry = itr.next();
if (!newPortIds.contains(entry.getKey())) {
final StandardRemoteGroupPort port = entry.getValue();
final StandardRemoteGroupPort port = itr.next();
if (!newPortTargetIds.contains(port.getTargetIdentifier())) {
port.setTargetExists(false);
port.setTargetRunning(false);
@ -481,11 +487,14 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public void setOutputPorts(final Set<RemoteProcessGroupPortDescriptor> ports) {
writeLock.lock();
try {
final List<String> newPortIds = new ArrayList<>();
final List<String> newPortTargetIds = new ArrayList<>();
for (final RemoteProcessGroupPortDescriptor descriptor : requireNonNull(ports)) {
newPortIds.add(descriptor.getId());
newPortTargetIds.add(descriptor.getTargetId());
if (!outputPorts.containsKey(descriptor.getId())) {
final Map<String, StandardRemoteGroupPort> outputPortsByTargetId = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
if (!outputPortsByTargetId.containsKey(descriptor.getTargetId())) {
addOutputPort(descriptor);
}
@ -501,11 +510,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// See if we have any ports that no longer exist; cannot be removed within the loop because it would cause
// a ConcurrentModificationException.
final Iterator<Map.Entry<String, StandardRemoteGroupPort>> itr = outputPorts.entrySet().iterator();
final Iterator<StandardRemoteGroupPort> itr = outputPorts.values().iterator();
while (itr.hasNext()) {
final Map.Entry<String, StandardRemoteGroupPort> entry = itr.next();
if (!newPortIds.contains(entry.getKey())) {
final StandardRemoteGroupPort port = entry.getValue();
final StandardRemoteGroupPort port = itr.next();
if (!newPortTargetIds.contains(port.getTargetIdentifier())) {
port.setTargetExists(false);
port.setTargetRunning(false);
@ -619,7 +627,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
throw new IllegalStateException("Output Port with ID " + descriptor.getId() + " already exists");
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(),
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(),
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
outputPorts.put(descriptor.getId(), port);
@ -694,7 +702,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
throw new IllegalStateException("Input Port with ID " + descriptor.getId() + " already exists");
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getName(), getProcessGroup(), this,
// We need to generate the port's UUID deterministically because we need
// all nodes in a cluster to use the same UUID. However, we want the ID to be
// unique for each Remote Group Port, so that if we have multiple RPG's pointing
// to the same target, we have unique ID's for each of those ports.
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this,
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@ -719,6 +731,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
}
private String generatePortId(final String targetId) {
return UUID.nameUUIDFromBytes((this.getIdentifier() + targetId).getBytes(StandardCharsets.UTF_8)).toString();
}
@Override
public RemoteGroupPort getOutputPort(final String portIdentifier) {
readLock.lock();
@ -964,7 +980,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
for (final PortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
final ScheduledState scheduledState = ScheduledState.valueOf(port.getState());
descriptor.setId(port.getId());
descriptor.setId(generatePortId(port.getId()));
descriptor.setTargetId(port.getId());
descriptor.setName(port.getName());
descriptor.setComments(port.getComments());
descriptor.setTargetRunning(ScheduledState.RUNNING.equals(scheduledState));

View File

@ -21,6 +21,7 @@ import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGroupPortDescriptor {
private String id;
private String targetId;
private String groupId;
private String name;
private String comments;
@ -61,6 +62,15 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
this.id = id;
}
@Override
public String getTargetId() {
return targetId;
}
public void setTargetId(String targetId) {
this.targetId = targetId;
}
@Override
public String getGroupId() {
return groupId;

View File

@ -285,6 +285,7 @@
<xs:complexContent>
<xs:extension base="PortType">
<xs:sequence>
<xs:element name="targetId" type="NonEmptyStringType" minOccurs="0" maxOccurs="1" />
<xs:element name="maxConcurrentTasks" type="xs:positiveInteger"></xs:element>
<xs:element name="useCompression" type="xs:boolean"></xs:element>
<xs:element name="batchCount" type="xs:positiveInteger" minOccurs="0" maxOccurs="1" />

View File

@ -90,6 +90,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final SSLContext sslContext;
private final TransferDirection transferDirection;
private final NiFiProperties nifiProperties;
private final String targetId;
private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
@ -97,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return clientRef.get();
}
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
public StandardRemoteGroupPort(final String id, final String targetId, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler,
final NiFiProperties nifiProperties) {
// remote group port id needs to be unique but cannot just be the id of the port
@ -105,6 +106,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
// instance more than once.
super(id, name, processGroup, type, scheduler);
this.targetId = targetId;
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
this.sslContext = sslContext;
@ -112,6 +114,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
}
@Override
public String getTargetIdentifier() {
return targetId == null ? getIdentifier() : targetId;
}
private static File getPeerPersistenceFile(final String portId, final NiFiProperties nifiProperties) {
final File stateDir = nifiProperties.getPersistentStateDirectory();
return new File(stateDir, portId + ".peers");
@ -164,7 +171,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder()
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
.portIdentifier(getIdentifier())
.portIdentifier(getTargetIdentifier())
.sslContext(sslContext)
.useCompression(isUseCompression())
.eventReporter(remoteGroup.getEventReporter())

View File

@ -120,7 +120,7 @@ public class TestStandardRemoteGroupPort {
break;
}
port = spy(new StandardRemoteGroupPort(ID, NAME,
port = spy(new StandardRemoteGroupPort(ID, ID, NAME,
processGroup, remoteGroup, direction, connectableType, null, scheduler, NiFiProperties.createBasicNiFiProperties(null, null)));
doReturn(true).when(remoteGroup).isTransmitting();

View File

@ -1502,6 +1502,7 @@ public final class DtoFactory {
final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
dto.setId(port.getIdentifier());
dto.setTargetId(port.getTargetIdentifier());
dto.setName(port.getName());
dto.setComments(port.getComments());
dto.setTransmitting(port.isRunning());
@ -3168,6 +3169,7 @@ public final class DtoFactory {
public RemoteProcessGroupPortDTO copy(final RemoteProcessGroupPortDTO original) {
final RemoteProcessGroupPortDTO copy = new RemoteProcessGroupPortDTO();
copy.setId(original.getId());
copy.setTargetId(original.getTargetId());
copy.setGroupId(original.getGroupId());
copy.setName(original.getName());
copy.setComments(original.getComments());

View File

@ -658,13 +658,24 @@ public final class SnippetUtils {
if (contents != null && contents.getInputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getInputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT));
final String originalId = remotePort.getId();
if (remotePort.getTargetId() == null) {
remotePort.setTargetId(originalId);
}
remotePort.setId(generateId(remotePort.getId(), idGenerationSeed, isCopy));
connectableMap.put(remoteGroupDTO.getId() + "-" + originalId, dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_INPUT_PORT));
}
}
if (contents != null && contents.getOutputPorts() != null) {
for (final RemoteProcessGroupPortDTO remotePort : contents.getOutputPorts()) {
remotePort.setGroupId(cp.getId());
connectableMap.put(remoteGroupDTO.getId() + "-" + remotePort.getId(), dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT));
final String originalId = remotePort.getId();
if (remotePort.getTargetId() == null) {
remotePort.setTargetId(originalId);
}
remotePort.setId(generateId(remotePort.getId(), idGenerationSeed, isCopy));
connectableMap.put(remoteGroupDTO.getId() + "-" + originalId, dtoFactory.createConnectableDto(remotePort, ConnectableType.REMOTE_OUTPUT_PORT));
}
}

View File

@ -415,6 +415,7 @@ public class TestRemoteProcessGroupAuditor {
final ProceedingJoinPoint joinPoint = mock(ProceedingJoinPoint.class);
final String remoteProcessGroupId = "remote-process-group-id";
inputRPGPortDTO.setId(remoteProcessGroupId);
inputRPGPortDTO.setTargetId(remoteProcessGroupId);
final String targetUrl = "http://localhost:8080/nifi";
when(existingRPG.getIdentifier()).thenReturn(remoteProcessGroupId);

View File

@ -79,6 +79,7 @@ public class TestStandardRemoteProcessGroupDAO {
final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
dto.setGroupId(remoteProcessGroupId);
dto.setId(remoteProcessGroupInputPortId);
dto.setTargetId(remoteProcessGroupInputPortId);
final BatchSettingsDTO batchSettings = new BatchSettingsDTO();
dto.setBatchSettings(batchSettings);