From 35307054eaddcdd13b6188613308f39d053002de Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 15 Jan 2016 18:08:59 -0500 Subject: [PATCH] Add reindex progress indicator Adds a progress indicator for reindex and update_by_query requests that you can fetch like so: ``` curl 'localhost:9200/_tasks/*/*byquery*?pretty&detailed' ``` ``` { "nodes" : { "r1A2WoRbTwKZ516z6NEs5A" : { "name" : "Tyrannus", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : { "testattr" : "test", "portsfile" : "true" }, "tasks" : [ { "node" : "r1A2WoRbTwKZ516z6NEs5A", "id" : 36619, "type" : "transport", "action" : "indices:data/write/update/byquery", "status" : { <---------------------------- Status is this "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 36, "version_conflicts" : 0, "noops" : 0 }, "description" : "update-by-query [test][test]" } ] } } } ``` The progress is just (updated + created + deleted) / total --- docs/plugins/reindex.asciidoc | 63 ++++ .../AbstractAsyncBulkByScrollAction.java | 133 ++++---- .../AbstractAsyncBulkIndexByScrollAction.java | 18 +- .../AbstractBaseReindexRestHandler.java | 4 +- .../reindex/AbstractBulkByScrollRequest.java | 8 +- .../plugin/reindex/BulkByScrollTask.java | 252 +++++++++++++++ .../reindex/BulkIndexByScrollResponse.java | 88 ++---- .../plugin/reindex/ReindexResponse.java | 78 ++--- .../reindex/TransportReindexAction.java | 28 +- .../reindex/TransportUpdateByQueryAction.java | 25 +- ...tAsyncBulkIndexByScrollActionTestCase.java | 2 + .../reindex/AsyncBulkByScrollActionTest.java | 189 ----------- .../reindex/AsyncBulkByScrollActionTests.java | 294 ++++++++++++++++++ .../plugin/reindex/BulkByScrollTaskTests.java | 112 +++++++ .../plugin/reindex/ReindexBasicTests.java | 10 +- .../plugin/reindex/ReindexMetadataTests.java | 2 +- .../reindex/ReindexParentChildTests.java | 6 +- .../plugin/reindex/ReindexScriptTests.java | 2 +- .../plugin/reindex/RoundTripTests.java | 97 +++--- .../reindex/UpdateByQueryMetadataTests.java | 2 +- .../reindex/UpdateByQueryWithScriptTests.java | 3 +- 21 files changed, 961 insertions(+), 455 deletions(-) create mode 100644 plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkByScrollTask.java delete mode 100644 plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTest.java create mode 100644 plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java create mode 100644 plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java diff --git a/docs/plugins/reindex.asciidoc b/docs/plugins/reindex.asciidoc index a1106941c67..319da504e26 100644 --- a/docs/plugins/reindex.asciidoc +++ b/docs/plugins/reindex.asciidoc @@ -526,6 +526,69 @@ from aborting the operation. The number of documents that were successfully created. This is not returned by `_update_by_query` because it isn't allowed to create documents. +[float] +=== Response body + +While `_reindex` and `_update_by_query` are running you can fetch their status +using the {ref}/task/list.html[Task List APIs]. This will fetch `_reindex`: + +[source,js] +-------------------------------------------------- +POST /_tasks/*/*reindex?pretty&detailed=true +-------------------------------------------------- +// AUTOSENSE + +and this will fetch `_update_by_query`: + +[source,js] +-------------------------------------------------- +POST /_tasks/*/*byquery?pretty&detailed=true +-------------------------------------------------- +// AUTOSENSE + +The responses looks like: + +[source,js] +-------------------------------------------------- +{ + "nodes" : { + "r1A2WoRbTwKZ516z6NEs5A" : { + "name" : "Tyrannus", + "transport_address" : "127.0.0.1:9300", + "host" : "127.0.0.1", + "ip" : "127.0.0.1:9300", + "attributes" : { + "testattr" : "test", + "portsfile" : "true" + }, + "tasks" : [ { + "node" : "r1A2WoRbTwKZ516z6NEs5A", + "id" : 36619, + "type" : "transport", + "action" : "indices:data/write/update/byquery", + "status" : { <1> + "total" : 6154, + "updated" : 3500, + "created" : 0, + "deleted" : 0, + "batches" : 36, + "version_conflicts" : 0, + "noops" : 0 + }, + "description" : "update-by-query [test][test]" + } ] + } + } +} +-------------------------------------------------- + +<1> this object contains the actual status. It is just like the response json +with the important addition of the `total` field. `total` is the total number +of operations that the reindex expects to perform. You can estimate the +progress by adding the `updated`, `created`, and `deleted` fields. The request +will finish when their sum is equal to the `total` field. + + [float] === Examples diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java index 863a8c5c5c3..77d7e144372 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkByScrollAction.java @@ -37,24 +37,26 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; +import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; import static org.elasticsearch.plugin.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.elasticsearch.search.sort.SortBuilders.fieldSort; @@ -64,17 +66,15 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort; * results. */ public abstract class AbstractAsyncBulkByScrollAction, Response> { + /** + * The request for this action. Named mainRequest because we create lots of request variables all representing child + * requests of this mainRequest. + */ protected final Request mainRequest; + protected final BulkByScrollTask task; private final AtomicLong startTime = new AtomicLong(-1); - private final AtomicLong updated = new AtomicLong(0); - private final AtomicLong created = new AtomicLong(0); - private final AtomicLong deleted = new AtomicLong(0); - private final AtomicInteger batches = new AtomicInteger(0); - private final AtomicLong versionConflicts = new AtomicLong(0); private final AtomicReference scroll = new AtomicReference<>(); - private final List indexingFailures = new CopyOnWriteArrayList<>(); - private final List searchFailures = new CopyOnWriteArrayList<>(); private final Set destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final ESLogger logger; @@ -83,8 +83,9 @@ public abstract class AbstractAsyncBulkByScrollAction listener; - public AbstractAsyncBulkByScrollAction(ESLogger logger, Client client, ThreadPool threadPool, Request mainRequest, - SearchRequest firstSearchRequest, ActionListener listener) { + public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, Client client, ThreadPool threadPool, + Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { + this.task = task; this.logger = logger; this.client = client; this.threadPool = threadPool; @@ -95,54 +96,14 @@ public abstract class AbstractAsyncBulkByScrollAction docs); - protected abstract Response buildResponse(long took); + protected abstract Response buildResponse(TimeValue took, List indexingFailures, List searchFailures); public void start() { initialSearch(); } - /** - * Count of documents updated. - */ - public long updated() { - return updated.get(); - } - - /** - * Count of documents created. - */ - public long created() { - return created.get(); - } - - /** - * Count of successful delete operations. - */ - public long deleted() { - return deleted.get(); - } - - /** - * The number of scan responses this request has processed. - */ - public int batches() { - return batches.get(); - } - - public long versionConflicts() { - return versionConflicts.get(); - } - - public long successfullyProcessed() { - return updated.get() + created.get() + deleted.get(); - } - - public List indexingFailures() { - return unmodifiableList(indexingFailures); - } - - public List searchFailures() { - return unmodifiableList(searchFailures); + public BulkByScrollTask getTask() { + return task; } private void initialSearch() { @@ -179,10 +140,14 @@ public abstract class AbstractAsyncBulkByScrollAction 0) { - Collections.addAll(searchFailures, searchResponse.getShardFailures()); - startNormalTermination(); + startNormalTermination(emptyList(), unmodifiableList(Arrays.asList(searchResponse.getShardFailures()))); return; } + long total = searchResponse.getHits().totalHits(); + if (mainRequest.getSize() > 0) { + total = min(total, mainRequest.getSize()); + } + task.setTotal(total); threadPool.generic().execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -190,14 +155,14 @@ public abstract class AbstractAsyncBulkByScrollAction docsIterable = Arrays.asList(docs); if (mainRequest.getSize() != SIZE_ALL_MATCHES) { // Truncate the docs if we have more than the request size - long remaining = max(0, mainRequest.getSize() - successfullyProcessed()); + long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed()); if (remaining < docs.length) { docsIterable = docsIterable.subList(0, (int) remaining); } @@ -243,10 +208,11 @@ public abstract class AbstractAsyncBulkByScrollAction failures = new ArrayList(); Set destinationIndicesThisBatch = new HashSet<>(); for (BulkItemResponse item : response) { if (item.isFailed()) { - recordFailure(item.getFailure()); + recordFailure(item.getFailure(), failures); continue; } @@ -255,13 +221,13 @@ public abstract class AbstractAsyncBulkByScrollAction= mainRequest.getSize()) { + if (mainRequest.getSize() != SIZE_ALL_MATCHES && task.getSuccessfullyProcessed() >= mainRequest.getSize()) { // We've processed all the requested docs. - startNormalTermination(); + startNormalTermination(emptyList(), emptyList()); return; } startNextScrollRequest(); @@ -303,19 +269,19 @@ public abstract class AbstractAsyncBulkByScrollAction failures) { if (failure.getStatus() == CONFLICT) { - versionConflicts.incrementAndGet(); + task.countVersionConflict(); if (false == mainRequest.isAbortOnVersionConflict()) { return; } } - indexingFailures.add(failure); + failures.add(failure); } - void startNormalTermination() { + void startNormalTermination(List indexingFailures, List searchFailures) { if (false == mainRequest.isRefresh()) { - finishHim(null); + finishHim(null, indexingFailures, searchFailures); return; } RefreshRequest refresh = new RefreshRequest(); @@ -323,7 +289,7 @@ public abstract class AbstractAsyncBulkByScrollAction() { @Override public void onResponse(RefreshResponse response) { - finishHim(null); + finishHim(null, indexingFailures, searchFailures); } @Override @@ -336,13 +302,20 @@ public abstract class AbstractAsyncBulkByScrollAction indexingFailures, List searchFailures) { String scrollId = scroll.get(); if (Strings.hasLength(scrollId)) { /* @@ -364,7 +337,7 @@ public abstract class AbstractAsyncBulkByScrollAction, Response extends BulkIndexByScrollResponse> extends AbstractAsyncBulkByScrollAction { - private final AtomicLong noops = new AtomicLong(0); private final ScriptService scriptService; private final CompiledScript script; - public AbstractAsyncBulkIndexByScrollAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, - Request mainRequest, SearchRequest firstSearchRequest, ActionListener listener) { - super(logger, client, threadPool, mainRequest, firstSearchRequest, listener); + public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, + Client client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, + ActionListener listener) { + super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener); this.scriptService = scriptService; if (mainRequest.getScript() == null) { script = null; @@ -78,13 +77,6 @@ public abstract class AbstractAsyncBulkIndexByScrollAction docs) { BulkRequest bulkRequest = new BulkRequest(); @@ -171,7 +163,7 @@ public abstract class AbstractAsyncBulkIndexByScrollAction, Response extends BulkIndexByScrollResponse, TA extends TransportAction> - extends BaseRestHandler { +public abstract class AbstractBaseReindexRestHandler, Response extends BulkIndexByScrollResponse, + TA extends TransportAction> extends BaseRestHandler { protected final IndicesQueriesRegistry indicesQueriesRegistry; private final ClusterService clusterService; private final TA action; diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java index 4b795bd048a..fd7b32f2cb5 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/AbstractBulkByScrollRequest.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; import java.io.IOException; import java.util.Arrays; @@ -205,6 +206,11 @@ public abstract class AbstractBulkByScrollRequest description) { + super(id, type, action, description); + } + + @Override + public Status getStatus() { + return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get()); + } + + /** + * Total number of successfully processed documents. + */ + public long getSuccessfullyProcessed() { + return updated.get() + created.get() + deleted.get(); + } + + public static class Status implements Task.Status { + public static final Status PROTOTYPE = new Status(0, 0, 0, 0, 0, 0, 0); + + private final long total; + private final long updated; + private final long created; + private final long deleted; + private final int batches; + private final long versionConflicts; + private final long noops; + + public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops) { + this.total = checkPositive(total, "total"); + this.updated = checkPositive(updated, "updated"); + this.created = checkPositive(created, "created"); + this.deleted = checkPositive(deleted, "deleted"); + this.batches = checkPositive(batches, "batches"); + this.versionConflicts = checkPositive(versionConflicts, "versionConflicts"); + this.noops = checkPositive(noops, "noops"); + } + + public Status(StreamInput in) throws IOException { + total = in.readVLong(); + updated = in.readVLong(); + created = in.readVLong(); + deleted = in.readVLong(); + batches = in.readVInt(); + versionConflicts = in.readVLong(); + noops = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(total); + out.writeVLong(updated); + out.writeVLong(created); + out.writeVLong(deleted); + out.writeVInt(batches); + out.writeVLong(versionConflicts); + out.writeVLong(noops); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerXContent(builder, params, true, true); + return builder.endObject(); + } + + public XContentBuilder innerXContent(XContentBuilder builder, Params params, boolean includeCreated, boolean includeDeleted) + throws IOException { + builder.field("total", total); + builder.field("updated", updated); + if (includeCreated) { + builder.field("created", created); + } + if (includeDeleted) { + builder.field("deleted", deleted); + } + builder.field("batches", batches); + builder.field("version_conflicts", versionConflicts); + builder.field("noops", noops); + return builder; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("BulkIndexByScrollResponse["); + innerToString(builder, true, true); + return builder.append(']').toString(); + } + + public void innerToString(StringBuilder builder, boolean includeCreated, boolean includeDeleted) { + builder.append("updated=").append(updated); + if (includeCreated) { + builder.append(",created=").append(created); + } + if (includeDeleted) { + builder.append(",deleted=").append(deleted); + } + builder.append(",batches=").append(batches); + builder.append(",versionConflicts=").append(versionConflicts); + builder.append(",noops=").append(noops); + } + + @Override + public String getWriteableName() { + return "bulk-by-scroll"; + } + + @Override + public Status readFrom(StreamInput in) throws IOException { + return new Status(in); + } + + /** + * The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents + * to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived. + */ + public long getTotal() { + return total; + } + + /** + * Count of documents updated. + */ + public long getUpdated() { + return updated; + } + + /** + * Count of documents created. + */ + public long getCreated() { + return created; + } + + /** + * Count of successful delete operations. + */ + public long getDeleted() { + return deleted; + } + + /** + * Number of scan responses this request has processed. + */ + public int getBatches() { + return batches; + } + + /** + * Number of version conflicts this request has hit. + */ + public long getVersionConflicts() { + return versionConflicts; + } + + /** + * Number of noops (skipped bulk items) as part of this request. + */ + public long getNoops() { + return noops; + } + + private int checkPositive(int value, String name) { + if (value < 0) { + throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]"); + } + return value; + } + + private long checkPositive(long value, String name) { + if (value < 0) { + throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]"); + } + return value; + } + } + + void setTotal(long totalHits) { + total.set(totalHits); + } + + void countBatch() { + batch.incrementAndGet(); + } + + void countNoop() { + noops.incrementAndGet(); + } + + void countCreated() { + created.incrementAndGet(); + } + + void countUpdated() { + updated.incrementAndGet(); + } + + void countDeleted() { + deleted.incrementAndGet(); + } + + void countVersionConflict() { + versionConflicts.incrementAndGet(); + } +} diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkIndexByScrollResponse.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkIndexByScrollResponse.java index 01aaf06fc11..7ae1b32f9cb 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkIndexByScrollResponse.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/BulkIndexByScrollResponse.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -33,68 +34,64 @@ import java.util.List; import static java.lang.Math.min; import static java.util.Collections.unmodifiableList; +import static java.util.Objects.requireNonNull; import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; /** * Response used for actions that index many documents using a scroll request. */ public class BulkIndexByScrollResponse extends ActionResponse implements ToXContent { - static final String TOOK_FIELD = "took"; - static final String UPDATED_FIELD = "updated"; - static final String BATCHES_FIELD = "batches"; - static final String VERSION_CONFLICTS_FIELD = "version_conflicts"; - static final String NOOPS_FIELD = "noops"; - static final String FAILURES_FIELD = "failures"; - - private long took; - private long updated; - private int batches; - private long versionConflicts; - private long noops; + private TimeValue took; + private BulkByScrollTask.Status status; private List indexingFailures; private List searchFailures; public BulkIndexByScrollResponse() { } - public BulkIndexByScrollResponse(long took, long updated, int batches, long versionConflicts, long noops, - List indexingFailures, List searchFailures) { + public BulkIndexByScrollResponse(TimeValue took, BulkByScrollTask.Status status, List indexingFailures, + List searchFailures) { this.took = took; - this.updated = updated; - this.batches = batches; - this.versionConflicts = versionConflicts; - this.noops = noops; + this.status = requireNonNull(status, "Null status not supported"); this.indexingFailures = indexingFailures; this.searchFailures = searchFailures; } - public long getTook() { + public TimeValue getTook() { return took; } + protected BulkByScrollTask.Status getStatus() { + return status; + } + public long getUpdated() { - return updated; + return status.getUpdated(); } public int getBatches() { - return batches; + return status.getBatches(); } public long getVersionConflicts() { - return versionConflicts; + return status.getVersionConflicts(); } public long getNoops() { - return noops; + return status.getNoops(); } /** - * Indexing failures. + * All of the indexing failures. Version conflicts are only included if the request sets abortOnVersionConflict to true (the + * default). */ public List getIndexingFailures() { return indexingFailures; } + /** + * All search failures. + */ public List getSearchFailures() { return searchFailures; } @@ -102,11 +99,8 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVLong(took); - out.writeVLong(updated); - out.writeVInt(batches); - out.writeVLong(versionConflicts); - out.writeVLong(noops); + took.writeTo(out); + status.writeTo(out); out.writeVInt(indexingFailures.size()); for (Failure failure: indexingFailures) { failure.writeTo(out); @@ -120,11 +114,8 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - took = in.readVLong(); - updated = in.readVLong(); - batches = in.readVInt(); - versionConflicts = in.readVLong(); - noops = in.readVLong(); + took = TimeValue.readTimeValue(in); + status = new BulkByScrollTask.Status(in); int indexingFailuresCount = in.readVInt(); List indexingFailures = new ArrayList<>(indexingFailuresCount); for (int i = 0; i < indexingFailuresCount; i++) { @@ -136,16 +127,14 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont for (int i = 0; i < searchFailuresCount; i++) { searchFailures.add(readShardSearchFailure(in)); } + this.searchFailures = unmodifiableList(searchFailures); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(TOOK_FIELD, took); - builder.field(UPDATED_FIELD, updated); - builder.field(BATCHES_FIELD, batches); - builder.field(VERSION_CONFLICTS_FIELD, versionConflicts); - builder.field(NOOPS_FIELD, noops); - builder.startArray(FAILURES_FIELD); + builder.field("took", took.millis()); + status.innerXContent(builder, params, false, false); + builder.startArray("failures"); for (Failure failure: indexingFailures) { builder.startObject(); failure.toXContent(builder, params); @@ -163,22 +152,11 @@ public class BulkIndexByScrollResponse extends ActionResponse implements ToXCont @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append(toStringName()).append("["); - builder.append("took=").append(took); - builder.append(",updated=").append(updated); - builder.append(",batches=").append(batches); - builder.append(",versionConflicts=").append(versionConflicts); - builder.append(",noops=").append(noops); + builder.append("BulkIndexByScrollResponse["); + builder.append("took=").append(took).append(','); + status.innerToString(builder, false, false); builder.append(",indexing_failures=").append(getIndexingFailures().subList(0, min(3, getIndexingFailures().size()))); builder.append(",search_failures=").append(getSearchFailures().subList(0, min(3, getSearchFailures().size()))); - innerToString(builder); - return builder.append("]").toString(); - } - - protected String toStringName() { - return "BulkIndexByScrollResponse"; - } - - protected void innerToString(StringBuilder builder) { + return builder.append(']').toString(); } } \ No newline at end of file diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/ReindexResponse.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/ReindexResponse.java index 74e50f60417..781399e51c8 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/ReindexResponse.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/ReindexResponse.java @@ -21,75 +21,53 @@ package org.elasticsearch.plugin.reindex; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.plugin.reindex.BulkByScrollTask.Status; import java.io.IOException; -import java.util.Iterator; import java.util.List; +/** + * Response for the ReindexAction. + */ public class ReindexResponse extends BulkIndexByScrollResponse { - static final String CREATED_FIELD = "created"; - - private long created; - public ReindexResponse() { } - public ReindexResponse(long took, long created, long updated, int batches, long versionConflicts, long noops, List indexingFailures, List searchFailures) { - super(took, updated, batches, versionConflicts, noops, indexingFailures, searchFailures); - this.created = created; + public ReindexResponse(TimeValue took, Status status, List indexingFailures, List searchFailures) { + super(took, status, indexingFailures, searchFailures); } public long getCreated() { - return created; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVLong(created); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - created = in.readVLong(); + return getStatus().getCreated(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - super.toXContent(builder, params); - builder.field(CREATED_FIELD, created); + builder.field("took", getTook()); + getStatus().innerXContent(builder, params, true, false); + builder.startArray("failures"); + for (Failure failure: getIndexingFailures()) { + builder.startObject(); + failure.toXContent(builder, params); + builder.endObject(); + } + for (ShardSearchFailure failure: getSearchFailures()) { + builder.startObject(); + failure.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); return builder; } @Override - protected String toStringName() { - return "ReindexResponse"; - } - - @Override - protected void innerToString(StringBuilder builder) { - builder.append(",created=").append(created); - } - - /** - * Get the first few failures to build a useful for toString. - */ - protected void truncatedFailures(StringBuilder builder) { - builder.append(",failures=["); - Iterator failures = getIndexingFailures().iterator(); - int written = 0; - while (failures.hasNext() && written < 3) { - Failure failure = failures.next(); - builder.append(failure.getMessage()); - if (written != 0) { - builder.append(", "); - } - written++; - } - builder.append(']'); + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ReindexResponse["); + builder.append("took=").append(getTook()).append(','); + getStatus().innerToString(builder, true, false); + return builder.append(']').toString(); } } diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportReindexAction.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportReindexAction.java index 62f2f7aa433..acb9b0fe4fc 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportReindexAction.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportReindexAction.java @@ -21,8 +21,10 @@ package org.elasticsearch.plugin.reindex; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; @@ -39,9 +41,11 @@ import org.elasticsearch.index.mapper.internal.TTLFieldMapper; import org.elasticsearch.index.mapper.internal.VersionFieldMapper; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; import java.util.Objects; import static java.util.Objects.requireNonNull; @@ -66,10 +70,15 @@ public class TransportReindexAction extends HandledTransportAction listener) { + protected void doExecute(Task task, ReindexRequest request, ActionListener listener) { validateAgainstAliases(request.getSource(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, clusterService.state()); - new AsyncIndexBySearchAction(logger, scriptService, client, threadPool, request, listener).start(); + new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start(); + } + + @Override + protected void doExecute(ReindexRequest request, ActionListener listener) { + throw new UnsupportedOperationException("task required"); } /** @@ -106,9 +115,9 @@ public class TransportReindexAction extends HandledTransportAction { - public AsyncIndexBySearchAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, + public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, ReindexRequest request, ActionListener listener) { - super(logger, scriptService, client, threadPool, request, request.getSource(), listener); + super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener); } @Override @@ -162,17 +171,16 @@ public class TransportReindexAction extends HandledTransportAction indexingFailures, List searchFailures) { + return new ReindexResponse(took, task.getStatus(), indexingFailures, searchFailures); + } + /* * Methods below here handle script updating the index request. They try * to be pretty liberal with regards to types because script are often * dynamically typed. */ - @Override - protected ReindexResponse buildResponse(long took) { - return new ReindexResponse(took, created(), updated(), batches(), versionConflicts(), noops(), indexingFailures(), - searchFailures()); - } - @Override protected void scriptChangedIndex(IndexRequest index, Object to) { requireNonNull(to, "Can't reindex without a destination index!"); diff --git a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportUpdateByQueryAction.java b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportUpdateByQueryAction.java index feafa6cee83..46140c64301 100644 --- a/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportUpdateByQueryAction.java +++ b/plugins/reindex/src/main/java/org/elasticsearch/plugin/reindex/TransportUpdateByQueryAction.java @@ -20,7 +20,9 @@ package org.elasticsearch.plugin.reindex; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; @@ -28,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.internal.IdFieldMapper; import org.elasticsearch.index.mapper.internal.IndexFieldMapper; @@ -38,9 +41,12 @@ import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.internal.TypeFieldMapper; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.List; + public class TransportUpdateByQueryAction extends HandledTransportAction { private final Client client; private final ScriptService scriptService; @@ -56,18 +62,23 @@ public class TransportUpdateByQueryAction extends HandledTransportAction listener) { - new AsyncIndexBySearchAction(logger, scriptService, client, threadPool, request, listener).start(); + new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start(); + } + + @Override + protected void doExecute(UpdateByQueryRequest request, ActionListener listener) { + throw new UnsupportedOperationException("task required"); } /** * Simple implementation of update-by-query using scrolling and bulk. */ static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction { - public AsyncIndexBySearchAction(ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, + public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, UpdateByQueryRequest request, ActionListener listener) { - super(logger, scriptService, client, threadPool, request, request.getSource(), listener); + super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener); } @Override @@ -83,9 +94,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction indexingFailures, + List searchFailures) { + return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); } @Override diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java index cd607b5a040..65ba0c79908 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java @@ -28,10 +28,12 @@ import org.junit.Before; public abstract class AbstractAsyncBulkIndexByScrollActionTestCase, Response extends BulkIndexByScrollResponse> extends ESTestCase { protected ThreadPool threadPool; + protected BulkByScrollTask task; @Before public void setupForTest() { threadPool = new ThreadPool(getTestName()); + task = new BulkByScrollTask(1, "test", "test", () -> "test"); } @After diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTest.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTest.java deleted file mode 100644 index f69be1c6635..00000000000 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.reindex; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkItemResponse.Failure; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.ClearScrollRequest; -import org.elasticsearch.action.search.ClearScrollResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.FilterClient; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; - -import static org.apache.lucene.util.TestUtil.randomSimpleString; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.emptyCollectionOf; -import static org.hamcrest.Matchers.equalTo; - -public class AsyncBulkByScrollActionTest extends ESTestCase { - private MockClearScrollClient client; - private ThreadPool threadPool; - private DummyAbstractBulkByScrollRequest mainRequest; - private SearchRequest firstSearchRequest; - private PlainActionFuture listener; - private String scrollId; - - @Before - public void setupForTest() { - client = new MockClearScrollClient(new NoOpClient(getTestName())); - threadPool = new ThreadPool(getTestName()); - mainRequest = new DummyAbstractBulkByScrollRequest(); - firstSearchRequest = null; - listener = new PlainActionFuture<>(); - scrollId = null; - } - - @After - public void tearDownAndVerifyCommonStuff() { - client.close(); - threadPool.shutdown(); - if (scrollId != null) { - assertThat(client.scrollsCleared, contains(scrollId)); - } - } - - /** - * Generates a random scrollId and registers it so that when the test - * finishes we check that it was cleared. Subsequent calls reregister a new - * random scroll id so it is checked instead. - */ - private String scrollId() { - scrollId = randomSimpleString(random()); - return scrollId; - } - - /** - * Mimicks a ThreadPool rejecting execution of the task. - */ - public void testThreadPoolRejectionsAbortRequest() throws Exception { - threadPool.shutdown(); - threadPool = new ThreadPool(getTestName()) { - @Override - public Executor generic() { - return new Executor() { - @Override - public void execute(Runnable command) { - ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); - } - }; - } - }; - new DummyAbstractAsyncBulkByScrollAction() - .onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), null)); - try { - listener.get(); - fail("Expected a failure"); - } catch (ExecutionException e) { - assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]")); - } - } - - /** - * Mimicks shard search failures usually caused by the data node serving the - * scroll request going down. - */ - public void testShardFailuresAbortRequest() throws Exception { - ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - action.onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); - listener.get(); - assertThat(action.indexingFailures(), emptyCollectionOf(Failure.class)); - assertThat(action.searchFailures(), contains(shardFailure)); - } - - /** - * Mimicks bulk indexing failures. - */ - public void testBulkFailuresAbortRequest() throws Exception { - Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); - DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); - action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong())); - listener.get(); - assertThat(action.indexingFailures(), contains(failure)); - assertThat(action.searchFailures(), emptyCollectionOf(ShardSearchFailure.class)); - } - - private class DummyAbstractAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { - public DummyAbstractAsyncBulkByScrollAction() { - super(logger, client, threadPool, AsyncBulkByScrollActionTest.this.mainRequest, firstSearchRequest, listener); - } - - @Override - protected BulkRequest buildBulk(Iterable docs) { - return new BulkRequest(); - } - - @Override - protected Object buildResponse(long took) { - return new Object(); - } - } - - private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest { - @Override - protected DummyAbstractBulkByScrollRequest self() { - return this; - } - } - - private static class MockClearScrollClient extends FilterClient { - private List scrollsCleared = new ArrayList<>(); - - public MockClearScrollClient(Client in) { - super(in); - } - - @Override - @SuppressWarnings("unchecked") - protected , Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder> void doExecute( - Action action, Request request, ActionListener listener) { - if (request instanceof ClearScrollRequest) { - ClearScrollRequest clearScroll = (ClearScrollRequest) request; - scrollsCleared.addAll(clearScroll.getScrollIds()); - listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size())); - return; - } - super.doExecute(action, request, listener); - } - } -} diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java new file mode 100644 index 00000000000..79ed5b9f0f4 --- /dev/null +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/AsyncBulkByScrollActionTests.java @@ -0,0 +1,294 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.reindex; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.FilterClient; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; + +import static java.util.Collections.emptyMap; +import static org.apache.lucene.util.TestUtil.randomSimpleString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.emptyCollectionOf; +import static org.hamcrest.Matchers.equalTo; + +public class AsyncBulkByScrollActionTests extends ESTestCase { + private MockClearScrollClient client; + private ThreadPool threadPool; + private DummyAbstractBulkByScrollRequest mainRequest; + private SearchRequest firstSearchRequest; + private PlainActionFuture listener; + private String scrollId; + private BulkByScrollTask task; + + @Before + public void setupForTest() { + client = new MockClearScrollClient(new NoOpClient(getTestName())); + threadPool = new ThreadPool(getTestName()); + mainRequest = new DummyAbstractBulkByScrollRequest(); + firstSearchRequest = null; + listener = new PlainActionFuture<>(); + scrollId = null; + task = new BulkByScrollTask(0, "test", "test", () -> "test"); + } + + @After + public void tearDownAndVerifyCommonStuff() { + client.close(); + threadPool.shutdown(); + } + + /** + * Generates a random scrollId and registers it so that when the test + * finishes we check that it was cleared. Subsequent calls reregister a new + * random scroll id so it is checked instead. + */ + private String scrollId() { + scrollId = randomSimpleString(random(), 1, 1000); // Empty string's get special behavior we don't want + return scrollId; + } + + public void testScrollResponseSetsTotal() { + // Default is 0, meaning unstarted + assertEquals(0, task.getStatus().getTotal()); + + long total = randomIntBetween(0, Integer.MAX_VALUE); + InternalSearchHits hits = new InternalSearchHits(null, total, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + new DummyAbstractAsyncBulkByScrollAction() + .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + assertEquals(total, task.getStatus().getTotal()); + } + + public void testEachScrollResponseIsABatch() { + // Replace the generic thread pool with one that executes immediately so the batch is updated immediately + threadPool.shutdown(); + threadPool = new ThreadPool(getTestName()) { + @Override + public Executor generic() { + return new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + }; + int maxBatches = randomIntBetween(0, 100); + for (int batches = 1; batches < maxBatches; batches++) { + InternalSearchHit hit = new InternalSearchHit(0, "id", new Text("type"), emptyMap()); + InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[] { hit }, 0, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + new DummyAbstractAsyncBulkByScrollAction() + .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + + assertEquals(batches, task.getStatus().getBatches()); + } + } + + public void testBulkResponseSetsLotsOfStatus() { + mainRequest.setAbortOnVersionConflict(false); + int maxBatches = randomIntBetween(0, 100); + long versionConflicts = 0; + long created = 0; + long updated = 0; + long deleted = 0; + for (int batches = 0; batches < maxBatches; batches++) { + BulkItemResponse[] responses = new BulkItemResponse[randomIntBetween(0, 10000)]; + for (int i = 0; i < responses.length; i++) { + ShardId shardId = new ShardId(new Index("name", "uid"), 0); + String opType; + if (rarely()) { + opType = randomSimpleString(random()); + versionConflicts++; + responses[i] = new BulkItemResponse(i, opType, new Failure(shardId.getIndexName(), "type", "id" + i, + new VersionConflictEngineException(shardId, "type", "id", "test"))); + continue; + } + boolean createdResponse; + switch (randomIntBetween(0, 2)) { + case 0: + opType = randomFrom("index", "create"); + createdResponse = true; + created++; + break; + case 1: + opType = randomFrom("index", "create"); + createdResponse = false; + updated++; + break; + case 2: + opType = "delete"; + createdResponse = false; + deleted++; + break; + default: + throw new RuntimeException("Bad scenario"); + } + responses[i] = new BulkItemResponse(i, opType, new IndexResponse(shardId, "type", "id" + i, randomInt(), createdResponse)); + } + new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0)); + assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); + assertEquals(updated, task.getStatus().getUpdated()); + assertEquals(created, task.getStatus().getCreated()); + assertEquals(deleted, task.getStatus().getDeleted()); + assertEquals(versionConflicts, task.getStatus().getVersionConflicts()); + } + } + + /** + * Mimicks a ThreadPool rejecting execution of the task. + */ + public void testThreadPoolRejectionsAbortRequest() throws Exception { + threadPool.shutdown(); + threadPool = new ThreadPool(getTestName()) { + @Override + public Executor generic() { + return new Executor() { + @Override + public void execute(Runnable command) { + ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); + } + }; + } + }; + InternalSearchHits hits = new InternalSearchHits(null, 0, 0); + InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false); + new DummyAbstractAsyncBulkByScrollAction() + .onScrollResponse(new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null)); + try { + listener.get(); + fail("Expected a failure"); + } catch (ExecutionException e) { + assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]")); + } + assertThat(client.scrollsCleared, contains(scrollId)); + } + + /** + * Mimicks shard search failures usually caused by the data node serving the + * scroll request going down. + */ + public void testShardFailuresAbortRequest() throws Exception { + ShardSearchFailure shardFailure = new ShardSearchFailure(new RuntimeException("test")); + new DummyAbstractAsyncBulkByScrollAction() + .onScrollResponse(new SearchResponse(null, scrollId(), 5, 4, randomLong(), new ShardSearchFailure[] { shardFailure })); + BulkIndexByScrollResponse response = listener.get(); + assertThat(response.getIndexingFailures(), emptyCollectionOf(Failure.class)); + assertThat(response.getSearchFailures(), contains(shardFailure)); + assertThat(client.scrollsCleared, contains(scrollId)); + } + + /** + * Mimicks bulk indexing failures. + */ + public void testBulkFailuresAbortRequest() throws Exception { + Failure failure = new Failure("index", "type", "id", new RuntimeException("test")); + DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction(); + action.onBulkResponse(new BulkResponse(new BulkItemResponse[] {new BulkItemResponse(0, "index", failure)}, randomLong())); + BulkIndexByScrollResponse response = listener.get(); + assertThat(response.getIndexingFailures(), contains(failure)); + assertThat(response.getSearchFailures(), emptyCollectionOf(ShardSearchFailure.class)); + } + + private class DummyAbstractAsyncBulkByScrollAction + extends AbstractAsyncBulkByScrollAction { + public DummyAbstractAsyncBulkByScrollAction() { + super(AsyncBulkByScrollActionTests.this.task, logger, client, threadPool, + AsyncBulkByScrollActionTests.this.mainRequest, firstSearchRequest, listener); + } + + @Override + protected BulkRequest buildBulk(Iterable docs) { + return new BulkRequest(); + } + + @Override + protected BulkIndexByScrollResponse buildResponse(TimeValue took, List indexingFailures, + List searchFailures) { + return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures); + } + } + + private static class DummyAbstractBulkByScrollRequest extends AbstractBulkByScrollRequest { + @Override + protected DummyAbstractBulkByScrollRequest self() { + return this; + } + } + + private static class MockClearScrollClient extends FilterClient { + private List scrollsCleared = new ArrayList<>(); + + public MockClearScrollClient(Client in) { + super(in); + } + + @Override + @SuppressWarnings("unchecked") + protected , Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder> void doExecute( + Action action, Request request, ActionListener listener) { + if (request instanceof ClearScrollRequest) { + ClearScrollRequest clearScroll = (ClearScrollRequest) request; + scrollsCleared.addAll(clearScroll.getScrollIds()); + listener.onResponse((Response) new ClearScrollResponse(true, clearScroll.getScrollIds().size())); + return; + } + super.doExecute(action, request, listener); + } + } +} diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java new file mode 100644 index 00000000000..7f01deca151 --- /dev/null +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/BulkByScrollTaskTests.java @@ -0,0 +1,112 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.reindex; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +public class BulkByScrollTaskTests extends ESTestCase { + private BulkByScrollTask task; + + @Before + public void createTask() { + task = new BulkByScrollTask(1, "test_type", "test_action", () -> "test"); + } + + public void testBasicData() { + assertEquals(1, task.getId()); + assertEquals("test_type", task.getType()); + assertEquals("test_action", task.getAction()); + } + + public void testProgress() { + long created = 0; + long updated = 0; + long deleted = 0; + long versionConflicts = 0; + long noops = 0; + int batch = 0; + BulkByScrollTask.Status status = task.getStatus(); + assertEquals(0, status.getTotal()); + assertEquals(created, status.getCreated()); + assertEquals(updated, status.getUpdated()); + assertEquals(deleted, status.getDeleted()); + assertEquals(versionConflicts, status.getVersionConflicts()); + assertEquals(batch, status.getBatches()); + assertEquals(noops, status.getNoops()); + + long totalHits = randomIntBetween(10, 1000); + task.setTotal(totalHits); + for (long p = 0; p < totalHits; p++) { + status = task.getStatus(); + assertEquals(totalHits, status.getTotal()); + assertEquals(created, status.getCreated()); + assertEquals(updated, status.getUpdated()); + assertEquals(deleted, status.getDeleted()); + assertEquals(versionConflicts, status.getVersionConflicts()); + assertEquals(batch, status.getBatches()); + assertEquals(noops, status.getNoops()); + + if (randomBoolean()) { + created++; + task.countCreated(); + } else if (randomBoolean()) { + updated++; + task.countUpdated(); + } else { + deleted++; + task.countDeleted(); + } + + if (rarely()) { + versionConflicts++; + task.countVersionConflict(); + } + + if (rarely()) { + batch++; + task.countBatch(); + } + + if (rarely()) { + noops++; + task.countNoop(); + } + } + status = task.getStatus(); + assertEquals(totalHits, status.getTotal()); + assertEquals(created, status.getCreated()); + assertEquals(updated, status.getUpdated()); + assertEquals(deleted, status.getDeleted()); + assertEquals(versionConflicts, status.getVersionConflicts()); + assertEquals(batch, status.getBatches()); + assertEquals(noops, status.getNoops()); + } + + public void testStatusHatesNegatives() { + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0)); + expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1)); + } +} diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexBasicTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexBasicTests.java index 0663bb26adb..233120330c4 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexBasicTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexBasicTests.java @@ -19,15 +19,15 @@ package org.elasticsearch.plugin.reindex; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; import java.util.ArrayList; import java.util.List; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; public class ReindexBasicTests extends ReindexTestCase { public void testFiltering() throws Exception { diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexMetadataTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexMetadataTests.java index 469f884f0f8..dffac062040 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexMetadataTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexMetadataTests.java @@ -67,7 +67,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe @Override protected TransportReindexAction.AsyncIndexBySearchAction action() { - return new TransportReindexAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener()); + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener()); } @Override diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexParentChildTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexParentChildTests.java index fe2505e3d48..9f75e2c5000 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexParentChildTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexParentChildTests.java @@ -19,15 +19,15 @@ package org.elasticsearch.plugin.reindex; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.index.query.QueryBuilder; + import static org.elasticsearch.index.query.QueryBuilders.hasParentQuery; import static org.elasticsearch.index.query.QueryBuilders.idsQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.index.query.QueryBuilder; - /** * Index-by-search tests for parent/child. */ diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexScriptTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexScriptTests.java index 5ae73d0a791..31bb9c9eea8 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexScriptTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/ReindexScriptTests.java @@ -134,6 +134,6 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri @Override protected AbstractAsyncBulkIndexByScrollAction action() { - return new TransportReindexAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener()); + return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener()); } } \ No newline at end of file diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java index 962f2c2c72f..1e1f7e5a99b 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/RoundTripTests.java @@ -42,6 +42,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static org.apache.lucene.util.TestUtil.randomSimpleString; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; /** * Round trip tests for all Streamable things declared in this plugin. @@ -67,23 +68,6 @@ public class RoundTripTests extends ESTestCase { assertRequestEquals(update, tripped); } - public void testReindexResponse() throws IOException { - ReindexResponse response = new ReindexResponse(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), - randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomIndexingFailures(), randomSearchFailures()); - ReindexResponse tripped = new ReindexResponse(); - roundTrip(response, tripped); - assertResponseEquals(response, tripped); - assertEquals(response.getCreated(), tripped.getCreated()); - } - - public void testBulkIndexByScrollResponse() throws IOException { - BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(randomPositiveLong(), randomPositiveLong(), randomPositiveInt(), - randomPositiveLong(), randomPositiveLong(), randomIndexingFailures(), randomSearchFailures()); - BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse(); - roundTrip(response, tripped); - assertResponseEquals(response, tripped); - } - private void randomRequest(AbstractBulkIndexByScrollRequest request) { request.getSource().indices("test"); request.getSource().source().size(between(1, 1000)); @@ -106,6 +90,35 @@ public class RoundTripTests extends ESTestCase { assertEquals(request.getScript(), tripped.getScript()); } + public void testBulkByTaskStatus() throws IOException { + BulkByScrollTask.Status status = randomStatus(); + BytesStreamOutput out = new BytesStreamOutput(); + status.writeTo(out); + BulkByScrollTask.Status tripped = new BulkByScrollTask.Status(out.bytes().streamInput()); + assertTaskStatusEquals(status, tripped); + } + + public void testReindexResponse() throws IOException { + ReindexResponse response = new ReindexResponse(timeValueMillis(randomPositiveLong()), randomStatus(), randomIndexingFailures(), + randomSearchFailures()); + ReindexResponse tripped = new ReindexResponse(); + roundTrip(response, tripped); + assertResponseEquals(response, tripped); + } + + public void testBulkIndexByScrollResponse() throws IOException { + BulkIndexByScrollResponse response = new BulkIndexByScrollResponse(timeValueMillis(randomPositiveLong()), randomStatus(), + randomIndexingFailures(), randomSearchFailures()); + BulkIndexByScrollResponse tripped = new BulkIndexByScrollResponse(); + roundTrip(response, tripped); + assertResponseEquals(response, tripped); + } + + private BulkByScrollTask.Status randomStatus() { + return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), + randomPositiveInt(), randomPositiveLong(), randomPositiveLong()); + } + private List randomIndexingFailures() { return usually() ? emptyList() : singletonList(new Failure(randomSimpleString(random()), randomSimpleString(random()), @@ -121,25 +134,6 @@ public class RoundTripTests extends ESTestCase { new SearchShardTarget(randomSimpleString(random()), index, randomInt()), randomFrom(RestStatus.values()))); } - - private void assertResponseEquals(BulkIndexByScrollResponse response, BulkIndexByScrollResponse tripped) { - assertEquals(response.getTook(), tripped.getTook()); - assertEquals(response.getUpdated(), tripped.getUpdated()); - assertEquals(response.getBatches(), tripped.getBatches()); - assertEquals(response.getVersionConflicts(), tripped.getVersionConflicts()); - assertEquals(response.getNoops(), tripped.getNoops()); - assertEquals(response.getIndexingFailures().size(), tripped.getIndexingFailures().size()); - for (int i = 0; i < response.getIndexingFailures().size(); i++) { - Failure expected = response.getIndexingFailures().get(i); - Failure actual = tripped.getIndexingFailures().get(i); - assertEquals(expected.getIndex(), actual.getIndex()); - assertEquals(expected.getType(), actual.getType()); - assertEquals(expected.getId(), actual.getId()); - assertEquals(expected.getMessage(), actual.getMessage()); - assertEquals(expected.getStatus(), actual.getStatus()); - } - } - private void roundTrip(Streamable example, Streamable empty) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); example.writeTo(out); @@ -164,4 +158,35 @@ public class RoundTripTests extends ESTestCase { private int randomPositiveInt() { return randomInt(Integer.MAX_VALUE); } + + private void assertResponseEquals(BulkIndexByScrollResponse expected, BulkIndexByScrollResponse actual) { + assertEquals(expected.getTook(), actual.getTook()); + assertTaskStatusEquals(expected.getStatus(), actual.getStatus()); + assertEquals(expected.getIndexingFailures().size(), actual.getIndexingFailures().size()); + for (int i = 0; i < expected.getIndexingFailures().size(); i++) { + Failure expectedFailure = expected.getIndexingFailures().get(i); + Failure actualFailure = actual.getIndexingFailures().get(i); + assertEquals(expectedFailure.getIndex(), actualFailure.getIndex()); + assertEquals(expectedFailure.getType(), actualFailure.getType()); + assertEquals(expectedFailure.getId(), actualFailure.getId()); + assertEquals(expectedFailure.getMessage(), actualFailure.getMessage()); + assertEquals(expectedFailure.getStatus(), actualFailure.getStatus()); + } + assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size()); + for (int i = 0; i < expected.getSearchFailures().size(); i++) { + ShardSearchFailure expectedFailure = expected.getSearchFailures().get(i); + ShardSearchFailure actualFailure = actual.getSearchFailures().get(i); + assertEquals(expectedFailure.shard(), actualFailure.shard()); + assertEquals(expectedFailure.status(), actualFailure.status()); + // We can't use getCause because throwable doesn't implement equals + assertEquals(expectedFailure.reason(), actualFailure.reason()); + } + } + + private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { + assertEquals(expected.getUpdated(), actual.getUpdated()); + assertEquals(expected.getBatches(), actual.getBatches()); + assertEquals(expected.getVersionConflicts(), actual.getVersionConflicts()); + assertEquals(expected.getNoops(), actual.getNoops()); + } } diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryMetadataTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryMetadataTests.java index 5399fe468d9..04b7cc259d5 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryMetadataTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryMetadataTests.java @@ -33,7 +33,7 @@ public class UpdateByQueryMetadataTests @Override protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() { - return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener()); + return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, request(), listener()); } @Override diff --git a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryWithScriptTests.java b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryWithScriptTests.java index c5165c86dfc..b989baf879f 100644 --- a/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryWithScriptTests.java +++ b/plugins/reindex/src/test/java/org/elasticsearch/plugin/reindex/UpdateByQueryWithScriptTests.java @@ -50,6 +50,7 @@ public class UpdateByQueryWithScriptTests @Override protected AbstractAsyncBulkIndexByScrollAction action() { - return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(logger, null, null, threadPool, request(), listener()); + return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, + request(), listener()); } }