Remove node conn block after connection barrier (#44114)

Today `testOnlyBlocksOnConnectionsToNewNodes` fails (extremely rarely) if the
last attempt to connect to `node0` is delayed for so long that the test runs
`nodeConnectionsBlocks.clear()` before the connection attempt obtains the
expected connection block. We can turn this into a reliable failure with this
delay:

```diff
diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
index f48413824d3..9a1d0336bcd 100644
--- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
+++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java
@@ -300,6 +300,13 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
         private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
             @Override
             protected void doRun() {
+
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    throw new AssertionError("unexpected", e);
+                }
+
                 assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
                 transportService.connectToNode(discoveryNode);
                 consecutiveFailureCount.set(0);
```

This commit reverts the extra logging introduced in #43979 and fixes this
failure by waiting for the connection attempt to hit the barrier before
removing it.

Fixes #40170
This commit is contained in:
David Turner 2019-07-09 17:01:48 +01:00
parent a406ef1d38
commit e70cad4c52
1 changed files with 4 additions and 36 deletions

View File

@ -19,8 +19,6 @@
package org.elasticsearch.cluster; package org.elasticsearch.cluster;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -37,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
@ -59,10 +56,8 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -219,7 +214,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
assertConnectedExactlyToNodes(targetNodes); assertConnectedExactlyToNodes(targetNodes);
} }
@TestLogging("org.elasticsearch.cluster.NodeConnectionsService:TRACE") // for https://github.com/elastic/elasticsearch/issues/40170
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
@ -232,7 +226,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
assertConnectedExactlyToNodes(nodes0); assertConnectedExactlyToNodes(nodes0);
// connection attempts to node0 block indefinitely // connection attempts to node0 block indefinitely
final CyclicBarrier connectionBarrier = new VerboseCyclicBarrier(2); final CyclicBarrier connectionBarrier = new CyclicBarrier(2);
try { try {
nodeConnectionBlocks.put(node0, connectionBarrier::await); nodeConnectionBlocks.put(node0, connectionBarrier::await);
transportService.disconnectFromNode(node0); transportService.disconnectFromNode(node0);
@ -259,8 +253,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
// once the connection is unblocked we successfully connect to it. // once the connection is unblocked we successfully connect to it.
nodeConnectionBlocks.clear();
connectionBarrier.await(10, TimeUnit.SECONDS); connectionBarrier.await(10, TimeUnit.SECONDS);
nodeConnectionBlocks.clear();
future3.actionGet(); future3.actionGet();
assertConnectedExactlyToNodes(nodes01); assertConnectedExactlyToNodes(nodes01);
@ -292,8 +286,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
future6.actionGet(); // completed even though the connection attempt is still blocked future6.actionGet(); // completed even though the connection attempt is still blocked
assertConnectedExactlyToNodes(nodes1); assertConnectedExactlyToNodes(nodes1);
nodeConnectionBlocks.clear();
connectionBarrier.await(10, TimeUnit.SECONDS); connectionBarrier.await(10, TimeUnit.SECONDS);
nodeConnectionBlocks.clear();
ensureConnections(service); ensureConnections(service);
assertConnectedExactlyToNodes(nodes1); assertConnectedExactlyToNodes(nodes1);
} finally { } finally {
@ -302,33 +296,6 @@ public class NodeConnectionsServiceTests extends ESTestCase {
} }
} }
// tracing barrier usage for https://github.com/elastic/elasticsearch/issues/40170
private class VerboseCyclicBarrier extends CyclicBarrier {
VerboseCyclicBarrier(int parties) {
super(parties);
}
@Override
public int await() throws InterruptedException, BrokenBarrierException {
final String waitUUID = UUIDs.randomBase64UUID(random());
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
new ElasticsearchException("stack trace for CyclicBarrier#await()"));
final int result = super.await();
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
return result;
}
@Override
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
final String waitUUID = UUIDs.randomBase64UUID(random());
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
new ElasticsearchException("stack trace for CyclicBarrier#await(" + timeout + ", " + unit + ')'));
final int result = super.await(timeout, unit);
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
return result;
}
}
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) { private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) { while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) { if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
@ -414,6 +381,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) { public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
} }
@SuppressWarnings("unchecked")
@Override @Override
public RequestHandlerRegistry getRequestHandler(String action) { public RequestHandlerRegistry getRequestHandler(String action) {
return null; return null;