mirror of https://github.com/apache/nifi.git
NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin.
Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #1801
This commit is contained in:
parent
ce1bc42ac5
commit
6b71b4cbb8
|
@ -61,7 +61,7 @@ public class BulletinBoardEndpointMerger extends AbstractSingleDTOEndpoint<Bulle
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
|
clientDto.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, dtoMap.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,9 +92,9 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
|
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos, entityMap.size()));
|
||||||
clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos));
|
clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos, entityMap.size()));
|
||||||
clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos));
|
clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos, entityMap.size()));
|
||||||
|
|
||||||
// sort the bulletins
|
// sort the bulletins
|
||||||
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.cluster.manager;
|
package org.apache.nifi.cluster.manager;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
import org.apache.nifi.web.api.entity.BulletinEntity;
|
import org.apache.nifi.web.api.entity.BulletinEntity;
|
||||||
|
|
||||||
|
@ -26,10 +27,10 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
public final class BulletinMerger {
|
public final class BulletinMerger {
|
||||||
|
|
||||||
|
final static String ALL_NODES_MESSAGE = "All Nodes";
|
||||||
|
|
||||||
private BulletinMerger() {}
|
private BulletinMerger() {}
|
||||||
|
|
||||||
public static Comparator<BulletinEntity> BULLETIN_COMPARATOR = new Comparator<BulletinEntity>() {
|
public static Comparator<BulletinEntity> BULLETIN_COMPARATOR = new Comparator<BulletinEntity>() {
|
||||||
|
@ -54,7 +55,7 @@ public final class BulletinMerger {
|
||||||
*
|
*
|
||||||
* @param bulletins bulletins
|
* @param bulletins bulletins
|
||||||
*/
|
*/
|
||||||
public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) {
|
public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins, final int totalNodes) {
|
||||||
final List<BulletinEntity> bulletinEntities = new ArrayList<>();
|
final List<BulletinEntity> bulletinEntities = new ArrayList<>();
|
||||||
|
|
||||||
for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
|
for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
|
||||||
|
@ -76,9 +77,42 @@ public final class BulletinMerger {
|
||||||
|
|
||||||
final List<BulletinEntity> entities = Lists.newArrayList();
|
final List<BulletinEntity> entities = Lists.newArrayList();
|
||||||
|
|
||||||
final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
|
// group by message when permissions allow
|
||||||
groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add);
|
final Map<String,List<BulletinEntity>> groupingEntities = bulletinEntities.stream()
|
||||||
|
.filter(bulletinEntity -> bulletinEntity.getCanRead())
|
||||||
|
.collect(Collectors.groupingBy(b -> b.getBulletin().getMessage()));
|
||||||
|
|
||||||
|
// add one from each grouped bulletin when all nodes report the same message
|
||||||
|
groupingEntities.forEach((message, groupedBulletinEntities) -> {
|
||||||
|
if (groupedBulletinEntities.size() == totalNodes) {
|
||||||
|
// get the most current bulletin
|
||||||
|
final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream()
|
||||||
|
.max(Comparator.comparingLong(bulletinEntity -> {
|
||||||
|
if (bulletinEntity.getTimestamp() == null) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return bulletinEntity.getTimestamp().getTime();
|
||||||
|
}
|
||||||
|
})).orElse(null);
|
||||||
|
|
||||||
|
// should never be null, but just in case
|
||||||
|
if (selectedBulletinEntity != null) {
|
||||||
|
selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE);
|
||||||
|
selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE);
|
||||||
|
entities.add(selectedBulletinEntity);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// since all nodes didn't report the exact same bulletin, keep them all
|
||||||
|
entities.addAll(groupedBulletinEntities);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// ensure we also get the remainder of the bulletin entities
|
||||||
|
bulletinEntities.stream()
|
||||||
|
.filter(bulletinEntity -> !bulletinEntity.getCanRead())
|
||||||
|
.forEach(entities::add);
|
||||||
|
|
||||||
|
// ensure the bulletins are sorted by time
|
||||||
Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> {
|
Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> {
|
||||||
final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
|
final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
|
||||||
if (timeComparison != 0) {
|
if (timeComparison != 0) {
|
||||||
|
|
|
@ -59,7 +59,7 @@ public interface ComponentEntityMerger<EntityType extends ComponentEntity & Perm
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities));
|
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinEntities, entityMap.size()));
|
||||||
|
|
||||||
// sort the results
|
// sort the results
|
||||||
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
Collections.sort(clientEntity.getBulletins(), BULLETIN_COMPARATOR);
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.cluster.manager;
|
||||||
|
|
||||||
|
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
|
import org.apache.nifi.web.api.dto.BulletinDTO;
|
||||||
|
import org.apache.nifi.web.api.entity.BulletinEntity;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.nifi.cluster.manager.BulletinMerger.ALL_NODES_MESSAGE;
|
||||||
|
import static org.testng.Assert.assertEquals;
|
||||||
|
import static org.testng.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class BulletinMergerTest {
|
||||||
|
|
||||||
|
long bulletinId = 0;
|
||||||
|
|
||||||
|
private BulletinEntity createBulletin(final String message) {
|
||||||
|
final BulletinDTO bulletin = new BulletinDTO();
|
||||||
|
bulletin.setId(bulletinId++);
|
||||||
|
bulletin.setMessage(message);
|
||||||
|
bulletin.setTimestamp(new Date());
|
||||||
|
|
||||||
|
final BulletinEntity entity = new BulletinEntity();
|
||||||
|
entity.setId(bulletin.getId());
|
||||||
|
entity.setTimestamp(bulletin.getTimestamp());
|
||||||
|
entity.setCanRead(true);
|
||||||
|
entity.setBulletin(bulletin);
|
||||||
|
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mergeBulletins() throws Exception {
|
||||||
|
final BulletinEntity bulletinEntity1 = createBulletin("This is bulletin 1");
|
||||||
|
final BulletinEntity bulletinEntity2 = createBulletin("This is bulletin 2");
|
||||||
|
|
||||||
|
final BulletinEntity unauthorizedBulletin = new BulletinEntity();
|
||||||
|
unauthorizedBulletin.setId(bulletinId++);
|
||||||
|
unauthorizedBulletin.setTimestamp(new Date());
|
||||||
|
unauthorizedBulletin.setCanRead(false);
|
||||||
|
|
||||||
|
final BulletinEntity copyOfBulletin1 = createBulletin("This is bulletin 1");
|
||||||
|
|
||||||
|
final NodeIdentifier node1 = new NodeIdentifier("node-1", "host-1", 8080, "host-1", 19998, null, null, null, false);
|
||||||
|
final NodeIdentifier node2 = new NodeIdentifier("node-2", "host-2", 8081, "host-2", 19999, null, null, null, false);
|
||||||
|
|
||||||
|
final Map<NodeIdentifier, List<BulletinEntity>> nodeMap = new HashMap<>();
|
||||||
|
nodeMap.put(node1, new ArrayList<>());
|
||||||
|
nodeMap.put(node2, new ArrayList<>());
|
||||||
|
|
||||||
|
nodeMap.get(node1).add(bulletinEntity1);
|
||||||
|
nodeMap.get(node1).add(bulletinEntity2);
|
||||||
|
nodeMap.get(node1).add(unauthorizedBulletin);
|
||||||
|
|
||||||
|
nodeMap.get(node2).add(copyOfBulletin1);
|
||||||
|
|
||||||
|
final List<BulletinEntity> bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size());
|
||||||
|
assertEquals(bulletinEntities.size(), 3);
|
||||||
|
assertTrue(bulletinEntities.contains(copyOfBulletin1));
|
||||||
|
assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE);
|
||||||
|
assertTrue(bulletinEntities.contains(bulletinEntity2));
|
||||||
|
assertTrue(bulletinEntities.contains(unauthorizedBulletin));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue