From e70cad4c52b504b38fe05da2b9bd2ac622b597fe Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Jul 2019 17:01:48 +0100 Subject: [PATCH] Remove node conn block after connection barrier (#44114) Today `testOnlyBlocksOnConnectionsToNewNodes` fails (extremely rarely) if the last attempt to connect to `node0` is delayed for so long that the test runs `nodeConnectionsBlocks.clear()` before the connection attempt obtains the expected connection block. We can turn this into a reliable failure with this delay: ```diff diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index f48413824d3..9a1d0336bcd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -300,6 +300,13 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { @Override protected void doRun() { + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new AssertionError("unexpected", e); + } + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; transportService.connectToNode(discoveryNode); consecutiveFailureCount.set(0); ``` This commit reverts the extra logging introduced in #43979 and fixes this failure by waiting for the connection attempt to hit the barrier before removing it. Fixes #40170 --- .../cluster/NodeConnectionsServiceTests.java | 40 ++----------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index bd3b80daffc..377cbef84a4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -19,8 +19,6 @@ package org.elasticsearch.cluster; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -37,7 +35,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -59,10 +56,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -219,7 +214,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { assertConnectedExactlyToNodes(targetNodes); } - @TestLogging("org.elasticsearch.cluster.NodeConnectionsService:TRACE") // for https://github.com/elastic/elasticsearch/issues/40170 public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -232,7 +226,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { assertConnectedExactlyToNodes(nodes0); // connection attempts to node0 block indefinitely - final CyclicBarrier connectionBarrier = new VerboseCyclicBarrier(2); + final CyclicBarrier connectionBarrier = new CyclicBarrier(2); try { nodeConnectionBlocks.put(node0, connectionBarrier::await); transportService.disconnectFromNode(node0); @@ -259,8 +253,8 @@ public class NodeConnectionsServiceTests extends ESTestCase { expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); // once the connection is unblocked we successfully connect to it. - nodeConnectionBlocks.clear(); connectionBarrier.await(10, TimeUnit.SECONDS); + nodeConnectionBlocks.clear(); future3.actionGet(); assertConnectedExactlyToNodes(nodes01); @@ -292,8 +286,8 @@ public class NodeConnectionsServiceTests extends ESTestCase { future6.actionGet(); // completed even though the connection attempt is still blocked assertConnectedExactlyToNodes(nodes1); - nodeConnectionBlocks.clear(); connectionBarrier.await(10, TimeUnit.SECONDS); + nodeConnectionBlocks.clear(); ensureConnections(service); assertConnectedExactlyToNodes(nodes1); } finally { @@ -302,33 +296,6 @@ public class NodeConnectionsServiceTests extends ESTestCase { } } - // tracing barrier usage for https://github.com/elastic/elasticsearch/issues/40170 - private class VerboseCyclicBarrier extends CyclicBarrier { - VerboseCyclicBarrier(int parties) { - super(parties); - } - - @Override - public int await() throws InterruptedException, BrokenBarrierException { - final String waitUUID = UUIDs.randomBase64UUID(random()); - logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID), - new ElasticsearchException("stack trace for CyclicBarrier#await()")); - final int result = super.await(); - logger.info("--> wait[{}] returning [{}]", waitUUID, result); - return result; - } - - @Override - public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { - final String waitUUID = UUIDs.randomBase64UUID(random()); - logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID), - new ElasticsearchException("stack trace for CyclicBarrier#await(" + timeout + ", " + unit + ')')); - final int result = super.await(timeout, unit); - logger.info("--> wait[{}] returning [{}]", waitUUID, result); - return result; - } - } - private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) { while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) { if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { @@ -414,6 +381,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { public void registerRequestHandler(RequestHandlerRegistry reg) { } + @SuppressWarnings("unchecked") @Override public RequestHandlerRegistry getRequestHandler(String action) { return null;