Remove ReindexResponse in favor of BulkIndexByScrollResponse

This commit is contained in:
Tanguy Leroux 2016-05-09 14:20:17 +02:00
parent ef2e3a8c39
commit 8c52e8814b
21 changed files with 121 additions and 160 deletions

View File

@ -20,12 +20,15 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
@ -44,6 +47,7 @@ import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -53,17 +57,15 @@ import static java.util.Collections.emptyMap;
* Abstract base for scrolling across a search and executing bulk indexes on all * Abstract base for scrolling across a search and executing bulk indexes on all
* results. * results.
*/ */
public abstract class AbstractAsyncBulkIndexByScrollAction< public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkIndexByScrollRequest<Request>>
Request extends AbstractBulkIndexByScrollRequest<Request>, extends AbstractAsyncBulkByScrollAction<Request, BulkIndexByScrollResponse> {
Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkByScrollAction<Request, Response> {
private final ScriptService scriptService; private final ScriptService scriptService;
private final CompiledScript script; private final CompiledScript script;
public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, ClusterState state, public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, ClusterState state,
ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<Response> listener) { ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener); super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener);
this.scriptService = scriptService; this.scriptService = scriptService;
if (mainRequest.getScript() == null) { if (mainRequest.getScript() == null) {
@ -73,6 +75,12 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<
} }
} }
@Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<BulkItemResponse.Failure> indexingFailures,
List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
}
/** /**
* Build the IndexRequest for a single search hit. This shouldn't handle * Build the IndexRequest for a single search hit. This shouldn't handle
* metadata or the script. That will be handled by copyMetadata and * metadata or the script. That will be handled by copyMetadata and

View File

@ -37,11 +37,12 @@ import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractBaseReindexRestHandler< public abstract class AbstractBaseReindexRestHandler<
Request extends AbstractBulkByScrollRequest<Request>, Request extends AbstractBulkByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse, TA extends TransportAction<Request, BulkIndexByScrollResponse>
TA extends TransportAction<Request, Response>
> extends BaseRestHandler { > extends BaseRestHandler {
/** /**
@ -87,14 +88,20 @@ public abstract class AbstractBaseReindexRestHandler<
this.action = action; this.action = action;
} }
protected void execute(RestRequest request, Request internalRequest, RestChannel channel) throws IOException { protected void execute(RestRequest request, Request internalRequest, RestChannel channel,
boolean includeCreated, boolean includeUpdated, boolean includeDeleted) throws IOException {
Float requestsPerSecond = parseRequestsPerSecond(request); Float requestsPerSecond = parseRequestsPerSecond(request);
if (requestsPerSecond != null) { if (requestsPerSecond != null) {
internalRequest.setRequestsPerSecond(requestsPerSecond); internalRequest.setRequestsPerSecond(requestsPerSecond);
} }
if (request.paramAsBoolean("wait_for_completion", true)) { if (request.paramAsBoolean("wait_for_completion", true)) {
action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<Response>(channel)); Map<String, String> params = new HashMap<>();
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(includeCreated));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(includeUpdated));
params.put(BulkByScrollTask.Status.INCLUDE_DELETED, Boolean.toString(includeDeleted));
action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<>(channel, params));
return; return;
} }
/* /*

View File

@ -108,6 +108,24 @@ public class BulkByScrollTask extends CancellableTask {
public static class Status implements Task.Status { public static class Status implements Task.Status {
public static final String NAME = "bulk-by-scroll"; public static final String NAME = "bulk-by-scroll";
/**
* XContent param name to indicate if "created" count must be included
* in the response.
*/
public static final String INCLUDE_CREATED = "include_created";
/**
* XContent param name to indicate if "updated" count must be included
* in the response.
*/
public static final String INCLUDE_UPDATED = "include_updated";
/**
* XContent param name to indicate if "deleted" count must be included
* in the response.
*/
public static final String INCLUDE_DELETED = "include_deleted";
private final long total; private final long total;
private final long updated; private final long updated;
private final long created; private final long created;
@ -171,18 +189,20 @@ public class BulkByScrollTask extends CancellableTask {
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
innerXContent(builder, params, true, true); innerXContent(builder, params);
return builder.endObject(); return builder.endObject();
} }
public XContentBuilder innerXContent(XContentBuilder builder, Params params, boolean includeCreated, boolean includeDeleted) public XContentBuilder innerXContent(XContentBuilder builder, Params params)
throws IOException { throws IOException {
builder.field("total", total); builder.field("total", total);
builder.field("updated", updated); if (params.paramAsBoolean(INCLUDE_UPDATED, true)) {
if (includeCreated) { builder.field("updated", updated);
}
if (params.paramAsBoolean(INCLUDE_CREATED, true)) {
builder.field("created", created); builder.field("created", created);
} }
if (includeDeleted) { if (params.paramAsBoolean(INCLUDE_DELETED, true)) {
builder.field("deleted", deleted); builder.field("deleted", deleted);
} }
builder.field("batches", batches); builder.field("batches", batches);
@ -202,18 +222,14 @@ public class BulkByScrollTask extends CancellableTask {
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("BulkIndexByScrollResponse["); builder.append("BulkIndexByScrollResponse[");
innerToString(builder, true, true); innerToString(builder);
return builder.append(']').toString(); return builder.append(']').toString();
} }
public void innerToString(StringBuilder builder, boolean includeCreated, boolean includeDeleted) { public void innerToString(StringBuilder builder) {
builder.append("updated=").append(updated); builder.append("updated=").append(updated);
if (includeCreated) { builder.append(",created=").append(created);
builder.append(",created=").append(created); builder.append(",deleted=").append(deleted);
}
if (includeDeleted) {
builder.append(",deleted=").append(deleted);
}
builder.append(",batches=").append(batches); builder.append(",batches=").append(batches);
builder.append(",versionConflicts=").append(versionConflicts); builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops); builder.append(",noops=").append(noops);

View File

@ -51,7 +51,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
} }
public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures, public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures, boolean timedOut) { List<ShardSearchFailure> searchFailures, boolean timedOut) {
this.took = took; this.took = took;
this.status = requireNonNull(status, "Null status not supported"); this.status = requireNonNull(status, "Null status not supported");
this.indexingFailures = indexingFailures; this.indexingFailures = indexingFailures;
@ -67,6 +67,14 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
return status; return status;
} }
public long getCreated() {
return status.getCreated();
}
public long getDeleted() {
return status.getDeleted();
}
public long getUpdated() { public long getUpdated() {
return status.getUpdated(); return status.getUpdated();
} }
@ -152,7 +160,7 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("took", took.millis()); builder.field("took", took.millis());
builder.field("timed_out", timedOut); builder.field("timed_out", timedOut);
status.innerXContent(builder, params, false, false); status.innerXContent(builder, params);
builder.startArray("failures"); builder.startArray("failures");
for (Failure failure: indexingFailures) { for (Failure failure: indexingFailures) {
builder.startObject(); builder.startObject();
@ -173,7 +181,8 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("BulkIndexByScrollResponse["); builder.append("BulkIndexByScrollResponse[");
builder.append("took=").append(took).append(','); builder.append("took=").append(took).append(',');
status.innerToString(builder, false, false); builder.append("timed_out=").append(timedOut).append(',');
status.innerToString(builder);
builder.append(",indexing_failures=").append(getIndexingFailures().subList(0, min(3, getIndexingFailures().size()))); builder.append(",indexing_failures=").append(getIndexingFailures().subList(0, min(3, getIndexingFailures().size())));
builder.append(",search_failures=").append(getSearchFailures().subList(0, min(3, getSearchFailures().size()))); builder.append(",search_failures=").append(getSearchFailures().subList(0, min(3, getSearchFailures().size())));
return builder.append(']').toString(); return builder.append(']').toString();

View File

@ -22,21 +22,37 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.rest.action.support.RestBuilderListener;
import java.util.Map;
/** /**
* Just like RestToXContentListener but will return higher than 200 status if * RestBuilderListener that returns higher than 200 status if there are any failures and allows to set XContent.Params.
* there are any failures.
*/ */
public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrollResponse> extends RestToXContentListener<R> { public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrollResponse> extends RestBuilderListener<R> {
public BulkIndexByScrollResponseContentListener(RestChannel channel) {
private final Map<String, String> params;
public BulkIndexByScrollResponseContentListener(RestChannel channel, Map<String, String> params) {
super(channel); super(channel);
this.params = params;
} }
@Override @Override
protected RestStatus getStatus(R response) { public RestResponse buildResponse(R response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, new ToXContent.DelegatingMapParams(params, channel.request()));
builder.endObject();
return new BytesRestResponse(getStatus(response), builder);
}
private RestStatus getStatus(R response) {
/* /*
* Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" and thus more * Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" and thus more
* interesting to the user. * interesting to the user.

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
public class ReindexAction extends Action<ReindexRequest, ReindexResponse, ReindexRequestBuilder> { public class ReindexAction extends Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> {
public static final ReindexAction INSTANCE = new ReindexAction(); public static final ReindexAction INSTANCE = new ReindexAction();
public static final String NAME = "indices:data/write/reindex"; public static final String NAME = "indices:data/write/reindex";
@ -36,7 +36,7 @@ public class ReindexAction extends Action<ReindexRequest, ReindexResponse, Reind
} }
@Override @Override
public ReindexResponse newResponse() { public BulkIndexByScrollResponse newResponse() {
return new ReindexResponse(); return new BulkIndexByScrollResponse();
} }
} }

View File

@ -27,17 +27,17 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
public class ReindexRequestBuilder extends public class ReindexRequestBuilder extends
AbstractBulkIndexByScrollRequestBuilder<ReindexRequest, ReindexResponse, ReindexRequestBuilder> { AbstractBulkIndexByScrollRequestBuilder<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> {
private final IndexRequestBuilder destination; private final IndexRequestBuilder destination;
public ReindexRequestBuilder(ElasticsearchClient client, public ReindexRequestBuilder(ElasticsearchClient client,
Action<ReindexRequest, ReindexResponse, ReindexRequestBuilder> action) { Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> action) {
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE), this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE),
new IndexRequestBuilder(client, IndexAction.INSTANCE)); new IndexRequestBuilder(client, IndexAction.INSTANCE));
} }
private ReindexRequestBuilder(ElasticsearchClient client, private ReindexRequestBuilder(ElasticsearchClient client,
Action<ReindexRequest, ReindexResponse, ReindexRequestBuilder> action, Action<ReindexRequest, BulkIndexByScrollResponse, ReindexRequestBuilder> action,
SearchRequestBuilder search, IndexRequestBuilder destination) { SearchRequestBuilder search, IndexRequestBuilder destination) {
super(client, action, search, new ReindexRequest(search.request(), destination.request())); super(client, action, search, new ReindexRequest(search.request(), destination.request()));
this.destination = destination; this.destination = destination;

View File

@ -1,75 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import java.io.IOException;
import java.util.List;
/**
* Response for the ReindexAction.
*/
public class ReindexResponse extends BulkIndexByScrollResponse {
public ReindexResponse() {
}
public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
boolean timedOut) {
super(took, status, indexingFailures, searchFailures, timedOut);
}
public long getCreated() {
return getStatus().getCreated();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("took", getTook().millis());
builder.field("timed_out", isTimedOut());
getStatus().innerXContent(builder, params, true, false);
builder.startArray("failures");
for (Failure failure: getIndexingFailures()) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
for (ShardSearchFailure failure: getSearchFailures()) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ReindexResponse[");
builder.append("took=").append(getTook()).append(',');
getStatus().innerToString(builder, true, false);
return builder.append(']').toString();
}
}

View File

@ -58,7 +58,7 @@ import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
/** /**
* Expose IndexBySearchRequest over rest. * Expose IndexBySearchRequest over rest.
*/ */
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexResponse, TransportReindexAction> { public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, TransportReindexAction> {
private static final ObjectParser<ReindexRequest, ReindexParseContext> PARSER = new ObjectParser<>("reindex"); private static final ObjectParser<ReindexRequest, ReindexParseContext> PARSER = new ObjectParser<>("reindex");
static { static {
ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> { ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> {
@ -130,7 +130,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
} }
parseCommon(internalRequest, request); parseCommon(internalRequest, request);
execute(request, internalRequest, channel); execute(request, internalRequest, channel, true, true, false);
} }
private void badRequest(RestChannel channel, String message) { private void badRequest(RestChannel channel, String message) {

View File

@ -46,8 +46,8 @@ import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_A
import static org.elasticsearch.index.reindex.RestReindexAction.parseCommon; import static org.elasticsearch.index.reindex.RestReindexAction.parseCommon;
import static org.elasticsearch.rest.RestRequest.Method.POST; import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpdateByQueryAction extends public class RestUpdateByQueryAction extends AbstractBaseReindexRestHandler<UpdateByQueryRequest, TransportUpdateByQueryAction> {
AbstractBaseReindexRestHandler<UpdateByQueryRequest, BulkIndexByScrollResponse, TransportUpdateByQueryAction> {
@Inject @Inject
public RestUpdateByQueryAction(Settings settings, RestController controller, Client client, public RestUpdateByQueryAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters, IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters,
@ -113,6 +113,6 @@ public class RestUpdateByQueryAction extends
internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null)); internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null));
} }
execute(request, internalRequest, channel); execute(request, internalRequest, channel, false, true, false);
} }
} }

View File

@ -21,10 +21,8 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
@ -46,13 +44,12 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.elasticsearch.index.VersionType.INTERNAL; import static org.elasticsearch.index.VersionType.INTERNAL;
public class TransportReindexAction extends HandledTransportAction<ReindexRequest, ReindexResponse> { public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkIndexByScrollResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final ScriptService scriptService; private final ScriptService scriptService;
private final AutoCreateIndex autoCreateIndex; private final AutoCreateIndex autoCreateIndex;
@ -71,7 +68,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
} }
@Override @Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) { protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state); validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state);
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task); ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
@ -79,7 +76,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
} }
@Override @Override
protected void doExecute(ReindexRequest request, ActionListener<ReindexResponse> listener) { protected void doExecute(ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
throw new UnsupportedOperationException("task required"); throw new UnsupportedOperationException("task required");
} }
@ -116,10 +113,10 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* but this makes no attempt to do any of them so it can be as simple * but this makes no attempt to do any of them so it can be as simple
* possible. * possible.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ParentTaskAssigningClient client, ClusterState state, ThreadPool threadPool, ReindexRequest request, ParentTaskAssigningClient client, ClusterState state, ThreadPool threadPool, ReindexRequest request,
ActionListener<ReindexResponse> listener) { ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener); super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener);
} }
@ -193,12 +190,6 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
} }
} }
@Override
protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
boolean timedOut) {
return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
}
/* /*
* Methods below here handle script updating the index request. They try * Methods below here handle script updating the index request. They try
* to be pretty liberal with regards to types because script are often * to be pretty liberal with regards to types because script are often

View File

@ -20,9 +20,7 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -33,7 +31,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
@ -48,8 +45,6 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.List;
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkIndexByScrollResponse> { public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
private final Client client; private final Client client;
private final ScriptService scriptService; private final ScriptService scriptService;
@ -82,7 +77,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
/** /**
* Simple implementation of update-by-query using scrolling and bulk. * Simple implementation of update-by-query using scrolling and bulk.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ParentTaskAssigningClient client, ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request, ParentTaskAssigningClient client, ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request,
ActionListener<BulkIndexByScrollResponse> listener) { ActionListener<BulkIndexByScrollResponse> listener) {
@ -102,12 +97,6 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
return index; return index;
} }
@Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
}
@Override @Override
protected void scriptChangedIndex(IndexRequest index, Object to) { protected void scriptChangedIndex(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + IndexFieldMapper.NAME + "] not allowed"); throw new IllegalArgumentException("Modifying [" + IndexFieldMapper.NAME + "] not allowed");

View File

@ -46,7 +46,7 @@ public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
threadPool.shutdown(); threadPool.shutdown();
} }
protected abstract AbstractAsyncBulkIndexByScrollAction<Request, Response> action(); protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action();
protected abstract Request request(); protected abstract Request request();

View File

@ -33,7 +33,7 @@ import static org.hamcrest.Matchers.equalTo;
*/ */
public class ReindexCancelTests extends ReindexTestCase { public class ReindexCancelTests extends ReindexTestCase {
public void testCancel() throws Exception { public void testCancel() throws Exception {
ReindexResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME); BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME);
assertThat(response, reindexResponseMatcher().created(1).reasonCancelled(equalTo("by user request"))); assertThat(response, reindexResponseMatcher().created(1).reasonCancelled(equalTo("by user request")));
refresh("dest"); refresh("dest");

View File

@ -57,7 +57,7 @@ public class ReindexFailureTests extends ReindexTestCase {
*/ */
copy.source().setSize(1); copy.source().setSize(1);
ReindexResponse response = copy.get(); BulkIndexByScrollResponse response = copy.get();
assertThat(response, reindexResponseMatcher() assertThat(response, reindexResponseMatcher()
.batches(1) .batches(1)
.failures(both(greaterThan(0)).and(lessThanOrEqualTo(maximumNumberOfShards())))); .failures(both(greaterThan(0)).and(lessThanOrEqualTo(maximumNumberOfShards()))));
@ -77,7 +77,7 @@ public class ReindexFailureTests extends ReindexTestCase {
// CREATE will cause the conflict to prevent the write. // CREATE will cause the conflict to prevent the write.
copy.destination().setOpType(CREATE); copy.destination().setOpType(CREATE);
ReindexResponse response = copy.get(); BulkIndexByScrollResponse response = copy.get();
assertThat(response, reindexResponseMatcher().batches(1).versionConflicts(1).failures(1).created(99)); assertThat(response, reindexResponseMatcher().batches(1).versionConflicts(1).failures(1).created(99));
for (Failure failure: response.getIndexingFailures()) { for (Failure failure: response.getIndexingFailures()) {
assertThat(failure.getMessage(), containsString("VersionConflictEngineException[[test][")); assertThat(failure.getMessage(), containsString("VersionConflictEngineException[[test]["));
@ -99,7 +99,7 @@ public class ReindexFailureTests extends ReindexTestCase {
indexDocs(100); indexDocs(100);
ReindexRequestBuilder copy = reindex().source("source").destination("dest"); ReindexRequestBuilder copy = reindex().source("source").destination("dest");
copy.source().setSize(10); copy.source().setSize(10);
Future<ReindexResponse> response = copy.execute(); Future<BulkIndexByScrollResponse> response = copy.execute();
client().admin().indices().prepareDelete("source").get(); client().admin().indices().prepareDelete("source").get();
try { try {

View File

@ -26,7 +26,7 @@ import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
/** /**
* Index-by-search test for ttl, timestamp, and routing. * Index-by-search test for ttl, timestamp, and routing.
*/ */
public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<ReindexRequest, ReindexResponse> { public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<ReindexRequest, BulkIndexByScrollResponse> {
public void testRoutingCopiedByDefault() throws Exception { public void testRoutingCopiedByDefault() throws Exception {
IndexRequest index = new IndexRequest(); IndexRequest index = new IndexRequest();
action().copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo")); action().copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));

View File

@ -30,7 +30,7 @@ import static org.hamcrest.Matchers.containsString;
/** /**
* Tests index-by-search with a script modifying the documents. * Tests index-by-search with a script modifying the documents.
*/ */
public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<ReindexRequest, ReindexResponse> { public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<ReindexRequest, BulkIndexByScrollResponse> {
public void testSetIndex() throws Exception { public void testSetIndex() throws Exception {
Object dest = randomFrom(new Object[] {234, 234L, "pancake"}); Object dest = randomFrom(new Object[] {234, 234L, "pancake"});
IndexRequest index = applyScript((Map<String, Object> ctx) -> ctx.put("_index", dest)); IndexRequest index = applyScript((Map<String, Object> ctx) -> ctx.put("_index", dest));
@ -133,7 +133,7 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri
} }
@Override @Override
protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> action() { protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest> action() {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener()); return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener());
} }
} }

View File

@ -58,7 +58,7 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
} }
protected static class IndexBySearchResponseMatcher protected static class IndexBySearchResponseMatcher
extends AbstractBulkIndexByScrollResponseMatcher<ReindexResponse, IndexBySearchResponseMatcher> { extends AbstractBulkIndexByScrollResponseMatcher<BulkIndexByScrollResponse, IndexBySearchResponseMatcher> {
private Matcher<Long> createdMatcher = equalTo(0L); private Matcher<Long> createdMatcher = equalTo(0L);
public IndexBySearchResponseMatcher created(Matcher<Long> updatedMatcher) { public IndexBySearchResponseMatcher created(Matcher<Long> updatedMatcher) {
@ -71,7 +71,7 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
} }
@Override @Override
protected boolean matchesSafely(ReindexResponse item) { protected boolean matchesSafely(BulkIndexByScrollResponse item) {
return super.matchesSafely(item) && createdMatcher.matches(item.getCreated()); return super.matchesSafely(item) && createdMatcher.matches(item.getCreated());
} }

View File

@ -88,7 +88,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
/** /**
* Perform a reindex with EXTERNAL versioning which has "refresh" semantics. * Perform a reindex with EXTERNAL versioning which has "refresh" semantics.
*/ */
private ReindexResponse reindexExternal() { private BulkIndexByScrollResponse reindexExternal() {
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false); ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
reindex.destination().setVersionType(EXTERNAL); reindex.destination().setVersionType(EXTERNAL);
return reindex.get(); return reindex.get();
@ -97,7 +97,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
/** /**
* Perform a reindex with INTERNAL versioning which has "overwrite" semantics. * Perform a reindex with INTERNAL versioning which has "overwrite" semantics.
*/ */
private ReindexResponse reindexInternal() { private BulkIndexByScrollResponse reindexInternal() {
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false); ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
reindex.destination().setVersionType(INTERNAL); reindex.destination().setVersionType(INTERNAL);
return reindex.get(); return reindex.get();
@ -106,7 +106,7 @@ public class ReindexVersioningTests extends ReindexTestCase {
/** /**
* Perform a reindex with CREATE OpType which has "create" semantics. * Perform a reindex with CREATE OpType which has "create" semantics.
*/ */
private ReindexResponse reindexCreate() { private BulkIndexByScrollResponse reindexCreate() {
ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false); ReindexRequestBuilder reindex = reindex().source("source").destination("dest").abortOnVersionConflict(false);
reindex.destination().setOpType(CREATE); reindex.destination().setOpType(CREATE);
return reindex.get(); return reindex.get();

View File

@ -106,9 +106,9 @@ public class RoundTripTests extends ESTestCase {
} }
public void testReindexResponse() throws IOException { public void testReindexResponse() throws IOException {
ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(), BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(),
randomSearchFailures(), randomBoolean()); randomIndexingFailures(), randomSearchFailures(), randomBoolean());
ReindexResponse tripped = new ReindexResponse(); BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
roundTrip(response, tripped); roundTrip(response, tripped);
assertResponseEquals(response, tripped); assertResponseEquals(response, tripped);
} }

View File

@ -49,7 +49,7 @@ public class UpdateByQueryWithScriptTests
} }
@Override @Override
protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> action() { protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(), listener()); return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(), listener());
} }
} }