mirror of https://github.com/apache/nifi.git
NIFI-10362: When asynchronous node disconnect is issued, do not send disconnect to node if the node becomes reconnected in the interim. Also, addressed the issue in which a disconnected node acts on a replicated request during the first phase by detect that it's the first phase if configured for cluster, not when only when connected to a cluster.
This closes #6308 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
0afdd32bb5
commit
21503f6353
|
@ -989,6 +989,19 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
||||||
|
|
||||||
Exception lastException = null;
|
Exception lastException = null;
|
||||||
for (int i = 0; i < attempts; i++) {
|
for (int i = 0; i < attempts; i++) {
|
||||||
|
// If the node is restarted, it will attempt to reconnect. In that case, we don't want to disconnect the node
|
||||||
|
// again. So we instead log the fact that the state has now transitioned to this point and consider the task completed.
|
||||||
|
final NodeConnectionState currentConnectionState = getConnectionState(nodeId);
|
||||||
|
if (currentConnectionState == NodeConnectionState.CONNECTING || currentConnectionState == NodeConnectionState.CONNECTED) {
|
||||||
|
reportEvent(nodeId, Severity.INFO, String.format(
|
||||||
|
"State of Node %s has now transitioned from DISCONNECTED to %s so will no longer attempt to notify node that it is disconnected.", nodeId, currentConnectionState));
|
||||||
|
future.completeExceptionally(new IllegalStateException("Node was marked as disconnected but its state transitioned from DISCONNECTED back to " + currentConnectionState +
|
||||||
|
" before the node could be notified. This typically indicates that the node was restarted."));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send disconnect notice to the node
|
||||||
try {
|
try {
|
||||||
senderListener.disconnect(request);
|
senderListener.disconnect(request);
|
||||||
reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
|
reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
|
||||||
|
|
|
@ -371,7 +371,7 @@ public abstract class ApplicationResource {
|
||||||
*/
|
*/
|
||||||
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
|
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
|
||||||
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||||
return transactionId != null && isConnectedToCluster();
|
return transactionId != null && isClustered();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -245,6 +245,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
||||||
getClientUtil().enableControllerService(countService);
|
getClientUtil().enableControllerService(countService);
|
||||||
getClientUtil().enableControllerService(sleepService);
|
getClientUtil().enableControllerService(sleepService);
|
||||||
getClientUtil().startReportingTask(reportingTask);
|
getClientUtil().startReportingTask(reportingTask);
|
||||||
|
getClientUtil().waitForValidProcessor(count.getId()); // Now that service was enabled, wait for processor to become valid.
|
||||||
getClientUtil().startProcessGroupComponents(group.getId());
|
getClientUtil().startProcessGroupComponents(group.getId());
|
||||||
getClientUtil().startProcessor(terminate);
|
getClientUtil().startProcessor(terminate);
|
||||||
getClientUtil().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
@ -603,6 +604,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
||||||
getClientUtil().enableControllerService(countService);
|
getClientUtil().enableControllerService(countService);
|
||||||
getClientUtil().enableControllerService(sleepService);
|
getClientUtil().enableControllerService(sleepService);
|
||||||
getClientUtil().startReportingTask(reportingTask);
|
getClientUtil().startReportingTask(reportingTask);
|
||||||
|
getClientUtil().waitForValidProcessor(count.getId()); // Now that service was enabled, wait for processor to become valid.
|
||||||
getClientUtil().startProcessGroupComponents(group.getId());
|
getClientUtil().startProcessGroupComponents(group.getId());
|
||||||
getClientUtil().startProcessor(terminate);
|
getClientUtil().startProcessor(terminate);
|
||||||
getClientUtil().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
Loading…
Reference in New Issue