Add a dedicated client/transport project for transport-client (#19435)

The `client/transport` project adds a new jar build project that
pulls in all dependencies and configures all required modules.

Preinstalled modules are:
 * transport-netty
 * lang-mustache
 * reindex
 * percolator

The `TransportClient` classes are still in core
while `TransportClient.Builder` has only a protected construcutor
such that users are redirected to use the new `TransportClientBuilder`
from the new jar.

Closes #19412
This commit is contained in:
Simon Willnauer 2016-07-18 15:42:24 +02:00 committed by GitHub
parent 398d70b567
commit 8394544548
23 changed files with 399 additions and 198 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.precommit.PrecommitTasks
apply plugin: 'elasticsearch.build'
group = 'org.elasticsearch.client'
dependencies {
compile "org.elasticsearch:elasticsearch:${version}"
compile project(path: ':modules:transport-netty3', configuration: 'runtime')
compile project(path: ':modules:reindex', configuration: 'runtime')
compile project(path: ':modules:lang-mustache', configuration: 'runtime')
compile project(path: ':modules:percolator', configuration: 'runtime')
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
}
dependencyLicenses {
dependencies = project.configurations.runtime.fileCollection {
it.group.startsWith('org.elasticsearch') == false
}
}
forbiddenApisTest {
// we don't use the core test-framework, no lucene classes present so we don't want the es-test-signatures to
// be pulled in
signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt'),
PrecommitTasks.getResource('/forbidden/es-all-signatures.txt')]
}
namingConventions {
testClass = 'com.carrotsearch.randomizedtesting.RandomizedTest'
//we don't have integration tests
skipIntegTestInDisguise = true
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.transport.Netty3Plugin;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
/**
* A builder to create an instance of {@link TransportClient}
* This class pre-installs the {@link Netty3Plugin}, {@link ReindexPlugin}, {@link PercolatorPlugin}, and {@link MustachePlugin}
* for the client. These plugins are all elasticsearch core modules required.
*/
@SuppressWarnings({"unchecked","varargs"})
public class PreBuiltTransportClient extends TransportClient {
private static final Collection<Class<? extends Plugin>> PRE_INSTALLED_PLUGINS = Collections.unmodifiableList(Arrays.asList(
TransportPlugin.class, ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class));
@SafeVarargs
public PreBuiltTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
this(settings, Arrays.asList(plugins));
}
public PreBuiltTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS));
}
/**
* The default transport implementation for the transport client.
*/
public static final class TransportPlugin extends Netty3Plugin {
// disable assertions for permissions since we might not have the permissions here
// compared to if we are loaded as a real module to the es server
public TransportPlugin(Settings settings) {
super(Settings.builder().put("netty.assert.buglevel", false).put(settings).build());
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.client;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.transport.Netty3Plugin;
import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class PreBuiltTransportClientTests extends RandomizedTest {
@Test
public void testPluginInstalled() {
try (TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)) {
Settings settings = client.settings();
assertEquals(Netty3Plugin.NETTY_TRANSPORT_NAME, NetworkModule.HTTP_DEFAULT_TYPE_SETTING.get(settings));
}
}
@Test
public void testInstallPluginTwice() {
for (Class<? extends Plugin> plugin : Arrays.asList(ReindexPlugin.class, PercolatorPlugin.class,
MustachePlugin.class)) {
try {
new PreBuiltTransportClient(Settings.EMPTY, plugin);
fail("exception expected");
} catch (IllegalArgumentException ex) {
assertEquals("plugin is already installed", ex.getMessage());
}
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.transport;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -69,122 +70,120 @@ import org.elasticsearch.transport.TransportService;
* The transport client important modules used is the {@link org.elasticsearch.common.network.NetworkModule} which is
* started in client mode (only connects, no bind).
*/
public class TransportClient extends AbstractClient {
public abstract class TransportClient extends AbstractClient {
/**
* Handy method ot create a {@link org.elasticsearch.client.transport.TransportClient.Builder}.
*/
public static Builder builder() {
return new Builder();
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(InternalSettingsPreparer.prepareSettings(settings))
.put(NetworkService.NETWORK_SERVER.getKey(), false)
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
return new PluginsService(settingsBuilder.build(), null, null, plugins);
}
/**
* A builder used to create an instance of the transport client.
*/
public static class Builder {
protected static Collection<Class<? extends Plugin>> addPlugins(Collection<Class<? extends Plugin>> collection,
Class<? extends Plugin>... plugins) {
return addPlugins(collection, Arrays.asList(plugins));
}
private Settings providedSettings = Settings.EMPTY;
private List<Class<? extends Plugin>> pluginClasses = new ArrayList<>();
/**
* The settings to configure the transport client with.
*/
public Builder settings(Settings.Builder settings) {
return settings(settings.build());
}
/**
* The settings to configure the transport client with.
*/
public Builder settings(Settings settings) {
this.providedSettings = settings;
return this;
}
/**
* Add the given plugin to the client when it is created.
*/
public Builder addPlugin(Class<? extends Plugin> pluginClass) {
pluginClasses.add(pluginClass);
return this;
}
private PluginsService newPluginService(final Settings settings) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
.put(InternalSettingsPreparer.prepareSettings(settings))
.put(NetworkService.NETWORK_SERVER.getKey(), false)
.put(CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE);
return new PluginsService(settingsBuilder.build(), null, null, pluginClasses);
}
/**
* Builds a new instance of the transport client.
*/
public TransportClient build() {
final PluginsService pluginsService = newPluginService(providedSettings);
final Settings settings = pluginsService.updatedSettings();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
try {
final List<Setting<?>> additionalSettings = new ArrayList<>();
final List<String> additionalSettingsFilter = new ArrayList<>();
additionalSettings.addAll(pluginsService.getPluginSettings());
additionalSettingsFilter.addAll(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry, true, pluginsService.filterPlugins(SearchPlugin.class)));
ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule);
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
}));
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
final TransportClientNodesService nodesService =
new TransportClientNodesService(settings, transportService, threadPool);
final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>();
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
TransportClient transportClient = new TransportClient(injector, pluginLifecycleComponents, nodesService, proxy);
resourcesToClose.clear();
return transportClient;
} finally {
IOUtils.closeWhileHandlingException(resourcesToClose);
protected static Collection<Class<? extends Plugin>> addPlugins(Collection<Class<? extends Plugin>> collection,
Collection<Class<? extends Plugin>> plugins) {
ArrayList<Class<? extends Plugin>> list = new ArrayList<>(collection);
for (Class<? extends Plugin> p : plugins) {
if (list.contains(p)) {
throw new IllegalArgumentException("plugin already exists: " + p);
}
list.add(p);
}
return list;
}
private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
Collection<Class<? extends Plugin>> plugins) {
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
try {
final List<Setting<?>> additionalSettings = new ArrayList<>();
final List<String> additionalSettingsFilter = new ArrayList<>();
additionalSettings.addAll(pluginsService.getPluginSettings());
additionalSettingsFilter.addAll(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry, true, pluginsService.filterPlugins(SearchPlugin.class)));
ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule);
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
}));
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
final TransportClientNodesService nodesService =
new TransportClientNodesService(settings, transportService, threadPool);
final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>();
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy);
resourcesToClose.clear();
return transportClient;
} finally {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
private static final class ClientTemplate {
final Injector injector;
private final List<LifecycleComponent> pluginLifecycleComponents;
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents, TransportClientNodesService nodesService, TransportProxyClient proxy) {
this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents;
this.nodesService = nodesService;
this.proxy = proxy;
}
Settings getSettings() {
return injector.getInstance(Settings.class);
}
ThreadPool getThreadPool() {
return injector.getInstance(ThreadPool.class);
}
}
@ -196,13 +195,29 @@ public class TransportClient extends AbstractClient {
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
private TransportClient(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
TransportClientNodesService nodesService, TransportProxyClient proxy) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class));
this.injector = injector;
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
this.nodesService = nodesService;
this.proxy = proxy;
/**
* Creates a new TransportClient with the given settings and plugins
*/
public TransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
this(buildTemplate(settings, Settings.EMPTY, plugins));
}
/**
* Creates a new TransportClient with the given settings, defaults and plugins.
* @param settings the client settings
* @param defaultSettings default settings that are merged after the plugins have added it's additional settings.
* @param plugins the client plugins
*/
protected TransportClient(Settings settings, Settings defaultSettings, Collection<Class<? extends Plugin>> plugins) {
this(buildTemplate(settings, defaultSettings, plugins));
}
private TransportClient(ClientTemplate template) {
super(template.getSettings(), template.getThreadPool());
this.injector = template.injector;
this.pluginLifecycleComponents = Collections.unmodifiableList(template.pluginLifecycleComponents);
this.nodesService = template.nodesService;
this.proxy = template.proxy;
}
/**

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -33,6 +32,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.MockTransportClient;
import java.util.Arrays;
import java.util.HashSet;
@ -158,9 +158,8 @@ public class BulkProcessorIT extends ESIntegTestCase {
//we create a transport client with no nodes to make sure it throws NoNodeAvailableException
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put("transport.type", "local")
.build();
Client transportClient = TransportClient.builder().settings(settings).build();
Client transportClient = new MockTransportClient(settings);
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);

View File

@ -20,13 +20,12 @@
package org.elasticsearch.action.search;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.MockTransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -41,9 +40,8 @@ public class SearchRequestBuilderTests extends ESTestCase {
//that is why we create it but we don't add any transport address to it
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
.build();
client = TransportClient.builder().settings(settings).build();
client = new MockTransportClient(settings);
}
@AfterClass

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESBackcompatTestCase;
import org.elasticsearch.transport.MockTransportClient;
import java.util.HashMap;
import java.util.Map;
@ -101,6 +102,6 @@ public class ClusterStateBackwardsCompatIT extends ESBackcompatTestCase {
private TransportClient newTransportClient() {
Settings settings = Settings.builder().put("client.transport.ignore_cluster_name", true)
.put("node.name", "transport_client_" + getTestName()).build();
return TransportClient.builder().settings(settings).build();
return new MockTransportClient(settings);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESBackcompatTestCase;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.MockTransportClient;
import java.lang.reflect.Method;
@ -43,7 +44,7 @@ public class NodesStatsBasicBackwardsCompatIT extends ESBackcompatTestCase {
// We explicitly connect to each node with a custom TransportClient
for (NodeInfo n : nodesInfo.getNodes()) {
TransportClient tc = TransportClient.builder().settings(settings).build().addTransportAddress(n.getNode().getAddress());
TransportClient tc = new MockTransportClient(settings).addTransportAddress(n.getNode().getAddress());
// Just verify that the NS can be sent and serialized/deserialized between nodes with basic indices
tc.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
tc.close();
@ -61,7 +62,7 @@ public class NodesStatsBasicBackwardsCompatIT extends ESBackcompatTestCase {
// We explicitly connect to each node with a custom TransportClient
for (NodeInfo n : nodesInfo.getNodes()) {
TransportClient tc = TransportClient.builder().settings(settings).build().addTransportAddress(n.getNode().getAddress());
TransportClient tc = new MockTransportClient(settings).addTransportAddress(n.getNode().getAddress());
// randomize the combination of flags set
// Uses reflection to find methods in an attempt to future-proof this test against newly added flags

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.CompositeTestCluster;
import org.elasticsearch.test.ESBackcompatTestCase;
import org.elasticsearch.transport.MockTransportClient;
import java.util.concurrent.ExecutionException;
@ -45,7 +46,7 @@ public class TransportClientBackwardsCompatibilityIT extends ESBackcompatTestCas
CompositeTestCluster compositeTestCluster = backwardsCluster();
TransportAddress transportAddress = compositeTestCluster.externalTransportAddress();
try(TransportClient client = TransportClient.builder().settings(settings).build()) {
try(TransportClient client = new MockTransportClient(settings)) {
client.addTransportAddress(transportAddress);
assertAcked(client.admin().indices().prepareCreate("test"));

View File

@ -40,6 +40,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -64,32 +65,27 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
@Override
protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
TransportClient client = TransportClient.builder()
.settings(Settings.builder()
TransportClient client = new MockTransportClient(Settings.builder()
.put("client.transport.sniff", false)
.put("cluster.name", "cluster1")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
.put("node.name", "transport_client_" + this.getTestName())
.put(headersSettings)
.build())
.addPlugin(InternalTransportService.TestPlugin.class).build();
.build(), InternalTransportService.TestPlugin.class);
client.addTransportAddress(address);
return client;
}
public void testWithSniffing() throws Exception {
try (TransportClient client = TransportClient.builder()
.settings(Settings.builder()
try (TransportClient client = new MockTransportClient(
Settings.builder()
.put("client.transport.sniff", true)
.put("cluster.name", "cluster1")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
.put("node.name", "transport_client_" + this.getTestName() + "_1")
.put("client.transport.nodes_sampler_interval", "1s")
.put(HEADER_SETTINGS)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build())
.addPlugin(InternalTransportService.TestPlugin.class)
.build()) {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(),
InternalTransportService.TestPlugin.class)) {
client.addTransportAddress(address);
InternalTransportService service = (InternalTransportService) client.injector.getInstance(TransportService.class);

View File

@ -27,10 +27,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -91,9 +91,8 @@ public class TransportClientIT extends ESIntegTestCase {
public void testThatTransportClientSettingCannotBeChanged() {
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put("transport.type", "local")
.build();
try (TransportClient client = TransportClient.builder().settings(baseSettings).build()) {
try (TransportClient client = new MockTransportClient(baseSettings)) {
Settings settings = client.injector.getInstance(Settings.class);
assertThat(Client.CLIENT_TYPE_SETTING_S.get(settings), is("transport"));
}

View File

@ -25,20 +25,18 @@ import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ClusterScope(scope = Scope.TEST, numClientNodes = 0, supportsDedicatedMasters = false)
@ -53,11 +51,10 @@ public class TransportClientRetryIT extends ESIntegTestCase {
Settings.Builder builder = Settings.builder().put("client.transport.nodes_sampler_interval", "1s")
.put("node.name", "transport_client_retry_test")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local")
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), internalCluster().getClusterName())
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir());
try (TransportClient client = TransportClient.builder().settings(builder.build()).build()) {
try (TransportClient client = new MockTransportClient(builder.build())) {
client.addTransportAddresses(addresses);
assertEquals(client.connectedNodes().size(), internalCluster().size());

View File

@ -20,9 +20,9 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.MockTransportClient;
import java.util.concurrent.ExecutionException;
@ -32,12 +32,10 @@ import static org.hamcrest.object.HasToString.hasToString;
public class TransportClientTests extends ESTestCase {
public void testThatUsingAClosedClientThrowsAnException() throws ExecutionException, InterruptedException {
final TransportClient client = TransportClient.builder().settings(Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "local"))
.build();
final TransportClient client = new MockTransportClient(Settings.EMPTY);
client.close();
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> client.admin().cluster().health(new ClusterHealthRequest()).get());
assertThat(e, hasToString(containsString("transport client is closed")));
}
}

View File

@ -17,7 +17,7 @@ that connects to a cluster.
The client must have the same major version (e.g. `2.x`, or `5.x`) as the
nodes in the cluster. Clients may connect to clusters which have a different
minor version (e.g. `2.3.x`) but it is possible that new funcionality may not
minor version (e.g. `2.3.x`) but it is possible that new functionality may not
be supported. Ideally, the client should have the same version as the
cluster.
@ -37,7 +37,7 @@ be "two hop" operations).
--------------------------------------------------
// on startup
TransportClient client = TransportClient.builder().build()
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
@ -53,7 +53,7 @@ Note that you have to set the cluster name if you use one different than
--------------------------------------------------
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = TransportClient.builder().settings(settings).build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...
--------------------------------------------------
@ -79,7 +79,7 @@ In order to enable sniffing, set `client.transport.sniff` to `true`:
--------------------------------------------------
Settings settings = Settings.settingsBuilder()
.put("client.transport.sniff", true).build();
TransportClient client = TransportClient.builder().settings(settings).build();
TransportClient client = new PreBuiltTransportClient(settings);
--------------------------------------------------
Other transport client level settings include:

View File

@ -34,6 +34,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.Netty3Plugin;
import java.net.InetAddress;
@ -75,7 +76,7 @@ public class Netty3TransportMultiPortIntegrationIT extends ESNetty3IntegTestCase
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3")
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
try (TransportClient transportClient = TransportClient.builder().addPlugin(Netty3Plugin.class).settings(settings).build()) {
try (TransportClient transportClient = new MockTransportClient(settings, Netty3Plugin.class)) {
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort));
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));

View File

@ -22,5 +22,5 @@ apply plugin: 'elasticsearch.rest-test'
// TODO: this test works, but it isn't really a rest test...should we have another plugin for "non rest test that just needs N clusters?"
dependencies {
testCompile project(path: ':modules:transport-netty3', configuration: 'runtime') // randomly swapped in as a transport
testCompile project(path: ':client:transport', configuration: 'runtime') // randomly swapped in as a transport
}

View File

@ -26,14 +26,13 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.Netty3Plugin;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -44,8 +43,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
@ -77,34 +76,19 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
private static String clusterAddresses;
protected String index;
public static final class BogusPlugin extends Plugin {
// se Netty3Plugin.... this runs without the permission from the netty3 module so it will fail since reindex can't set the property
// to make it still work we disable that check but need to register the setting first
private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL = Setting.boolSetting("netty.assert.buglevel", true,
Setting.Property.NodeScope);
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASSERT_NETTY_BUGLEVEL);
}
}
private static Client startClient(Path tempDir, TransportAddress... transportAddresses) {
TransportClient.Builder transportClientBuilder = TransportClient.builder();
Settings.Builder builder = Settings.builder()
.put("node.name", "qa_smoke_client_" + counter.getAndIncrement())
.put("client.transport.ignore_cluster_name", true)
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir);
final Collection<Class<? extends Plugin>> plugins;
if (random().nextBoolean()) {
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME);
transportClientBuilder.addPlugin(Netty3Plugin.class);
transportClientBuilder.addPlugin(BogusPlugin.class);
builder.put("netty.assert.buglevel", false); // see BogusPlugin
} else {
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
transportClientBuilder.addPlugin(MockTcpTransportPlugin.class);
plugins = Collections.singleton(MockTcpTransportPlugin.class);
} else {
plugins = Collections.emptyList();
}
transportClientBuilder.settings(builder.build());
TransportClient client = transportClientBuilder.build().addTransportAddresses(transportAddresses);
TransportClient client = new PreBuiltTransportClient(builder.build(), plugins).addTransportAddresses(transportAddresses);
logger.info("--> Elasticsearch Java TransportClient started");

View File

@ -7,6 +7,7 @@ List projects = [
'docs',
'client:rest',
'client:sniffer',
'client:transport',
'client:test',
'benchmarks',
'distribution:integ-test-zip',

View File

@ -32,7 +32,7 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.node.Node;
import org.elasticsearch.transport.MockTransportClient;
import java.io.Closeable;
import java.io.IOException;
@ -188,7 +188,7 @@ final class ExternalNode implements Closeable {
.put("client.transport.nodes_sampler_interval", "1s")
.put("node.name", "transport_client_" + nodeInfo.getNode().getName())
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName).put("client.transport.sniff", false).build();
TransportClient client = TransportClient.builder().settings(clientSettings).build();
TransportClient client = new MockTransportClient(clientSettings);
client.addTransportAddress(addr);
this.client = client;
}

View File

@ -34,15 +34,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@ -78,20 +77,17 @@ public final class ExternalTestCluster extends TestCluster {
.put("node.name", InternalTestCluster.TRANSPORT_CLIENT_PREFIX + EXTERNAL_CLUSTER_PREFIX + counter.getAndIncrement())
.put("client.transport.ignore_cluster_name", true)
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir);
TransportClient.Builder transportClientBuilder = TransportClient.builder();
boolean addMockTcpTransport = additionalSettings.get(NetworkModule.TRANSPORT_TYPE_KEY) == null;
if (addMockTcpTransport) {
clientSettingsBuilder.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME);
if (pluginClasses.contains(MockTcpTransportPlugin.class) == false) {
transportClientBuilder.addPlugin(MockTcpTransportPlugin.class);
pluginClasses = new ArrayList<>(pluginClasses);
pluginClasses.add(MockTcpTransportPlugin.class);
}
}
Settings clientSettings = clientSettingsBuilder.build();
transportClientBuilder.settings(clientSettings);
for (Class<? extends Plugin> pluginClass : pluginClasses) {
transportClientBuilder.addPlugin(pluginClass);
}
TransportClient client = transportClientBuilder.build();
TransportClient client = new MockTransportClient(clientSettings, pluginClasses);
try {
client.addTransportAddresses(transportAddresses);

View File

@ -88,6 +88,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
@ -904,11 +905,7 @@ public final class InternalTestCluster extends TestCluster {
if ( NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) {
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NetworkModule.TRANSPORT_TYPE_SETTING.get(settings));
}
TransportClient.Builder clientBuilder = TransportClient.builder().settings(builder.build());
for (Class<? extends Plugin> plugin : plugins) {
clientBuilder.addPlugin(plugin);
}
TransportClient client = clientBuilder.build();
TransportClient client = new MockTransportClient(builder.build(), plugins);
client.addTransportAddress(addr);
return client;
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import java.util.Arrays;
import java.util.Collection;
@SuppressWarnings({"unchecked","varargs"})
public class MockTransportClient extends TransportClient {
private static final Settings DEFAULT_SETTINGS = Settings.builder().put("transport.type.default", "local").build();
public MockTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
super(settings, DEFAULT_SETTINGS, Arrays.asList(plugins));
}
public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
super(settings, DEFAULT_SETTINGS, plugins);
}
}

View File

@ -1,3 +1,4 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with