Support client-only mode for NioTransport (#25839)

Currently, NioTransport does start normal socket selectors and the
client when the network server setting is set to false. This commit
makes it so that the client will be started even when the network server
is not enabled.

Additionally, it randomly introduces the NioTransport as an option for
the MockTransportClient throughout tests.
This commit is contained in:
Tim Brooks 2017-07-26 10:27:15 -05:00 committed by GitHub
parent 03eb1460ad
commit 6d02b45f10
9 changed files with 139 additions and 33 deletions

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -41,6 +42,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
@ -51,6 +53,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -79,10 +82,12 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null); transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();
String transport = randomTestTransport();
TransportClient client = new MockTransportClient(Settings.builder() TransportClient client = new MockTransportClient(Settings.builder()
.put("client.transport.sniff", false) .put("client.transport.sniff", false)
.put("cluster.name", "cluster1") .put("cluster.name", "cluster1")
.put("node.name", "transport_client_" + this.getTestName()) .put("node.name", "transport_client_" + this.getTestName())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.put(headersSettings) .put(headersSettings)
.build(), InternalTransportServiceInterceptor.TestPlugin.class); .build(), InternalTransportServiceInterceptor.TestPlugin.class);
InternalTransportServiceInterceptor.TestPlugin plugin = client.injector.getInstance(PluginsService.class) InternalTransportServiceInterceptor.TestPlugin plugin = client.injector.getInstance(PluginsService.class)
@ -94,12 +99,14 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
} }
public void testWithSniffing() throws Exception { public void testWithSniffing() throws Exception {
String transport = randomTestTransport();
try (TransportClient client = new MockTransportClient( try (TransportClient client = new MockTransportClient(
Settings.builder() Settings.builder()
.put("client.transport.sniff", true) .put("client.transport.sniff", true)
.put("cluster.name", "cluster1") .put("cluster.name", "cluster1")
.put("node.name", "transport_client_" + this.getTestName() + "_1") .put("node.name", "transport_client_" + this.getTestName() + "_1")
.put("client.transport.nodes_sampler_interval", "1s") .put("client.transport.nodes_sampler_interval", "1s")
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.put(HEADER_SETTINGS) .put(HEADER_SETTINGS)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(),
InternalTransportServiceInterceptor.TestPlugin.class)) { InternalTransportServiceInterceptor.TestPlugin.class)) {

View File

@ -19,9 +19,6 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import java.io.IOException;
import java.util.Arrays;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -39,6 +36,10 @@ import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.io.IOException;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -95,8 +96,10 @@ public class TransportClientIT extends ESIntegTestCase {
} }
public void testThatTransportClientSettingCannotBeChanged() { public void testThatTransportClientSettingCannotBeChanged() {
String transport = randomTestTransport();
Settings baseSettings = Settings.builder() Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.build(); .build();
try (TransportClient client = new MockTransportClient(baseSettings)) { try (TransportClient client = new MockTransportClient(baseSettings)) {
Settings settings = client.injector.getInstance(Settings.class); Settings settings = client.injector.getInstance(Settings.class);

View File

@ -25,14 +25,17 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -49,9 +52,12 @@ public class TransportClientRetryIT extends ESIntegTestCase {
addresses[i++] = instance.boundAddress().publishAddress(); addresses[i++] = instance.boundAddress().publishAddress();
} }
String transport = randomTestTransport();
Settings.Builder builder = Settings.builder().put("client.transport.nodes_sampler_interval", "1s") Settings.Builder builder = Settings.builder().put("client.transport.nodes_sampler_interval", "1s")
.put("node.name", "transport_client_retry_test") .put("node.name", "transport_client_retry_test")
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), internalCluster().getClusterName()) .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), internalCluster().getClusterName())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(),transport)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()); .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir());
try (TransportClient client = new MockTransportClient(builder.build())) { try (TransportClient client = new MockTransportClient(builder.build())) {

View File

@ -102,6 +102,8 @@ import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -883,6 +885,10 @@ public abstract class ESTestCase extends LuceneTestCase {
return geohashGenerator.ofStringLength(random(), minPrecision, maxPrecision); return geohashGenerator.ofStringLength(random(), minPrecision, maxPrecision);
} }
public static String randomTestTransport() {
return randomBoolean() ? NioTransportPlugin.NIO_TRANSPORT_NAME : MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME;
}
private static final GeohashGenerator geohashGenerator = new GeohashGenerator(); private static final GeohashGenerator geohashGenerator = new GeohashGenerator();
public static class GeohashGenerator extends CodepointSetGenerator { public static class GeohashGenerator extends CodepointSetGenerator {

View File

@ -94,10 +94,12 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import org.junit.Assert; import org.junit.Assert;
import java.io.Closeable; import java.io.Closeable;
@ -135,6 +137,7 @@ import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOU
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -971,8 +974,12 @@ public final class InternalTestCluster extends TestCluster {
.put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.prefix", nodeSettings.get("logger.prefix", ""))
.put("logger.level", nodeSettings.get("logger.level", "INFO")) .put("logger.level", nodeSettings.get("logger.level", "INFO"))
.put(settings); .put(settings);
if ( NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) { if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) {
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NetworkModule.TRANSPORT_TYPE_SETTING.get(settings)); builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), NetworkModule.TRANSPORT_TYPE_SETTING.get(settings));
} else {
String transport = randomBoolean() ? NioTransportPlugin.NIO_TRANSPORT_NAME :
MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME;
builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport);
} }
TransportClient client = new MockTransportClient(builder.build(), plugins); TransportClient client = new MockTransportClient(builder.build(), plugins);
client.addTransportAddress(addr); client.addTransportAddress(addr);

