Merge pull request #19825 from cbuescher/register-namedWritables-transportClient

Add NamedWriteables from plugins to TransportClient
This commit is contained in:
Christoph Büscher 2016-08-05 22:51:04 +02:00 committed by GitHub
commit fbbb633d81
3 changed files with 61 additions and 2 deletions

View File

@ -124,6 +124,9 @@ public abstract class TransportClient extends AbstractClient {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(); List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(networkModule.getNamedWriteables()); entries.addAll(networkModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables()); entries.addAll(searchModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
.collect(Collectors.toList()));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ModulesBuilder modules = new ModulesBuilder(); ModulesBuilder modules = new ModulesBuilder();
@ -167,7 +170,7 @@ public abstract class TransportClient extends AbstractClient {
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy); ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
resourcesToClose.clear(); resourcesToClose.clear();
return transportClient; return transportClient;
} finally { } finally {
@ -180,12 +183,15 @@ public abstract class TransportClient extends AbstractClient {
private final List<LifecycleComponent> pluginLifecycleComponents; private final List<LifecycleComponent> pluginLifecycleComponents;
private final TransportClientNodesService nodesService; private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy; private final TransportProxyClient proxy;
private final NamedWriteableRegistry namedWriteableRegistry;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents, TransportClientNodesService nodesService, TransportProxyClient proxy) { private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
TransportClientNodesService nodesService, TransportProxyClient proxy, NamedWriteableRegistry namedWriteableRegistry) {
this.injector = injector; this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents; this.pluginLifecycleComponents = pluginLifecycleComponents;
this.nodesService = nodesService; this.nodesService = nodesService;
this.proxy = proxy; this.proxy = proxy;
this.namedWriteableRegistry = namedWriteableRegistry;
} }
Settings getSettings() { Settings getSettings() {
@ -200,6 +206,7 @@ public abstract class TransportClient extends AbstractClient {
public static final String CLIENT_TYPE = "transport"; public static final String CLIENT_TYPE = "transport";
final Injector injector; final Injector injector;
final NamedWriteableRegistry namedWriteableRegistry;
private final List<LifecycleComponent> pluginLifecycleComponents; private final List<LifecycleComponent> pluginLifecycleComponents;
private final TransportClientNodesService nodesService; private final TransportClientNodesService nodesService;
@ -228,6 +235,7 @@ public abstract class TransportClient extends AbstractClient {
this.pluginLifecycleComponents = Collections.unmodifiableList(template.pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(template.pluginLifecycleComponents);
this.nodesService = template.nodesService; this.nodesService = template.nodesService;
this.proxy = template.proxy; this.proxy = template.proxy;
this.namedWriteableRegistry = template.namedWriteableRegistry;
} }
/** /**

View File

@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.startsWith;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0)
public class TransportClientIT extends ESIntegTestCase { public class TransportClientIT extends ESIntegTestCase {
public void testPickingUpChangesInDiscoveryNode() { public void testPickingUpChangesInDiscoveryNode() {
String nodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false)); String nodeName = internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false));

View File

@ -20,10 +20,20 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
@ -38,4 +48,44 @@ public class TransportClientTests extends ESTestCase {
expectThrows(IllegalStateException.class, () -> client.admin().cluster().health(new ClusterHealthRequest()).get()); expectThrows(IllegalStateException.class, () -> client.admin().cluster().health(new ClusterHealthRequest()).get());
assertThat(e, hasToString(containsString("transport client is closed"))); assertThat(e, hasToString(containsString("transport client is closed")));
} }
/**
* test that when plugins are provided that want to register
* {@link NamedWriteable}, those are also made known to the
* {@link NamedWriteableRegistry} of the transport client
*/
public void testPluginNamedWriteablesRegistered() {
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
assertNotNull(client.namedWriteableRegistry.getReader(MockPlugin.MockNamedWriteable.class, MockPlugin.MockNamedWriteable.NAME));
}
}
public static class MockPlugin extends Plugin {
@Override
public List<Entry> getNamedWriteables() {
return Arrays.asList(new Entry[]{ new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new)});
}
public class MockNamedWriteable implements NamedWriteable {
static final String NAME = "mockNamedWritable";
MockNamedWriteable(StreamInput in) {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public String getWriteableName() {
return NAME;
}
}
}
} }