Add reindex progress indicator

Adds a progress indicator for reindex and update_by_query requests that you
can fetch like so:
```
curl 'localhost:9200/_tasks/*/*byquery*?pretty&detailed'
```

```
{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "Tyrannus",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : [ {
        "node" : "r1A2WoRbTwKZ516z6NEs5A",
        "id" : 36619,
        "type" : "transport",
        "action" : "indices:data/write/update/byquery",
        "status" : {       <---------------------------- Status is this
          "total" : 6154,
          "updated" : 3500,
          "created" : 0,
          "deleted" : 0,
          "batches" : 36,
          "version_conflicts" : 0,
          "noops" : 0
        },
        "description" : "update-by-query [test][test]"
      } ]
    }
  }
}
```

The progress is just (updated + created + deleted) / total
This commit is contained in:
Nik Everett 2016-01-15 18:08:59 -05:00
parent bb4d8b79fe
commit 35307054ea
21 changed files with 961 additions and 455 deletions

View File

@ -526,6 +526,69 @@ from aborting the operation.
The number of documents that were successfully created. This is not returned by
`_update_by_query` because it isn't allowed to create documents.
[float]
=== Response body
While `_reindex` and `_update_by_query` are running you can fetch their status
using the {ref}/task/list.html[Task List APIs]. This will fetch `_reindex`:
[source,js]
--------------------------------------------------
POST /_tasks/*/*reindex?pretty&detailed=true
--------------------------------------------------
// AUTOSENSE
and this will fetch `_update_by_query`:
[source,js]
--------------------------------------------------
POST /_tasks/*/*byquery?pretty&detailed=true
--------------------------------------------------
// AUTOSENSE
The responses looks like:
[source,js]
--------------------------------------------------
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "Tyrannus",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : [ {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/update/byquery",
"status" : { <1>
"total" : 6154,
"updated" : 3500,
"created" : 0,
"deleted" : 0,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0
},
"description" : "update-by-query [test][test]"
} ]
}
}
}
--------------------------------------------------
<1> this object contains the actual status. It is just like the response json
with the important addition of the `total` field. `total` is the total number
of operations that the reindex expects to perform. You can estimate the
progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
[float]
=== Examples

View File

