diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index f48413824d3..b004af9d38a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -297,14 +297,27 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); - private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + private final Runnable connectActivity = new AbstractRunnable() { + + final AbstractRunnable abstractRunnable = this; + @Override protected void doRun() { assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - transportService.connectToNode(discoveryNode); - consecutiveFailureCount.set(0); - logger.debug("connected to {}", discoveryNode); - onCompletion(ActivityType.CONNECTING, null, disconnectActivity); + transportService.connectToNode(discoveryNode, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; + consecutiveFailureCount.set(0); + logger.debug("connected to {}", discoveryNode); + onCompletion(ActivityType.CONNECTING, null, disconnectActivity); + } + + @Override + public void onFailure(Exception e) { + abstractRunnable.onFailure(e); + } + }); } @Override @@ -322,7 +335,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { public String toString() { return "connect to " + discoveryNode; } - }); + }; private final Runnable disconnectActivity = new AbstractRunnable() { @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 377cbef84a4..e84b8391a7e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -131,7 +131,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { service.connectToNodes(nodes, () -> future.onResponse(null)); future.actionGet(); if (isDisrupting == false) { - assertConnected(nodes); + assertConnected(transportService, nodes); } service.disconnectFromNodesExcept(nodes); @@ -169,6 +169,11 @@ public class NodeConnectionsServiceTests extends ESTestCase { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); + MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool()); + TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool()); + transportService.start(); + transportService.acceptIncomingRequests(); + final NodeConnectionsService service = new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService); service.start(); @@ -211,7 +216,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { transport.randomConnectionExceptions = false; logger.info("renewing connections"); runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis); - assertConnectedExactlyToNodes(targetNodes); + assertConnectedExactlyToNodes(transportService, targetNodes); } public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { @@ -314,11 +319,15 @@ public class NodeConnectionsServiceTests extends ESTestCase { } private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) { - assertConnected(discoveryNodes); + assertConnectedExactlyToNodes(transportService, discoveryNodes); + } + + private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) { + assertConnected(transportService, discoveryNodes); assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize())); } - private void assertConnected(Iterable nodes) { + private void assertConnected(TransportService transportService, Iterable nodes) { for (DiscoveryNode node : nodes) { assertTrue("not connected to " + node, transportService.nodeConnected(node)); } @@ -328,8 +337,9 @@ public class NodeConnectionsServiceTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); - this.threadPool = new TestThreadPool(getClass().getName()); - this.transport = new MockTransport(); + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + this.threadPool = threadPool; + this.transport = new MockTransport(threadPool); nodeConnectionBlocks = newConcurrentMap(); transportService = new TestTransportService(transport, threadPool); transportService.start(); @@ -361,21 +371,35 @@ public class NodeConnectionsServiceTests extends ESTestCase { @Override public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + throw new AssertionError("no blocking connect"); + } + + @Override + public void connectToNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node); if (connectionBlock != null) { - try { - connectionBlock.run(); - } catch (Exception e) { - throw new AssertionError(e); - } + getThreadPool().generic().execute(() -> { + try { + connectionBlock.run(); + super.connectToNode(node, listener); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } else { + super.connectToNode(node, listener); } - super.connectToNode(node); } } private final class MockTransport implements Transport { private ResponseHandlers responseHandlers = new ResponseHandlers(); private volatile boolean randomConnectionExceptions = false; + private final ThreadPool threadPool; + + MockTransport(ThreadPool threadPool) { + this.threadPool = threadPool; + } @Override public void registerRequestHandler(RequestHandlerRegistry reg) {