diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e76210ff195..9774e5e0f66 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static org.elasticsearch.common.settings.Setting.listSetting; @@ -807,20 +808,20 @@ public class TransportService extends AbstractLifecycleComponent { @Override public void onNodeConnected(final DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onNodeConnected(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node))); } @Override public void onConnectionOpened(DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onConnectionOpened(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node))); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index db81956865a..6e263f474dc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -540,7 +539,6 @@ public class UnicastZenPingTests extends ESTestCase { } } - @TestLogging("org.elasticsearch:DEBUG,org.elasticsearch.discovery:TRACE,org.elasticsearch.transport:TRACE") public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException { final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();