@ -37,24 +37,26 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.plugin.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
@ -64,17 +66,15 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
* results.
*/
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>, Response> {
/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
* requests of this mainRequest.
*/
protected final Request mainRequest;
protected final BulkByScrollTask task;
private final AtomicLong startTime = new AtomicLong(-1);
private final AtomicLong updated = new AtomicLong(0);
private final AtomicLong created = new AtomicLong(0);
private final AtomicLong deleted = new AtomicLong(0);
private final AtomicInteger batches = new AtomicInteger(0);
private final AtomicLong versionConflicts = new AtomicLong(0);
private final AtomicReference<String> scroll = new AtomicReference<>();
private final List<Failure> indexingFailures = new CopyOnWriteArrayList<>();
private final List<ShardSearchFailure> searchFailures = new CopyOnWriteArrayList<>();
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ESLogger logger;
@ -83,8 +83,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final SearchRequest firstSearchRequest;
private final ActionListener<Response> listener;
public AbstractAsyncBulkByScrollAction(ESLogger logger, Client client, ThreadPool threadPool, Request mainRequest,
SearchRequest firstSearchRequest, ActionListener<Response> listener) {
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool,
Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
this.task = task;
this.logger = logger;
this.client = client;
this.threadPool = threadPool;
@ -95,54 +96,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
protected abstract Response buildResponse(long took);
protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures);
public void start() {
initialSearch();
}
/**
* Count of documents updated.
*/
public long updated() {
return updated.get();
}
/**
* Count of documents created.
*/
public long created() {
return created.get();
}
/**
* Count of successful delete operations.
*/
public long deleted() {
return deleted.get();
}
/**
* The number of scan responses this request has processed.
*/
public int batches() {
return batches.get();
}
public long versionConflicts() {
return versionConflicts.get();
}
public long successfullyProcessed() {
return updated.get() + created.get() + deleted.get();
}
public List<Failure> indexingFailures() {
return unmodifiableList(indexingFailures);
}
public List<ShardSearchFailure> searchFailures() {
return unmodifiableList(searchFailures);
public BulkByScrollTask getTask() {
return task;
}
private void initialSearch() {
@ -179,10 +140,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
void onScrollResponse(SearchResponse searchResponse) {
scroll.set(searchResponse.getScrollId());
if (searchResponse.getShardFailures() != null && searchResponse.getShardFailures().length > 0) {
Collections.addAll(searchFailures, searchResponse.getShardFailures());
startNormalTermination();
startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures())));
return;
}
long total = searchResponse.getHits().totalHits();
if (mainRequest.getSize() > 0) {
total = min(total, mainRequest.getSize());
}
task.setTotal(total);
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
@ -190,14 +155,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
SearchHit[] docs = searchResponse.getHits().getHits();
logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
if (docs.length == 0) {
startNormalTermination();
startNormalTermination(emptyList(), emptyList());
return;
}
batches.incrementAndGet();
task.countBatch();
List<SearchHit> docsIterable = Arrays.asList(docs);
if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
// Truncate the docs if we have more than the request size
long remaining = max(0, mainRequest.getSize() - successfullyProcessed());
long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed());
if (remaining < docs.length) {
docsIterable = docsIterable.subList(0, (int) remaining);
}
@ -243,10 +208,11 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
void onBulkResponse(BulkResponse response) {
try {
List<Failure> failures = new ArrayList<Failure>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
for (BulkItemResponse item : response) {
if (item.isFailed()) {
recordFailure(item.getFailure());
recordFailure(item.getFailure(), failures);
continue;
}
@ -255,13 +221,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
case "create":
IndexResponse ir = item.getResponse();
if (ir.isCreated()) {
created.incrementAndGet();
task.countCreated();
} else {
updated.incrementAndGet();
task.countUpdated();
}
break;
case "delete":
deleted.incrementAndGet();
task.countDeleted();
break;
default:
throw new IllegalArgumentException("Unknown op type: " + item.getOpType());
@ -271,14 +237,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
}
destinationIndices.addAll(destinationIndicesThisBatch);
if (false == indexingFailures.isEmpty()) {
startNormalTermination();
if (false == failures.isEmpty()) {
startNormalTermination(unmodifiableList(failures), emptyList());
return;
}
if (mainRequest.getSize() != SIZE_ALL_MATCHES && successfullyProcessed() >= mainRequest.getSize()) {
if (mainRequest.getSize() != SIZE_ALL_MATCHES && task.getSuccessfullyProcessed() >= mainRequest.getSize()) {
// We've processed all the requested docs.
startNormalTermination();
startNormalTermination(emptyList(), emptyList());
return;
}
startNextScrollRequest();
@ -303,19 +269,19 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
});
}
private void recordFailure(Failure failure) {
private void recordFailure(Failure failure, List<Failure> failures) {
if (failure.getStatus() == CONFLICT) {
versionConflicts.incrementAndGet();
task.countVersionConflict();
if (false == mainRequest.isAbortOnVersionConflict()) {
return;
}
}
indexingFailures.add(failure);
failures.add(failure);
}
void startNormalTermination() {
void startNormalTermination(List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
if (false == mainRequest.isRefresh()) {
finishHim(null);
finishHim(null, indexingFailures, searchFailures);
return;
}
RefreshRequest refresh = new RefreshRequest();
@ -323,7 +289,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
client.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse response) {
finishHim(null);
finishHim(null, indexingFailures, searchFailures);
}
@Override
@ -336,13 +302,20 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
/**
* Finish the request.
*
* @param failure
* the failure that caused the request to fail prematurely if not
* null. If not null this doesn't mean the request was entirely
* successful - it may have accumulated failures in the failures
* list.
* @param failure if non null then the request failed catastrophically with this exception
*/
void finishHim(Throwable failure) {
finishHim(failure, emptyList(), emptyList());
}
/**
* Finish the request.
*
* @param failure if non null then the request failed catastrophically with this exception
* @param indexingFailures any indexing failures accumulated during the request
* @param searchFailures any search failures accumulated during the request
*/
void finishHim(Throwable failure, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
String scrollId = scroll.get();
if (Strings.hasLength(scrollId)) {
/*
@ -364,7 +337,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
});
}
if (failure == null) {
listener.onResponse(buildResponse(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get())));
listener.onResponse(buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures));
} else {
listener.onFailure(failure);
}

View File

@ -45,7 +45,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptyMap;
@ -56,13 +55,13 @@ import static java.util.Collections.emptyMap;
public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkByScrollAction<Request, Response> {
private final AtomicLong noops = new AtomicLong(0);
private final ScriptService scriptService;
private final CompiledScript script;
public AbstractAsyncBulkIndexByScrollAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool,
Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
super(logger, client, threadPool, mainRequest, firstSearchRequest, listener);
public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
Client client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<Response> listener) {
super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener);
this.scriptService = scriptService;
if (mainRequest.getScript() == null) {
script = null;
@ -78,13 +77,6 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
*/
protected abstract IndexRequest buildIndexRequest(SearchHit doc);
/**
* The number of noops (skipped bulk items) as part of this request.
*/
public long noops() {
return noops.get();
}
@Override
protected BulkRequest buildBulk(Iterable<SearchHit> docs) {
BulkRequest bulkRequest = new BulkRequest();
@ -171,7 +163,7 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
throw new IllegalArgumentException("Script cleared op!");
}
if ("noop".equals(newOp)) {
noops.incrementAndGet();
task.countNoop();
return false;
}
if (false == "update".equals(newOp)) {

View File

@ -37,8 +37,8 @@ import org.elasticsearch.tasks.Task;
import java.io.IOException;
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse, TA extends TransportAction<Request, Response>>
extends BaseRestHandler {
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse,
TA extends TransportAction<Request, Response>> extends BaseRestHandler {
protected final IndicesQueriesRegistry indicesQueriesRegistry;
private final ClusterService clusterService;
private final TA action;

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
import java.util.Arrays;
@ -205,6 +206,11 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
return self();
}
@Override
public Task createTask(long id, String type, String action) {
return new BulkByScrollTask(id, type, action, this::getDescription);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -239,7 +245,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
b.append("[all indices]");
}
if (source.types() != null && source.types().length != 0) {
b.append(source.types());
b.append(Arrays.toString(source.types()));
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Task storing information about a currently running BulkByScroll request.
*/
public class BulkByScrollTask extends Task {
/**
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
*/
private final AtomicLong total = new AtomicLong(0);
private final AtomicLong updated = new AtomicLong(0);
private final AtomicLong created = new AtomicLong(0);
private final AtomicLong deleted = new AtomicLong(0);
private final AtomicLong noops = new AtomicLong(0);
private final AtomicInteger batch = new AtomicInteger(0);
private final AtomicLong versionConflicts = new AtomicLong(0);
public BulkByScrollTask(long id, String type, String action, Provider<String> description) {
super(id, type, action, description);
}
@Override
public Status getStatus() {
return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get());
}
/**
* Total number of successfully processed documents.
*/
public long getSuccessfullyProcessed() {
return updated.get() + created.get() + deleted.get();
}
public static class Status implements Task.Status {
public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0);
private final long total;
private final long updated;
private final long created;
private final long deleted;
private final int batches;
private final long versionConflicts;
private final long noops;
public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops) {
this.total = checkPositive(total, "total");
this.updated = checkPositive(updated, "updated");
this.created = checkPositive(created, "created");
this.deleted = checkPositive(deleted, "deleted");
this.batches = checkPositive(batches, "batches");
this.versionConflicts = checkPositive(versionConflicts, "versionConflicts");
this.noops = checkPositive(noops, "noops");
}
public Status(StreamInput in) throws IOException {
total = in.readVLong();
updated = in.readVLong();
created = in.readVLong();
deleted = in.readVLong();
batches = in.readVInt();
versionConflicts = in.readVLong();
noops = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(updated);
out.writeVLong(created);
out.writeVLong(deleted);
out.writeVInt(batches);
out.writeVLong(versionConflicts);
out.writeVLong(noops);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerXContent(builder, params, true, true);
return builder.endObject();
}
public XContentBuilder innerXContent(XContentBuilder builder, Params params, boolean includeCreated, boolean includeDeleted)
throws IOException {
builder.field("total", total);
builder.field("updated", updated);
if (includeCreated) {
builder.field("created", created);
}
if (includeDeleted) {
builder.field("deleted", deleted);
}
builder.field("batches", batches);
builder.field("version_conflicts", versionConflicts);
builder.field("noops", noops);
return builder;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("BulkIndexByScrollResponse[");
innerToString(builder, true, true);
return builder.append(']').toString();
}
public void innerToString(StringBuilder builder, boolean includeCreated, boolean includeDeleted) {
builder.append("updated=").append(updated);
if (includeCreated) {
builder.append(",created=").append(created);
}
if (includeDeleted) {
builder.append(",deleted=").append(deleted);
}
builder.append(",batches=").append(batches);
builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops);
}
@Override
public String getWriteableName() {
return "bulk-by-scroll";
}
@Override
public Status readFrom(StreamInput in) throws IOException {
return new Status(in);
}
/**
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
*/
public long getTotal() {
return total;
}
/**
* Count of documents updated.
*/
public long getUpdated() {
return updated;
}
/**
* Count of documents created.
*/
public long getCreated() {
return created;
}
/**
* Count of successful delete operations.
*/
public long getDeleted() {
return deleted;
}
/**
* Number of scan responses this request has processed.
*/
public int getBatches() {
return batches;
}
/**
* Number of version conflicts this request has hit.
*/
public long getVersionConflicts() {
return versionConflicts;
}
/**
* Number of noops (skipped bulk items) as part of this request.
*/
public long getNoops() {
return noops;
}
private int checkPositive(int value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
}
return value;
}
private long checkPositive(long value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
}
return value;
}
}
void setTotal(long totalHits) {
total.set(totalHits);
}
void countBatch() {
batch.incrementAndGet();
}
void countNoop() {
noops.incrementAndGet();
}
void countCreated() {
created.incrementAndGet();
}
void countUpdated() {
updated.incrementAndGet();
}
void countDeleted() {
deleted.incrementAndGet();
}
void countVersionConflict() {
versionConflicts.incrementAndGet();
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -33,68 +34,64 @@ import java.util.List;
import static java.lang.Math.min;
import static java.util.Collections.unmodifiableList;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
/**
* Response used for actions that index many documents using a scroll request.
*/
public class BulkIndexByScrollResponse extends ActionResponse implements ToXContent {
static final String TOOK_FIELD = "took";
static final String UPDATED_FIELD = "updated";
static final String BATCHES_FIELD = "batches";
static final String VERSION_CONFLICTS_FIELD = "version_conflicts";
static final String NOOPS_FIELD = "noops";
static final String FAILURES_FIELD = "failures";
private long took;
private long updated;
private int batches;
private long versionConflicts;
private long noops;
private TimeValue took;
private BulkByScrollTask.Status status;
private List<Failure> indexingFailures;
private List<ShardSearchFailure> searchFailures;
public BulkIndexByScrollResponse() {
}
public BulkIndexByScrollResponse(long took, long updated, int batches, long versionConflicts, long noops,
List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) {
this.took = took;
this.updated = updated;
this.batches = batches;
this.versionConflicts = versionConflicts;
this.noops = noops;
this.status = requireNonNull(status, "Null status not supported");
this.indexingFailures = indexingFailures;
this.searchFailures = searchFailures;
}
public long getTook() {
public TimeValue getTook() {
return took;
}
protected BulkByScrollTask.Status getStatus() {
return status;
}
public long getUpdated() {
return updated;
return status.getUpdated();
}
public int getBatches() {
return batches;
return status.getBatches();
}
public long getVersionConflicts() {
return versionConflicts;
return status.getVersionConflicts();
}
public long getNoops() {
return noops;
return status.getNoops();
}
/**
* Indexing failures.
* All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the
* default).
*/
public List<Failure> getIndexingFailures() {
return indexingFailures;
}
/**
* All search failures.
*/
public List<ShardSearchFailure> getSearchFailures() {
return searchFailures;
}
@ -102,11 +99,8 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(took);
out.writeVLong(updated);
out.writeVInt(batches);
out.writeVLong(versionConflicts);
out.writeVLong(noops);
took.writeTo(out);
status.writeTo(out);
out.writeVInt(indexingFailures.size());
for (Failure failure: indexingFailures) {
failure.writeTo(out);
@ -120,11 +114,8 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
took = in.readVLong();
updated = in.readVLong();
batches = in.readVInt();
versionConflicts = in.readVLong();
noops = in.readVLong();
took = TimeValue.readTimeValue(in);
status = new BulkByScrollTask.Status(in);
int indexingFailuresCount = in.readVInt();
List<Failure> indexingFailures = new ArrayList<>(indexingFailuresCount);
for (int i = 0; i < indexingFailuresCount; i++) {
@ -136,16 +127,14 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
for (int i = 0; i < searchFailuresCount; i++) {
searchFailures.add(readShardSearchFailure(in));
}
this.searchFailures = unmodifiableList(searchFailures);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(TOOK_FIELD, took);
builder.field(UPDATED_FIELD, updated);
builder.field(BATCHES_FIELD, batches);
builder.field(VERSION_CONFLICTS_FIELD, versionConflicts);
builder.field(NOOPS_FIELD, noops);
builder.startArray(FAILURES_FIELD);
builder.field("took", took.millis());
status.innerXContent(builder, params, false, false);
builder.startArray("failures");
for (Failure failure: indexingFailures) {
builder.startObject();
failure.toXContent(builder, params);
@ -163,22 +152,11 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(toStringName()).append("[");
builder.append("took=").append(took);
builder.append(",updated=").append(updated);
builder.append(",batches=").append(batches);
builder.append(",versionConflicts=").append(versionConflicts);
builder.append(",noops=").append(noops);
builder.append("BulkIndexByScrollResponse[");
builder.append("took=").append(took).append(',');
status.innerToString(builder, false, false);
builder.append(",indexing_failures=").append(getIndexingFailures().subList(0, min(3, getIndexingFailures().size())));
builder.append(",search_failures=").append(getSearchFailures().subList(0, min(3, getSearchFailures().size())));
innerToString(builder);
return builder.append("]").toString();
}
protected String toStringName() {
return "BulkIndexByScrollResponse";
}
protected void innerToString(StringBuilder builder) {
return builder.append(']').toString();
}
}

View File

@ -21,75 +21,53 @@ package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.reindex.BulkByScrollTask.Status;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Response for the ReindexAction.
*/
public class ReindexResponse extends BulkIndexByScrollResponse {
static final String CREATED_FIELD = "created";
private long created;
public ReindexResponse() {
}
public ReindexResponse(long took, long created, long updated, int batches, long versionConflicts, long noops, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
super(took, updated, batches, versionConflicts, noops, indexingFailures, searchFailures);
this.created = created;
public ReindexResponse(TimeValue took, Status status, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
super(took, status, indexingFailures, searchFailures);
}
public long getCreated() {
return created;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(created);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
created = in.readVLong();
return getStatus().getCreated();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field(CREATED_FIELD, created);
builder.field("took", getTook());
getStatus().innerXContent(builder, params, true, false);
builder.startArray("failures");
for (Failure failure: getIndexingFailures()) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
for (ShardSearchFailure failure: getSearchFailures()) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}
@Override
protected String toStringName() {
return "ReindexResponse";
}
@Override
protected void innerToString(StringBuilder builder) {
builder.append(",created=").append(created);
}
/**
* Get the first few failures to build a useful for toString.
*/
protected void truncatedFailures(StringBuilder builder) {
builder.append(",failures=[");
Iterator<Failure> failures = getIndexingFailures().iterator();
int written = 0;
while (failures.hasNext() && written < 3) {
Failure failure = failures.next();
builder.append(failure.getMessage());
if (written != 0) {
builder.append(", ");
}
written++;
}
builder.append(']');
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ReindexResponse[");
builder.append("took=").append(getTook()).append(',');
getStatus().innerToString(builder, true, false);
return builder.append(']').toString();
}
}

View File

@ -21,8 +21,10 @@ package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
@ -39,9 +41,11 @@ import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
@ -66,10 +70,15 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
}
@Override
protected void doExecute(ReindexRequest request, ActionListener<ReindexResponse> listener) {
protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) {
validateAgainstAliases(request.getSource(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex,
clusterService.state());
new AsyncIndexBySearchAction(logger, scriptService, client, threadPool, request, listener).start();
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start();
}
@Override
protected void doExecute(ReindexRequest request, ActionListener<ReindexResponse> listener) {
throw new UnsupportedOperationException("task required");
}
/**
@ -106,9 +115,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* possible.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> {
public AsyncIndexBySearchAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool,
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool,
ReindexRequest request, ActionListener<ReindexResponse> listener) {
super(logger, scriptService, client, threadPool, request, request.getSource(), listener);
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
}
@Override
@ -162,17 +171,16 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
}
}
@Override
protected ReindexResponse buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures) {
return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures);
}
/*
* Methods below here handle script updating the index request. They try
* to be pretty liberal with regards to types because script are often
* dynamically typed.
*/
@Override
protected ReindexResponse buildResponse(long took) {
return new ReindexResponse(took, created(), updated(), batches(), versionConflicts(), noops(), indexingFailures(),
searchFailures());
}
@Override
protected void scriptChangedIndex(IndexRequest index, Object to) {
requireNonNull(to, "Can't reindex without a destination index!");

View File

@ -20,7 +20,9 @@
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
@ -28,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
@ -38,9 +41,12 @@ import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
private final Client client;
private final ScriptService scriptService;
@ -56,18 +62,23 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
}
@Override
protected void doExecute(UpdateByQueryRequest request,
protected void doExecute(Task task, UpdateByQueryRequest request,
ActionListener<BulkIndexByScrollResponse> listener) {
new AsyncIndexBySearchAction(logger, scriptService, client, threadPool, request, listener).start();
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start();
}
@Override
protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
/**
* Simple implementation of update-by-query using scrolling and bulk.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
public AsyncIndexBySearchAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool,
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool,
UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
super(logger, scriptService, client, threadPool, request, request.getSource(), listener);
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
}
@Override
@ -83,9 +94,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
}
@Override
protected BulkIndexByScrollResponse buildResponse(long took) {
return new BulkIndexByScrollResponse(took, updated(), batches(), versionConflicts(), noops(), indexingFailures(),
searchFailures());
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures);
}
@Override

View File

@ -28,10 +28,12 @@ import org.junit.Before;
public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse>
extends ESTestCase {
protected ThreadPool threadPool;
protected BulkByScrollTask task;
@Before
public void setupForTest() {
threadPool = new ThreadPool(getTestName());
task = new BulkByScrollTask(1, "test", "test", () -> "test");
}
@After

View File

@ -1,189 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
public class AsyncBulkByScrollActionTest extends ESTestCase {
private MockClearScrollClient client;
private ThreadPool threadPool;
private DummyAbstractBulkByScrollRequest mainRequest;
private SearchRequest firstSearchRequest;
private PlainActionFuture<Object> listener;
private String scrollId;
@Before
public void setupForTest() {
client = new MockClearScrollClient(new NoOpClient(getTestName()));
threadPool = new ThreadPool(getTestName());
mainRequest = new DummyAbstractBulkByScrollRequest();
firstSearchRequest = null;
listener = new PlainActionFuture<>();
scrollId = null;
}
@After
public void tearDownAndVerifyCommonStuff() {
client.close();
threadPool.shutdown();
if (scrollId != null) {
assertThat(client.scrollsCleared, contains(scrollId));
}
}
/**
* Generates a random scrollId and registers it so that when the test
* finishes we check that it was cleared. Subsequent calls reregister a new
* random scroll id so it is checked instead.
*/
private String scrollId() {
scrollId = randomSimpleString(random());
return scrollId;
}
/**
* Mimicks a ThreadPool rejecting execution of the task.
*/
public void testThreadPoolRejectionsAbortRequest() throws Exception {
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public Executor generic() {
return new Executor() {
@Override
public void execute(Runnable command) {
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
}
};
}
};
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), null));
try {
listener.get();
fail("Expected a failure");
} catch (ExecutionException e) {
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
}
}
/**
* Mimicks shard search failures usually caused by the data node serving the
* scroll request going down.
*/
public void testShardFailuresAbortRequest() throws Exception {
ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
listener.get();
assertThat(action.indexingFailures(), emptyCollectionOf(Failure.class));
assertThat(action.searchFailures(), contains(shardFailure));
}
/**
* Mimicks bulk indexing failures.
*/
public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong()));
listener.get();
assertThat(action.indexingFailures(), contains(failure));
assertThat(action.searchFailures(), emptyCollectionOf(ShardSearchFailure.class));
}
private class DummyAbstractAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, Object> {
public DummyAbstractAsyncBulkByScrollAction() {
super(logger, client, threadPool, AsyncBulkByScrollActionTest.this.mainRequest, firstSearchRequest, listener);
}
@Override
protected BulkRequest buildBulk(Iterable<SearchHit> docs) {
return new BulkRequest();
}
@Override
protected Object buildResponse(long took) {
return new Object();
}
}
private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest<DummyAbstractBulkByScrollRequest> {
@Override
protected DummyAbstractBulkByScrollRequest self() {
return this;
}
}
private static class MockClearScrollClient extends FilterClient {
private List<String> scrollsCleared = new ArrayList<>();
public MockClearScrollClient(Client in) {
super(in);
}
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
if (request instanceof ClearScrollRequest) {
ClearScrollRequest clearScroll = (ClearScrollRequest) request;
scrollsCleared.addAll(clearScroll.getScrollIds());
listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size()));
return;
}
super.doExecute(action, request, listener);
}
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import static java.util.Collections.emptyMap;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
public class AsyncBulkByScrollActionTests extends ESTestCase {
private MockClearScrollClient client;
private ThreadPool threadPool;
private DummyAbstractBulkByScrollRequest mainRequest;
private SearchRequest firstSearchRequest;
private PlainActionFuture<BulkIndexByScrollResponse> listener;
private String scrollId;
private BulkByScrollTask task;
@Before
public void setupForTest() {
client = new MockClearScrollClient(new NoOpClient(getTestName()));
threadPool = new ThreadPool(getTestName());
mainRequest = new DummyAbstractBulkByScrollRequest();
firstSearchRequest = null;
listener = new PlainActionFuture<>();
scrollId = null;
task = new BulkByScrollTask(0, "test", "test", () -> "test");
}
@After
public void tearDownAndVerifyCommonStuff() {
client.close();
threadPool.shutdown();
}
/**
* Generates a random scrollId and registers it so that when the test
* finishes we check that it was cleared. Subsequent calls reregister a new
* random scroll id so it is checked instead.
*/
private String scrollId() {
scrollId = randomSimpleString(random(), 1, 1000); // Empty string's get special behavior we don't want
return scrollId;
}
public void testScrollResponseSetsTotal() {
// Default is 0, meaning unstarted
assertEquals(0, task.getStatus().getTotal());
long total = randomIntBetween(0, Integer.MAX_VALUE);
InternalSearchHits hits = new InternalSearchHits(null, total, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(total, task.getStatus().getTotal());
}
public void testEachScrollResponseIsABatch() {
// Replace the generic thread pool with one that executes immediately so the batch is updated immediately
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public Executor generic() {
return new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
}
};
int maxBatches = randomIntBetween(0, 100);
for (int batches = 1; batches < maxBatches; batches++) {
InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap());
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
assertEquals(batches, task.getStatus().getBatches());
}
}
public void testBulkResponseSetsLotsOfStatus() {
mainRequest.setAbortOnVersionConflict(false);
int maxBatches = randomIntBetween(0, 100);
long versionConflicts = 0;
long created = 0;
long updated = 0;
long deleted = 0;
for (int batches = 0; batches < maxBatches; batches++) {
BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 10000)];
for (int i = 0; i < responses.length; i++) {
ShardId shardId = new ShardId(new Index("name", "uid"), 0);
String opType;
if (rarely()) {
opType = randomSimpleString(random());
versionConflicts++;
responses[i] = new BulkItemResponse(i, opType, new Failure(shardId.getIndexName(), "type", "id" + i,
new VersionConflictEngineException(shardId, "type", "id", "test")));
continue;
}
boolean createdResponse;
switch (randomIntBetween(0, 2)) {
case 0:
opType = randomFrom("index", "create");
createdResponse = true;
created++;
break;
case 1:
opType = randomFrom("index", "create");
createdResponse = false;
updated++;
break;
case 2:
opType = "delete";
createdResponse = false;
deleted++;
break;
default:
throw new RuntimeException("Bad scenario");
}
responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse));
}
new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0));
assertEquals(versionConflicts, task.getStatus().getVersionConflicts());
assertEquals(updated, task.getStatus().getUpdated());
assertEquals(created, task.getStatus().getCreated());
assertEquals(deleted, task.getStatus().getDeleted());
assertEquals(versionConflicts, task.getStatus().getVersionConflicts());
}
}
/**
* Mimicks a ThreadPool rejecting execution of the task.
*/
public void testThreadPoolRejectionsAbortRequest() throws Exception {
threadPool.shutdown();
threadPool = new ThreadPool(getTestName()) {
@Override
public Executor generic() {
return new Executor() {
@Override
public void execute(Runnable command) {
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
}
};
}
};
InternalSearchHits hits = new InternalSearchHits(null, 0, 0);
InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
try {
listener.get();
fail("Expected a failure");
} catch (ExecutionException e) {
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
}
assertThat(client.scrollsCleared, contains(scrollId));
}
/**
* Mimicks shard search failures usually caused by the data node serving the
* scroll request going down.
*/
public void testShardFailuresAbortRequest() throws Exception {
ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test"));
new DummyAbstractAsyncBulkByScrollAction()
.onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure }));
BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class));
assertThat(response.getSearchFailures(), contains(shardFailure));
assertThat(client.scrollsCleared, contains(scrollId));
}
/**
* Mimicks bulk indexing failures.
*/
public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong()));
BulkIndexByScrollResponse response = listener.get();
assertThat(response.getIndexingFailures(), contains(failure));
assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class));
}
private class DummyAbstractAsyncBulkByScrollAction
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, BulkIndexByScrollResponse> {
public DummyAbstractAsyncBulkByScrollAction() {
super(AsyncBulkByScrollActionTests.this.task, logger, client, threadPool,
AsyncBulkByScrollActionTests.this.mainRequest, firstSearchRequest, listener);
}
@Override
protected BulkRequest buildBulk(Iterable<SearchHit> docs) {
return new BulkRequest();
}
@Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<Failure> indexingFailures,
List<ShardSearchFailure> searchFailures) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures);
}
}
private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest<DummyAbstractBulkByScrollRequest> {
@Override
protected DummyAbstractBulkByScrollRequest self() {
return this;
}
}
private static class MockClearScrollClient extends FilterClient {
private List<String> scrollsCleared = new ArrayList<>();
public MockClearScrollClient(Client in) {
super(in);
}
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest<Request>, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
if (request instanceof ClearScrollRequest) {
ClearScrollRequest clearScroll = (ClearScrollRequest) request;
scrollsCleared.addAll(clearScroll.getScrollIds());
listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size()));
return;
}
super.doExecute(action, request, listener);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
public class BulkByScrollTaskTests extends ESTestCase {
private BulkByScrollTask task;
@Before
public void createTask() {
task = new BulkByScrollTask(1, "test_type", "test_action", () -> "test");
}
public void testBasicData() {
assertEquals(1, task.getId());
assertEquals("test_type", task.getType());
assertEquals("test_action", task.getAction());
}
public void testProgress() {
long created = 0;
long updated = 0;
long deleted = 0;
long versionConflicts = 0;
long noops = 0;
int batch = 0;
BulkByScrollTask.Status status = task.getStatus();
assertEquals(0, status.getTotal());
assertEquals(created, status.getCreated());
assertEquals(updated, status.getUpdated());
assertEquals(deleted, status.getDeleted());
assertEquals(versionConflicts, status.getVersionConflicts());
assertEquals(batch, status.getBatches());
assertEquals(noops, status.getNoops());
long totalHits = randomIntBetween(10, 1000);
task.setTotal(totalHits);
for (long p = 0; p < totalHits; p++) {
status = task.getStatus();
assertEquals(totalHits, status.getTotal());
assertEquals(created, status.getCreated());
assertEquals(updated, status.getUpdated());
assertEquals(deleted, status.getDeleted());
assertEquals(versionConflicts, status.getVersionConflicts());
assertEquals(batch, status.getBatches());
assertEquals(noops, status.getNoops());
if (randomBoolean()) {
created++;
task.countCreated();
} else if (randomBoolean()) {
updated++;
task.countUpdated();
} else {
deleted++;
task.countDeleted();
}
if (rarely()) {
versionConflicts++;
task.countVersionConflict();
}
if (rarely()) {
batch++;
task.countBatch();
}
if (rarely()) {
noops++;
task.countNoop();
}
}
status = task.getStatus();
assertEquals(totalHits, status.getTotal());
assertEquals(created, status.getCreated());
assertEquals(updated, status.getUpdated());
assertEquals(deleted, status.getDeleted());
assertEquals(versionConflicts, status.getVersionConflicts());
assertEquals(batch, status.getBatches());
assertEquals(noops, status.getNoops());
}
public void testStatusHatesNegatives() {
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0));
expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1));
}
}