View File

@ -19,8 +19,10 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -37,19 +39,34 @@ public class MockTransportClient extends TransportClient {
} }
public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) { public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
this(settings, addMockTransportIfMissing(plugins), null); this(settings, plugins, null);
} }
public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins, HostFailureListener listener) { public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins, HostFailureListener listener) {
super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins), listener); super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(settings, plugins), listener);
} }
private static Collection<Class<? extends Plugin>> addMockTransportIfMissing(Collection<Class<? extends Plugin>> plugins) { private static Collection<Class<? extends Plugin>> addMockTransportIfMissing(Settings settings,
Collection<Class<? extends Plugin>> plugins) {
boolean settingExists = NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings);
String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings);
if (settingExists == false || MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME.equals(transportType)) {
if (plugins.contains(MockTcpTransportPlugin.class)) { if (plugins.contains(MockTcpTransportPlugin.class)) {
return plugins; return plugins;
} } else {
plugins = new ArrayList<>(plugins); plugins = new ArrayList<>(plugins);
plugins.add(MockTcpTransportPlugin.class); plugins.add(MockTcpTransportPlugin.class);
return plugins; return plugins;
} }
} else if (NioTransportPlugin.NIO_TRANSPORT_NAME.equals(transportType)) {
if (plugins.contains(NioTransportPlugin.class)) {
return plugins;
} else {
plugins = new ArrayList<>(plugins);
plugins.add(NioTransportPlugin.class);
return plugins;
}
}
return plugins;
}
} }

View File

