Make NodeConnectionsService non-blocking (#44211)
With connection management now being non-blocking, we can make NodeConnectionsService avoid the use of MANAGEMENT threads that are blocked during the connection attempts. I had to fiddle a bit with the tests as testPeriodicReconnection was using both the mock Threadpool from the DeterministicTaskQueue as well as the real ThreadPool initialized at the test class level, which resulted in races.
This commit is contained in:
parent
47ab2bda72
commit
ea5513f2cf
|
@ -297,16 +297,29 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
|
|||
|
||||
private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
|
||||
|
||||
private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
|
||||
private final Runnable connectActivity = new AbstractRunnable() {
|
||||
|
||||
final AbstractRunnable abstractRunnable = this;
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
|
||||
transportService.connectToNode(discoveryNode);
|
||||
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
|
||||
consecutiveFailureCount.set(0);
|
||||
logger.debug("connected to {}", discoveryNode);
|
||||
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
abstractRunnable.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
|
||||
|
@ -322,7 +335,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
|
|||
public String toString() {
|
||||
return "connect to " + discoveryNode;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
private final Runnable disconnectActivity = new AbstractRunnable() {
|
||||
@Override
|
||||
|
|
|
@ -131,7 +131,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
service.connectToNodes(nodes, () -> future.onResponse(null));
|
||||
future.actionGet();
|
||||
if (isDisrupting == false) {
|
||||
assertConnected(nodes);
|
||||
assertConnected(transportService, nodes);
|
||||
}
|
||||
service.disconnectFromNodesExcept(nodes);
|
||||
|
||||
|
@ -169,6 +169,11 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
final DeterministicTaskQueue deterministicTaskQueue
|
||||
= new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
|
||||
|
||||
MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
|
||||
TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
final NodeConnectionsService service
|
||||
= new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
|
||||
service.start();
|
||||
|
@ -211,7 +216,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
transport.randomConnectionExceptions = false;
|
||||
logger.info("renewing connections");
|
||||
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
|
||||
assertConnectedExactlyToNodes(targetNodes);
|
||||
assertConnectedExactlyToNodes(transportService, targetNodes);
|
||||
}
|
||||
|
||||
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
|
||||
|
@ -314,11 +319,15 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
|
||||
assertConnected(discoveryNodes);
|
||||
assertConnectedExactlyToNodes(transportService, discoveryNodes);
|
||||
}
|
||||
|
||||
private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) {
|
||||
assertConnected(transportService, discoveryNodes);
|
||||
assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
|
||||
}
|
||||
|
||||
private void assertConnected(Iterable<DiscoveryNode> nodes) {
|
||||
private void assertConnected(TransportService transportService, Iterable<DiscoveryNode> nodes) {
|
||||
for (DiscoveryNode node : nodes) {
|
||||
assertTrue("not connected to " + node, transportService.nodeConnected(node));
|
||||
}
|
||||
|
@ -328,8 +337,9 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
this.threadPool = new TestThreadPool(getClass().getName());
|
||||
this.transport = new MockTransport();
|
||||
ThreadPool threadPool = new TestThreadPool(getClass().getName());
|
||||
this.threadPool = threadPool;
|
||||
this.transport = new MockTransport(threadPool);
|
||||
nodeConnectionBlocks = newConcurrentMap();
|
||||
transportService = new TestTransportService(transport, threadPool);
|
||||
transportService.start();
|
||||
|
@ -361,21 +371,35 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
|
||||
throw new AssertionError("no blocking connect");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {
|
||||
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
|
||||
if (connectionBlock != null) {
|
||||
getThreadPool().generic().execute(() -> {
|
||||
try {
|
||||
connectionBlock.run();
|
||||
super.connectToNode(node, listener);
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
super.connectToNode(node, listener);
|
||||
}
|
||||
super.connectToNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
private final class MockTransport implements Transport {
|
||||
private ResponseHandlers responseHandlers = new ResponseHandlers();
|
||||
private volatile boolean randomConnectionExceptions = false;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
MockTransport(ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
|
||||
|
|
Loading…
Reference in New Issue