From c670161cb09855b28e444a4d32d37427b31e8422 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 21 Dec 2023 10:49:15 -0500 Subject: [PATCH] NIFI-12532: This closes #8179. Ensure that when CommunicateAction completes (exceptionally or otherwise) that it gets removed from the list of all CommunicationActions Signed-off-by: Joseph Witt --- .../server/ConnectionLoadBalanceServer.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java index 666620b955..f5c7c644ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java @@ -17,6 +17,17 @@ package org.apache.nifi.controller.queue.clustered.server; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.io.socket.SocketUtils; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.security.util.TlsPlatform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLServerSocket; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; @@ -31,17 +42,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLServerSocket; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.security.util.TlsPlatform; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ConnectionLoadBalanceServer { private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class); @@ -104,10 +104,12 @@ public class ConnectionLoadBalanceServer { acceptConnection.stop(); } - final Iterator itr = communicationActions.iterator(); - while (itr.hasNext()) { - itr.next().stop(); - itr.remove(); + synchronized (communicationActions) { // Must synchronize on Synchronized List when using iterator + final Iterator itr = communicationActions.iterator(); + while (itr.hasNext()) { + itr.next().stop(); + itr.remove(); + } } } @@ -135,8 +137,7 @@ public class ConnectionLoadBalanceServer { private volatile boolean stopped = false; - // This should be final but it is not to allow override during testing; no production code modifies the value - private static int EXCEPTION_THRESHOLD_MILLIS = 10_000; + private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000; private volatile long tlsErrorLastSeen = -1; public CommunicateAction(final LoadBalanceProtocol loadBalanceProtocol, final Socket socket, final EventReporter eventReporter) throws IOException { @@ -187,6 +188,8 @@ public class ConnectionLoadBalanceServer { logger.error("Failed to communicate over Channel {}", channelDescription, e); eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e); } + + return; } } } @@ -265,11 +268,18 @@ public class ConnectionLoadBalanceServer { socket.setSoTimeout(connectionTimeoutMillis); final CommunicateAction communicateAction = new CommunicateAction(loadBalanceProtocol, socket, eventReporter); - final Thread commsThread = new Thread(communicateAction); + communicationActions.add(communicateAction); + + final Thread commsThread = new Thread(() -> { + try { + communicateAction.run(); + } finally { + communicationActions.remove(communicateAction); + } + }); + commsThread.setName("Load-Balance Server Thread-" + threadCounter.getAndIncrement()); commsThread.start(); - - communicationActions.add(communicateAction); } catch (final Exception e) { logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e); }