@ -105,7 +105,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
if (channel != null && channel.isOpen()) { if (channel != null && channel.isOpen()) {
// We do not need to wait for the close operation to complete. If the close operation fails due // We do not need to wait for the close operation to complete. If the close operation fails due
// to an IOException, the selector's handler will log the exception. Additionally, in the case // to an IOException, the selector's handler will log the exception. Additionally, in the case
// of transport shutdown, where we do want to ensure that all channels to finished closing, the // of transport shutdown, where we do want to ensure that all channels are finished closing, the
// NioShutdown class will block on close. // NioShutdown class will block on close.
futures.add(channel.closeAsync()); futures.add(channel.closeAsync());
} }
@ -163,23 +163,12 @@ public class NioTransport extends TcpTransport<NioChannel> {
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;
try { try {
if (NetworkService.NETWORK_SERVER.get(settings)) {
int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings); int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings);
for (int i = 0; i < workerCount; ++i) { for (int i = 0; i < workerCount; ++i) {
SocketSelector selector = new SocketSelector(getSocketEventHandler()); SocketSelector selector = new SocketSelector(getSocketEventHandler());
socketSelectors.add(selector); socketSelectors.add(selector);
} }
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
for (int i = 0; i < acceptorCount; ++i) {
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, openChannels, selectorSupplier);
AcceptingSelector acceptor = new AcceptingSelector(eventHandler);
acceptors.add(acceptor);
}
client = createClient();
for (SocketSelector selector : socketSelectors) { for (SocketSelector selector : socketSelectors) {
if (selector.isRunning() == false) { if (selector.isRunning() == false) {
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX);
@ -188,6 +177,17 @@ public class NioTransport extends TcpTransport<NioChannel> {
} }
} }
client = createClient();
if (NetworkService.NETWORK_SERVER.get(settings)) {
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
for (int i = 0; i < acceptorCount; ++i) {
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, openChannels, selectorSupplier);
AcceptingSelector acceptor = new AcceptingSelector(eventHandler);
acceptors.add(acceptor);
}
for (AcceptingSelector acceptor : acceptors) { for (AcceptingSelector acceptor : acceptors) {
if (acceptor.isRunning() == false) { if (acceptor.isRunning() == false) {
ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX); ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX);

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
public class NioTransportPlugin extends Plugin implements NetworkPlugin {
public static final String NIO_TRANSPORT_NAME = "nio-transport";
@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
Settings settings1;
if (NioTransport.NIO_WORKER_COUNT.exists(settings) == false) {
// As this is only used for tests right now, limit the number of worker threads.
settings1 = Settings.builder().put(settings).put(NioTransport.NIO_WORKER_COUNT.getKey(), 2).build();
} else {
settings1 = settings;
}
return Collections.singletonMap(NIO_TRANSPORT_NAME,
() -> new NioTransport(settings1, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.NioTransportPlugin;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -176,6 +177,7 @@ public class InternalTestClusterTests extends ESTestCase {
final int numClientNodes = randomIntBetween(0, 2); final int numClientNodes = randomIntBetween(0, 2);
final String clusterName1 = "shared1"; final String clusterName1 = "shared1";
final String clusterName2 = "shared2"; final String clusterName2 = "shared2";
String transportClient = randomTestTransport();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override @Override
public Settings nodeSettings(int nodeOrdinal) { public Settings nodeSettings(int nodeOrdinal) {
@ -201,7 +203,7 @@ public class InternalTestClusterTests extends ESTestCase {
@Override @Override
public Settings transportClientSettings() { public Settings transportClientSettings() {
return Settings.builder() return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
} }
}; };
@ -253,6 +255,7 @@ public class InternalTestClusterTests extends ESTestCase {
final int maxNumDataNodes = 2; final int maxNumDataNodes = 2;
final int numClientNodes = randomIntBetween(0, 2); final int numClientNodes = randomIntBetween(0, 2);
final String clusterName1 = "shared1"; final String clusterName1 = "shared1";
String transportClient = randomTestTransport();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override @Override
public Settings nodeSettings(int nodeOrdinal) { public Settings nodeSettings(int nodeOrdinal) {
@ -272,7 +275,7 @@ public class InternalTestClusterTests extends ESTestCase {
@Override @Override
public Settings transportClientSettings() { public Settings transportClientSettings() {
return Settings.builder() return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
} }
}; };
boolean enableHttpPipelining = randomBoolean(); boolean enableHttpPipelining = randomBoolean();
@ -360,6 +363,8 @@ public class InternalTestClusterTests extends ESTestCase {
public void testDifferentRolesMaintainPathOnRestart() throws Exception { public void testDifferentRolesMaintainPathOnRestart() throws Exception {
final Path baseDir = createTempDir(); final Path baseDir = createTempDir();
final int numNodes = 5; final int numNodes = 5;
String transportClient = randomTestTransport();
InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false,
false, 0, 0, "test", new NodeConfigurationSource() { false, 0, 0, "test", new NodeConfigurationSource() {
@Override @Override
@ -383,7 +388,7 @@ public class InternalTestClusterTests extends ESTestCase {
@Override @Override
public Settings transportClientSettings() { public Settings transportClientSettings() {
return Settings.builder() return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
} }
}, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity()); }, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity());
cluster.beforeTest(random(), 0.0); cluster.beforeTest(random(), 0.0);
@ -447,6 +452,7 @@ public class InternalTestClusterTests extends ESTestCase {
} }
public void testTwoNodeCluster() throws Exception { public void testTwoNodeCluster() throws Exception {
String transportClient = randomTestTransport();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override @Override
public Settings nodeSettings(int nodeOrdinal) { public Settings nodeSettings(int nodeOrdinal) {
@ -464,7 +470,7 @@ public class InternalTestClusterTests extends ESTestCase {
@Override @Override
public Settings transportClientSettings() { public Settings transportClientSettings() {
return Settings.builder() return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
} }
}; };
boolean enableHttpPipelining = randomBoolean(); boolean enableHttpPipelining = randomBoolean();