View File

@ -19,15 +19,15 @@
package org.elasticsearch.plugin.reindex;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class ReindexBasicTests extends ReindexTestCase {
public void testFiltering() throws Exception {

View File

@ -67,7 +67,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
@Override
protected TransportReindexAction.AsyncIndexBySearchAction action() {
return new TransportReindexAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener());
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener());
}
@Override

View File

@ -19,15 +19,15 @@
package org.elasticsearch.plugin.reindex;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import static org.elasticsearch.index.query.QueryBuilders.hasParentQuery;
import static org.elasticsearch.index.query.QueryBuilders.idsQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.equalTo;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.index.query.QueryBuilder;
/**
* Index-by-search tests for parent/child.
*/

View File

@ -134,6 +134,6 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri
@Override
protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> action() {
return new TransportReindexAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener());
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener());
}
}

View File

@ -42,6 +42,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
/**
* Round trip tests for all Streamable things declared in this plugin.
@ -67,23 +68,6 @@ public class RoundTripTests extends ESTestCase {
assertRequestEquals(update, tripped);
}
public void testReindexResponse() throws IOException {
ReindexResponse response = new ReindexResponse(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomIndexingFailures(), randomSearchFailures());
ReindexResponse tripped = new ReindexResponse();
roundTrip(response, tripped);
assertResponseEquals(response, tripped);
assertEquals(response.getCreated(), tripped.getCreated());
}
public void testBulkIndexByScrollResponse() throws IOException {
BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(randomPositiveLong(), randomPositiveLong(), randomPositiveInt(),
randomPositiveLong(), randomPositiveLong(), randomIndexingFailures(), randomSearchFailures());
BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
roundTrip(response, tripped);
assertResponseEquals(response, tripped);
}
private void randomRequest(AbstractBulkIndexByScrollRequest<?> request) {
request.getSource().indices("test");
request.getSource().source().size(between(1, 1000));
@ -106,6 +90,35 @@ public class RoundTripTests extends ESTestCase {
assertEquals(request.getScript(), tripped.getScript());
}
public void testBulkByTaskStatus() throws IOException {
BulkByScrollTask.Status status = randomStatus();
BytesStreamOutput out = new BytesStreamOutput();
status.writeTo(out);
BulkByScrollTask.Status tripped = new BulkByScrollTask.Status(out.bytes().streamInput());
assertTaskStatusEquals(status, tripped);
}
public void testReindexResponse() throws IOException {
ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(),
randomSearchFailures());
ReindexResponse tripped = new ReindexResponse();
roundTrip(response, tripped);
assertResponseEquals(response, tripped);
}
public void testBulkIndexByScrollResponse() throws IOException {
BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(),
randomIndexingFailures(), randomSearchFailures());
BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse();
roundTrip(response, tripped);
assertResponseEquals(response, tripped);
}
private BulkByScrollTask.Status randomStatus() {
return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
randomPositiveInt(), randomPositiveLong(), randomPositiveLong());
}
private List<Failure> randomIndexingFailures() {
return usually() ? emptyList()
: singletonList(new Failure(randomSimpleString(random()), randomSimpleString(random()),
@ -121,25 +134,6 @@ public class RoundTripTests extends ESTestCase {
new SearchShardTarget(randomSimpleString(random()), index, randomInt()), randomFrom(RestStatus.values())));
}
private void assertResponseEquals(BulkIndexByScrollResponse response, BulkIndexByScrollResponse tripped) {
assertEquals(response.getTook(), tripped.getTook());
assertEquals(response.getUpdated(), tripped.getUpdated());
assertEquals(response.getBatches(), tripped.getBatches());
assertEquals(response.getVersionConflicts(), tripped.getVersionConflicts());
assertEquals(response.getNoops(), tripped.getNoops());
assertEquals(response.getIndexingFailures().size(), tripped.getIndexingFailures().size());
for (int i = 0; i < response.getIndexingFailures().size(); i++) {
Failure expected = response.getIndexingFailures().get(i);
Failure actual = tripped.getIndexingFailures().get(i);
assertEquals(expected.getIndex(), actual.getIndex());
assertEquals(expected.getType(), actual.getType());
assertEquals(expected.getId(), actual.getId());
assertEquals(expected.getMessage(), actual.getMessage());
assertEquals(expected.getStatus(), actual.getStatus());
}
}
private void roundTrip(Streamable example, Streamable empty) throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
example.writeTo(out);
@ -164,4 +158,35 @@ public class RoundTripTests extends ESTestCase {
private int randomPositiveInt() {
return randomInt(Integer.MAX_VALUE);
}
private void assertResponseEquals(BulkIndexByScrollResponse expected, BulkIndexByScrollResponse actual) {
assertEquals(expected.getTook(), actual.getTook());
assertTaskStatusEquals(expected.getStatus(), actual.getStatus());
assertEquals(expected.getIndexingFailures().size(), actual.getIndexingFailures().size());
for (int i = 0; i < expected.getIndexingFailures().size(); i++) {
Failure expectedFailure = expected.getIndexingFailures().get(i);
Failure actualFailure = actual.getIndexingFailures().get(i);
assertEquals(expectedFailure.getIndex(), actualFailure.getIndex());
assertEquals(expectedFailure.getType(), actualFailure.getType());
assertEquals(expectedFailure.getId(), actualFailure.getId());
assertEquals(expectedFailure.getMessage(), actualFailure.getMessage());
assertEquals(expectedFailure.getStatus(), actualFailure.getStatus());
}
assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size());
for (int i = 0; i < expected.getSearchFailures().size(); i++) {
ShardSearchFailure expectedFailure = expected.getSearchFailures().get(i);
ShardSearchFailure actualFailure = actual.getSearchFailures().get(i);
assertEquals(expectedFailure.shard(), actualFailure.shard());
assertEquals(expectedFailure.status(), actualFailure.status());
// We can't use getCause because throwable doesn't implement equals
assertEquals(expectedFailure.reason(), actualFailure.reason());
}
}
private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {
assertEquals(expected.getUpdated(), actual.getUpdated());
assertEquals(expected.getBatches(), actual.getBatches());
assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts());
assertEquals(expected.getNoops(), actual.getNoops());
}
}

View File

@ -33,7 +33,7 @@ public class UpdateByQueryMetadataTests
@Override
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener());
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener());
}
@Override

View File

@ -50,6 +50,7 @@ public class UpdateByQueryWithScriptTests
@Override
protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener());
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool,
request(), listener());
}
}