NIFI-5153: If a node is disconnected due to failure to complete mutable request, the node should be allowed to rejoin

This closes #2677.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-05-04 09:31:51 -04:00 committed by Bryan Bende
parent 4544f3969d
commit 8acac9cba5
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 6 additions and 3 deletions

View File

@ -373,7 +373,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
final Map<NodeConnectionState, List<NodeIdentifier>> connectionStates = new HashMap<>(); final Map<NodeConnectionState, List<NodeIdentifier>> connectionStates = new HashMap<>();
for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : nodeStatuses.entrySet()) { for (final Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : nodeStatuses.entrySet()) {
final NodeConnectionState state = entry.getValue().getState(); final NodeConnectionState state = entry.getValue().getState();
final List<NodeIdentifier> nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList<NodeIdentifier>()); final List<NodeIdentifier> nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList<>());
nodeIds.add(entry.getKey()); nodeIds.add(entry.getKey());
} }
@ -998,9 +998,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// disconnect problematic nodes // disconnect problematic nodes
if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) { if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) {
final Set<NodeIdentifier> failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet()); final Set<NodeIdentifier> failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet());
logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node disconnect from cluster.", uriPath, failedNodeIds)); logger.warn(String.format("The following nodes failed to process URI %s '%s'. Requesting each node reconnect to cluster.", uriPath, failedNodeIds));
for (final NodeIdentifier nodeId : failedNodeIds) { for (final NodeIdentifier nodeId : failedNodeIds) {
requestNodeDisconnect(nodeId, DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + method + " " + uriPath); // Update the node to 'CONNECTING' status and request that the node connect
final NodeConnectionStatus reconnectionStatus = new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING);
updateNodeStatus(reconnectionStatus);
requestNodeConnect(nodeId, null);
} }
} }
} }