NIFI-12532: This closes #8179. Ensure that when CommunicateAction completes (exceptionally or otherwise) that it gets removed from the list of all CommunicationActions

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-12-21 10:49:15 -05:00 committed by Joseph Witt
parent 77093671e0
commit c670161cb0
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A

View File

@ -17,6 +17,17 @@
package org.apache.nifi.controller.queue.clustered.server;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.TlsPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLServerSocket;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
@ -31,17 +42,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLServerSocket;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.TlsPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConnectionLoadBalanceServer {
private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
@ -104,10 +104,12 @@ public class ConnectionLoadBalanceServer {
acceptConnection.stop();
}
final Iterator<CommunicateAction> itr = communicationActions.iterator();
while (itr.hasNext()) {
itr.next().stop();
itr.remove();
synchronized (communicationActions) { // Must synchronize on Synchronized List when using iterator
final Iterator<CommunicateAction> itr = communicationActions.iterator();
while (itr.hasNext()) {
itr.next().stop();
itr.remove();
}
}
}
@ -135,8 +137,7 @@ public class ConnectionLoadBalanceServer {
private volatile boolean stopped = false;
// This should be final but it is not to allow override during testing; no production code modifies the value
private static int EXCEPTION_THRESHOLD_MILLIS = 10_000;
private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000;
private volatile long tlsErrorLastSeen = -1;
public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol, final Socket socket, final EventReporter eventReporter) throws IOException {
@ -187,6 +188,8 @@ public class ConnectionLoadBalanceServer {
logger.error("Failed to communicate over Channel {}", channelDescription, e);
eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
}
return;
}
}
}
@ -265,11 +268,18 @@ public class ConnectionLoadBalanceServer {
socket.setSoTimeout(connectionTimeoutMillis);
final CommunicateAction communicateAction = new CommunicateAction(loadBalanceProtocol, socket, eventReporter);
final Thread commsThread = new Thread(communicateAction);
communicationActions.add(communicateAction);
final Thread commsThread = new Thread(() -> {
try {
communicateAction.run();
} finally {
communicationActions.remove(communicateAction);
}
});
commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement());
commsThread.start();
communicationActions.add(communicateAction);
} catch (final Exception e) {
logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e);
}