diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index 1257a3ce2e..ac2a561cee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -357,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { return null; } finally { - polledPartitions.forEach(partitionQueue::offer); + partitionQueue.addAll(polledPartitions); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java index 5636e224aa..b6990529d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java @@ -27,6 +27,9 @@ import org.apache.nifi.reporting.Severity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + public class NioAsyncLoadBalanceClientTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class); private static final String EVENT_CATEGORY = "Load-Balanced Connection"; @@ -34,6 +37,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable { private final NioAsyncLoadBalanceClientRegistry clientRegistry; private final ClusterCoordinator clusterCoordinator; private final EventReporter eventReporter; + private final Map nodeConnectionStates = new HashMap<>(); private volatile boolean running = true; public NioAsyncLoadBalanceClientTask(final NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) { @@ -66,7 +70,8 @@ public class NioAsyncLoadBalanceClientTask implements Runnable { } final NodeConnectionState connectionState = connectionStatus.getState(); - if (connectionState != NodeConnectionState.CONNECTED) { + final NodeConnectionState previousState = nodeConnectionStates.put(client.getNodeIdentifier(), connectionState); + if (connectionState != NodeConnectionState.CONNECTED && previousState == NodeConnectionState.CONNECTED) { logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState); client.nodeDisconnected(); continue; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java index 144a043f5b..d25203a9bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java @@ -220,13 +220,17 @@ public class RemoteQueuePartition implements QueuePartition { // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother // creating the connection to send data. - final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable(); + final BooleanSupplier emptySupplier = this::isQueueEmpty; clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile, failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes); running = true; } + private boolean isQueueEmpty() { + return !priorityQueue.isFlowFileAvailable(); + } + public void onRemoved() { clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier); }