Merge pull request #19276 from rjernst/transport_client_deguice

Remove guice from transport client helper classes
This commit is contained in:
Ryan Ernst 2016-07-05 20:34:20 -07:00 committed by GitHub
commit 5907534877
7 changed files with 41 additions and 51 deletions

View File

@ -19,6 +19,12 @@
package org.elasticsearch.action;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
@ -303,12 +309,6 @@ import org.elasticsearch.rest.action.termvectors.RestMultiTermVectorsAction;
import org.elasticsearch.rest.action.termvectors.RestTermVectorsAction;
import org.elasticsearch.rest.action.update.RestUpdateAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
@ -338,6 +338,10 @@ public class ActionModule extends AbstractModule {
restController = new RestController(settings);
}
public Map<String, ActionHandler<?, ?>> getActions() {
return actions;
}
static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
@ -621,14 +625,6 @@ public class ActionModule extends AbstractModule {
bind(ActionFilters.class).asEagerSingleton();
bind(DestructiveOperations.class).toInstance(destructiveOperations);
// register Name -> GenericAction Map that can be injected to instances.
@SuppressWarnings("rawtypes")
MapBinder<String, GenericAction> actionsBinder
= MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
for (Map.Entry<String, ActionHandler<?, ?>> entry : actions.entrySet()) {
actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().getAction());
}
if (false == transportClient) {
// Supporting classes only used when not a transport client
bind(AutoCreateIndex.class).toInstance(autoCreateIndex);

View File

@ -35,7 +35,6 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
private final GenericAction<Request, Response> action;
private final TransportRequestOptions transportOptions;
@Inject
public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
super(settings);
this.action = action;

View File

@ -19,6 +19,12 @@
package org.elasticsearch.client.transport;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -53,11 +59,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* The transport client allows to create a client that is not part of the cluster, but simply connects to one
* or more nodes directly by adding their respective addresses using {@link #addTransportAddress(org.elasticsearch.common.transport.TransportAddress)}.
@ -143,8 +144,9 @@ public class TransportClient extends AbstractClient {
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry, true));
modules.add(new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class)));
ActionModule actionModule = new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule);
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
@ -161,9 +163,14 @@ public class TransportClient extends AbstractClient {
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
final TransportClientNodesService nodesService =
new TransportClientNodesService(settings, transportService, threadPool);
final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
transportService.start();
transportService.acceptIncomingRequests();
TransportClient transportClient = new TransportClient(injector);
TransportClient transportClient = new TransportClient(injector, nodesService, proxy);
resourcesToClose.clear();
return transportClient;
} finally {
@ -179,15 +186,11 @@ public class TransportClient extends AbstractClient {
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
private TransportClient(Injector injector) {
private TransportClient(Injector injector, TransportClientNodesService nodesService, TransportProxyClient proxy) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class);
}
TransportClientNodesService nodeService() {
return nodesService;
this.nodesService = nodesService;
this.proxy = proxy;
}
/**
@ -263,7 +266,7 @@ public class TransportClient extends AbstractClient {
@Override
public void close() {
List<Closeable> closeables = new ArrayList<>();
closeables.add(injector.getInstance(TransportClientNodesService.class));
closeables.add(nodesService);
closeables.add(injector.getInstance(TransportService.class));
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) {

View File

@ -113,7 +113,6 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Property.NodeScope);
@Inject
public TransportClientNodesService(Settings settings,TransportService transportService,
ThreadPool threadPool) {
super(settings);
@ -282,6 +281,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
}
@Override
public void close() {
synchronized (mutex) {
if (closed) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
@ -45,11 +46,11 @@ public class TransportProxyClient {
private final TransportClientNodesService nodesService;
private final Map<Action, TransportActionNodeProxy> proxies;
@Inject
public TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, Map<String, GenericAction> actions) {
public TransportProxyClient(Settings settings, TransportService transportService,
TransportClientNodesService nodesService, List<GenericAction> actions) {
this.nodesService = nodesService;
Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
for (GenericAction action : actions.values()) {
for (GenericAction action : actions) {
if (action instanceof Action) {
proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
@ -59,11 +60,6 @@ public class TransportProxyClient {
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
}
}

View File

@ -160,10 +160,7 @@ public class NetworkModule extends AbstractModule {
String defaultTransport = DiscoveryNode.isLocalNode(settings) ? LOCAL_TRANSPORT : NETTY_TRANSPORT;
transportTypes.bindType(binder(), settings, TRANSPORT_TYPE_KEY, defaultTransport);
if (transportClient) {
bind(TransportProxyClient.class).asEagerSingleton();
bind(TransportClientNodesService.class).asEagerSingleton();
} else {
if (transportClient == false) {
if (HTTP_ENABLED.get(settings)) {
bind(HttpServer.class).asEagerSingleton();
httpTransportTypes.bindType(binder(), settings, HTTP_TYPE_SETTING.getKey(), NETTY_TRANSPORT);

View File

@ -51,7 +51,6 @@ public class TransportClientIT extends ESIntegTestCase {
public void testNodeVersionIsUpdated() throws IOException {
TransportClient client = (TransportClient) internalCluster().client();
TransportClientNodesService nodeService = client.nodeService();
Node node = new Node(Settings.builder()
.put(internalCluster().getDefaultSettings())
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
@ -65,19 +64,19 @@ public class TransportClientIT extends ESIntegTestCase {
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
client.addTransportAddress(transportAddress);
// since we force transport clients there has to be one node started that we connect to.
assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1));
assertThat(client.connectedNodes().size(), greaterThanOrEqualTo(1));
// connected nodes have updated version
for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) {
for (DiscoveryNode discoveryNode : client.connectedNodes()) {
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT));
}
for (DiscoveryNode discoveryNode : nodeService.listedNodes()) {
for (DiscoveryNode discoveryNode : client.listedNodes()) {
assertThat(discoveryNode.getId(), startsWith("#transport#-"));
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
assertThat(nodeService.filteredNodes().size(), equalTo(1));
for (DiscoveryNode discoveryNode : nodeService.filteredNodes()) {
assertThat(client.filteredNodes().size(), equalTo(1));
for (DiscoveryNode discoveryNode : client.filteredNodes()) {
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
} finally {