mirror of https://github.com/apache/nifi.git
NIFI-2488 Fixed BulletinDTOs not getting their node addresses set properly during bulletin merging. This closes #888
This commit is contained in:
parent
46b81058c7
commit
31ca8d02f7
|
@ -22,10 +22,8 @@ import org.apache.nifi.web.api.entity.BulletinEntity;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
public final class BulletinMerger {
|
public final class BulletinMerger {
|
||||||
|
|
||||||
|
@ -54,23 +52,26 @@ public final class BulletinMerger {
|
||||||
* @param bulletins bulletins
|
* @param bulletins bulletins
|
||||||
*/
|
*/
|
||||||
public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) {
|
public static List<BulletinEntity> mergeBulletins(final Map<NodeIdentifier, List<BulletinEntity>> bulletins) {
|
||||||
final List<BulletinEntity> bulletinDtos = new ArrayList<>();
|
final List<BulletinEntity> bulletinEntities = new ArrayList<>();
|
||||||
|
|
||||||
for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
|
for (final Map.Entry<NodeIdentifier, List<BulletinEntity>> entry : bulletins.entrySet()) {
|
||||||
final NodeIdentifier nodeId = entry.getKey();
|
final NodeIdentifier nodeId = entry.getKey();
|
||||||
final List<BulletinEntity> nodeBulletins = entry.getValue();
|
final List<BulletinEntity> nodeBulletins = entry.getValue();
|
||||||
final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
|
final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
|
||||||
|
|
||||||
for (final BulletinEntity bulletin : nodeBulletins) {
|
for (final BulletinEntity bulletinEntity : nodeBulletins) {
|
||||||
if (bulletin.getNodeAddress() == null) {
|
if (bulletinEntity.getNodeAddress() == null) {
|
||||||
bulletin.setNodeAddress(nodeAddress);
|
bulletinEntity.setNodeAddress(nodeAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
bulletinDtos.add(bulletin);
|
if (bulletinEntity.getCanRead() && bulletinEntity.getBulletin() != null && bulletinEntity.getBulletin().getNodeAddress() == null) {
|
||||||
|
bulletinEntity.getBulletin().setNodeAddress(nodeAddress);
|
||||||
|
}
|
||||||
|
bulletinEntities.add(bulletinEntity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Collections.sort(bulletinDtos, (BulletinEntity o1, BulletinEntity o2) -> {
|
Collections.sort(bulletinEntities, (BulletinEntity o1, BulletinEntity o2) -> {
|
||||||
final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
|
final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp());
|
||||||
if (timeComparison != 0) {
|
if (timeComparison != 0) {
|
||||||
return timeComparison;
|
return timeComparison;
|
||||||
|
@ -79,28 +80,6 @@ public final class BulletinMerger {
|
||||||
return o1.getNodeAddress().compareTo(o2.getNodeAddress());
|
return o1.getNodeAddress().compareTo(o2.getNodeAddress());
|
||||||
});
|
});
|
||||||
|
|
||||||
return bulletinDtos;
|
return bulletinEntities;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Normalizes the validation errors.
|
|
||||||
*
|
|
||||||
* @param validationErrorMap validation errors for each node
|
|
||||||
* @param totalNodes total number of nodes
|
|
||||||
* @return the normalized validation errors
|
|
||||||
*/
|
|
||||||
public static Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
|
|
||||||
final Set<String> normalizedValidationErrors = new HashSet<>();
|
|
||||||
for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
|
|
||||||
final String msg = validationEntry.getKey();
|
|
||||||
final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
|
|
||||||
|
|
||||||
if (nodeIds.size() == totalNodes) {
|
|
||||||
normalizedValidationErrors.add(msg);
|
|
||||||
} else {
|
|
||||||
nodeIds.forEach(id -> normalizedValidationErrors.add(id.getApiAddress() + ":" + id.getApiPort() + " -- " + msg));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return normalizedValidationErrors;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue