diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index d0917b8d454..77eac4a6e2a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -511,7 +511,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet() + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -537,7 +537,7 @@ public class RestHighLevelClient implements Closeable { */ public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet() + reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } @@ -551,7 +551,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -566,7 +566,7 @@ public class RestHighLevelClient implements Closeable { public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } @@ -580,7 +580,7 @@ public class RestHighLevelClient implements Closeable { */ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { return performRequestAndParseEntity( - deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, singleton(409) ); } @@ -595,7 +595,7 @@ public class RestHighLevelClient implements Closeable { public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options, ActionListener listener) { performRequestAsyncAndParseEntity( - deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409) ); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index e2102236cc4..d5d59b07f55 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -21,12 +21,8 @@ package org.elasticsearch.client; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -39,7 +35,6 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -58,12 +53,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryAction; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.UpdateByQueryAction; -import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; @@ -74,8 +63,6 @@ import org.elasticsearch.rest.action.document.RestUpdateAction; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; -import org.elasticsearch.tasks.RawTaskStatus; -import org.elasticsearch.tasks.TaskId; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -85,18 +72,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.lessThan; public class CrudIT extends ESRestHighLevelClientTestCase { @@ -857,230 +838,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase { validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); } - private TaskId findTaskToRethrottle(String actionName) throws IOException { - long start = System.nanoTime(); - ListTasksRequest request = new ListTasksRequest(); - request.setActions(actionName); - request.setDetailed(true); - do { - ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); - list.rethrowFailures("Finding tasks to rethrottle"); - assertThat("tasks are left over from the last execution of this test", - list.getTaskGroups(), hasSize(lessThan(2))); - if (0 == list.getTaskGroups().size()) { - // The parent task hasn't started yet - continue; - } - TaskGroup taskGroup = list.getTaskGroups().get(0); - assertThat(taskGroup.getChildTasks(), empty()); - return taskGroup.getTaskInfo().getTaskId(); - } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); - throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + - highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); - } - - public void testUpdateByQuery() throws Exception { - final String sourceIndex = "source1"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - new BulkRequest() - .add(new IndexRequest(sourceIndex).id("1") - .source(Collections.singletonMap("foo", 1), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("2") - .source(Collections.singletonMap("foo", 2), XContentType.JSON)) - .setRefreshPolicy(RefreshPolicy.IMMEDIATE), - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: create one doc in dest - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - } - { - // test2: update using script - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); - updateByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); - assertEquals(2, bulkResponse.getTotal()); - assertEquals(2, bulkResponse.getUpdated()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertEquals( - 3, - (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) - .getSourceAsMap().get("foo")) - ); - } - { - // test update-by-query rethrottling - UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); - updateByQueryRequest.indices(sourceIndex); - updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - updateByQueryRequest.setRefresh(true); - - // this following settings are supposed to halt reindexing after first document - updateByQueryRequest.setBatchSize(1); - updateByQueryRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - taskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - taskFinished.await(2, TimeUnit.SECONDS); - - // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - - public void testDeleteByQuery() throws Exception { - final String sourceIndex = "source1"; - { - // Prepare - Settings settings = Settings.builder() - .put("number_of_shards", 1) - .put("number_of_replicas", 0) - .build(); - createIndex(sourceIndex, settings); - assertEquals( - RestStatus.OK, - highLevelClient().bulk( - new BulkRequest() - .add(new IndexRequest(sourceIndex).id("1") - .source(Collections.singletonMap("foo", 1), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("2") - .source(Collections.singletonMap("foo", 2), XContentType.JSON)) - .add(new IndexRequest(sourceIndex).id("3") - .source(Collections.singletonMap("foo", 3), XContentType.JSON)) - .setRefreshPolicy(RefreshPolicy.IMMEDIATE), - RequestOptions.DEFAULT - ).status() - ); - } - { - // test1: delete one doc - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - deleteByQueryRequest.indices(sourceIndex); - deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); - deleteByQueryRequest.setRefresh(true); - BulkByScrollResponse bulkResponse = - execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); - assertEquals(1, bulkResponse.getTotal()); - assertEquals(1, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(1, bulkResponse.getBatches()); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertEquals( - 2, - highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value - ); - } - { - // test delete-by-query rethrottling - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - deleteByQueryRequest.indices(sourceIndex); - deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3")); - deleteByQueryRequest.setRefresh(true); - - // this following settings are supposed to halt reindexing after first document - deleteByQueryRequest.setBatchSize(1); - deleteByQueryRequest.setRequestsPerSecond(0.00001f); - final CountDownLatch taskFinished = new CountDownLatch(1); - highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { - - @Override - public void onResponse(BulkByScrollResponse response) { - taskFinished.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - }); - - TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); - float requestsPerSecond = 1000f; - ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); - assertThat(response.getTasks(), hasSize(1)); - assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); - assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); - assertEquals(Float.toString(requestsPerSecond), - ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); - taskFinished.await(2, TimeUnit.SECONDS); - - // any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure - response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), - highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); - assertTrue(response.getTasks().isEmpty()); - assertFalse(response.getNodeFailures().isEmpty()); - assertEquals(1, response.getNodeFailures().size()); - assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", - response.getNodeFailures().get(0).getCause().getMessage()); - } - } - public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index cfdd29cdfbf..73cca7827e7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -19,23 +19,54 @@ package org.elasticsearch.client; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; public class ReindexIT extends ESRestHighLevelClientTestCase { + private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline"; + public void testReindex() throws IOException { final String sourceIndex = "source1"; final String destinationIndex = "dest"; @@ -122,10 +153,338 @@ public class ReindexIT extends ESRestHighLevelClientTestCase { } } + public void testReindexConflict() throws IOException { + final String sourceIndex = "testreindexconflict_source"; + final String destIndex = "testreindexconflict_dest"; + + final Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + createIndex(destIndex, settings); + final BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); + + putConflictPipeline(); + + final ReindexRequest reindexRequest = new ReindexRequest(); + reindexRequest.setSourceIndices(sourceIndex); + reindexRequest.setDestIndex(destIndex); + reindexRequest.setRefresh(true); + reindexRequest.setDestPipeline(CONFLICT_PIPELINE_ID); + final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT); + + assertThat(response.getVersionConflicts(), equalTo(2L)); + assertThat(response.getBulkFailures(), empty()); + assertThat(response.getSearchFailures(), hasSize(2)); + assertThat( + response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + everyItem(containsString("version conflict")) + ); + + assertThat(response.getTotal(), equalTo(2L)); + assertThat(response.getCreated(), equalTo(0L)); + assertThat(response.getUpdated(), equalTo(0L)); + assertThat(response.getDeleted(), equalTo(0L)); + assertThat(response.getNoops(), equalTo(0L)); + assertThat(response.getBatches(), equalTo(1)); + assertTrue(response.getTook().getMillis() > 0); + } + + public void testUpdateByQuery() throws Exception { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: create one doc in dest + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + } + { + // test2: update using script + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;")); + updateByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync); + assertEquals(2, bulkResponse.getTotal()); + assertEquals(2, bulkResponse.getUpdated()); + assertEquals(0, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 3, + (int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT) + .getSourceAsMap().get("foo")) + ); + } + { + // test update-by-query rethrottling + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(sourceIndex); + updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + updateByQueryRequest.setRefresh(true); + + // this following settings are supposed to halt reindexing after first document + updateByQueryRequest.setBatchSize(1); + updateByQueryRequest.setRequestsPerSecond(0.00001f); + final CountDownLatch taskFinished = new CountDownLatch(1); + highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse response) { + taskFinished.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + + TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); + float requestsPerSecond = 1000f; + ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertThat(response.getTasks(), hasSize(1)); + assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); + assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); + assertEquals(Float.toString(requestsPerSecond), + ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); + taskFinished.await(2, TimeUnit.SECONDS); + + // any rethrottling after the update-by-query is done performed with the same taskId should result in a failure + response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); + assertTrue(response.getTasks().isEmpty()); + assertFalse(response.getNodeFailures().isEmpty()); + assertEquals(1, response.getNodeFailures().size()); + assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", + response.getNodeFailures().get(0).getCause().getMessage()); + } + } + + public void testUpdateByQueryConflict() throws IOException { + final String index = "testupdatebyqueryconflict"; + + final Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + final BulkRequest bulkRequest = new BulkRequest() + .add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK)); + + putConflictPipeline(); + + final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); + updateByQueryRequest.indices(index); + updateByQueryRequest.setRefresh(true); + updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID); + final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); + + assertThat(response.getVersionConflicts(), equalTo(1L)); + assertThat(response.getBulkFailures(), empty()); + assertThat(response.getSearchFailures(), hasSize(1)); + assertThat( + response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()), + everyItem(containsString("version conflict")) + ); + + assertThat(response.getTotal(), equalTo(2L)); + assertThat(response.getCreated(), equalTo(0L)); + assertThat(response.getUpdated(), equalTo(1L)); + assertThat(response.getDeleted(), equalTo(0L)); + assertThat(response.getNoops(), equalTo(0L)); + assertThat(response.getBatches(), equalTo(1)); + assertTrue(response.getTook().getMillis() > 0); + } + + public void testDeleteByQuery() throws Exception { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex).id("1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .add(new IndexRequest(sourceIndex).id("3") + .source(Collections.singletonMap("foo", 3), XContentType.JSON)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: delete one doc + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1")); + deleteByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 2, + highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value + ); + } + { + // test delete-by-query rethrottling + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3")); + deleteByQueryRequest.setRefresh(true); + + // this following settings are supposed to halt reindexing after first document + deleteByQueryRequest.setBatchSize(1); + deleteByQueryRequest.setRequestsPerSecond(0.00001f); + final CountDownLatch taskFinished = new CountDownLatch(1); + highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener() { + + @Override + public void onResponse(BulkByScrollResponse response) { + taskFinished.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + + TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); + float requestsPerSecond = 1000f; + ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); + assertThat(response.getTasks(), hasSize(1)); + assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId()); + assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class)); + assertEquals(Float.toString(requestsPerSecond), + ((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString()); + taskFinished.await(2, TimeUnit.SECONDS); + + // any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure + response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), + highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); + assertTrue(response.getTasks().isEmpty()); + assertFalse(response.getNodeFailures().isEmpty()); + assertEquals(1, response.getNodeFailures().size()); + assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]", + response.getNodeFailures().get(0).getCause().getMessage()); + } + } + + private static TaskId findTaskToRethrottle(String actionName) throws IOException { + long start = System.nanoTime(); + ListTasksRequest request = new ListTasksRequest(); + request.setActions(actionName); + request.setDetailed(true); + do { + ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); + list.rethrowFailures("Finding tasks to rethrottle"); + assertThat("tasks are left over from the last execution of this test", + list.getTaskGroups(), hasSize(lessThan(2))); + if (0 == list.getTaskGroups().size()) { + // The parent task hasn't started yet + continue; + } + TaskGroup taskGroup = list.getTaskGroups().get(0); + assertThat(taskGroup.getChildTasks(), empty()); + return taskGroup.getTaskInfo().getTaskId(); + } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10)); + throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " + + highLevelClient().tasks().list(request, RequestOptions.DEFAULT)); + } + static CheckedRunnable checkCompletionStatus(RestClient client, String taskId) { return () -> { Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId)); assertTrue((boolean) entityAsMap(response).get("completed")); }; } + + private void putConflictPipeline() throws IOException { + final XContentBuilder pipelineBuilder = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", "_version") + .field("value", 1) + .endObject() + .endObject() + .startObject() + .startObject("set") + .field("field", "_id") + .field("value", "1") + .endObject() + .endObject() + .endArray() + .endObject(); + final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder), + pipelineBuilder.contentType()); + assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged()); + } }