diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java index 10019128b0..0aa705c5ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java @@ -61,7 +61,7 @@ public class BulletinBoardEndpointMerger extends AbstractSingleDTOEndpoint BULLETIN_COMPARATOR = new Comparator() { @@ -54,7 +55,7 @@ public final class BulletinMerger { * * @param bulletins bulletins */ - public static List mergeBulletins(final Map> bulletins) { + public static List mergeBulletins(final Map> bulletins, final int totalNodes) { final List bulletinEntities = new ArrayList<>(); for (final Map.Entry> entry : bulletins.entrySet()) { @@ -76,9 +77,42 @@ public final class BulletinMerger { final List entities = Lists.newArrayList(); - final Map> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); - groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add); + // group by message when permissions allow + final Map> groupingEntities = bulletinEntities.stream() + .filter(bulletinEntity -> bulletinEntity.getCanRead()) + .collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); + // add one from each grouped bulletin when all nodes report the same message + groupingEntities.forEach((message, groupedBulletinEntities) -> { + if (groupedBulletinEntities.size() == totalNodes) { + // get the most current bulletin + final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream() + .max(Comparator.comparingLong(bulletinEntity -> { + if (bulletinEntity.getTimestamp() == null) { + return 0; + } else { + return bulletinEntity.getTimestamp().getTime(); + } + })).orElse(null); + + // should never be null, but just in case + if (selectedBulletinEntity != null) { + selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE); + selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE); + entities.add(selectedBulletinEntity); + } + } else { + // since all nodes didn't report the exact same bulletin, keep them all + entities.addAll(groupedBulletinEntities); + } + }); + + // ensure we also get the remainder of the bulletin entities + bulletinEntities.stream() + .filter(bulletinEntity -> !bulletinEntity.getCanRead()) + .forEach(entities::add); + + // ensure the bulletins are sorted by time Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> { final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); if (timeComparison != 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java index eda3c0f668..f7c28fd889 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java @@ -59,7 +59,7 @@ public interface ComponentEntityMerger> nodeMap = new HashMap<>(); + nodeMap.put(node1, new ArrayList<>()); + nodeMap.put(node2, new ArrayList<>()); + + nodeMap.get(node1).add(bulletinEntity1); + nodeMap.get(node1).add(bulletinEntity2); + nodeMap.get(node1).add(unauthorizedBulletin); + + nodeMap.get(node2).add(copyOfBulletin1); + + final List bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size()); + assertEquals(bulletinEntities.size(), 3); + assertTrue(bulletinEntities.contains(copyOfBulletin1)); + assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE); + assertTrue(bulletinEntities.contains(bulletinEntity2)); + assertTrue(bulletinEntities.contains(unauthorizedBulletin)); + } + +} \ No newline at end of file