From c0c6d702a2638f4ce65bf01ebab44b8515084d6e Mon Sep 17 00:00:00 2001 From: Andy Bristol Date: Wed, 27 Mar 2019 13:27:17 -0700 Subject: [PATCH] ignore 409 conflict in reindex responses (#39543) The reindex family of APIs (reindex, update-by-query, delete-by-query) can sometimes return responses that have an error status code (409 Conflict in this case) but still have a body in the usual BulkByScrollResponse format. When the HLRC tries to handle such responses, it blows up because it tris to parse it expecting the error format that errors in general use. This change prompts the HLRC to parse the response using the expected BulkByScrollResponse format. --- .../client/RestHighLevelClient.java | 12 +- .../java/org/elasticsearch/client/CrudIT.java | 243 ------------ .../org/elasticsearch/client/ReindexIT.java | 359 ++++++++++++++++++ 3 files changed, 365 insertions(+), 249 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index d0917b8d454..77eac4a6e2a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -511,7 +511,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet() + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -537,7 +537,7 @@ public class RestHighLevelClient implements Closeable { */ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet() + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } @@ -551,7 +551,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -566,7 +566,7 @@ public class RestHighLevelClient implements Closeable { public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } @@ -580,7 +580,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -595,7 +595,7 @@ public class RestHighLevelClient implements Closeable { public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index e2102236cc4..d5d59b07f55 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -21,12 +21,8 @@ package org.elasticsearch.client; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -39,7 +35,6 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -58,12 +53,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryAction; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.UpdateByQueryAction; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; @@ -74,8 +63,6 @@ import org.elasticsearch.rest.action.document.RestUpdateAction; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; -import org.elasticsearch.tasks.RawTaskStatus; -import org.elasticsearch.tasks.TaskId; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -85,18 +72,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; public class CrudIT extends ESRestHighLevelClientTestCase { @@ -857,230 +838,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } - private TaskId findTaskToRethrottle(String actionName) throws IOException { - long start = System.nanoTime(); - ListTasksRequest request = new ListTasksRequest(); - request.setActions(actionName); - request.setDetailed(true); - do { - ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); - list.rethrowFailures("Finding tasks to rethrottle"); - assertThat("tasks are left over from the last execution of this test", - list.getTaskGroups(), hasSize(lessThan(2))); - if (0 == list.getTaskGroups().size()) { - // The parent task hasn't started yet - continue; - } - TaskGroup taskGroup = list.getTaskGroups().get(0); - assertThat(taskGroup.getChildTasks(), empty()); - return taskGroup.getTaskInfo().getTaskId(); - } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); - throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + - highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); - } - - public void testUpdateByQuery() throws Exception { - final String sourceIndex = "source1"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - new BulkRequest() - .add(new IndexRequest(sourceIndex).id("1") - .source(Collections.singletonMap("foo", 1), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("2") - .source(Collections.singletonMap("foo", 2), XContentType.JSON)) - .setRefreshPolicy(RefreshPolicy.IMMEDIATE), - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: create one doc in dest - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - } - { - // test2: update using script - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(2, bulkResponse.getTotal()); - assertEquals(2, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertEquals( - 3, - (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) - .getSourceAsMap().get("foo")) - ); - } - { - // test update-by-query rethrottling - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - - // this following settings are supposed to halt reindexing after first document - updateByQueryRequest.setBatchSize(1); - updateByQueryRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - taskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - taskFinished.await(2, TimeUnit.SECONDS); - - // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - - public void testDeleteByQuery() throws Exception { - final String sourceIndex = "source1"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - new BulkRequest() - .add(new IndexRequest(sourceIndex).id("1") - .source(Collections.singletonMap("foo", 1), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("2") - .source(Collections.singletonMap("foo", 2), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("3") - .source(Collections.singletonMap("foo", 3), XContentType.JSON)) - .setRefreshPolicy(RefreshPolicy.IMMEDIATE), - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: delete one doc - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - deleteByQueryRequest.indices(sourceIndex); - deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - deleteByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertEquals( - 2, - highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value - ); - } - { - // test delete-by-query rethrottling - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - deleteByQueryRequest.indices(sourceIndex); - deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3")); - deleteByQueryRequest.setRefresh(true); - - // this following settings are supposed to halt reindexing after first document - deleteByQueryRequest.setBatchSize(1); - deleteByQueryRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - taskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - taskFinished.await(2, TimeUnit.SECONDS); - - // any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index cfdd29cdfbf..73cca7827e7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -19,23 +19,54 @@ package org.elasticsearch.client; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; public class ReindexIT extends ESRestHighLevelClientTestCase { + private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline"; + public void testReindex() throws IOException { final String sourceIndex = "source1"; final String destinationIndex = "dest"; @@ -122,10 +153,338 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { } } + public void testReindexConflict() throws IOException { + final String sourceIndex = "testreindexconflict_source"; + final String destIndex = "testreindexconflict_dest"; + + final Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destIndex, settings); + final BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); + + putConflictPipeline(); + + final ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destIndex); + reindexRequest.setRefresh(true); + reindexRequest.setDestPipeline(CONFLICT_PIPELINE_ID); + final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT); + + assertThat(response.getVersionConflicts(), equalTo(2L)); + assertThat(response.getBulkFailures(), empty()); + assertThat(response.getSearchFailures(), hasSize(2)); + assertThat( + response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + everyItem(containsString("version conflict")) + ); + + assertThat(response.getTotal(), equalTo(2L)); + assertThat(response.getCreated(), equalTo(0L)); + assertThat(response.getUpdated(), equalTo(0L)); + assertThat(response.getDeleted(), equalTo(0L)); + assertThat(response.getNoops(), equalTo(0L)); + assertThat(response.getBatches(), equalTo(1)); + assertTrue(response.getTook().getMillis() > 0); + } + + public void testUpdateByQuery() throws Exception { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: update using script + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(2, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 3, + (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) + .getSourceAsMap().get("foo")) + ); + } + { + // test update-by-query rethrottling + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + + // this following settings are supposed to halt reindexing after first document + updateByQueryRequest.setBatchSize(1); + updateByQueryRequest.setRequestsPerSecond(0.00001f); + final CountDownLatch taskFinished = new CountDownLatch(1); + highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse response) { + taskFinished.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + + TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); + float requestsPerSecond = 1000f; + ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertThat(response.getTasks(), hasSize(1)); + assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); + assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); + assertEquals(Float.toString(requestsPerSecond), + ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); + taskFinished.await(2, TimeUnit.SECONDS); + + // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure + response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertTrue(response.getTasks().isEmpty()); + assertFalse(response.getNodeFailures().isEmpty()); + assertEquals(1, response.getNodeFailures().size()); + assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", + response.getNodeFailures().get(0).getCause().getMessage()); + } + } + + public void testUpdateByQueryConflict() throws IOException { + final String index = "testupdatebyqueryconflict"; + + final Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + final BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); + + putConflictPipeline(); + + final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(index); + updateByQueryRequest.setRefresh(true); + updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID); + final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + + assertThat(response.getVersionConflicts(), equalTo(1L)); + assertThat(response.getBulkFailures(), empty()); + assertThat(response.getSearchFailures(), hasSize(1)); + assertThat( + response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + everyItem(containsString("version conflict")) + ); + + assertThat(response.getTotal(), equalTo(2L)); + assertThat(response.getCreated(), equalTo(0L)); + assertThat(response.getUpdated(), equalTo(1L)); + assertThat(response.getDeleted(), equalTo(0L)); + assertThat(response.getNoops(), equalTo(0L)); + assertThat(response.getBatches(), equalTo(1)); + assertTrue(response.getTook().getMillis() > 0); + } + + public void testDeleteByQuery() throws Exception { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("3") + .source(Collections.singletonMap("foo", 3), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: delete one doc + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + deleteByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 2, + highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value + ); + } + { + // test delete-by-query rethrottling + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3")); + deleteByQueryRequest.setRefresh(true); + + // this following settings are supposed to halt reindexing after first document + deleteByQueryRequest.setBatchSize(1); + deleteByQueryRequest.setRequestsPerSecond(0.00001f); + final CountDownLatch taskFinished = new CountDownLatch(1); + highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse response) { + taskFinished.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + + TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); + float requestsPerSecond = 1000f; + ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); + assertThat(response.getTasks(), hasSize(1)); + assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); + assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); + assertEquals(Float.toString(requestsPerSecond), + ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); + taskFinished.await(2, TimeUnit.SECONDS); + + // any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure + response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); + assertTrue(response.getTasks().isEmpty()); + assertFalse(response.getNodeFailures().isEmpty()); + assertEquals(1, response.getNodeFailures().size()); + assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", + response.getNodeFailures().get(0).getCause().getMessage()); + } + } + + private static TaskId findTaskToRethrottle(String actionName) throws IOException { + long start = System.nanoTime(); + ListTasksRequest request = new ListTasksRequest(); + request.setActions(actionName); + request.setDetailed(true); + do { + ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); + list.rethrowFailures("Finding tasks to rethrottle"); + assertThat("tasks are left over from the last execution of this test", + list.getTaskGroups(), hasSize(lessThan(2))); + if (0 == list.getTaskGroups().size()) { + // The parent task hasn't started yet + continue; + } + TaskGroup taskGroup = list.getTaskGroups().get(0); + assertThat(taskGroup.getChildTasks(), empty()); + return taskGroup.getTaskInfo().getTaskId(); + } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); + throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + + highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); + } + static CheckedRunnable checkCompletionStatus(RestClient client, String taskId) { return () -> { Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId)); assertTrue((boolean) entityAsMap(response).get("completed")); }; } + + private void putConflictPipeline() throws IOException { + final XContentBuilder pipelineBuilder = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", "_version") + .field("value", 1) + .endObject() + .endObject() + .startObject() + .startObject("set") + .field("field", "_id") + .field("value", "1") + .endObject() + .endObject() + .endArray() + .endObject(); + final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType()); + assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged()); + } }