From c2baa5f213cb181c3e4ba34fca9c11e8ce5d4d6b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 23 Dec 2016 13:40:54 +0100 Subject: [PATCH] TransportService should capture listener before spawning background notification task Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically: ``` // connection to reuse handleA.transportService.connectToNode(handleB.node); // install a listener to check that no new connections are made handleA.transportService.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(DiscoveryNode node) { fail("should not open any connections. got [" + node + "]"); } }); ``` relates to #22277 --- .../transport/TransportService.java | 21 ++++++++++--------- .../discovery/zen/UnicastZenPingTests.java | 2 -- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index e76210ff195..9774e5e0f66 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static org.elasticsearch.common.settings.Setting.listSetting; @@ -807,20 +808,20 @@ public class TransportService extends AbstractLifecycleComponent { @Override public void onNodeConnected(final DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onNodeConnected(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node))); } @Override public void onConnectionOpened(DiscoveryNode node) { - threadPool.generic().execute(() -> { - for (TransportConnectionListener connectionListener : connectionListeners) { - connectionListener.onConnectionOpened(node); - } - }); + // capture listeners before spawning the background callback so the following pattern won't trigger a call + // connectToNode(); connection is completed successfully + // addConnectionListener(); this listener shouldn't be called + final Stream listenersToNotify = TransportService.this.connectionListeners.stream(); + threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node))); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index db81956865a..6e263f474dc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -540,7 +539,6 @@ public class UnicastZenPingTests extends ESTestCase { } } - @TestLogging("org.elasticsearch:DEBUG,org.elasticsearch.discovery:TRACE,org.elasticsearch.transport:TRACE") public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException { final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();