NIFI-7999: Do not call NioAsyncLoadBalanceClient.nodeDisconnected() if node was already in a disconnected state. Doing so was resulting in that method being called constantly on startup, and with the synchronization in place that can result in a huge performance hit on startup. Also updated RemoteQueuePartition to move a small predicate into its own method. This was done because the predicate was previously defined within a synchronized method, which meant that invoking that predicate required synchronization.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4657.
This commit is contained in:
Mark Payne 2020-11-12 10:35:21 -05:00 committed by Pierre Villard
parent 018778a25d
commit c79ad1502e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 12 additions and 3 deletions

View File

@ -357,7 +357,7 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
return null;
} finally {
polledPartitions.forEach(partitionQueue::offer);
partitionQueue.addAll(polledPartitions);
}
}

View File

@ -27,6 +27,9 @@ import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class NioAsyncLoadBalanceClientTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
private static final String EVENT_CATEGORY = "Load-Balanced Connection";
@ -34,6 +37,7 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
private final NioAsyncLoadBalanceClientRegistry clientRegistry;
private final ClusterCoordinator clusterCoordinator;
private final EventReporter eventReporter;
private final Map<NodeIdentifier, NodeConnectionState> nodeConnectionStates = new HashMap<>();
private volatile boolean running = true;
public NioAsyncLoadBalanceClientTask(final NioAsyncLoadBalanceClientRegistry clientRegistry, final ClusterCoordinator clusterCoordinator, final EventReporter eventReporter) {
@ -66,7 +70,8 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
}
final NodeConnectionState connectionState = connectionStatus.getState();
if (connectionState != NodeConnectionState.CONNECTED) {
final NodeConnectionState previousState = nodeConnectionStates.put(client.getNodeIdentifier(), connectionState);
if (connectionState != NodeConnectionState.CONNECTED && previousState == NodeConnectionState.CONNECTED) {
logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
client.nodeDisconnected();
continue;

View File

@ -220,13 +220,17 @@ public class RemoteQueuePartition implements QueuePartition {
// determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
// until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
// creating the connection to send data.
final BooleanSupplier emptySupplier = () -> !priorityQueue.isFlowFileAvailable();
final BooleanSupplier emptySupplier = this::isQueueEmpty;
clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
running = true;
}
private boolean isQueueEmpty() {
return !priorityQueue.isFlowFileAvailable();
}
public void onRemoved() {
clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
}