Remove ClusterService from ctors in reindex (#22539)

Moves fetching the local node id into `NodeClient` which is a
fairly useful place to put it so you can generate task ids from
`NodeClient#executeLocally`.
This commit is contained in:
Nik Everett 2017-01-10 18:26:06 -05:00 committed by GitHub
parent 4df2e182bf
commit b71b8acf59
9 changed files with 33 additions and 28 deletions

View File

@ -28,12 +28,14 @@ import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier;
/** /**
* Client that executes actions on the local node. * Client that executes actions on the local node.
@ -41,13 +43,19 @@ import java.util.Map;
public class NodeClient extends AbstractClient { public class NodeClient extends AbstractClient {
private Map<GenericAction, TransportAction> actions; private Map<GenericAction, TransportAction> actions;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}.
*/
private Supplier<String> localNodeId;
public NodeClient(Settings settings, ThreadPool threadPool) { public NodeClient(Settings settings, ThreadPool threadPool) {
super(settings, threadPool); super(settings, threadPool);
} }
public void initialize(Map<GenericAction, TransportAction> actions) { public void initialize(Map<GenericAction, TransportAction> actions, Supplier<String> localNodeId) {
this.actions = actions; this.actions = actions;
this.localNodeId = localNodeId;
} }
@Override @Override
@ -85,6 +93,14 @@ public class NodeClient extends AbstractClient {
return transportAction(action).execute(request, listener); return transportAction(action).execute(request, listener);
} }
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}.
*/
public String getLocalNodeId() {
return localNodeId.get();
}
/** /**
* Get the {@link TransportAction} for an {@link Action}, throwing exceptions if the action isn't available. * Get the {@link TransportAction} for an {@link Action}, throwing exceptions if the action isn't available.
*/ */

View File

@ -463,7 +463,8 @@ public class Node implements Closeable {
.map(injector::getInstance).collect(Collectors.toList())); .map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents); resourcesToClose.addAll(pluginLifecycleComponents);
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {})); client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
() -> clusterService.localNode().getId());
logger.info("initialized"); logger.info("initialized");

View File

@ -43,7 +43,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase {
Settings settings = HEADER_SETTINGS; Settings settings = HEADER_SETTINGS;
Actions actions = new Actions(settings, threadPool, testedActions); Actions actions = new Actions(settings, threadPool, testedActions);
NodeClient client = new NodeClient(settings, threadPool); NodeClient client = new NodeClient(settings, threadPool);
client.initialize(actions); client.initialize(actions, () -> "test");
return client; return client;
} }

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BaseRestHandler;
@ -44,14 +43,11 @@ public abstract class AbstractBaseReindexRestHandler<
> extends BaseRestHandler { > extends BaseRestHandler {
protected final SearchRequestParsers searchRequestParsers; protected final SearchRequestParsers searchRequestParsers;
private final ClusterService clusterService;
private final A action; private final A action;
protected AbstractBaseReindexRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, ClusterService clusterService, protected AbstractBaseReindexRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, A action) {
A action) {
super(settings); super(settings);
this.searchRequestParsers = searchRequestParsers; this.searchRequestParsers = searchRequestParsers;
this.clusterService = clusterService;
this.action = action; this.action = action;
} }
@ -80,7 +76,7 @@ public abstract class AbstractBaseReindexRestHandler<
if (validationException != null) { if (validationException != null) {
throw validationException; throw validationException;
} }
return sendTask(client.executeLocally(action, internal, LoggingTaskListener.instance())); return sendTask(client.getLocalNodeId(), client.executeLocally(action, internal, LoggingTaskListener.instance()));
} }
/** /**
@ -111,11 +107,11 @@ public abstract class AbstractBaseReindexRestHandler<
return request; return request;
} }
private RestChannelConsumer sendTask(Task task) throws IOException { private RestChannelConsumer sendTask(String localNodeId, Task task) throws IOException {
return channel -> { return channel -> {
try (XContentBuilder builder = channel.newBuilder()) { try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject(); builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId()); builder.field("task", localNodeId + ":" + task.getId());
builder.endObject(); builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -44,9 +43,8 @@ public abstract class AbstractBulkByQueryRestHandler<
Request extends AbstractBulkByScrollRequest<Request>, Request extends AbstractBulkByScrollRequest<Request>,
A extends GenericAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, A> { A extends GenericAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, A> {
protected AbstractBulkByQueryRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, ClusterService clusterService, protected AbstractBulkByQueryRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, A action) {
A action) { super(settings, searchRequestParsers, action);
super(settings, searchRequestParsers, clusterService, action);
} }
protected void parseInternalRequest(Request internal, RestRequest restRequest, protected void parseInternalRequest(Request internal, RestRequest restRequest,

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
@ -39,9 +38,8 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<DeleteByQueryRequest, DeleteByQueryAction> { public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<DeleteByQueryRequest, DeleteByQueryAction> {
@Inject @Inject
public RestDeleteByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers, public RestDeleteByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers) {
ClusterService clusterService) { super(settings, searchRequestParsers, DeleteByQueryAction.INSTANCE);
super(settings, searchRequestParsers, clusterService, DeleteByQueryAction.INSTANCE);
controller.registerHandler(POST, "/{index}/_delete_by_query", this); controller.registerHandler(POST, "/{index}/_delete_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this);
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier; import org.elasticsearch.common.ParseFieldMatcherSupplier;
@ -102,9 +101,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
} }
@Inject @Inject
public RestReindexAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers, public RestReindexAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers) {
ClusterService clusterService) { super(settings, searchRequestParsers, ReindexAction.INSTANCE);
super(settings, searchRequestParsers, clusterService, ReindexAction.INSTANCE);
controller.registerHandler(POST, "/_reindex", this); controller.registerHandler(POST, "/_reindex", this);
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -45,9 +44,8 @@ import static org.elasticsearch.script.Script.DEFAULT_SCRIPT_LANG;
public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<UpdateByQueryRequest, UpdateByQueryAction> { public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<UpdateByQueryRequest, UpdateByQueryAction> {
@Inject @Inject
public RestUpdateByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers, public RestUpdateByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers) {
ClusterService clusterService) { super(settings, searchRequestParsers, UpdateByQueryAction.INSTANCE);
super(settings, searchRequestParsers, clusterService, UpdateByQueryAction.INSTANCE);
controller.registerHandler(POST, "/{index}/_update_by_query", this); controller.registerHandler(POST, "/{index}/_update_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this);
} }

View File

@ -133,7 +133,7 @@ public class RestReindexActionTests extends ESTestCase {
public void testPipelineQueryParameterIsError() throws IOException { public void testPipelineQueryParameterIsError() throws IOException {
SearchRequestParsers parsers = new SearchRequestParsers(); SearchRequestParsers parsers = new SearchRequestParsers();
RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class), parsers, null); RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class), parsers);
FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry()); FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry());
try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) { try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) {