NIFI-2319: Ensure that when we set cluster node id's and node addresses, that we do so only if they are not already populated

This closes #680

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-07-19 15:33:59 -04:00 committed by jpercivall
parent 108c815988
commit 2a7f135f1c
3 changed files with 17 additions and 7 deletions

View File

@ -74,8 +74,11 @@ public class ComponentStateEndpointMerger extends AbstractSingleDTOEndpoint<Comp
totalStateEntries += nodeLocalStateMap.getTotalEntryCount();
for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) {
if (nodeStateEntry.getClusterNodeId() == null || nodeStateEntry.getClusterNodeAddress() == null) {
nodeStateEntry.setClusterNodeId(nodeId.getId());
nodeStateEntry.setClusterNodeAddress(nodeAddress);
}
localStateEntries.add(nodeStateEntry);
}
}

View File

@ -124,8 +124,10 @@ public class ListFlowFilesEndpointMerger extends AbstractSingleDTOEndpoint<Listi
if (nodeRequest.getFlowFileSummaries() != null) {
for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
if (summaryDTO.getClusterNodeId() == null || summaryDTO.getClusterNodeAddress() == null) {
summaryDTO.setClusterNodeId(nodeIdentifier.getId());
summaryDTO.setClusterNodeAddress(nodeAddress);
}
flowFileSummaries.add(summaryDTO);

View File

@ -48,8 +48,13 @@ public class ProvenanceEventEndpointMerger extends AbstractSingleDTOEndpoint<Pro
@Override
protected void mergeResponses(ProvenanceEventDTO clientDto, Map<NodeIdentifier, ProvenanceEventDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
// The request for a Provenance Event is replicated to a single Node. We simply update its cluster node info.
// However, we only do this if the cluster node info isn't set, because if this is replicated across the cluster,
// the cluster coordinator will have already set it, and we will be receiving the response from the cluster
// coordinator. We do not want to overwrite this value on all DTO's with the cluster coordinator's information.
if (clientDto.getClusterNodeId() == null || clientDto.getClusterNodeAddress() == null) {
final NodeIdentifier nodeId = successfulResponses.iterator().next().getNodeId();
clientDto.setClusterNodeId(nodeId.getId());
clientDto.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
}
}
}