From c79ad1502e7938822e5c10693c294538996f5a61 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2020 10:35:21 -0500 Subject: [PATCH] NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking that predicate required synchronization. Signed-off-by: Pierre Villard This closes #4657. --- .../client/async/nio/NioAsyncLoadBalanceClient.java | 2 +- .../client/async/nio/NioAsyncLoadBalanceClientTask.java | 7 ++++++- .../queue/clustered/partition/RemoteQueuePartition.java | 6 +++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java index 1257a3ce2e..ac2a561cee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java @@ -357,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient { return null; } finally { - polledPartitions.forEach(partitionQueue::offer); + partitionQueue.addAll(polledPartitions); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java index 5636e224aa..b6990529d3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.java @@ -27,6 +27,9 @@ import org.apache.nifi.reporting.Severity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + public class NioAsyncLoadBalanceClientTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class); private static final String EVENT_CATEGORY = "Load-Balanced Connection"; @@ -34,6 +37,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable { private final NioAsyncLoadBalanceClientRegistry clientRegistry; private final ClusterCoordinator clusterCoordinator; private final EventReporter eventReporter; + private final Map nodeConnectionStates = new HashMap<>(); private volatile boolean running = true; public NioAsyncLoadBalanceClientTask(final NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) { @@ -66,7 +70,8 @@ public class NioAsyncLoadBalanceClientTask implements Runnable { } final NodeConnectionState connectionState = connectionStatus.getState(); - if (connectionState != NodeConnectionState.CONNECTED) { + final NodeConnectionState previousState = nodeConnectionStates.put(client.getNodeIdentifier(), connectionState); + if (connectionState != NodeConnectionState.CONNECTED && previousState == NodeConnectionState.CONNECTED) { logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState); client.nodeDisconnected(); continue; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java index 144a043f5b..d25203a9bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java @@ -220,13 +220,17 @@ public class RemoteQueuePartition implements QueuePartition { // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother // creating the connection to send data. - final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable(); + final BooleanSupplier emptySupplier = this::isQueueEmpty; clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile, failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes); running = true; } + private boolean isQueueEmpty() { + return !priorityQueue.isFlowFileAvailable(); + } + public void onRemoved() { clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier); }