NIFI-5634: When merging RPG entities, ensure that we only send back the ports that are common to all nodes - even if that means sending back no ports

This closes #3030
This commit is contained in:
Mark Payne 2018-09-25 09:05:06 -04:00 committed by Matt Gilman
parent ca70dbbb36
commit ad4c886fbf
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 91 additions and 2 deletions

View File

@ -23,6 +23,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -124,11 +125,20 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
} }
if (remoteProcessGroupContents != null) { if (remoteProcessGroupContents != null) {
if (mergedInputPorts != null && !mergedInputPorts.isEmpty()) { if (mergedInputPorts == null) {
remoteProcessGroupContents.setInputPorts(Collections.emptySet());
clientDto.setInputPortCount(0);
} else {
remoteProcessGroupContents.setInputPorts(mergedInputPorts); remoteProcessGroupContents.setInputPorts(mergedInputPorts);
clientDto.setInputPortCount(mergedInputPorts.size());
} }
if (mergedOutputPorts != null && !mergedOutputPorts.isEmpty()) {
if (mergedOutputPorts == null) {
remoteProcessGroupContents.setOutputPorts(Collections.emptySet());
clientDto.setOutputPortCount(0);
} else {
remoteProcessGroupContents.setOutputPorts(mergedOutputPorts); remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
clientDto.setOutputPortCount(mergedOutputPorts.size());
} }
} }

View File

@ -122,4 +122,83 @@ public class RemoteProcessGroupEntityMergerTest {
assertEquals(1, entity1.getComponent().getContents().getOutputPorts().size()); assertEquals(1, entity1.getComponent().getContents().getOutputPorts().size());
assertEquals("out1", entity1.getComponent().getContents().getOutputPorts().iterator().next().getName()); assertEquals("out1", entity1.getComponent().getContents().getOutputPorts().iterator().next().getName());
} }
@Test
public void testNoPortsAvailableOnOneNode() throws Exception {
final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
final PermissionsDTO permissions = new PermissionsDTO();
permissions.setCanRead(true);
permissions.setCanWrite(true);
final PermissionsDTO opsPermissions = new PermissionsDTO();
opsPermissions.setCanRead(false);
opsPermissions.setCanWrite(false);
final RemoteProcessGroupStatusDTO status = new RemoteProcessGroupStatusDTO();
status.setAggregateSnapshot(new RemoteProcessGroupStatusSnapshotDTO());
final RemoteProcessGroupPortDTO in1_1 = new RemoteProcessGroupPortDTO();
in1_1.setName("in1");
final RemoteProcessGroupPortDTO in1_2 = new RemoteProcessGroupPortDTO();
in1_2.setName("in2");
final Set<RemoteProcessGroupPortDTO> inputs1 = new HashSet<>();
inputs1.add(in1_1);
inputs1.add(in1_2);
final RemoteProcessGroupPortDTO out1_1 = new RemoteProcessGroupPortDTO();
out1_1.setName("out1");
final Set<RemoteProcessGroupPortDTO> outputs1 = new HashSet<>();
outputs1.add(out1_1);
final RemoteProcessGroupContentsDTO contents1 = new RemoteProcessGroupContentsDTO();
contents1.setInputPorts(inputs1);
contents1.setOutputPorts(outputs1);
final RemoteProcessGroupDTO rpg1 = new RemoteProcessGroupDTO();
rpg1.setContents(contents1);
rpg1.setInputPortCount(2);
rpg1.setOutputPortCount(1);
final RemoteProcessGroupEntity entity1 = new RemoteProcessGroupEntity();
entity1.setPermissions(permissions);
entity1.setOperatePermissions(opsPermissions);
entity1.setStatus(status);
entity1.setComponent(rpg1);
final Set<RemoteProcessGroupPortDTO> inputs2 = new HashSet<>();
final Set<RemoteProcessGroupPortDTO> outputs2 = new HashSet<>();
final RemoteProcessGroupContentsDTO contents2 = new RemoteProcessGroupContentsDTO();
contents2.setInputPorts(inputs2);
contents2.setOutputPorts(outputs2);
final RemoteProcessGroupDTO rpg2 = new RemoteProcessGroupDTO();
rpg2.setContents(contents2);
rpg2.setInputPortCount(0);
rpg2.setOutputPortCount(0);
final RemoteProcessGroupEntity entity2 = new RemoteProcessGroupEntity();
entity2.setPermissions(permissions);
entity2.setOperatePermissions(opsPermissions);
entity2.setStatus(status);
entity2.setComponent(rpg2);
final Map<NodeIdentifier, RemoteProcessGroupEntity> nodeMap = new HashMap<>();
nodeMap.put(node1, entity1);
nodeMap.put(node2, entity2);
final RemoteProcessGroupEntityMerger merger = new RemoteProcessGroupEntityMerger();
merger.merge(entity1, nodeMap);
// should only include ports in common to all rpg's
assertEquals(0, entity1.getComponent().getContents().getInputPorts().size());
assertEquals(0, entity1.getComponent().getContents().getOutputPorts().size());
assertEquals(0, entity1.getComponent().getInputPortCount().intValue());
assertEquals(0, entity1.getComponent().getOutputPortCount().intValue());
}
} }