NIFI-4973:

- Fixing RPG port merging.
- Adding unit tests.
- Removing unecessary sorting that wasn't maintained while clustered.

This closes #2551.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matt Gilman 2018-03-14 16:31:41 -04:00 committed by Mark Payne
parent 86f162b611
commit a1c917656e
3 changed files with 126 additions and 49 deletions

View File

@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<RemoteProcessGroupEntity>, ComponentEntityStatusMerger<RemoteProcessGroupStatusDTO> {
@Override
public void merge(RemoteProcessGroupEntity clientEntity, Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap) {
ComponentEntityMerger.super.merge(clientEntity, entityMap);
@ -76,9 +77,10 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
final Map<String, Set<NodeIdentifier>> authorizationErrorMap = new HashMap<>();
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
Boolean mergedIsTargetSecure = null;
final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>();
final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>();
Set<RemoteProcessGroupPortDTO> mergedInputPorts = null;
Set<RemoteProcessGroupPortDTO> mergedOutputPorts = null;
for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : dtoMap.entrySet()) {
final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeEntry.getValue();
@ -100,22 +102,32 @@ public class RemoteProcessGroupEntityMerger implements ComponentEntityMerger<Rem
// merge the ports in the contents
final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroup.getContents();
if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) {
if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) {
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
}
if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) {
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
}
final Set<RemoteProcessGroupPortDTO> nodeInputPorts = nodeRemoteProcessGroupContentsDto.getInputPorts();
if (nodeInputPorts != null) {
if (mergedInputPorts == null) {
mergedInputPorts = new HashSet<>(nodeInputPorts);
} else {
mergedInputPorts.retainAll(nodeInputPorts);
}
}
final Set<RemoteProcessGroupPortDTO> nodeOutputPorts = nodeRemoteProcessGroupContentsDto.getOutputPorts();
if (nodeOutputPorts != null) {
if (mergedOutputPorts == null) {
mergedOutputPorts = new HashSet<>(nodeOutputPorts);
} else {
mergedOutputPorts.retainAll(nodeOutputPorts);
}
}
}
}
}
if (remoteProcessGroupContents != null) {
if (!mergedInputPorts.isEmpty()) {
if (mergedInputPorts != null && !mergedInputPorts.isEmpty()) {
remoteProcessGroupContents.setInputPorts(mergedInputPorts);
}
if (!mergedOutputPorts.isEmpty()) {
if (mergedOutputPorts != null && !mergedOutputPorts.isEmpty()) {
remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
}
}

View File

@ -0,0 +1,103 @@
package org.apache.nifi.cluster.manager;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class RemoteProcessGroupEntityMergerTest {
@Test
public void testMergeRemoteProcessGroups() throws Exception {
final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
final PermissionsDTO permissed = new PermissionsDTO();
permissed.setCanRead(true);
permissed.setCanWrite(true);
final RemoteProcessGroupStatusDTO status = new RemoteProcessGroupStatusDTO();
status.setAggregateSnapshot(new RemoteProcessGroupStatusSnapshotDTO());
final RemoteProcessGroupPortDTO in1_1 = new RemoteProcessGroupPortDTO();
in1_1.setName("in1");
final RemoteProcessGroupPortDTO in1_2 = new RemoteProcessGroupPortDTO();
in1_2.setName("in2");
final Set<RemoteProcessGroupPortDTO> inputs1 = new HashSet<>();
inputs1.add(in1_1);
inputs1.add(in1_2);
final RemoteProcessGroupPortDTO out1_1 = new RemoteProcessGroupPortDTO();
out1_1.setName("out1");
final Set<RemoteProcessGroupPortDTO> outputs1 = new HashSet<>();
outputs1.add(out1_1);
final RemoteProcessGroupContentsDTO contents1 = new RemoteProcessGroupContentsDTO();
contents1.setInputPorts(inputs1);
contents1.setOutputPorts(outputs1);
final RemoteProcessGroupDTO rpg1 = new RemoteProcessGroupDTO();
rpg1.setContents(contents1);
final RemoteProcessGroupEntity entity1 = new RemoteProcessGroupEntity();
entity1.setPermissions(permissed);
entity1.setStatus(status);
entity1.setComponent(rpg1);
final RemoteProcessGroupPortDTO in2_1 = new RemoteProcessGroupPortDTO();
in2_1.setName("in1");
final Set<RemoteProcessGroupPortDTO> inputs2 = new HashSet<>();
inputs2.add(in2_1);
final RemoteProcessGroupPortDTO out2_1 = new RemoteProcessGroupPortDTO();
out2_1.setName("out1");
final RemoteProcessGroupPortDTO out2_2 = new RemoteProcessGroupPortDTO();
out2_2.setName("out2");
final Set<RemoteProcessGroupPortDTO> outputs2 = new HashSet<>();
outputs2.add(out2_1);
outputs2.add(out2_2);
final RemoteProcessGroupContentsDTO contents2 = new RemoteProcessGroupContentsDTO();
contents2.setInputPorts(inputs2);
contents2.setOutputPorts(outputs2);
final RemoteProcessGroupDTO rpg2 = new RemoteProcessGroupDTO();
rpg2.setContents(contents2);
final RemoteProcessGroupEntity entity2 = new RemoteProcessGroupEntity();
entity2.setPermissions(permissed);
entity2.setStatus(status);
entity2.setComponent(rpg2);
final Map<NodeIdentifier, RemoteProcessGroupEntity> nodeMap = new HashMap<>();
nodeMap.put(node1, entity1);
nodeMap.put(node2, entity2);
final RemoteProcessGroupEntityMerger merger = new RemoteProcessGroupEntityMerger();
merger.merge(entity1, nodeMap);
// should only include ports in common to all rpg's
assertEquals(1, entity1.getComponent().getContents().getInputPorts().size());
assertEquals("in1", entity1.getComponent().getContents().getInputPorts().iterator().next().getName());
assertEquals(1, entity1.getComponent().getContents().getOutputPorts().size());
assertEquals("out1", entity1.getComponent().getContents().getOutputPorts().iterator().next().getName());
}
}

View File

@ -1595,8 +1595,8 @@ public final class DtoFactory {
return null;
}
final Set<RemoteProcessGroupPortDTO> inputPorts = new TreeSet<>(new DtoFactory.SortedRemoteGroupPortComparator());
final Set<RemoteProcessGroupPortDTO> outputPorts = new TreeSet<>(new DtoFactory.SortedRemoteGroupPortComparator());
final Set<RemoteProcessGroupPortDTO> inputPorts = new HashSet<>();
final Set<RemoteProcessGroupPortDTO> outputPorts = new HashSet<>();
int activeRemoteInputPortCount = 0;
int inactiveRemoteInputPortCount = 0;
@ -4082,44 +4082,6 @@ public final class DtoFactory {
return copy;
}
private static class SortedRemoteGroupPortComparator implements Comparator<RemoteProcessGroupPortDTO> {
@Override
public int compare(final RemoteProcessGroupPortDTO o1, final RemoteProcessGroupPortDTO o2) {
if (o2 == null) {
return -1;
} else if (o1 == null) {
return 1;
}
final String name1 = o1.getName();
final String name2 = o2.getName();
if (name2 == null) {
return -1;
} else if (name1 == null) {
return 1;
} else {
int compareResult = Collator.getInstance(Locale.US).compare(name2, name2);
// if the names are same, use the id
if (compareResult == 0) {
final String id1 = o1.getId();
final String id2 = o2.getId();
if (id2 == null) {
compareResult = -1;
} else if (id1 == null) {
compareResult = 1;
} else {
compareResult = id1.compareTo(id2);
}
}
return compareResult;
}
}
}
/**
* Factory method for creating a new RevisionDTO based on this controller.
*