From 10b91ffe6c192b15c643f6d17c29924676208cc0 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 20 Jul 2016 14:59:58 -0400 Subject: [PATCH] NIFI-2332: Ensure that bulletin node address is set when clustered but not overwritten if already set This closes #692 Signed-off-by: jpercivall --- .../ControllerBulletinsEndpointMerger.java | 16 +++++++++++++++- .../ControllerStatusEndpointMerger.java | 1 - .../nifi/cluster/manager/BulletinMerger.java | 5 ++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java index 6e79bc1be7..690f1cccde 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerBulletinsEndpointMerger.java @@ -38,7 +38,7 @@ import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_FOR_CON import static org.apache.nifi.reporting.BulletinRepository.MAX_BULLETINS_PER_COMPONENT; public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpoint implements EndpointResponseMerger { - public static final Pattern CONTROLLER_BULLETINS_URI_PATTERN = Pattern.compile("/nifi-api/controller/bulletins"); + public static final Pattern CONTROLLER_BULLETINS_URI_PATTERN = Pattern.compile("/nifi-api/flow/controller/bulletins"); @Override public boolean canHandle(URI uri, String method) { @@ -60,24 +60,38 @@ public class ControllerBulletinsEndpointMerger extends AbstractSingleEntityEndpo for (final Map.Entry entry : entityMap.entrySet()) { final NodeIdentifier nodeIdentifier = entry.getKey(); final ControllerBulletinsEntity entity = entry.getValue(); + final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort(); // consider the bulletins if present and authorized if (entity.getBulletins() != null) { entity.getBulletins().forEach(bulletin -> { + if (bulletin.getNodeAddress() == null) { + bulletin.setNodeAddress(nodeAddress); + } + bulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); }); } if (entity.getControllerServiceBulletins() != null) { entity.getControllerServiceBulletins().forEach(bulletin -> { + if (bulletin.getNodeAddress() == null) { + bulletin.setNodeAddress(nodeAddress); + } + controllerServiceBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); }); } if (entity.getReportingTaskBulletins() != null) { entity.getReportingTaskBulletins().forEach(bulletin -> { + if (bulletin.getNodeAddress() == null) { + bulletin.setNodeAddress(nodeAddress); + } + reportingTaskBulletinDtos.computeIfAbsent(nodeIdentifier, nodeId -> new ArrayList<>()).add(bulletin); }); } } + clientEntity.setBulletins(BulletinMerger.mergeBulletins(bulletinDtos)); clientEntity.setControllerServiceBulletins(BulletinMerger.mergeBulletins(controllerServiceBulletinDtos)); clientEntity.setReportingTaskBulletins(BulletinMerger.mergeBulletins(reportingTaskBulletinDtos)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java index 26c7de6858..46346240cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerStatusEndpointMerger.java @@ -50,7 +50,6 @@ public class ControllerStatusEndpointMerger extends AbstractSingleDTOEndpoint dtoMap, Set successfulResponses, Set problematicResponses) { ControllerStatusDTO mergedStatus = clientDto; for (final Map.Entry entry : dtoMap.entrySet()) { - final NodeIdentifier nodeId = entry.getKey(); final ControllerStatusDTO nodeStatus = entry.getValue(); if (nodeStatus == mergedStatus) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java index 0d32244c64..431c83458f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/BulletinMerger.java @@ -62,7 +62,10 @@ public final class BulletinMerger { final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); for (final BulletinDTO bulletin : nodeBulletins) { - bulletin.setNodeAddress(nodeAddress); + if (bulletin.getNodeAddress() == null) { + bulletin.setNodeAddress(nodeAddress); + } + bulletinDtos.add(bulletin); } }