NIFI-5585: Adjustments to the Connection Load Balancing to ensure that node offloading works smoothly

Signed-off-by: Jeff Storck <jtswork@gmail.com>
This commit is contained in:
Mark Payne 2018-10-08 09:53:14 -04:00
parent 01e2098d24
commit a1a4c99763
4 changed files with 24 additions and 10 deletions

View File

@ -39,6 +39,8 @@ public interface AsyncLoadBalanceClient {
void unregister(String connectionId);
int getRegisteredConnectionCount();
boolean isRunning();
boolean isPenalized();

View File

@ -119,6 +119,10 @@ public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
registeredPartitions.remove(connectionId);
}
public synchronized int getRegisteredConnectionCount() {
return registeredPartitions.size();
}
private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() {
return new HashMap<>(registeredPartitions);
}

View File

@ -67,15 +67,27 @@ public class NioAsyncLoadBalanceClientRegistry implements AsyncLoadBalanceClient
@Override
public synchronized void unregister(final String connectionId, final NodeIdentifier nodeId) {
final Set<AsyncLoadBalanceClient> clients = clientMap.remove(nodeId);
final Set<AsyncLoadBalanceClient> clients = clientMap.get(nodeId);
if (clients == null) {
return;
}
clients.forEach(client -> client.unregister(connectionId));
final Set<AsyncLoadBalanceClient> toRemove = new HashSet<>();
for (final AsyncLoadBalanceClient client : clients) {
client.unregister(connectionId);
if (client.getRegisteredConnectionCount() == 0) {
toRemove.add(client);
}
}
allClients.removeAll(clients);
logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}", connectionId, nodeId);
clients.removeAll(toRemove);
allClients.removeAll(toRemove);
if (clients.isEmpty()) {
clientMap.remove(nodeId);
}
logger.debug("Un-registered Connection with ID {} so that it will no longer send data to Node {}; {} clients were removed", connectionId, nodeId, toRemove.size());
}
private Set<AsyncLoadBalanceClient> registerClients(final NodeIdentifier nodeId) {

View File

@ -66,13 +66,9 @@ public class NioAsyncLoadBalanceClientTask implements Runnable {
}
final NodeConnectionState connectionState = connectionStatus.getState();
if (connectionState == NodeConnectionState.DISCONNECTED || connectionState == NodeConnectionState.DISCONNECTING) {
client.nodeDisconnected();
continue;
}
if (connectionState != NodeConnectionState.CONNECTED) {
logger.debug("Client {} is for node that is not currently connected (state = {}) so will not communicate with node", client, connectionState);
logger.debug("Notifying Client {} that node is not connected because current state is {}", client, connectionState);
client.nodeDisconnected();
continue;
}