diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index 9d364d77c20..1474afb4ce2 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -41,6 +42,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; @@ -51,6 +53,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.NioTransportPlugin; import java.util.Collections; import java.util.List; @@ -79,10 +82,12 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null); transportService.start(); transportService.acceptIncomingRequests(); + String transport = randomTestTransport(); TransportClient client = new MockTransportClient(Settings.builder() .put("client.transport.sniff", false) .put("cluster.name", "cluster1") .put("node.name", "transport_client_" + this.getTestName()) + .put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport) .put(headersSettings) .build(), InternalTransportServiceInterceptor.TestPlugin.class); InternalTransportServiceInterceptor.TestPlugin plugin = client.injector.getInstance(PluginsService.class) @@ -94,12 +99,14 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { } public void testWithSniffing() throws Exception { + String transport = randomTestTransport(); try (TransportClient client = new MockTransportClient( Settings.builder() .put("client.transport.sniff", true) .put("cluster.name", "cluster1") .put("node.name", "transport_client_" + this.getTestName() + "_1") .put("client.transport.nodes_sampler_interval", "1s") + .put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport) .put(HEADER_SETTINGS) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), InternalTransportServiceInterceptor.TestPlugin.class)) { diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index 0772e87d900..6b738685f89 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -19,9 +19,6 @@ package org.elasticsearch.client.transport; -import java.io.IOException; -import java.util.Arrays; - import org.elasticsearch.Version; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -39,6 +36,10 @@ import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.NioTransportPlugin; + +import java.io.IOException; +import java.util.Arrays; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -95,8 +96,10 @@ public class TransportClientIT extends ESIntegTestCase { } public void testThatTransportClientSettingCannotBeChanged() { + String transport = randomTestTransport(); Settings baseSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport) .build(); try (TransportClient client = new MockTransportClient(baseSettings)) { Settings settings = client.injector.getInstance(Settings.class); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java index 0247d8f343f..5bf5e9b6f5b 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java @@ -25,14 +25,17 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.NioTransportPlugin; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -49,9 +52,12 @@ public class TransportClientRetryIT extends ESIntegTestCase { addresses[i++] = instance.boundAddress().publishAddress(); } + String transport = randomTestTransport(); + Settings.Builder builder = Settings.builder().put("client.transport.nodes_sampler_interval", "1s") .put("node.name", "transport_client_retry_test") .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), internalCluster().getClusterName()) + .put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(),transport) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()); try (TransportClient client = new MockTransportClient(builder.build())) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index bf8888fa28e..047286404c4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -102,6 +102,8 @@ import org.elasticsearch.search.MockSearchService; import org.elasticsearch.test.junit.listeners.LoggingListener; import org.elasticsearch.test.junit.listeners.ReproduceInfoPrinter; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTcpTransportPlugin; +import org.elasticsearch.transport.nio.NioTransportPlugin; import org.joda.time.DateTimeZone; import org.junit.After; import org.junit.AfterClass; @@ -883,6 +885,10 @@ public abstract class ESTestCase extends LuceneTestCase { return geohashGenerator.ofStringLength(random(), minPrecision, maxPrecision); } + public static String randomTestTransport() { + return randomBoolean() ? NioTransportPlugin.NIO_TRANSPORT_NAME : MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME; + } + private static final GeohashGenerator geohashGenerator = new GeohashGenerator(); public static class GeohashGenerator extends CodepointSetGenerator { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 72d2a690b5e..33cae7adceb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -94,10 +94,12 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.NioTransportPlugin; import org.junit.Assert; import java.io.Closeable; @@ -135,6 +137,7 @@ import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOU import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; +import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -971,8 +974,12 @@ public final class InternalTestCluster extends TestCluster { .put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.level", nodeSettings.get("logger.level", "INFO")) .put(settings); - if ( NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) { - builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NetworkModule.TRANSPORT_TYPE_SETTING.get(settings)); + if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) { + builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), NetworkModule.TRANSPORT_TYPE_SETTING.get(settings)); + } else { + String transport = randomBoolean() ? NioTransportPlugin.NIO_TRANSPORT_NAME : + MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME; + builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport); } TransportClient client = new MockTransportClient(builder.build(), plugins); client.addTransportAddress(addr); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java index 566f8ca11e7..1f712adb161 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTransportClient.java @@ -19,8 +19,10 @@ package org.elasticsearch.transport; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.nio.NioTransportPlugin; import java.util.ArrayList; import java.util.Arrays; @@ -37,19 +39,34 @@ public class MockTransportClient extends TransportClient { } public MockTransportClient(Settings settings, Collection> plugins) { - this(settings, addMockTransportIfMissing(plugins), null); + this(settings, plugins, null); } public MockTransportClient(Settings settings, Collection> plugins, HostFailureListener listener) { - super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(plugins), listener); + super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(settings, plugins), listener); } - private static Collection> addMockTransportIfMissing(Collection> plugins) { - if (plugins.contains(MockTcpTransportPlugin.class)) { - return plugins; + private static Collection> addMockTransportIfMissing(Settings settings, + Collection> plugins) { + boolean settingExists = NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings); + String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings); + if (settingExists == false || MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME.equals(transportType)) { + if (plugins.contains(MockTcpTransportPlugin.class)) { + return plugins; + } else { + plugins = new ArrayList<>(plugins); + plugins.add(MockTcpTransportPlugin.class); + return plugins; + } + } else if (NioTransportPlugin.NIO_TRANSPORT_NAME.equals(transportType)) { + if (plugins.contains(NioTransportPlugin.class)) { + return plugins; + } else { + plugins = new ArrayList<>(plugins); + plugins.add(NioTransportPlugin.class); + return plugins; + } } - plugins = new ArrayList<>(plugins); - plugins.add(MockTcpTransportPlugin.class); return plugins; } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index ad95b0baeda..a6219251400 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -105,7 +105,7 @@ public class NioTransport extends TcpTransport { if (channel != null && channel.isOpen()) { // We do not need to wait for the close operation to complete. If the close operation fails due // to an IOException, the selector's handler will log the exception. Additionally, in the case - // of transport shutdown, where we do want to ensure that all channels to finished closing, the + // of transport shutdown, where we do want to ensure that all channels are finished closing, the // NioShutdown class will block on close. futures.add(channel.closeAsync()); } @@ -163,13 +163,23 @@ public class NioTransport extends TcpTransport { protected void doStart() { boolean success = false; try { - if (NetworkService.NETWORK_SERVER.get(settings)) { - int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings); - for (int i = 0; i < workerCount; ++i) { - SocketSelector selector = new SocketSelector(getSocketEventHandler()); - socketSelectors.add(selector); - } + int workerCount = NioTransport.NIO_WORKER_COUNT.get(settings); + for (int i = 0; i < workerCount; ++i) { + SocketSelector selector = new SocketSelector(getSocketEventHandler()); + socketSelectors.add(selector); + } + for (SocketSelector selector : socketSelectors) { + if (selector.isRunning() == false) { + ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); + threadFactory.newThread(selector::runLoop).start(); + selector.isRunningFuture().actionGet(); + } + } + + client = createClient(); + + if (NetworkService.NETWORK_SERVER.get(settings)) { int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); for (int i = 0; i < acceptorCount; ++i) { Supplier selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); @@ -178,16 +188,6 @@ public class NioTransport extends TcpTransport { acceptors.add(acceptor); } - client = createClient(); - - for (SocketSelector selector : socketSelectors) { - if (selector.isRunning() == false) { - ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); - threadFactory.newThread(selector::runLoop).start(); - selector.isRunningFuture().actionGet(); - } - } - for (AcceptingSelector acceptor : acceptors) { if (acceptor.isRunning() == false) { ThreadFactory threadFactory = daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java new file mode 100644 index 00000000000..733351fe429 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport.nio; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.plugins.NetworkPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +public class NioTransportPlugin extends Plugin implements NetworkPlugin { + + public static final String NIO_TRANSPORT_NAME = "nio-transport"; + + @Override + public Map> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, + CircuitBreakerService circuitBreakerService, + NamedWriteableRegistry namedWriteableRegistry, + NetworkService networkService) { + Settings settings1; + if (NioTransport.NIO_WORKER_COUNT.exists(settings) == false) { + // As this is only used for tests right now, limit the number of worker threads. + settings1 = Settings.builder().put(settings).put(NioTransport.NIO_WORKER_COUNT.getKey(), 2).build(); + } else { + settings1 = settings; + } + return Collections.singletonMap(NIO_TRANSPORT_NAME, + () -> new NioTransport(settings1, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService)); + } +} diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 708b65b9235..451f80ebf0d 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.transport.MockTcpTransportPlugin; import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.transport.nio.NioTransportPlugin; import java.io.IOException; import java.nio.file.Files; @@ -176,6 +177,7 @@ public class InternalTestClusterTests extends ESTestCase { final int numClientNodes = randomIntBetween(0, 2); final String clusterName1 = "shared1"; final String clusterName2 = "shared2"; + String transportClient = randomTestTransport(); NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { @@ -201,7 +203,7 @@ public class InternalTestClusterTests extends ESTestCase { @Override public Settings transportClientSettings() { return Settings.builder() - .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); + .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }; @@ -253,6 +255,7 @@ public class InternalTestClusterTests extends ESTestCase { final int maxNumDataNodes = 2; final int numClientNodes = randomIntBetween(0, 2); final String clusterName1 = "shared1"; + String transportClient = randomTestTransport(); NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { @@ -272,7 +275,7 @@ public class InternalTestClusterTests extends ESTestCase { @Override public Settings transportClientSettings() { return Settings.builder() - .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); + .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }; boolean enableHttpPipelining = randomBoolean(); @@ -360,6 +363,8 @@ public class InternalTestClusterTests extends ESTestCase { public void testDifferentRolesMaintainPathOnRestart() throws Exception { final Path baseDir = createTempDir(); final int numNodes = 5; + + String transportClient = randomTestTransport(); InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, false, 0, 0, "test", new NodeConfigurationSource() { @Override @@ -383,7 +388,7 @@ public class InternalTestClusterTests extends ESTestCase { @Override public Settings transportClientSettings() { return Settings.builder() - .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); + .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity()); cluster.beforeTest(random(), 0.0); @@ -447,6 +452,7 @@ public class InternalTestClusterTests extends ESTestCase { } public void testTwoNodeCluster() throws Exception { + String transportClient = randomTestTransport(); NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() { @Override public Settings nodeSettings(int nodeOrdinal) { @@ -464,7 +470,7 @@ public class InternalTestClusterTests extends ESTestCase { @Override public Settings transportClientSettings() { return Settings.builder() - .put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build(); + .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }; boolean enableHttpPipelining = randomBoolean();