Use MockNioTransport in MockTransportService (#36346)

The default transport used in the MockTransportService is the
MockTcpTransport. This commit changes that to be the
MockNioTransport.
This commit is contained in:
Tim Brooks 2018-12-07 11:17:11 -07:00 committed by GitHub
parent 03c577b477
commit 5556204f81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 25 deletions

View File

@ -392,14 +392,13 @@ public class TransportClientNodesServiceTests extends ESTestCase {
assertEquals(1, clientService.connectionManager().size()); assertEquals(1, clientService.connectionManager().size());
establishedConnections.clear(); establishedConnections.clear();
handler.blockRequest(); handler.failToRespond();
Thread thread = new Thread(transportClientNodesService::doSample); Thread thread = new Thread(transportClientNodesService::doSample);
thread.start(); thread.start();
assertBusy(() -> assertTrue(establishedConnections.size() >= 1)); assertBusy(() -> assertTrue(establishedConnections.size() >= 1));
assertFalse("Temporary ping connection must be opened", establishedConnections.get(0).isClosed()); assertFalse("Temporary ping connection must be opened", establishedConnections.get(0).isClosed());
handler.releaseRequest();
thread.join(); thread.join();
assertTrue(establishedConnections.get(0).isClosed()); assertTrue(establishedConnections.get(0).isClosed());
@ -411,8 +410,8 @@ public class TransportClientNodesServiceTests extends ESTestCase {
} }
class MockHandler implements TransportRequestHandler<ClusterStateRequest> { class MockHandler implements TransportRequestHandler<ClusterStateRequest> {
private final AtomicBoolean block = new AtomicBoolean(false);
private final CountDownLatch release = new CountDownLatch(1); private final AtomicBoolean failToRespond = new AtomicBoolean(false);
private final MockTransportService transportService; private final MockTransportService transportService;
MockHandler(MockTransportService transportService) { MockHandler(MockTransportService transportService) {
@ -421,23 +420,20 @@ public class TransportClientNodesServiceTests extends ESTestCase {
@Override @Override
public void messageReceived(ClusterStateRequest request, TransportChannel channel, Task task) throws Exception { public void messageReceived(ClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
if (block.get()) { if (failToRespond.get()) {
release.await();
return; return;
} }
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build(); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build();
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L, false)); channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L, false));
} }
void blockRequest() { void failToRespond() {
if (block.compareAndSet(false, true) == false) { if (failToRespond.compareAndSet(false, true) == false) {
throw new AssertionError("Request handler is already marked as blocking"); throw new AssertionError("Request handler is already marked as failToRespond");
} }
} }
void releaseRequest() {
release.countDown();
}
} }
public static class TestRequest extends TransportRequest { public static class TestRequest extends TransportRequest {

View File

@ -1439,8 +1439,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
throw new IllegalArgumentException("nodeMap must be non-empty"); throw new IllegalArgumentException("nodeMap must be non-empty");
} }
StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY,
.CURRENT, threadPool)); Version.CURRENT, threadPool));
stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> { stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> {
Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString()); Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString());
if (proxyMapping == null) { if (proxyMapping == null) {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -49,7 +50,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
@ -57,6 +57,7 @@ import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -99,12 +100,11 @@ public final class MockTransportService extends TransportService {
public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool, public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings) { @Nullable ClusterSettings clusterSettings) {
MockTcpTransport mockTcpTransport = newMockTransport(settings, version, threadPool); MockNioTransport mockTransport = newMockTransport(settings, version, threadPool);
return createNewService(settings, mockTcpTransport, version, threadPool, clusterSettings, return createNewService(settings, mockTransport, version, threadPool, clusterSettings, Collections.emptySet());
Collections.emptySet());
} }
public static MockTcpTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) { public static MockNioTransport newMockTransport(Settings settings, Version version, ThreadPool threadPool) {
// some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means // some tests use MockTransportService to do network based testing. Yet, we run tests in multiple JVMs that means
// concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might // concurrent tests could claim port that another JVM just released and if that test tries to simulate a disconnect it might
// be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use // be smart enough to re-connect depending on what is tested. To reduce the risk, since this is very hard to debug we use
@ -112,8 +112,9 @@ public final class MockTransportService extends TransportService {
int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port int basePort = 10300 + (JVM_ORDINAL * 100); // use a non-default port otherwise some cluster in this JVM might reuse a port
settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build(); settings = Settings.builder().put(TcpTransport.PORT.getKey(), basePort + "-" + (basePort + 100)).put(settings).build();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
return new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, return new MockNioTransport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version); BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
new NoneCircuitBreakerService());
} }
public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@ -160,7 +161,7 @@ public final class MockTransportService extends TransportService {
this.original = transport.getDelegate(); this.original = transport.getDelegate();
} }
public static TransportAddress[] extractTransportAddresses(TransportService transportService) { private static TransportAddress[] extractTransportAddresses(TransportService transportService) {
HashSet<TransportAddress> transportAddresses = new HashSet<>(); HashSet<TransportAddress> transportAddresses = new HashSet<>();
BoundTransportAddress boundTransportAddress = transportService.boundAddress(); BoundTransportAddress boundTransportAddress = transportService.boundAddress();
transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses())); transportAddresses.addAll(Arrays.asList(boundTransportAddress.boundAddresses()));

View File

@ -74,9 +74,9 @@ public class MockNioTransport extends TcpTransport {
private volatile NioGroup nioGroup; private volatile NioGroup nioGroup;
private volatile MockTcpChannelFactory clientChannelFactory; private volatile MockTcpChannelFactory clientChannelFactory;
MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) { CircuitBreakerService circuitBreakerService) {
super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); super("mock-nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
this.pageCacheRecycler = pageCacheRecycler; this.pageCacheRecycler = pageCacheRecycler;
} }