From 5556204f813e973e90c08f06b36da5aab91075c1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 7 Dec 2018 11:17:11 -0700 Subject: [PATCH] Use MockNioTransport in MockTransportService (#36346) The default transport used in the MockTransportService is the MockTcpTransport. This commit changes that to be the MockNioTransport. --- .../TransportClientNodesServiceTests.java | 20 ++++++++----------- .../RemoteClusterConnectionTests.java | 4 ++-- .../test/transport/MockTransportService.java | 17 ++++++++-------- .../transport/nio/MockNioTransport.java | 6 +++--- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index aa5bf950fee..208629c169a 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -392,14 +392,13 @@ public class TransportClientNodesServiceTests extends ESTestCase { assertEquals(1, clientService.connectionManager().size()); establishedConnections.clear(); - handler.blockRequest(); + handler.failToRespond(); Thread thread = new Thread(transportClientNodesService::doSample); thread.start(); assertBusy(() -> assertTrue(establishedConnections.size() >= 1)); assertFalse("Temporary ping connection must be opened", establishedConnections.get(0).isClosed()); - handler.releaseRequest(); thread.join(); assertTrue(establishedConnections.get(0).isClosed()); @@ -411,8 +410,8 @@ public class TransportClientNodesServiceTests extends ESTestCase { } class MockHandler implements TransportRequestHandler { - private final AtomicBoolean block = new AtomicBoolean(false); - private final CountDownLatch release = new CountDownLatch(1); + + private final AtomicBoolean failToRespond = new AtomicBoolean(false); private final MockTransportService transportService; MockHandler(MockTransportService transportService) { @@ -421,23 +420,20 @@ public class TransportClientNodesServiceTests extends ESTestCase { @Override public void messageReceived(ClusterStateRequest request, TransportChannel channel, Task task) throws Exception { - if (block.get()) { - release.await(); + if (failToRespond.get()) { return; } + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build(); ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L, false)); } - void blockRequest() { - if (block.compareAndSet(false, true) == false) { - throw new AssertionError("Request handler is already marked as blocking"); + void failToRespond() { + if (failToRespond.compareAndSet(false, true) == false) { + throw new AssertionError("Request handler is already marked as failToRespond"); } } - void releaseRequest() { - release.countDown(); - } } public static class TestRequest extends TransportRequest { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 5ac71f197ce..50391162367 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1439,8 +1439,8 @@ public class RemoteClusterConnectionTests extends ESTestCase { throw new IllegalArgumentException("nodeMap must be non-empty"); } - StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version - .CURRENT, threadPool)); + StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, + Version.CURRENT, threadPool)); stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> { Map proxyMapping = nodeMap.get(node.getAddress().toString()); if (proxyMapping == null) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dfdaf148b7b..09f735cb0ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -49,7 +50,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; -import org.elasticsearch.transport.MockTcpTransport; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -57,6 +57,7 @@ import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.MockNioTransport; import java.io.IOException; import java.util.Arrays; @@ -99,12 +100,11 @@ public final class MockTransportService extends TransportService { public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings) { - MockTcpTransport mockTcpTransport = newMockTransport(settings, version, threadPool); - return createNewService(settings, mockTcpTransport, version, threadPool, clusterSettings, - Collections.emptySet()); + MockNioTransport mockTransport = newMockTransport(settings, version, threadPool); + return createNewService(settings, mockTransport, version, threadPool, clusterSettings, Collections.emptySet()); } - public static MockTcpTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) { + public static MockNioTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) { // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use @@ -112,8 +112,9 @@ public final class MockTransportService extends TransportService { int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); - return new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); + return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()), + BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, + new NoneCircuitBreakerService()); } public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, @@ -160,7 +161,7 @@ public final class MockTransportService extends TransportService { this.original = transport.getDelegate(); } - public static TransportAddress[] extractTransportAddresses(TransportService transportService) { + private static TransportAddress[] extractTransportAddresses(TransportService transportService) { HashSet transportAddresses = new HashSet<>(); BoundTransportAddress boundTransportAddress = transportService.boundAddress(); transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses())); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 8e766c91319..780913ae9f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -74,9 +74,9 @@ public class MockNioTransport extends TcpTransport { private volatile NioGroup nioGroup; private volatile MockTcpChannelFactory clientChannelFactory; - MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService) { + public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService) { super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); this.pageCacheRecycler = pageCacheRecycler; }