Add wait_for_completion to reindex

It defaults to false and when false it returns a task identifier. Right
now all you can do is get the task to see if it is still running. Once
the task finishes it vanishes and you can't get any information about it.
This commit is contained in:
Nik Everett 2016-01-17 08:22:43 -05:00
parent 0d6d77328d
commit da42838cff
19 changed files with 256 additions and 33 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
/**
* An ActionListener that does nothing. Used when we need a listener but don't
* care to listen for the result.
*/
public final class NoopActionListener<Response> implements ActionListener<Response> {
/**
* Get the instance of NoopActionListener cast appropriately.
*/
@SuppressWarnings("unchecked") // Safe because we do nothing with the type.
public static <Response> ActionListener<Response> instance() {
return (ActionListener<Response>) INSTANCE;
}
private static final NoopActionListener<Object> INSTANCE = new NoopActionListener<Object>();
private NoopActionListener() {
}
@Override
public void onResponse(Response response) {
}
@Override
public void onFailure(Throwable e) {
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.NoopActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse, TA extends TransportAction<Request, Response>>
extends BaseRestHandler {
protected final IndicesQueriesRegistry indicesQueriesRegistry;
private final ClusterService clusterService;
private final TA action;
protected AbstractBaseReindexRestHandler(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService, TA action) {
super(settings, controller, client);
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.clusterService = clusterService;
this.action = action;
}
protected void execute(RestRequest request, Request internalRequest, RestChannel channel) throws IOException {
if (request.paramAsBoolean("wait_for_completion", false)) {
action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<Response>(channel));
return;
}
/*
* Lets try and validate before forking launching the task so we can
* return errors even if we aren't waiting.
*/
ActionRequestValidationException validationException = internalRequest.validate();
if (validationException != null) {
channel.sendResponse(new BytesRestResponse(channel, validationException));
return;
}
Task task = action.execute(internalRequest, NoopActionListener.instance());
sendTask(channel, task);
}
private void sendTask(RestChannel channel, Task task) throws IOException {
XContentBuilder builder = channel.newBuilder();
builder.startObject();
builder.startObject("task");
builder.field("node", clusterService.localNode().getId());
builder.field("id", task.getId());
builder.endObject();
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
}

View File

@ -19,10 +19,15 @@
package org.elasticsearch.plugin.reindex;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.inject.Inject;
@ -36,26 +41,20 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.Script;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.plugin.reindex.ReindexAction.INSTANCE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
/**
* Expose IndexBySearchRequest over rest.
*/
public class RestReindexAction extends BaseRestHandler {
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexResponse, TransportReindexAction> {
private static final ObjectParser<ReindexRequest, QueryParseContext> PARSER = new ObjectParser<>("reindex");
static {
ObjectParser.Parser<SearchRequest, QueryParseContext> sourceParser = (parser, search, context) -> {
@ -97,13 +96,10 @@ public class RestReindexAction extends BaseRestHandler {
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
}
private IndicesQueriesRegistry indicesQueriesRegistry;
@Inject
public RestReindexAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry) {
super(settings, controller, client);
this.indicesQueriesRegistry = indicesQueriesRegistry;
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService, TransportReindexAction action) {
super(settings, controller, client, indicesQueriesRegistry, clusterService, action);
controller.registerHandler(POST, "/_reindex", this);
}
@ -125,7 +121,7 @@ public class RestReindexAction extends BaseRestHandler {
}
parseCommon(internalRequest, request);
client.execute(INSTANCE, internalRequest, new BulkIndexByScrollResponseContentListener<>(channel));
execute(request, internalRequest, channel);
}
private void badRequest(RestChannel channel, String message) {

View File

@ -19,8 +19,11 @@
package org.elasticsearch.plugin.reindex;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -30,7 +33,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -38,21 +40,17 @@ import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.script.Script;
import java.util.Map;
import static org.elasticsearch.plugin.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.plugin.reindex.RestReindexAction.parseCommon;
import static org.elasticsearch.plugin.reindex.UpdateByQueryAction.INSTANCE;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpdateByQueryAction extends BaseRestHandler {
private IndicesQueriesRegistry indicesQueriesRegistry;
public class RestUpdateByQueryAction extends
AbstractBaseReindexRestHandler<UpdateByQueryRequest, BulkIndexByScrollResponse, TransportUpdateByQueryAction> {
@Inject
public RestUpdateByQueryAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry) {
super(settings, controller, client);
this.indicesQueriesRegistry = indicesQueriesRegistry;
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService,
TransportUpdateByQueryAction action) {
super(settings, controller, client, indicesQueriesRegistry, clusterService, action);
controller.registerHandler(POST, "/{index}/_update_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this);
}
@ -108,6 +106,6 @@ public class RestUpdateByQueryAction extends BaseRestHandler {
internalRequest.setSize(internalRequest.getSource().source().size());
internalRequest.getSource().source().size(request.paramAsInt("scroll_size", scrollSize));
client.execute(INSTANCE, internalRequest, new BulkIndexByScrollResponseContentListener<>(channel));
execute(request, internalRequest, channel);
}
}

View File

@ -174,8 +174,8 @@ public class AsyncBulkByScrollActionTest extends ESTestCase {
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" }) // Declaration is raw
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
if (request instanceof ClearScrollRequest) {
ClearScrollRequest clearScroll = (ClearScrollRequest) request;

View File

@ -22,20 +22,13 @@ package org.elasticsearch.plugin.reindex;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import java.io.IOException;
import java.util.Collection;
public class ReindexRestIT extends ESRestTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return pluginList(ReindexPlugin.class);
}
public ReindexRestIT(@Name("yaml") RestTestCandidate testCandidate) {
super(testCandidate);
}

View File

@ -20,6 +20,11 @@
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
},
"wait_for_completion": {
"type" : "boolean",
"default": false,
"description" : "Should the request should block until the reindex is complete."
}
}
},

View File

@ -185,6 +185,11 @@
"type": "integer",
"defaut_value": 100,
"description": "Size on the scroll request powering the update-by-query"
},
"wait_for_completion": {
"type" : "boolean",
"default": false,
"description" : "Should the request should block until the reindex is complete."
}
}
},

View File

@ -11,6 +11,7 @@
- do:
reindex:
wait_for_completion: true
body:
source:
index: source
@ -42,6 +43,7 @@
- do:
reindex:
wait_for_completion: true
body:
source:
index: source
@ -54,6 +56,34 @@
- match: {failures: []}
- is_true: took
---
"wait_for_completion=true":
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
reindex:
body:
source:
index: source
dest:
index: dest
- is_true: task.node
- is_true: task.id
- is_false: updated
- is_false: version_conflicts
- is_false: batches
- is_false: failures
- is_false: noops
- is_false: took
- is_false: created
---
"Response format for version conflict":
- do:
@ -74,6 +104,7 @@
- do:
catch: conflict
reindex:
wait_for_completion: true
body:
source:
index: source
@ -113,6 +144,7 @@
- do:
reindex:
wait_for_completion: true
body:
conflicts: proceed
source:
@ -140,6 +172,7 @@
- do:
reindex:
wait_for_completion: true
refresh: true
body:
source:
@ -172,6 +205,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -204,6 +238,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -238,6 +273,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -272,6 +308,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: [twitter, blog]
@ -319,6 +356,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
size: 1
source:

View File

@ -18,6 +18,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: test
@ -52,6 +53,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
size: 1
source:

View File

@ -30,6 +30,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: src
@ -75,6 +76,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
conflicts: proceed
source:
@ -122,6 +124,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: src
@ -166,6 +169,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: src

View File

@ -12,6 +12,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: src
@ -42,6 +43,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: src

View File

@ -23,6 +23,7 @@
catch: unavailable
reindex:
timeout: 1s
wait_for_completion: true
body:
source:
index: src
@ -34,6 +35,7 @@
- do:
reindex:
consistency: one
wait_for_completion: true
body:
source:
index: src

View File

@ -12,6 +12,7 @@
- do:
update-by-query:
index: test
wait_for_completion: true
- match: {updated: 1}
- match: {version_conflicts: 0}
- match: {batches: 1}
@ -20,6 +21,30 @@
- is_true: took
- is_false: created # This shouldn't be included in the response
---
"wait_for_completion=false":
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
update-by-query:
index: test
- is_true: task.node
- is_true: task.id
- is_false: updated
- is_false: version_conflicts
- is_false: batches
- is_false: failures
- is_false: noops
- is_false: took
- is_false: created
---
"Response for version conflict":
- do:
@ -47,6 +72,7 @@
catch: conflict
update-by-query:
index: test
wait_for_completion: true
- match: {updated: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
@ -87,6 +113,7 @@
update-by-query:
index: test
conflicts: proceed
wait_for_completion: true
- match: {updated: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
@ -114,6 +141,7 @@
- do:
update-by-query:
index: twitter
wait_for_completion: true
body:
query:
match:
@ -144,6 +172,7 @@
- do:
update-by-query:
index: twitter
wait_for_completion: true
size: 1
- match: {updated: 1}
- match: {version_conflicts: 0}
@ -184,4 +213,5 @@
update-by-query:
index: test
scroll_size: 1
wait_for_completion: true
- match: {batches: 3}

View File

@ -45,6 +45,7 @@
- do:
update-by-query:
index: test
wait_for_completion: true
- do:
indices.refresh: {}

View File

@ -12,6 +12,7 @@
- do:
update-by-query:
index: test
wait_for_completion: true
- match: {updated: 1}
- match: {version_conflicts: 0}

View File

@ -24,6 +24,7 @@
update-by-query:
index: test
timeout: 1s
wait_for_completion: true
- match:
failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
@ -31,6 +32,7 @@
update-by-query:
index: test
consistency: one
wait_for_completion: true
- match: {failures: []}
- match: {updated: 1}
- match: {version_conflicts: 0}

View File

@ -12,6 +12,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -51,6 +52,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -110,6 +112,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -153,6 +156,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -211,6 +215,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -256,6 +261,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter
@ -303,6 +309,7 @@
- do:
reindex:
wait_for_completion: true
body:
source:
index: twitter
@ -336,6 +343,7 @@
- do:
reindex:
wait_for_completion: true
refresh: true
body:
source:
@ -377,6 +385,7 @@
- do:
reindex:
refresh: true
wait_for_completion: true
body:
source:
index: twitter

View File

@ -13,6 +13,7 @@
update-by-query:
index: twitter
refresh: true
wait_for_completion: true
body:
script:
inline: ctx._source.user = "not" + ctx._source.user
@ -49,6 +50,7 @@
update-by-query:
refresh: true
index: twitter
wait_for_completion: true
body:
script:
inline: if (ctx._source.user == "kimchy") {ctx._source.user = "not" + ctx._source.user} else {ctx.op = "noop"}
@ -94,6 +96,7 @@
update-by-query:
refresh: true
index: twitter
wait_for_completion: true
body:
script:
inline: ctx.op = "noop"
@ -116,6 +119,7 @@
catch: /Invalid fields added to ctx \[junk\]/
update-by-query:
index: twitter
wait_for_completion: true
body:
script:
inline: ctx.junk = "stuff"
@ -135,6 +139,7 @@
catch: /Modifying \[_id\] not allowed/
update-by-query:
index: twitter
wait_for_completion: true
body:
script:
inline: ctx._id = "stuff"