HLRC: UpdateByQuery API with wait_for_completion being false (#58552) (#61081)

(cherry picked from commit 291f5bd1b2e889e9447d660e5407f3120cffb1a5)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>

Co-authored-by: Tuan Le <23419763+tumile@users.noreply.github.com>
This commit is contained in:
Andrei Dan 2020-08-13 11:06:41 +01:00 committed by GitHub
parent 8e775394ac
commit 186e8b865d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 348 additions and 228 deletions

View File

@ -572,10 +572,22 @@ final class RequestConverters {
return prepareReindexRequest(reindexRequest, false);
}
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
}
static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, false);
}
static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
return prepareUpdateByQueryRequest(updateByQueryRequest, true);
}
static Request submitUpdateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
return prepareUpdateByQueryRequest(updateByQueryRequest, false);
}
private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@ -626,9 +638,10 @@ final class RequestConverters {
return request;
}
static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
static Request prepareUpdateByQueryRequest(UpdateByQueryRequest updateByQueryRequest,
boolean waitForCompletion) throws IOException {
String endpoint =
endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
.withRouting(updateByQueryRequest.getRouting())
@ -638,6 +651,7 @@ final class RequestConverters {
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(updateByQueryRequest.indicesOptions())
.withWaitForCompletion(waitForCompletion)
.withSlices(updateByQueryRequest.getSlices());
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
@ -656,10 +670,6 @@ final class RequestConverters {
return request;
}
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
}
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {
return rethrottle(rethrottleRequest, "_reindex");
}

View File

@ -601,6 +601,21 @@ public class RestHighLevelClient implements Closeable {
);
}
/**
* Submits a update by query task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
* Update By Query API on elastic.co</a>
* @param updateByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
*/
public final TaskSubmissionResponse submitUpdateByQueryTask(UpdateByQueryRequest updateByQueryRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(
updateByQueryRequest, RequestConverters::submitUpdateByQuery, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}
/**
* Asynchronously executes an update by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">

View File

@ -21,6 +21,9 @@ package org.elasticsearch.client;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@ -31,6 +34,7 @@ import org.elasticsearch.client.cluster.RemoteInfoRequest;
import org.elasticsearch.client.cluster.RemoteInfoResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -42,6 +46,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
@ -51,18 +56,25 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
protected static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";
private static RestHighLevelClient restHighLevelClient;
private static boolean async = Booleans.parseBoolean(System.getProperty("tests.rest.async", "false"));
@ -216,6 +228,29 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
request, highLevelClient().cluster()::putSettings, highLevelClient().cluster()::putSettingsAsync).isAcknowledged());
}
protected void putConflictPipeline() throws IOException {
final XContentBuilder pipelineBuilder = jsonBuilder()
.startObject()
.startArray("processors")
.startObject()
.startObject("set")
.field("field", "_version")
.field("value", 1)
.endObject()
.endObject()
.startObject()
.startObject("set")
.field("field", "_id")
.field("value", "1")
.endObject()
.endObject()
.endArray()
.endObject();
final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());
assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
}
@Override
protected Settings restClientSettings() {
final String user = Objects.requireNonNull(System.getProperty("tests.rest.cluster.username"));
@ -280,4 +315,36 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
protected static Map<String, Object> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}
protected static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(actionName);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle");
List<TaskGroup> taskGroups =
list.getTaskGroups().stream()
.filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
assertThat("tasks are left over from the last execution of this test",
taskGroups, hasSize(lessThan(2)));
if (0 == taskGroups.size()) {
// The parent task hasn't started yet
continue;
}
TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}
protected static CheckedRunnable<Exception> checkTaskCompletionStatus(RestClient client, String taskId) {
return () -> {
Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
assertTrue((boolean) entityAsMap(response).get("completed"));
};
}
}

View File

@ -20,54 +20,39 @@
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
public class ReindexIT extends ESRestHighLevelClientTestCase {
private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";
public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
@ -150,7 +135,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
String taskId = reindexSubmission.getTask(); // <3>
// end::submit-reindex-task
assertBusy(checkCompletionStatus(client(), taskId));
assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
@ -196,155 +181,6 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
assertTrue(response.getTook().getMillis() > 0);
}
public void testUpdateByQuery() throws Exception {
final String sourceIndex = "source1";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex).id("1")
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
updateByQueryRequest.setRefresh(true);
BulkByScrollResponse bulkResponse =
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
assertEquals(1, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: update using script
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
updateByQueryRequest.setRefresh(true);
BulkByScrollResponse bulkResponse =
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
assertEquals(2, bulkResponse.getTotal());
assertEquals(2, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
assertEquals(
3,
(int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
.getSourceAsMap().get("foo"))
);
}
{
// test update-by-query rethrottling
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
updateByQueryRequest.setRefresh(true);
// this following settings are supposed to halt reindexing after first document
updateByQueryRequest.setBatchSize(1);
updateByQueryRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch taskFinished = new CountDownLatch(1);
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
taskFinished.countDown();
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
assertTrue(taskFinished.await(10, TimeUnit.SECONDS));
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}
public void testUpdateByQueryConflict() throws IOException {
final String index = "testupdatebyqueryconflict";
final Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
final BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
putConflictPipeline();
final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(index);
updateByQueryRequest.setRefresh(true);
updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID);
final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
assertThat(response.getVersionConflicts(), equalTo(1L));
assertThat(response.getSearchFailures(), empty());
assertThat(response.getBulkFailures(), hasSize(1));
assertThat(
response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
everyItem(containsString("version conflict"))
);
assertThat(response.getTotal(), equalTo(2L));
assertThat(response.getCreated(), equalTo(0L));
assertThat(response.getUpdated(), equalTo(1L));
assertThat(response.getDeleted(), equalTo(0L));
assertThat(response.getNoops(), equalTo(0L));
assertThat(response.getBatches(), equalTo(1));
assertTrue(response.getTook().getMillis() > 0);
}
public void testDeleteByQuery() throws Exception {
final String sourceIndex = "source1";
{
@ -474,62 +310,7 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
String taskId = deleteByQuerySubmission.getTask();
// end::submit-delete_by_query-task
assertBusy(checkCompletionStatus(client(), taskId));
assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(actionName);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle");
List<TaskGroup> taskGroups =
list.getTaskGroups().stream()
.filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
assertThat("tasks are left over from the last execution of this test",
taskGroups, hasSize(lessThan(2)));
if (0 == taskGroups.size()) {
// The parent task hasn't started yet
continue;
}
TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}
static CheckedRunnable<Exception> checkCompletionStatus(RestClient client, String taskId) {
return () -> {
Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
assertTrue((boolean) entityAsMap(response).get("completed"));
};
}
private void putConflictPipeline() throws IOException {
final XContentBuilder pipelineBuilder = jsonBuilder()
.startObject()
.startArray("processors")
.startObject()
.startObject("set")
.field("field", "_version")
.field("value", 1)
.endObject()
.endObject()
.startObject()
.startObject("set")
.field("field", "_id")
.field("value", "1")
.endObject()
.endObject()
.endArray()
.endObject();
final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());
assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
}
}

View File

@ -583,6 +583,7 @@ public class RequestConvertersTests extends ESTestCase {
}
setRandomIndicesOptions(updateByQueryRequest::setIndicesOptions, updateByQueryRequest::indicesOptions, expectedParams);
setRandomTimeout(updateByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
Request request = RequestConverters.updateByQuery(updateByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", updateByQueryRequest.indices()));

View File

@ -121,7 +121,7 @@ public class TasksIT extends ESRestHighLevelClientTestCase {
assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
if (taskResponse.isCompleted() == false) {
assertBusy(ReindexIT.checkCompletionStatus(client(), taskId.toString()));
assertBusy(checkTaskCompletionStatus(client(), taskId.toString()));
}
}

View File

@ -0,0 +1,246 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
public class UpdateByQueryIT extends ESRestHighLevelClientTestCase {
public void testUpdateByQuery() throws Exception {
final String sourceIndex = "source1";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex).id("1")
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
updateByQueryRequest.setRefresh(true);
BulkByScrollResponse bulkResponse =
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
assertEquals(1, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: update using script
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
updateByQueryRequest.setRefresh(true);
BulkByScrollResponse bulkResponse =
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
assertEquals(2, bulkResponse.getTotal());
assertEquals(2, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
assertEquals(
3,
(int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
.getSourceAsMap().get("foo"))
);
}
{
// test update-by-query rethrottling
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
updateByQueryRequest.setRefresh(true);
// this following settings are supposed to halt reindexing after first document
updateByQueryRequest.setBatchSize(1);
updateByQueryRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch taskFinished = new CountDownLatch(1);
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
taskFinished.countDown();
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
assertTrue(taskFinished.await(10, TimeUnit.SECONDS));
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}
public void testUpdateByQueryTask() throws Exception {
final String sourceIndex = "testupdatebyquerytask";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex).id("1")
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("3")
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// tag::submit-update_by_query-task
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(sourceIndex);
updateByQueryRequest.setScript(
new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
updateByQueryRequest.setRefresh(true);
TaskSubmissionResponse updateByQuerySubmission = highLevelClient()
.submitUpdateByQueryTask(updateByQueryRequest, RequestOptions.DEFAULT);
String taskId = updateByQuerySubmission.getTask();
// end::submit-update_by_query-task
assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
public void testUpdateByQueryConflict() throws IOException {
final String index = "testupdatebyqueryconflict";
final Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
final BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
putConflictPipeline();
final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
updateByQueryRequest.indices(index);
updateByQueryRequest.setRefresh(true);
updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID);
final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
assertThat(response.getVersionConflicts(), equalTo(1L));
assertThat(response.getSearchFailures(), empty());
assertThat(response.getBulkFailures(), hasSize(1));
assertThat(
response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toSet()),
everyItem(containsString("version conflict"))
);
assertThat(response.getTotal(), equalTo(2L));
assertThat(response.getCreated(), equalTo(0L));
assertThat(response.getUpdated(), equalTo(1L));
assertThat(response.getDeleted(), equalTo(0L));
assertThat(response.getNoops(), equalTo(0L));
assertThat(response.getBatches(), equalTo(1));
assertTrue(response.getTook().getMillis() > 0);
}
}