From 7f5e29ddb21ee5935852633f0dff483c8fbd34b2 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 28 Aug 2018 19:02:23 +0200 Subject: [PATCH] HLREST: add reindex API (#32679) Adds the reindex API to the high level REST client. --- .../client/RequestConverters.java | 16 + .../client/RestHighLevelClient.java | 29 ++ .../java/org/elasticsearch/client/CrudIT.java | 67 +++ .../client/RequestConvertersTests.java | 63 +++ .../client/RestHighLevelClientTests.java | 1 - .../documentation/CRUDDocumentationIT.java | 149 ++++++ .../high-level/document/reindex.asciidoc | 215 +++++++++ .../high-level/supported-apis.asciidoc | 2 + .../index/reindex/RestReindexAction.java | 3 +- .../index/reindex/ReindexMetadataTests.java | 3 +- .../index/reindex/ReindexScriptTests.java | 3 +- .../index/reindex/RestReindexActionTests.java | 4 +- .../index/reindex/RoundTripTests.java | 3 +- .../action/bulk/BulkItemResponse.java | 35 +- .../reindex/AbstractBulkByScrollRequest.java | 8 + .../index/reindex/BulkByScrollResponse.java | 113 ++++- .../reindex/BulkByScrollResponseBuilder.java | 76 +++ .../index/reindex/BulkByScrollTask.java | 435 +++++++++++++++++- .../index/reindex/ReindexRequest.java | 176 ++++++- .../index/reindex/RemoteInfo.java | 25 +- .../index/reindex/ScrollableHitSource.java | 13 +- .../search/builder/SearchSourceBuilder.java | 11 +- .../reindex/BulkByScrollResponseTests.java | 51 +- ...ulkByScrollTaskStatusOrExceptionTests.java | 101 ++++ .../reindex/BulkByScrollTaskStatusTests.java | 92 +++- .../index/reindex/ReindexRequestTests.java | 8 +- .../xpack/upgrade/InternalIndexReindexer.java | 10 +- 27 files changed, 1644 insertions(+), 68 deletions(-) create mode 100644 docs/java-rest/high-level/document/reindex.asciidoc create mode 100644 server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java create mode 100644 server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 9dd316a0fb0..15bbde6b5bf 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -106,6 +106,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest; @@ -820,6 +821,21 @@ final class RequestConverters { return request; } + static Request reindex(ReindexRequest reindexRequest) throws IOException { + String endpoint = new EndpointBuilder().addPathPart("_reindex").build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request) + .withRefresh(reindexRequest.isRefresh()) + .withTimeout(reindexRequest.getTimeout()) + .withWaitForActiveShards(reindexRequest.getWaitForActiveShards()); + + if (reindexRequest.getScrollTime() != null) { + params.putParam("scroll", reindexRequest.getScrollTime()); + } + request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index c45ec048ae8..4ac5bfd080f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -64,6 +64,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; @@ -395,6 +397,33 @@ public class RestHighLevelClient implements Closeable { performRequestAsyncAndParseEntity(bulkRequest, RequestConverters::bulk, options, BulkResponse::fromXContent, listener, emptySet()); } + /** + * Executes a reindex request. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes a reindex request. + * See Reindex API on elastic.co + * @param reindexRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 89f357477fa..7978d76c56d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -41,12 +41,16 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -624,6 +628,69 @@ public class CrudIT extends ESRestHighLevelClientTestCase { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } + public void testReindex() throws IOException { + final String sourceIndex = "source1"; + final String destinationIndex = "dest"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destinationIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); + reindexRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: create 1 and update 1 + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destinationIndex); + BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); + assertEquals(1, bulkResponse.getCreated()); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index ebabb8f95b5..44b4ae05b57 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.RandomCreateIndexGenerator; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.QueryBuilder; @@ -126,6 +127,8 @@ import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalSpec; import org.elasticsearch.index.rankeval.RatedRequest; import org.elasticsearch.index.rankeval.RestRankEvalAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest; import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; @@ -172,6 +175,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE; import static org.elasticsearch.client.RequestConverters.enforceSameContentType; @@ -179,6 +183,7 @@ import static org.elasticsearch.index.RandomCreateIndexGenerator.randomAliases; import static org.elasticsearch.index.RandomCreateIndexGenerator.randomCreateIndexRequest; import static org.elasticsearch.index.RandomCreateIndexGenerator.randomIndexSettings; import static org.elasticsearch.index.alias.RandomAliasActionsGenerator.randomAliasAction; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; import static org.hamcrest.CoreMatchers.equalTo; @@ -407,6 +412,64 @@ public class RequestConvertersTests extends ESTestCase { assertToXContentBody(indicesAliasesRequest, request.getEntity()); } + public void testReindex() throws IOException { + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices("source_idx"); + reindexRequest.setDestIndex("dest_idx"); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + RemoteInfo remoteInfo = new RemoteInfo("http", "remote-host", 9200, null, + BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)), + "user", + "pass", + emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, + RemoteInfo.DEFAULT_CONNECT_TIMEOUT + ); + reindexRequest.setRemoteInfo(remoteInfo); + } + if (randomBoolean()) { + reindexRequest.setSourceDocTypes("doc", "tweet"); + } + if (randomBoolean()) { + reindexRequest.setSourceBatchSize(randomInt(100)); + } + if (randomBoolean()) { + reindexRequest.setDestDocType("tweet_and_doc"); + } + if (randomBoolean()) { + reindexRequest.setDestOpType("create"); + } + if (randomBoolean()) { + reindexRequest.setDestPipeline("my_pipeline"); + } + if (randomBoolean()) { + reindexRequest.setDestRouting("=cat"); + } + if (randomBoolean()) { + reindexRequest.setSize(randomIntBetween(100, 1000)); + } + if (randomBoolean()) { + reindexRequest.setAbortOnVersionConflict(false); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + reindexRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + } + if (reindexRequest.getRemoteInfo() == null && randomBoolean()) { + reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval")); + } + setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams); + expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep()); + Request request = RequestConverters.reindex(reindexRequest); + assertEquals("/_reindex", request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(reindexRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 15f2b80b252..d2585b6f3f4 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -660,7 +660,6 @@ public class RestHighLevelClientTests extends ESTestCase { "indices.put_alias", "mtermvectors", "put_script", - "reindex", "reindex_rethrottle", "render_search_template", "scripts_painless_execute", diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index ad41c139ddc..9c69a2a4836 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -50,6 +50,8 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -59,13 +61,22 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; +import org.elasticsearch.index.reindex.ScrollableHitSource; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.SortOrder; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -750,6 +761,144 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { } } + public void testReindex() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + createPipeline("my_pipeline"); + } + { + // tag::reindex-request + ReindexRequest request = new ReindexRequest(); // <1> + request.setSourceIndices("source1", "source2"); // <2> + request.setDestIndex("dest"); // <3> + // end::reindex-request + // tag::reindex-request-versionType + request.setDestVersionType(VersionType.EXTERNAL); // <1> + // end::reindex-request-versionType + // tag::reindex-request-opType + request.setDestOpType("create"); // <1> + // end::reindex-request-opType + // tag::reindex-request-conflicts + request.setConflicts("proceed"); // <1> + // end::reindex-request-conflicts + // tag::reindex-request-typeOrQuery + request.setSourceDocTypes("doc"); // <1> + request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::reindex-request-typeOrQuery + // tag::reindex-request-size + request.setSize(10); // <1> + // end::reindex-request-size + // tag::reindex-request-sourceSize + request.setSourceBatchSize(100); // <1> + // end::reindex-request-sourceSize + // tag::reindex-request-pipeline + request.setDestPipeline("my_pipeline"); // <1> + // end::reindex-request-pipeline + // tag::reindex-request-sort + request.addSortField("field1", SortOrder.DESC); // <1> + request.addSortField("field2", SortOrder.ASC); // <2> + // end::reindex-request-sort + // tag::reindex-request-script + request.setScript( + new Script( + ScriptType.INLINE, "painless", + "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", + Collections.emptyMap())); // <1> + // end::reindex-request-script + // tag::reindex-request-remote + request.setRemoteInfo( + new RemoteInfo( + "https", "localhost", 9002, null, new BytesArray(new MatchAllQueryBuilder().toString()), + "user", "pass", Collections.emptyMap(), new TimeValue(100, TimeUnit.MILLISECONDS), + new TimeValue(100, TimeUnit.SECONDS) + ) + ); // <1> + // end::reindex-request-remote + request.setRemoteInfo(null); // Remove it for tests + // tag::reindex-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::reindex-request-timeout + // tag::reindex-request-refresh + request.setRefresh(true); // <1> + // end::reindex-request-refresh + // tag::reindex-request-slices + request.setSlices(2); // <1> + // end::reindex-request-slices + // tag::reindex-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::reindex-request-scroll + + + // tag::reindex-execute + BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT); + // end::reindex-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::reindex-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long updatedDocs = bulkResponse.getUpdated(); // <4> + long createdDocs = bulkResponse.getCreated(); // <5> + long deletedDocs = bulkResponse.getDeleted(); // <6> + long batches = bulkResponse.getBatches(); // <7> + long noops = bulkResponse.getNoops(); // <8> + long versionConflicts = bulkResponse.getVersionConflicts(); // <9> + long bulkRetries = bulkResponse.getBulkRetries(); // <10> + long searchRetries = bulkResponse.getSearchRetries(); // <11> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <12> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <13> + List searchFailures = bulkResponse.getSearchFailures(); // <14> + List bulkFailures = bulkResponse.getBulkFailures(); // <15> + // end::reindex-response + } + { + ReindexRequest request = new ReindexRequest(); + request.setSourceIndices("source1"); + request.setDestIndex("dest"); + + // tag::reindex-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::reindex-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::reindex-execute-async + client.reindexAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::reindex-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/reindex.asciidoc b/docs/java-rest/high-level/document/reindex.asciidoc new file mode 100644 index 00000000000..b6d98b42dc5 --- /dev/null +++ b/docs/java-rest/high-level/document/reindex.asciidoc @@ -0,0 +1,215 @@ +[[java-rest-high-document-reindex]] +=== Reindex API + +[[java-rest-high-document-reindex-request]] +==== Reindex Request + +A `ReindexRequest` can be used to copy documents from one or more indexes into a destination index. + +It requires an existing source index and a target index which may or may not exist pre-request. Reindex does not attempt +to set up the destination index. It does not copy the settings of the source index. You should set up the destination +index prior to running a _reindex action, including setting up mappings, shard counts, replicas, etc. + +The simplest form of a `ReindexRequest` looks like follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request] +-------------------------------------------------- +<1> Creates the `ReindexRequest` +<2> Adds a list of sources to copy from +<3> Adds the destination index + +The `dest` element can be configured like the index API to control optimistic concurrency control. Just leaving out +`versionType` (as above) or setting it to internal will cause Elasticsearch to blindly dump documents into the target. +Setting `versionType` to external will cause Elasticsearch to preserve the version from the source, create any documents +that are missing, and update any documents that have an older version in the destination index than they do in the +source index. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-versionType] +-------------------------------------------------- +<1> Set the versionType to `EXTERNAL` + +Setting `opType` to `create` will cause `_reindex` to only create missing documents in the target index. All existing +documents will cause a version conflict. The default `opType` is `index`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-opType] +-------------------------------------------------- +<1> Set the opType to `create` + +By default version conflicts abort the `_reindex` process but you can just count them by settings it to `proceed` +in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `_reindex` uses batches of 1000. You can change the batch size with `sourceBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sourceSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +Reindex can also use the ingest feature by specifying a `pipeline`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-pipeline] +-------------------------------------------------- +<1> set pipeline to `my_pipeline` + +If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more +selective query to size and sort. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-sort] +-------------------------------------------------- +<1> add descending sort to`field1` +<2> add ascending sort to `field2` + +`ReindexRequest` also supports a `script` that modifies the document. It allows you to also change the document's +metadata. The following example illustrates that. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-script] +-------------------------------------------------- +<1> `setScript` to increment the `likes` field on all documents with user `kimchy`. + +`ReindexRequest` supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be +specified inside the `RemoteInfo` object and not using `setSourceQuery`. If both the remote info and the source query are +set it results in a validation error during the request. The reason for this is that the remote Elasticsearch may not +understand queries built by the modern query builders. The remote cluster support works all the way back to Elasticsearch +0.90 and the query language has changed since then. When reaching older versions, it is safer to write the query by hand +in JSON. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-remote] +-------------------------------------------------- +<1> set remote elastic cluster + +`ReindexRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`ReindexRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-scroll] +-------------------------------------------------- +<1> set scroll time + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the reindex request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling reindex + + +[[java-rest-high-document-reindex-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute] +-------------------------------------------------- + +[[java-rest-high-document-reindex-async]] +==== Asynchronous Execution + +The asynchronous execution of a reindex request requires both the `ReindexRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-async] +-------------------------------------------------- +<1> The `ReindexRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `ReindexRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-reindex-response]] +==== Reindex Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[reindex-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were updated +<5> Number of docs that were created +<6> Number of docs that were deleted +<7> Number of batches that were executed +<8> Number of skipped docs +<9> Number of version conflicts +<10> Number of times request had to retry bulk index operations +<11> Number of times request had to retry search operations +<12> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<13> Remaining delay of any current throttle sleep or 0 if not sleeping +<14> Failures during search phase +<15> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index e04e391f3e0..64c95912b5e 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -15,6 +15,7 @@ Single document APIs:: Multi-document APIs:: * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -23,6 +24,7 @@ include::document/delete.asciidoc[] include::document/update.asciidoc[] include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] +include::document/reindex.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index a5520c90b0f..50d01535d7f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -118,7 +117,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler PARSER = + new ConstructingObjectParser<>( + "bulk_failures", + true, + a -> + new Failure( + (String)a[0], (String)a[1], (String)a[2], (Exception)a[3], RestStatus.fromCode((int)a[4]) + ) + ); + static { + PARSER.declareString(constructorArg(), new ParseField(INDEX_FIELD)); + PARSER.declareString(constructorArg(), new ParseField(TYPE_FIELD)); + PARSER.declareString(optionalConstructorArg(), new ParseField(ID_FIELD)); + PARSER.declareObject(constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), new ParseField(CAUSE_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(STATUS_FIELD)); + } + /** * For write failures before operation was assigned a sequence number. * @@ -322,6 +343,10 @@ public class BulkItemResponse implements Streamable, StatusToXContentObject { return builder; } + public static Failure fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + @Override public String toString() { return Strings.toString(this); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 8536337bfdb..3b635c82387 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -252,6 +252,14 @@ public abstract class AbstractBulkByScrollRequest searchFailures; private boolean timedOut; + private static final String TOOK_FIELD = "took"; + private static final String TIMED_OUT_FIELD = "timed_out"; + private static final String FAILURES_FIELD = "failures"; + + @SuppressWarnings("unchecked") + private static final ObjectParser PARSER = + new ObjectParser<>( + "bulk_by_scroll_response", + true, + BulkByScrollResponseBuilder::new + ); + static { + PARSER.declareLong(BulkByScrollResponseBuilder::setTook, new ParseField(TOOK_FIELD)); + PARSER.declareBoolean(BulkByScrollResponseBuilder::setTimedOut, new ParseField(TIMED_OUT_FIELD)); + PARSER.declareObjectArray( + BulkByScrollResponseBuilder::setFailures, (p, c) -> parseFailure(p), new ParseField(FAILURES_FIELD) + ); + // since the result of BulkByScrollResponse.Status are mixed we also parse that in this + Status.declareFields(PARSER); + } + public BulkByScrollResponse() { } @@ -87,6 +118,10 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr return status.getCreated(); } + public long getTotal() { + return status.getTotal(); + } + public long getDeleted() { return status.getDeleted(); } @@ -171,8 +206,8 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("took", took.millis()); - builder.field("timed_out", timedOut); + builder.field(TOOK_FIELD, took.millis()); + builder.field(TIMED_OUT_FIELD, timedOut); status.innerXContent(builder, params); builder.startArray("failures"); for (Failure failure: bulkFailures) { @@ -187,6 +222,80 @@ public class BulkByScrollResponse extends ActionResponse implements ToXContentFr return builder; } + public static BulkByScrollResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null).buildResponse(); + } + + private static Object parseFailure(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + Token token; + String index = null; + String type = null; + String id = null; + Integer status = null; + Integer shardId = null; + String nodeId = null; + ElasticsearchException bulkExc = null; + ElasticsearchException searchExc = null; + while ((token = parser.nextToken()) != Token.END_OBJECT) { + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + String name = parser.currentName(); + token = parser.nextToken(); + if (token == Token.START_ARRAY) { + parser.skipChildren(); + } else if (token == Token.START_OBJECT) { + switch (name) { + case SearchFailure.REASON_FIELD: + bulkExc = ElasticsearchException.fromXContent(parser); + break; + case Failure.CAUSE_FIELD: + searchExc = ElasticsearchException.fromXContent(parser); + break; + default: + parser.skipChildren(); + } + } else if (token == Token.VALUE_STRING) { + switch (name) { + // This field is the same as SearchFailure.index + case Failure.INDEX_FIELD: + index = parser.text(); + break; + case Failure.TYPE_FIELD: + type = parser.text(); + break; + case Failure.ID_FIELD: + id = parser.text(); + break; + case SearchFailure.NODE_FIELD: + nodeId = parser.text(); + break; + default: + // Do nothing + break; + } + } else if (token == Token.VALUE_NUMBER) { + switch (name) { + case Failure.STATUS_FIELD: + status = parser.intValue(); + break; + case SearchFailure.SHARD_FIELD: + shardId = parser.intValue(); + break; + default: + // Do nothing + break; + } + } + } + if (bulkExc != null) { + return new Failure(index, type, id, bulkExc, RestStatus.fromCode(status)); + } else if (searchExc != null) { + return new SearchFailure(searchExc, index, shardId, nodeId); + } else { + throw new ElasticsearchParseException("failed to parse failures array. At least one of {reason,cause} must be present"); + } + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java new file mode 100644 index 00000000000..ad5bfd6e03c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollResponseBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; +import org.elasticsearch.index.reindex.BulkByScrollTask.StatusBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Helps build a {@link BulkByScrollResponse}. Used by an instance of {@link ObjectParser} when parsing from XContent. + */ +class BulkByScrollResponseBuilder extends StatusBuilder { + private TimeValue took; + private BulkByScrollTask.Status status; + private List bulkFailures = new ArrayList<>(); + private List searchFailures = new ArrayList<>(); + private boolean timedOut; + + BulkByScrollResponseBuilder() {} + + public void setTook(long took) { + setTook(new TimeValue(took, TimeUnit.MILLISECONDS)); + } + + public void setTook(TimeValue took) { + this.took = took; + } + + public void setStatus(BulkByScrollTask.Status status) { + this.status = status; + } + + public void setFailures(List failures) { + if (failures != null) { + for (Object object: failures) { + if (object instanceof Failure) { + bulkFailures.add((Failure) object); + } else if (object instanceof SearchFailure) { + searchFailures.add((SearchFailure) object); + } + } + } + } + + public void setTimedOut(boolean timedOut) { + this.timedOut = timedOut; + } + + public BulkByScrollResponse buildResponse() { + status = super.buildStatus(); + return new BulkByScrollResponse(took, status, bulkFailures, searchFailures, timedOut); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 66e83907d49..5beb86fae6b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -21,27 +21,40 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParseException; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; import static java.lang.Math.min; import static java.util.Collections.emptyList; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Task storing information about a currently running BulkByScroll request. @@ -188,6 +201,120 @@ public class BulkByScrollTask extends CancellableTask { return true; } + /** + * This class acts as a builder for {@link Status}. Once the {@link Status} object is built by calling + * {@link #buildStatus()} it is immutable. Used by an instance of {@link ObjectParser} when parsing from + * XContent. + */ + public static class StatusBuilder { + private Integer sliceId = null; + private Long total = null; + private Long updated = null; + private Long created = null; + private Long deleted = null; + private Integer batches = null; + private Long versionConflicts = null; + private Long noops = null; + private Long bulkRetries = null; + private Long searchRetries = null; + private TimeValue throttled = null; + private Float requestsPerSecond = null; + private String reasonCancelled = null; + private TimeValue throttledUntil = null; + private List sliceStatuses = emptyList(); + + public void setSliceId(Integer sliceId) { + this.sliceId = sliceId; + } + + public void setTotal(Long total) { + this.total = total; + } + + public void setUpdated(Long updated) { + this.updated = updated; + } + + public void setCreated(Long created) { + this.created = created; + } + + public void setDeleted(Long deleted) { + this.deleted = deleted; + } + + public void setBatches(Integer batches) { + this.batches = batches; + } + + public void setVersionConflicts(Long versionConflicts) { + this.versionConflicts = versionConflicts; + } + + public void setNoops(Long noops) { + this.noops = noops; + } + + public void setRetries(Tuple retries) { + if (retries != null) { + setBulkRetries(retries.v1()); + setSearchRetries(retries.v2()); + } + } + + public void setBulkRetries(Long bulkRetries) { + this.bulkRetries = bulkRetries; + } + + public void setSearchRetries(Long searchRetries) { + this.searchRetries = searchRetries; + } + + public void setThrottled(Long throttled) { + if (throttled != null) { + this.throttled = new TimeValue(throttled, TimeUnit.MILLISECONDS); + } + } + + public void setRequestsPerSecond(Float requestsPerSecond) { + if (requestsPerSecond != null) { + requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + this.requestsPerSecond = requestsPerSecond; + } + } + + public void setReasonCancelled(String reasonCancelled) { + this.reasonCancelled = reasonCancelled; + } + + public void setThrottledUntil(Long throttledUntil) { + if (throttledUntil != null) { + this.throttledUntil = new TimeValue(throttledUntil, TimeUnit.MILLISECONDS); + } + } + + public void setSliceStatuses(List sliceStatuses) { + if (sliceStatuses != null) { + this.sliceStatuses = sliceStatuses; + } + } + + public Status buildStatus() { + if (sliceStatuses.isEmpty()) { + try { + return new Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil + ); + } catch (NullPointerException npe) { + throw new IllegalArgumentException("a required field is null when building Status"); + } + } else { + return new Status(sliceStatuses, reasonCancelled); + } + } + } + public static class Status implements Task.Status, SuccessfullyProcessed { public static final String NAME = "bulk-by-scroll"; @@ -203,6 +330,76 @@ public class BulkByScrollTask extends CancellableTask { */ public static final String INCLUDE_UPDATED = "include_updated"; + public static final String SLICE_ID_FIELD = "slice_id"; + public static final String TOTAL_FIELD = "total"; + public static final String UPDATED_FIELD = "updated"; + public static final String CREATED_FIELD = "created"; + public static final String DELETED_FIELD = "deleted"; + public static final String BATCHES_FIELD = "batches"; + public static final String VERSION_CONFLICTS_FIELD = "version_conflicts"; + public static final String NOOPS_FIELD = "noops"; + public static final String RETRIES_FIELD = "retries"; + public static final String RETRIES_BULK_FIELD = "bulk"; + public static final String RETRIES_SEARCH_FIELD = "search"; + public static final String THROTTLED_RAW_FIELD = "throttled_millis"; + public static final String THROTTLED_HR_FIELD = "throttled"; + public static final String REQUESTS_PER_SEC_FIELD = "requests_per_second"; + public static final String CANCELED_FIELD = "canceled"; + public static final String THROTTLED_UNTIL_RAW_FIELD = "throttled_until_millis"; + public static final String THROTTLED_UNTIL_HR_FIELD = "throttled_until"; + public static final String SLICES_FIELD = "slices"; + + public static Set FIELDS_SET = new HashSet<>(); + static { + FIELDS_SET.add(SLICE_ID_FIELD); + FIELDS_SET.add(TOTAL_FIELD); + FIELDS_SET.add(UPDATED_FIELD); + FIELDS_SET.add(CREATED_FIELD); + FIELDS_SET.add(DELETED_FIELD); + FIELDS_SET.add(BATCHES_FIELD); + FIELDS_SET.add(VERSION_CONFLICTS_FIELD); + FIELDS_SET.add(NOOPS_FIELD); + FIELDS_SET.add(RETRIES_FIELD); + // No need for inner level fields for retries in the set of outer level fields + FIELDS_SET.add(THROTTLED_RAW_FIELD); + FIELDS_SET.add(THROTTLED_HR_FIELD); + FIELDS_SET.add(REQUESTS_PER_SEC_FIELD); + FIELDS_SET.add(CANCELED_FIELD); + FIELDS_SET.add(THROTTLED_UNTIL_RAW_FIELD); + FIELDS_SET.add(THROTTLED_UNTIL_HR_FIELD); + FIELDS_SET.add(SLICES_FIELD); + } + + @SuppressWarnings("unchecked") + static ConstructingObjectParser, Void> RETRIES_PARSER = new ConstructingObjectParser<>( + "bulk_by_scroll_task_status_retries", + true, + a -> new Tuple(a[0], a[1]) + ); + static { + RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_BULK_FIELD)); + RETRIES_PARSER.declareLong(constructorArg(), new ParseField(RETRIES_SEARCH_FIELD)); + } + + public static void declareFields(ObjectParser parser) { + parser.declareInt(StatusBuilder::setSliceId, new ParseField(SLICE_ID_FIELD)); + parser.declareLong(StatusBuilder::setTotal, new ParseField(TOTAL_FIELD)); + parser.declareLong(StatusBuilder::setUpdated, new ParseField(UPDATED_FIELD)); + parser.declareLong(StatusBuilder::setCreated, new ParseField(CREATED_FIELD)); + parser.declareLong(StatusBuilder::setDeleted, new ParseField(DELETED_FIELD)); + parser.declareInt(StatusBuilder::setBatches, new ParseField(BATCHES_FIELD)); + parser.declareLong(StatusBuilder::setVersionConflicts, new ParseField(VERSION_CONFLICTS_FIELD)); + parser.declareLong(StatusBuilder::setNoops, new ParseField(NOOPS_FIELD)); + parser.declareObject(StatusBuilder::setRetries, RETRIES_PARSER, new ParseField(RETRIES_FIELD)); + parser.declareLong(StatusBuilder::setThrottled, new ParseField(THROTTLED_RAW_FIELD)); + parser.declareFloat(StatusBuilder::setRequestsPerSecond, new ParseField(REQUESTS_PER_SEC_FIELD)); + parser.declareString(StatusBuilder::setReasonCancelled, new ParseField(CANCELED_FIELD)); + parser.declareLong(StatusBuilder::setThrottledUntil, new ParseField(THROTTLED_UNTIL_RAW_FIELD)); + parser.declareObjectArray( + StatusBuilder::setSliceStatuses, (p, c) -> StatusOrException.fromXContent(p), new ParseField(SLICES_FIELD) + ); + } + private final Integer sliceId; private final long total; private final long updated; @@ -353,35 +550,40 @@ public class BulkByScrollTask extends CancellableTask { return builder.endObject(); } + /** + * We need to write a manual parser for this because of {@link StatusOrException}. Since + * {@link StatusOrException#fromXContent(XContentParser)} tries to peek at a field first before deciding + * what needs to be it cannot use an {@link ObjectParser}. + */ public XContentBuilder innerXContent(XContentBuilder builder, Params params) throws IOException { if (sliceId != null) { - builder.field("slice_id", sliceId); + builder.field(SLICE_ID_FIELD, sliceId); } - builder.field("total", total); + builder.field(TOTAL_FIELD, total); if (params.paramAsBoolean(INCLUDE_UPDATED, true)) { - builder.field("updated", updated); + builder.field(UPDATED_FIELD, updated); } if (params.paramAsBoolean(INCLUDE_CREATED, true)) { - builder.field("created", created); + builder.field(CREATED_FIELD, created); } - builder.field("deleted", deleted); - builder.field("batches", batches); - builder.field("version_conflicts", versionConflicts); - builder.field("noops", noops); - builder.startObject("retries"); { - builder.field("bulk", bulkRetries); - builder.field("search", searchRetries); + builder.field(DELETED_FIELD, deleted); + builder.field(BATCHES_FIELD, batches); + builder.field(VERSION_CONFLICTS_FIELD, versionConflicts); + builder.field(NOOPS_FIELD, noops); + builder.startObject(RETRIES_FIELD); { + builder.field(RETRIES_BULK_FIELD, bulkRetries); + builder.field(RETRIES_SEARCH_FIELD, searchRetries); } builder.endObject(); - builder.humanReadableField("throttled_millis", "throttled", throttled); - builder.field("requests_per_second", requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond); + builder.humanReadableField(THROTTLED_RAW_FIELD, THROTTLED_HR_FIELD, throttled); + builder.field(REQUESTS_PER_SEC_FIELD, requestsPerSecond == Float.POSITIVE_INFINITY ? -1 : requestsPerSecond); if (reasonCancelled != null) { - builder.field("canceled", reasonCancelled); + builder.field(CANCELED_FIELD, reasonCancelled); } - builder.humanReadableField("throttled_until_millis", "throttled_until", throttledUntil); + builder.humanReadableField(THROTTLED_UNTIL_RAW_FIELD, THROTTLED_UNTIL_HR_FIELD, throttledUntil); if (false == sliceStatuses.isEmpty()) { - builder.startArray("slices"); + builder.startArray(SLICES_FIELD); for (StatusOrException slice : sliceStatuses) { if (slice == null) { builder.nullValue(); @@ -394,6 +596,114 @@ public class BulkByScrollTask extends CancellableTask { return builder; } + public static Status fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token; + if (parser.currentToken() == Token.START_OBJECT) { + token = parser.nextToken(); + } else { + token = parser.nextToken(); + } + ensureExpectedToken(Token.START_OBJECT, token, parser::getTokenLocation); + token = parser.nextToken(); + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + return innerFromXContent(parser); + } + + public static Status innerFromXContent(XContentParser parser) throws IOException { + Token token = parser.currentToken(); + String fieldName = parser.currentName(); + ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); + Integer sliceId = null; + Long total = null; + Long updated = null; + Long created = null; + Long deleted = null; + Integer batches = null; + Long versionConflicts = null; + Long noOps = null; + Long bulkRetries = null; + Long searchRetries = null; + TimeValue throttled = null; + Float requestsPerSecond = null; + String reasonCancelled = null; + TimeValue throttledUntil = null; + List sliceStatuses = new ArrayList<>(); + while ((token = parser.nextToken()) != Token.END_OBJECT) { + if (token == Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == Token.START_OBJECT) { + if (fieldName.equals(Status.RETRIES_FIELD)) { + Tuple retries = + Status.RETRIES_PARSER.parse(parser, null); + bulkRetries = retries.v1(); + searchRetries = retries.v2(); + } else { + parser.skipChildren(); + } + } else if (token == Token.START_ARRAY) { + if (fieldName.equals(Status.SLICES_FIELD)) { + while ((token = parser.nextToken()) != Token.END_ARRAY) { + sliceStatuses.add(StatusOrException.fromXContent(parser)); + } + } else { + parser.skipChildren(); + } + } else { // else if it is a value + switch (fieldName) { + case Status.SLICE_ID_FIELD: + sliceId = parser.intValue(); + break; + case Status.TOTAL_FIELD: + total = parser.longValue(); + break; + case Status.UPDATED_FIELD: + updated = parser.longValue(); + break; + case Status.CREATED_FIELD: + created = parser.longValue(); + break; + case Status.DELETED_FIELD: + deleted = parser.longValue(); + break; + case Status.BATCHES_FIELD: + batches = parser.intValue(); + break; + case Status.VERSION_CONFLICTS_FIELD: + versionConflicts = parser.longValue(); + break; + case Status.NOOPS_FIELD: + noOps = parser.longValue(); + break; + case Status.THROTTLED_RAW_FIELD: + throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + break; + case Status.REQUESTS_PER_SEC_FIELD: + requestsPerSecond = parser.floatValue(); + requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + break; + case Status.CANCELED_FIELD: + reasonCancelled = parser.text(); + break; + case Status.THROTTLED_UNTIL_RAW_FIELD: + throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + break; + default: + break; + } + } + } + if (sliceStatuses.isEmpty()) { + return + new Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries, + searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil + ); + } else { + return new Status(sliceStatuses, reasonCancelled); + } + + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -520,6 +830,44 @@ public class BulkByScrollTask extends CancellableTask { return sliceStatuses; } + @Override + public int hashCode() { + return Objects.hash( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, searchRetries, + bulkRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil, sliceStatuses + ); + } + + public boolean equalsWithoutSliceStatus(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status other = (Status) o; + return + Objects.equals(sliceId, other.sliceId) && + total == other.total && + updated == other.updated && + created == other.created && + deleted == other.deleted && + batches == other.batches && + versionConflicts == other.versionConflicts && + noops == other.noops && + searchRetries == other.searchRetries && + bulkRetries == other.bulkRetries && + Objects.equals(throttled, other.throttled) && + requestsPerSecond == other.requestsPerSecond && + Objects.equals(reasonCancelled, other.reasonCancelled) && + Objects.equals(throttledUntil, other.throttledUntil); + } + + @Override + public boolean equals(Object o) { + if (equalsWithoutSliceStatus(o)) { + return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses); + } else { + return false; + } + } + private int checkPositive(int value, String name) { if (value < 0) { throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]"); @@ -543,6 +891,19 @@ public class BulkByScrollTask extends CancellableTask { private final Status status; private final Exception exception; + public static Set EXPECTED_EXCEPTION_FIELDS = new HashSet<>(); + static { + EXPECTED_EXCEPTION_FIELDS.add("type"); + EXPECTED_EXCEPTION_FIELDS.add("reason"); + EXPECTED_EXCEPTION_FIELDS.add("caused_by"); + EXPECTED_EXCEPTION_FIELDS.add("suppressed"); + EXPECTED_EXCEPTION_FIELDS.add("stack_trace"); + EXPECTED_EXCEPTION_FIELDS.add("header"); + EXPECTED_EXCEPTION_FIELDS.add("error"); + EXPECTED_EXCEPTION_FIELDS.add("root_cause"); + } + + public StatusOrException(Status status) { this.status = status; exception = null; @@ -597,6 +958,48 @@ public class BulkByScrollTask extends CancellableTask { return builder; } + /** + * Since {@link StatusOrException} can contain either an {@link Exception} or a {@link Status} we need to peek + * at a field first before deciding what needs to be parsed since the same object could contains either. + * The {@link #EXPECTED_EXCEPTION_FIELDS} contains the fields that are expected when the serialised object + * was an instance of exception and the {@link Status#FIELDS_SET} is the set of fields expected when the + * serialized object was an instance of Status. + */ + public static StatusOrException fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + if (token == Token.VALUE_NULL) { + return null; + } else { + ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation); + token = parser.nextToken(); + // This loop is present only to ignore unknown tokens. It breaks as soon as we find a field + // that is allowed. + while (token != Token.END_OBJECT) { + ensureExpectedToken(Token.FIELD_NAME, token, parser::getTokenLocation); + String fieldName = parser.currentName(); + // weird way to ignore unknown tokens + if (Status.FIELDS_SET.contains(fieldName)) { + return new StatusOrException( + Status.innerFromXContent(parser) + ); + } else if (EXPECTED_EXCEPTION_FIELDS.contains(fieldName)){ + return new StatusOrException(ElasticsearchException.innerFromXContent(parser, false)); + } else { + // Ignore unknown tokens + token = parser.nextToken(); + if (token == Token.START_OBJECT || token == Token.START_ARRAY) { + parser.skipChildren(); + } + token = parser.nextToken(); + } + } + throw new XContentParseException("Unable to parse StatusFromException. Expected fields not found."); + } + } + @Override public boolean equals(Object obj) { if (obj == null) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java index e45d039edae..52a1c89d4b3 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java @@ -26,6 +26,11 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -39,7 +44,8 @@ import static org.elasticsearch.index.VersionType.INTERNAL; * of reasons, not least of which that scripts are allowed to change the destination request in drastic ways, including changing the index * to which documents are written. */ -public class ReindexRequest extends AbstractBulkIndexByScrollRequest implements CompositeIndicesRequest { +public class ReindexRequest extends AbstractBulkIndexByScrollRequest + implements CompositeIndicesRequest, ToXContentObject { /** * Prototype for index requests. */ @@ -48,9 +54,10 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest 0) { + builder.field("size", getSize()); + } + if (getScript() != null) { + builder.field("script", getScript()); + } + if (isAbortOnVersionConflict() == false) { + builder.field("conflicts", "proceed"); + } + } + builder.endObject(); + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java b/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java index 3ebd261b584..e255b4db34e 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.HashMap; @@ -35,7 +37,7 @@ import static java.util.Collections.unmodifiableMap; import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; -public class RemoteInfo implements Writeable { +public class RemoteInfo implements Writeable, ToXContentObject { /** * Default {@link #socketTimeout} for requests that don't have one set. */ @@ -190,4 +192,25 @@ public class RemoteInfo implements Writeable { } return b.toString(); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (username != null) { + builder.field("username", username); + } + if (password != null) { + builder.field("password", password); + } + builder.field("host", scheme + "://" + host + ":" + port + + (pathPrefix == null ? "" : "/" + pathPrefix)); + if (headers.size() >0 ) { + builder.field("headers", headers); + } + builder.field("socket_timeout", socketTimeout.getStringRep()); + builder.field("connect_timeout", connectTimeout.getStringRep()); + builder.field("query", query); + builder.endObject(); + return builder; + } } diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 917b57a9c97..a3901bb7a56 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -284,6 +284,11 @@ public abstract class ScrollableHitSource { @Nullable private final String nodeId; + public static final String INDEX_FIELD = "index"; + public static final String SHARD_FIELD = "shard"; + public static final String NODE_FIELD = "node"; + public static final String REASON_FIELD = "reason"; + public SearchFailure(Throwable reason, @Nullable String index, @Nullable Integer shardId, @Nullable String nodeId) { this.index = index; this.shardId = shardId; @@ -337,15 +342,15 @@ public abstract class ScrollableHitSource { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (index != null) { - builder.field("index", index); + builder.field(INDEX_FIELD, index); } if (shardId != null) { - builder.field("shard", shardId); + builder.field(SHARD_FIELD, shardId); } if (nodeId != null) { - builder.field("node", nodeId); + builder.field(NODE_FIELD, nodeId); } - builder.field("reason"); + builder.field(REASON_FIELD); { builder.startObject(); ElasticsearchException.generateThrowableXContent(builder, params, reason); diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index c7564dc5ea8..92ae481a830 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -1150,9 +1150,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R } } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { if (from != -1) { builder.field(FROM_FIELD.getPreferredName(), from); } @@ -1290,6 +1288,13 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R if (collapse != null) { builder.field(COLLAPSE.getPreferredName(), collapse); } + return builder; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder, params); builder.endObject(); return builder; } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java index a288328391a..0dd4d6bc849 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollResponseTests.java @@ -24,7 +24,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; import java.util.List; @@ -33,8 +34,9 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; +import static org.hamcrest.Matchers.containsString; -public class BulkByScrollResponseTests extends ESTestCase { +public class BulkByScrollResponseTests extends AbstractXContentTestCase { public void testRountTrip() throws IOException { BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()), @@ -94,4 +96,49 @@ public class BulkByScrollResponseTests extends ESTestCase { assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage()); } } + + @Override + protected void assertEqualInstances(BulkByScrollResponse expected, BulkByScrollResponse actual) { + assertEquals(expected.getTook(), actual.getTook()); + BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + assertEquals(expected.getBulkFailures().size(), actual.getBulkFailures().size()); + for (int i = 0; i < expected.getBulkFailures().size(); i++) { + Failure expectedFailure = expected.getBulkFailures().get(i); + Failure actualFailure = actual.getBulkFailures().get(i); + assertEquals(expectedFailure.getIndex(), actualFailure.getIndex()); + assertEquals(expectedFailure.getType(), actualFailure.getType()); + assertEquals(expectedFailure.getId(), actualFailure.getId()); + assertThat(expectedFailure.getMessage(), containsString(actualFailure.getMessage())); + assertEquals(expectedFailure.getStatus(), actualFailure.getStatus()); + } + assertEquals(expected.getSearchFailures().size(), actual.getSearchFailures().size()); + for (int i = 0; i < expected.getSearchFailures().size(); i++) { + ScrollableHitSource.SearchFailure expectedFailure = expected.getSearchFailures().get(i); + ScrollableHitSource.SearchFailure actualFailure = actual.getSearchFailures().get(i); + assertEquals(expectedFailure.getIndex(), actualFailure.getIndex()); + assertEquals(expectedFailure.getShardId(), actualFailure.getShardId()); + assertEquals(expectedFailure.getNodeId(), actualFailure.getNodeId()); + assertThat(expectedFailure.getReason().getMessage(), containsString(actualFailure.getReason().getMessage())); + } + } + + @Override + protected BulkByScrollResponse createTestInstance() { + // failures are tested separately, so we can test XContent equivalence at least when we have no failures + return + new BulkByScrollResponse( + timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatusWithoutException(), + emptyList(), emptyList(), randomBoolean() + ); + } + + @Override + protected BulkByScrollResponse doParseInstance(XContentParser parser) throws IOException { + return BulkByScrollResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java new file mode 100644 index 00000000000..33c56bacd91 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.index.reindex.BulkByScrollTask.StatusOrException; + +import java.io.IOException; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.containsString; + +public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTestCase { + @Override + protected StatusOrException createTestInstance() { + // failures are tested separately, so we can test XContent equivalence at least when we have no failures + return createTestInstanceWithoutExceptions(); + } + + static StatusOrException createTestInstanceWithoutExceptions() { + return new StatusOrException(BulkByScrollTaskStatusTests.randomStatusWithoutException()); + } + + static StatusOrException createTestInstanceWithExceptions() { + if (randomBoolean()) { + return new StatusOrException(new ElasticsearchException("test_exception")); + } else { + return new StatusOrException(BulkByScrollTaskStatusTests.randomStatus()); + } + } + + @Override + protected StatusOrException doParseInstance(XContentParser parser) throws IOException { + return StatusOrException.fromXContent(parser); + } + + public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) { + if (expected != null && actual != null) { + assertNotSame(expected, actual); + if (expected.getException() == null) { + BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + } else { + assertThat( + actual.getException().getMessage(), + containsString(expected.getException().getMessage()) + ); + } + } else { + // If one of them is null both of them should be null + assertSame(expected, actual); + } + } + + @Override + protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) { + assertEqualStatusOrException(expected, actual); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + /** + * Test parsing {@link StatusOrException} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = BulkByScrollTaskStatusOrExceptionTests::createTestInstanceWithExceptions; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java index dff07e0f215..368e1b3bdac 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java @@ -23,20 +23,27 @@ import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; import org.hamcrest.Matchers; +import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import java.util.stream.IntStream; import static java.lang.Math.abs; import static java.util.stream.Collectors.toList; import static org.apache.lucene.util.TestUtil.randomSimpleString; -import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; +import static org.hamcrest.Matchers.equalTo; -public class BulkByScrollTaskStatusTests extends ESTestCase { +public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase { public void testBulkByTaskStatus() throws IOException { BulkByScrollTask.Status status = randomStatus(); BytesStreamOutput out = new BytesStreamOutput(); @@ -98,6 +105,22 @@ public class BulkByScrollTaskStatusTests extends ESTestCase { return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); } + public static BulkByScrollTask.Status randomStatusWithoutException() { + if (randomBoolean()) { + return randomWorkingStatus(null); + } + boolean canHaveNullStatues = randomBoolean(); + List statuses = IntStream.range(0, between(0, 10)) + .mapToObj(i -> { + if (canHaveNullStatues && LuceneTestCase.rarely()) { + return null; + } + return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i)); + }) + .collect(toList()); + return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); + } + private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { // These all should be believably small because we sum them if we have multiple workers int total = between(0, 10000000); @@ -109,8 +132,65 @@ public class BulkByScrollTaskStatusTests extends ESTestCase { long versionConflicts = between(0, total); long bulkRetries = between(0, 10000000); long searchRetries = between(0, 100000); - return new BulkByScrollTask.Status(sliceId, total, updated, created, deleted, batches, versionConflicts, noops, bulkRetries, - searchRetries, parseTimeValue(randomPositiveTimeValue(), "test"), abs(Randomness.get().nextFloat()), - randomBoolean() ? null : randomSimpleString(Randomness.get()), parseTimeValue(randomPositiveTimeValue(), "test")); + // smallest unit of time during toXContent is Milliseconds + TimeUnit[] timeUnits = {TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS}; + TimeValue throttled = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + TimeValue throttledUntil = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + return + new BulkByScrollTask.Status( + sliceId, total, updated, created, deleted, batches, versionConflicts, noops, + bulkRetries, searchRetries, throttled, abs(Randomness.get().nextFloat()), + randomBoolean() ? null : randomSimpleString(Randomness.get()), throttledUntil + ); + } + + public static void assertEqualStatus(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { + assertNotSame(expected, actual); + assertTrue(expected.equalsWithoutSliceStatus(actual)); + assertThat(expected.getSliceStatuses().size(), equalTo(actual.getSliceStatuses().size())); + for (int i = 0; i< expected.getSliceStatuses().size(); i++) { + BulkByScrollTaskStatusOrExceptionTests.assertEqualStatusOrException( + expected.getSliceStatuses().get(i), + actual.getSliceStatuses().get(i) + ); + } + } + + @Override + protected void assertEqualInstances(BulkByScrollTask.Status first, BulkByScrollTask.Status second) { + assertEqualStatus(first, second); + } + + @Override + protected BulkByScrollTask.Status createTestInstance() { + // failures are tested separately, so we can test xcontent equivalence at least when we have no failures + return randomStatusWithoutException(); + } + + @Override + protected BulkByScrollTask.Status doParseInstance(XContentParser parser) throws IOException { + return BulkByScrollTask.Status.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + /** + * Test parsing {@link Status} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = BulkByScrollTaskStatusTests::randomStatus; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, Strings.EMPTY_ARRAY, + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, ToXContent.EMPTY_PARAMS); } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 6c1988a1440..1c3d539263e 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -20,8 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.search.slice.SliceBuilder; @@ -89,9 +87,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase { private void reindex(ParentTaskAssigningClient parentAwareClient, String index, String newIndex, ActionListener listener) { - SearchRequest sourceRequest = new SearchRequest(index); - sourceRequest.types(types); - IndexRequest destinationRequest = new IndexRequest(newIndex); - ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest); + ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(index); + reindexRequest.setSourceDocTypes(types); + reindexRequest.setDestIndex(newIndex); reindexRequest.setRefresh(true); reindexRequest.setScript(transformScript); parentAwareClient.execute(ReindexAction.INSTANCE, reindexRequest, listener);