diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index c45a8e1005e..5e74262fa20 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -107,7 +107,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.protocol.xpack.license.DeleteLicenseRequest; @@ -837,6 +839,33 @@ final class RequestConverters { return request; } + static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException { + String endpoint = + endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query"); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request) + .withRouting(updateByQueryRequest.getRouting()) + .withPipeline(updateByQueryRequest.getPipeline()) + .withRefresh(updateByQueryRequest.isRefresh()) + .withTimeout(updateByQueryRequest.getTimeout()) + .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) + .withIndicesOptions(updateByQueryRequest.indicesOptions()); + if (updateByQueryRequest.isAbortOnVersionConflict() == false) { + params.putParam("conflicts", "proceed"); + } + if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { + params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize())); + } + if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { + params.putParam("scroll", updateByQueryRequest.getScrollTime()); + } + if (updateByQueryRequest.getSize() > 0) { + params.putParam("size", Integer.toString(updateByQueryRequest.getSize())); + } + request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 4ac5bfd080f..6e3c5a6fb83 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; @@ -424,6 +425,35 @@ public class RestHighLevelClient implements Closeable { ); } + /** + * Executes a update by query request. + * See + * Update By Query API on elastic.co + * @param updateByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes an update by query request. + * See + * Update By Query API on elastic.co + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options, + ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 7978d76c56d..e02d9f451eb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -51,6 +51,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -691,6 +692,72 @@ public class CrudIT extends ESRestHighLevelClientTestCase { } } + public void testUpdateByQuery() throws IOException { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: update using script + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(2, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 3, + (int) (highLevelClient().get(new GetRequest(sourceIndex, "type", "2"), RequestOptions.DEFAULT) + .getSourceAsMap().get("foo")) + ); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 44b4ae05b57..92930d14cf4 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -129,6 +129,7 @@ import org.elasticsearch.index.rankeval.RatedRequest; import org.elasticsearch.index.rankeval.RestRankEvalAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.migration.IndexUpgradeInfoRequest; import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest; @@ -137,6 +138,7 @@ import org.elasticsearch.protocol.xpack.graph.Hop; import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.rest.action.search.RestSearchAction; +import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.mustache.MultiSearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -470,6 +472,60 @@ public class RequestConvertersTests extends ESTestCase { assertToXContentBody(reindexRequest, request.getEntity()); } + public void testUpdateByQuery() throws IOException { + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(randomIndicesNames(1, 5)); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + updateByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false)); + } + if (randomBoolean()) { + int batchSize = randomInt(100); + updateByQueryRequest.setBatchSize(batchSize); + expectedParams.put("scroll_size", Integer.toString(batchSize)); + } + if (randomBoolean()) { + updateByQueryRequest.setPipeline("my_pipeline"); + expectedParams.put("pipeline", "my_pipeline"); + } + if (randomBoolean()) { + updateByQueryRequest.setRouting("=cat"); + expectedParams.put("routing", "=cat"); + } + if (randomBoolean()) { + int size = randomIntBetween(100, 1000); + updateByQueryRequest.setSize(size); + expectedParams.put("size", Integer.toString(size)); + } + if (randomBoolean()) { + updateByQueryRequest.setAbortOnVersionConflict(false); + expectedParams.put("conflicts", "proceed"); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + updateByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + expectedParams.put("scroll", ts); + } + if (randomBoolean()) { + updateByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval")); + } + if (randomBoolean()) { + updateByQueryRequest.setScript(new Script("ctx._source.last = \"lastname\"")); + } + setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams); + setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + Request request = RequestConverters.updateByQuery(updateByQueryRequest); + StringJoiner joiner = new StringJoiner("/", "/", ""); + joiner.add(String.join(",", updateByQueryRequest.indices())); + if (updateByQueryRequest.getDocTypes().length > 0) + joiner.add(String.join(",", updateByQueryRequest.getDocTypes())); + joiner.add("_update_by_query"); + assertEquals(joiner.toString(), request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(updateByQueryRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index d2585b6f3f4..7df4e072576 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -664,8 +664,7 @@ public class RestHighLevelClientTests extends ESTestCase { "render_search_template", "scripts_painless_execute", "tasks.get", - "termvectors", - "update_by_query" + "termvectors" }; //These API are not required for high-level client feature completeness String[] notRequiredApi = new String[] { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 9c69a2a4836..ac9d42c65ca 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -67,6 +68,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; @@ -899,6 +901,125 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { } } + public void testUpdateByQuery() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + createPipeline("my_pipeline"); + } + { + // tag::update-by-query-request + UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2"); // <1> + // end::update-by-query-request + // tag::update-by-query-request-conflicts + request.setConflicts("proceed"); // <1> + // end::update-by-query-request-conflicts + // tag::update-by-query-request-typeOrQuery + request.setDocTypes("doc"); // <1> + request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::update-by-query-request-typeOrQuery + // tag::update-by-query-request-size + request.setSize(10); // <1> + // end::update-by-query-request-size + // tag::update-by-query-request-scrollSize + request.setBatchSize(100); // <1> + // end::update-by-query-request-scrollSize + // tag::update-by-query-request-pipeline + request.setPipeline("my_pipeline"); // <1> + // end::update-by-query-request-pipeline + // tag::update-by-query-request-script + request.setScript( + new Script( + ScriptType.INLINE, "painless", + "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}", + Collections.emptyMap())); // <1> + // end::update-by-query-request-script + // tag::update-by-query-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::update-by-query-request-timeout + // tag::update-by-query-request-refresh + request.setRefresh(true); // <1> + // end::update-by-query-request-refresh + // tag::update-by-query-request-slices + request.setSlices(2); // <1> + // end::update-by-query-request-slices + // tag::update-by-query-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::update-by-query-request-scroll + // tag::update-by-query-request-routing + request.setRouting("=cat"); // <1> + // end::update-by-query-request-routing + // tag::update-by-query-request-indicesOptions + request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1> + // end::update-by-query-request-indicesOptions + + // tag::update-by-query-execute + BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT); + // end::update-by-query-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::update-by-query-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long updatedDocs = bulkResponse.getUpdated(); // <4> + long deletedDocs = bulkResponse.getDeleted(); // <5> + long batches = bulkResponse.getBatches(); // <6> + long noops = bulkResponse.getNoops(); // <7> + long versionConflicts = bulkResponse.getVersionConflicts(); // <8> + long bulkRetries = bulkResponse.getBulkRetries(); // <9> + long searchRetries = bulkResponse.getSearchRetries(); // <10> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <11> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <12> + List searchFailures = bulkResponse.getSearchFailures(); // <13> + List bulkFailures = bulkResponse.getBulkFailures(); // <14> + // end::update-by-query-response + } + { + UpdateByQueryRequest request = new UpdateByQueryRequest(); + request.indices("source1"); + + // tag::update-by-query-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::update-by-query-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::update-by-query-execute-async + client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::update-by-query-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/update-by-query.asciidoc b/docs/java-rest/high-level/document/update-by-query.asciidoc new file mode 100644 index 00000000000..324385a442b --- /dev/null +++ b/docs/java-rest/high-level/document/update-by-query.asciidoc @@ -0,0 +1,181 @@ +[[java-rest-high-document-update-by-query]] +=== Update By Query API + +[[java-rest-high-document-update-by-query-request]] +==== Update By Query Request + +A `UpdateByQueryRequest` can be used to update documents in an index. + +It requires an existing index (or a set of indices) on which the update is to be performed. + +The simplest form of a `UpdateByQueryRequest` looks like follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request] +-------------------------------------------------- +<1> Creates the `UpdateByQueryRequest` on a set of indices. + +By default version conflicts abort the `UpdateByQueryRequest` process but you can just count them by settings it to +`proceed` in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `UpdateByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scrollSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +Update by query can also use the ingest feature by specifying a `pipeline`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-pipeline] +-------------------------------------------------- +<1> set pipeline to `my_pipeline` + +`UpdateByQueryRequest` also supports a `script` that modifies the document. The following example illustrates that. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-script] +-------------------------------------------------- +<1> `setScript` to increment the `likes` field on all documents with user `kimchy`. + +`UpdateByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`UpdateByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-scroll] +-------------------------------------------------- +<1> set scroll time + +If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match +that routing value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-routing] +-------------------------------------------------- +<1> set routing + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the update by query request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling update by query + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-request-indicesOptions] +-------------------------------------------------- +<1> Set indices options + + +[[java-rest-high-document-update-by-query-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute] +-------------------------------------------------- + +[[java-rest-high-document-update-by-query-async]] +==== Asynchronous Execution + +The asynchronous execution of an update by query request requires both the `UpdateByQueryRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-async] +-------------------------------------------------- +<1> The `UpdateByQueryRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `UpdateByQueryRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-update-by-query-execute-listener-response]] +==== Update By Query Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[update-by-query-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were updated +<5> Number of docs that were deleted +<6> Number of batches that were executed +<7> Number of skipped docs +<8> Number of version conflicts +<9> Number of times request had to retry bulk index operations +<10> Number of times request had to retry search operations +<11> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<12> Remaining delay of any current throttle sleep or 0 if not sleeping +<13> Failures during search phase +<14> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 68320fbfe9f..b791dbc0f8c 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -16,6 +16,7 @@ Multi-document APIs:: * <> * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -25,6 +26,7 @@ include::document/update.asciidoc[] include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] include::document/reindex.asciidoc[] +include::document/update-by-query.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java index bf0adc6e142..72a2a0d7335 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestUpdateByQueryAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -63,7 +62,7 @@ public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler> consumers = new HashMap<>(); consumers.put("conflicts", o -> internal.setConflicts((String) o)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index cc848900b78..30621ab607b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -81,7 +81,7 @@ public class RoundTripTests extends ESTestCase { } public void testUpdateByQueryRequest() throws IOException { - UpdateByQueryRequest update = new UpdateByQueryRequest(new SearchRequest()); + UpdateByQueryRequest update = new UpdateByQueryRequest(); randomRequest(update); if (randomBoolean()) { update.setPipeline(randomAlphaOfLength(5)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java index b688ce019e3..d3f62af907d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.settings.Settings; public class UpdateByQueryMetadataTests @@ -39,7 +38,7 @@ public class UpdateByQueryMetadataTests @Override protected UpdateByQueryRequest request() { - return new UpdateByQueryRequest(new SearchRequest()); + return new UpdateByQueryRequest(); } private class TestAction extends TransportUpdateByQueryAction.AsyncIndexBySearchAction { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java index 4006d16fbcb..8c9744aa0dd 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.ScriptService; @@ -50,7 +49,7 @@ public class UpdateByQueryWithScriptTests @Override protected UpdateByQueryRequest request() { - return new UpdateByQueryRequest(new SearchRequest()); + return new UpdateByQueryRequest(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 3b635c82387..b2e6f98f126 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -44,8 +44,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; public abstract class AbstractBulkByScrollRequest> extends ActionRequest { public static final int SIZE_ALL_MATCHES = -1; - static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5); - static final int DEFAULT_SCROLL_SIZE = 1000; + public static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5); + public static final int DEFAULT_SCROLL_SIZE = 1000; public static final int AUTO_SLICES = 0; public static final String AUTO_SLICES_VALUE = "auto"; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 5beb86fae6b..7aa2c8a1b75 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -209,8 +209,8 @@ public class BulkByScrollTask extends CancellableTask { public static class StatusBuilder { private Integer sliceId = null; private Long total = null; - private Long updated = null; - private Long created = null; + private long updated = 0; // Not present during deleteByQuery + private long created = 0; // Not present during updateByQuery private Long deleted = null; private Integer batches = null; private Long versionConflicts = null; @@ -221,7 +221,7 @@ public class BulkByScrollTask extends CancellableTask { private Float requestsPerSecond = null; private String reasonCancelled = null; private TimeValue throttledUntil = null; - private List sliceStatuses = emptyList(); + private List sliceStatuses = new ArrayList<>(); public void setSliceId(Integer sliceId) { this.sliceId = sliceId; @@ -295,10 +295,14 @@ public class BulkByScrollTask extends CancellableTask { public void setSliceStatuses(List sliceStatuses) { if (sliceStatuses != null) { - this.sliceStatuses = sliceStatuses; + this.sliceStatuses.addAll(sliceStatuses); } } + public void addToSliceStatuses(StatusOrException statusOrException) { + this.sliceStatuses.add(statusOrException); + } + public Status buildStatus() { if (sliceStatuses.isEmpty()) { try { @@ -613,37 +617,20 @@ public class BulkByScrollTask extends CancellableTask { Token token = parser.currentToken(); String fieldName = parser.currentName(); ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser::getTokenLocation); - Integer sliceId = null; - Long total = null; - Long updated = null; - Long created = null; - Long deleted = null; - Integer batches = null; - Long versionConflicts = null; - Long noOps = null; - Long bulkRetries = null; - Long searchRetries = null; - TimeValue throttled = null; - Float requestsPerSecond = null; - String reasonCancelled = null; - TimeValue throttledUntil = null; - List sliceStatuses = new ArrayList<>(); + StatusBuilder builder = new StatusBuilder(); while ((token = parser.nextToken()) != Token.END_OBJECT) { if (token == Token.FIELD_NAME) { fieldName = parser.currentName(); } else if (token == Token.START_OBJECT) { if (fieldName.equals(Status.RETRIES_FIELD)) { - Tuple retries = - Status.RETRIES_PARSER.parse(parser, null); - bulkRetries = retries.v1(); - searchRetries = retries.v2(); + builder.setRetries(Status.RETRIES_PARSER.parse(parser, null)); } else { parser.skipChildren(); } } else if (token == Token.START_ARRAY) { if (fieldName.equals(Status.SLICES_FIELD)) { while ((token = parser.nextToken()) != Token.END_ARRAY) { - sliceStatuses.add(StatusOrException.fromXContent(parser)); + builder.addToSliceStatuses(StatusOrException.fromXContent(parser)); } } else { parser.skipChildren(); @@ -651,57 +638,47 @@ public class BulkByScrollTask extends CancellableTask { } else { // else if it is a value switch (fieldName) { case Status.SLICE_ID_FIELD: - sliceId = parser.intValue(); + builder.setSliceId(parser.intValue()); break; case Status.TOTAL_FIELD: - total = parser.longValue(); + builder.setTotal(parser.longValue()); break; case Status.UPDATED_FIELD: - updated = parser.longValue(); + builder.setUpdated(parser.longValue()); break; case Status.CREATED_FIELD: - created = parser.longValue(); + builder.setCreated(parser.longValue()); break; case Status.DELETED_FIELD: - deleted = parser.longValue(); + builder.setDeleted(parser.longValue()); break; case Status.BATCHES_FIELD: - batches = parser.intValue(); + builder.setBatches(parser.intValue()); break; case Status.VERSION_CONFLICTS_FIELD: - versionConflicts = parser.longValue(); + builder.setVersionConflicts(parser.longValue()); break; case Status.NOOPS_FIELD: - noOps = parser.longValue(); + builder.setNoops(parser.longValue()); break; case Status.THROTTLED_RAW_FIELD: - throttled = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + builder.setThrottled(parser.longValue()); break; case Status.REQUESTS_PER_SEC_FIELD: - requestsPerSecond = parser.floatValue(); - requestsPerSecond = requestsPerSecond == -1 ? Float.POSITIVE_INFINITY : requestsPerSecond; + builder.setRequestsPerSecond(parser.floatValue()); break; case Status.CANCELED_FIELD: - reasonCancelled = parser.text(); + builder.setReasonCancelled(parser.text()); break; case Status.THROTTLED_UNTIL_RAW_FIELD: - throttledUntil = new TimeValue(parser.longValue(), TimeUnit.MILLISECONDS); + builder.setThrottledUntil(parser.longValue()); break; default: break; } } } - if (sliceStatuses.isEmpty()) { - return - new Status( - sliceId, total, updated, created, deleted, batches, versionConflicts, noOps, bulkRetries, - searchRetries, throttled, requestsPerSecond, reasonCancelled, throttledUntil - ); - } else { - return new Status(sliceStatuses, reasonCancelled); - } - + return builder.buildStatus(); } @Override @@ -838,15 +815,15 @@ public class BulkByScrollTask extends CancellableTask { ); } - public boolean equalsWithoutSliceStatus(Object o) { + public boolean equalsWithoutSliceStatus(Object o, boolean includeUpdated, boolean includeCreated) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status other = (Status) o; return Objects.equals(sliceId, other.sliceId) && total == other.total && - updated == other.updated && - created == other.created && + (!includeUpdated || updated == other.updated) && + (!includeCreated || created == other.created) && deleted == other.deleted && batches == other.batches && versionConflicts == other.versionConflicts && @@ -861,7 +838,7 @@ public class BulkByScrollTask extends CancellableTask { @Override public boolean equals(Object o) { - if (equalsWithoutSliceStatus(o)) { + if (equalsWithoutSliceStatus(o, true, true)) { return Objects.equals(sliceStatuses, ((Status) o).sliceStatuses); } else { return false; diff --git a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java index eb4fd59a7bc..71ffadc9303 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java @@ -24,6 +24,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -34,16 +37,22 @@ import java.io.IOException; * representative set of subrequests. This is best-effort but better than {@linkplain ReindexRequest} because scripts can't change the * destination index and things. */ -public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest implements IndicesRequest.Replaceable { +public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest + implements IndicesRequest.Replaceable, ToXContentObject { /** * Ingest pipeline to set on index requests made by this action. */ private String pipeline; public UpdateByQueryRequest() { + this(new SearchRequest()); } - public UpdateByQueryRequest(SearchRequest search) { + public UpdateByQueryRequest(String... indices) { + this(new SearchRequest(indices)); + } + + UpdateByQueryRequest(SearchRequest search) { this(search, true); } @@ -59,8 +68,81 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest { + private boolean includeUpdated; + private boolean includeCreated; + public void testRountTrip() throws IOException { BulkByScrollResponse response = new BulkByScrollResponse(timeValueMillis(randomNonNegativeLong()), BulkByScrollTaskStatusTests.randomStatus(), randomIndexingFailures(), randomSearchFailures(), randomBoolean()); @@ -97,10 +104,11 @@ public class BulkByScrollResponseTests extends AbstractXContentTestCase params = new HashMap<>(); + if (randomBoolean()) { + includeUpdated = false; + params.put(Status.INCLUDE_UPDATED, "false"); + } else { + includeUpdated = true; + } + if (randomBoolean()) { + includeCreated = false; + params.put(Status.INCLUDE_CREATED, "false"); + } else { + includeCreated = true; + } + return new ToXContent.MapParams(params); + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java index 33c56bacd91..0d84b0e1412 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusOrExceptionTests.java @@ -55,11 +55,14 @@ public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTest return StatusOrException.fromXContent(parser); } - public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual) { + public static void assertEqualStatusOrException(StatusOrException expected, StatusOrException actual, + boolean includeUpdated, boolean includeCreated) { if (expected != null && actual != null) { assertNotSame(expected, actual); if (expected.getException() == null) { - BulkByScrollTaskStatusTests.assertEqualStatus(expected.getStatus(), actual.getStatus()); + BulkByScrollTaskStatusTests + // we test includeCreated params in the Status tests + .assertEqualStatus(expected.getStatus(), actual.getStatus(), includeUpdated, includeCreated); } else { assertThat( actual.getException().getMessage(), @@ -74,7 +77,7 @@ public class BulkByScrollTaskStatusOrExceptionTests extends AbstractXContentTest @Override protected void assertEqualInstances(StatusOrException expected, StatusOrException actual) { - assertEqualStatusOrException(expected, actual); + assertEqualStatusOrException(expected, actual, true, true); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java index 368e1b3bdac..13db9f4766e 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskStatusTests.java @@ -33,7 +33,9 @@ import org.hamcrest.Matchers; import org.elasticsearch.index.reindex.BulkByScrollTask.Status; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -44,6 +46,10 @@ import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.hamcrest.Matchers.equalTo; public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase { + + private boolean includeUpdated; + private boolean includeCreated; + public void testBulkByTaskStatus() throws IOException { BulkByScrollTask.Status status = randomStatus(); BytesStreamOutput out = new BytesStreamOutput(); @@ -144,21 +150,21 @@ public class BulkByScrollTaskStatusTests extends AbstractXContentTestCase params = new HashMap<>(); + if (randomBoolean()) { + includeUpdated = false; + params.put(Status.INCLUDE_UPDATED, "false"); + } else { + includeUpdated = true; + } + if (randomBoolean()) { + includeCreated = false; + params.put(Status.INCLUDE_CREATED, "false"); + } else { + includeCreated = true; + } + return new ToXContent.MapParams(params); + } } diff --git a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java index b30968cf056..47449eb7391 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import static org.apache.lucene.util.TestUtil.randomSimpleString; @@ -32,11 +31,11 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa indices[i] = randomSimpleString(random(), 1, 30); } - SearchRequest searchRequest = new SearchRequest(indices); IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()); - searchRequest.indicesOptions(indicesOptions); - UpdateByQueryRequest request = new UpdateByQueryRequest(searchRequest); + UpdateByQueryRequest request = new UpdateByQueryRequest(); + request.indices(indices); + request.setIndicesOptions(indicesOptions); for (int i = 0; i < numIndices; i++) { assertEquals(indices[i], request.indices()[i]); } @@ -60,7 +59,7 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa @Override protected UpdateByQueryRequest newRequest() { - return new UpdateByQueryRequest(new SearchRequest(randomAlphaOfLength(5))); + return new UpdateByQueryRequest(randomAlphaOfLength(5)); } @Override