NIFI-2332: Ensure that bulletin node address is set when clustered but not overwritten if already set

This closes #692

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-07-20 14:59:58 -04:00 committed by jpercivall
parent 7f2bda29d9
commit 10b91ffe6c
3 changed files with 19 additions and 3 deletions

View File

@ -38,7 +38,7 @@ import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_FOR_CON
import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT; import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT;
public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpoint<ControllerBulletinsEntity> implements EndpointResponseMerger { public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpoint<ControllerBulletinsEntity> implements EndpointResponseMerger {
public static final Pattern CONTROLLER_BULLETINS_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletins"); public static final Pattern CONTROLLER_BULLETINS_URI_PATTERN = Pattern.compile("/nifi-api/flow/controller/bulletins");
@Override @Override
public boolean canHandle(URI uri, String method) { public boolean canHandle(URI uri, String method) {
@ -60,24 +60,38 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo
for (final Map.Entry<NodeIdentifier, ControllerBulletinsEntity> entry : entityMap.entrySet()) { for (final Map.Entry<NodeIdentifier, ControllerBulletinsEntity> entry : entityMap.entrySet()) {
final NodeIdentifier nodeIdentifier = entry.getKey(); final NodeIdentifier nodeIdentifier = entry.getKey();
final ControllerBulletinsEntity entity = entry.getValue(); final ControllerBulletinsEntity entity = entry.getValue();
final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
// consider the bulletins if present and authorized // consider the bulletins if present and authorized
if (entity.getBulletins() != null) { if (entity.getBulletins() != null) {
entity.getBulletins().forEach(bulletin -> { entity.getBulletins().forEach(bulletin -> {
if (bulletin.getNodeAddress() == null) {
bulletin.setNodeAddress(nodeAddress);
}
bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
}); });
} }
if (entity.getControllerServiceBulletins() != null) { if (entity.getControllerServiceBulletins() != null) {
entity.getControllerServiceBulletins().forEach(bulletin -> { entity.getControllerServiceBulletins().forEach(bulletin -> {
if (bulletin.getNodeAddress() == null) {
bulletin.setNodeAddress(nodeAddress);
}
controllerServiceBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); controllerServiceBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
}); });
} }
if (entity.getReportingTaskBulletins() != null) { if (entity.getReportingTaskBulletins() != null) {
entity.getReportingTaskBulletins().forEach(bulletin -> { entity.getReportingTaskBulletins().forEach(bulletin -> {
if (bulletin.getNodeAddress() == null) {
bulletin.setNodeAddress(nodeAddress);
}
reportingTaskBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); reportingTaskBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin);
}); });
} }
} }
clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos));
clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos)); clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos));
clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos)); clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos));

View File

@ -50,7 +50,6 @@ public class ControllerStatusEndpointMerger extends AbstractSingleDTOEndpoint<Co
protected void mergeResponses(ControllerStatusDTO clientDto, Map<NodeIdentifier, ControllerStatusDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) { protected void mergeResponses(ControllerStatusDTO clientDto, Map<NodeIdentifier, ControllerStatusDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
ControllerStatusDTO mergedStatus = clientDto; ControllerStatusDTO mergedStatus = clientDto;
for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : dtoMap.entrySet()) { for (final Map.Entry<NodeIdentifier, ControllerStatusDTO> entry : dtoMap.entrySet()) {
final NodeIdentifier nodeId = entry.getKey();
final ControllerStatusDTO nodeStatus = entry.getValue(); final ControllerStatusDTO nodeStatus = entry.getValue();
if (nodeStatus == mergedStatus) { if (nodeStatus == mergedStatus) {

View File

@ -62,7 +62,10 @@ public final class BulletinMerger {
final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
for (final BulletinDTO bulletin : nodeBulletins) { for (final BulletinDTO bulletin : nodeBulletins) {
bulletin.setNodeAddress(nodeAddress); if (bulletin.getNodeAddress() == null) {
bulletin.setNodeAddress(nodeAddress);
}
bulletinDtos.add(bulletin); bulletinDtos.add(bulletin);
} }
} }