mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
ignore 409 conflict in reindex responses (#39543)
The reindex family of APIs (reindex, update-by-query, delete-by-query) can sometimes return responses that have an error status code (409 Conflict in this case) but still have a body in the usual BulkByScrollResponse format. When the HLRC tries to handle such responses, it blows up because it tris to parse it expecting the error format that errors in general use. This change prompts the HLRC to parse the response using the expected BulkByScrollResponse format.
This commit is contained in:
parent
6563dc7ed9
commit
c0c6d702a2
@ -511,7 +511,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
*/
|
||||
public final BulkByScrollResponse reindex(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
|
||||
return performRequestAndParseEntity(
|
||||
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, emptySet()
|
||||
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
@ -537,7 +537,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
*/
|
||||
public final void reindexAsync(ReindexRequest reindexRequest, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
|
||||
performRequestAsyncAndParseEntity(
|
||||
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||
reindexRequest, RequestConverters::reindex, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
@ -551,7 +551,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
*/
|
||||
public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) throws IOException {
|
||||
return performRequestAndParseEntity(
|
||||
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
|
||||
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
@ -566,7 +566,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
performRequestAsyncAndParseEntity(
|
||||
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
@ -580,7 +580,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
*/
|
||||
public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException {
|
||||
return performRequestAndParseEntity(
|
||||
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
|
||||
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
@ -595,7 +595,7 @@ public class RestHighLevelClient implements Closeable {
|
||||
public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
|
||||
ActionListener<BulkByScrollResponse> listener) {
|
||||
performRequestAsyncAndParseEntity(
|
||||
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
|
||||
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, singleton(409)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -21,12 +21,8 @@ package org.elasticsearch.client;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
@ -39,7 +35,6 @@ import org.elasticsearch.action.get.MultiGetRequest;
|
||||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
@ -58,12 +53,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryAction;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.document.RestBulkAction;
|
||||
import org.elasticsearch.rest.action.document.RestDeleteAction;
|
||||
@ -74,8 +63,6 @@ import org.elasticsearch.rest.action.document.RestUpdateAction;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptType;
|
||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||
import org.elasticsearch.tasks.RawTaskStatus;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.format.DateTimeFormat;
|
||||
@ -85,18 +72,12 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||
|
||||
@ -857,230 +838,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
|
||||
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
|
||||
}
|
||||
|
||||
private TaskId findTaskToRethrottle(String actionName) throws IOException {
|
||||
long start = System.nanoTime();
|
||||
ListTasksRequest request = new ListTasksRequest();
|
||||
request.setActions(actionName);
|
||||
request.setDetailed(true);
|
||||
do {
|
||||
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
|
||||
list.rethrowFailures("Finding tasks to rethrottle");
|
||||
assertThat("tasks are left over from the last execution of this test",
|
||||
list.getTaskGroups(), hasSize(lessThan(2)));
|
||||
if (0 == list.getTaskGroups().size()) {
|
||||
// The parent task hasn't started yet
|
||||
continue;
|
||||
}
|
||||
TaskGroup taskGroup = list.getTaskGroups().get(0);
|
||||
assertThat(taskGroup.getChildTasks(), empty());
|
||||
return taskGroup.getTaskInfo().getTaskId();
|
||||
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
|
||||
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
|
||||
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
|
||||
}
|
||||
|
||||
public void testUpdateByQuery() throws Exception {
|
||||
final String sourceIndex = "source1";
|
||||
{
|
||||
// Prepare
|
||||
Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
highLevelClient().bulk(
|
||||
new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex).id("1")
|
||||
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("2")
|
||||
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
||||
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
|
||||
RequestOptions.DEFAULT
|
||||
).status()
|
||||
);
|
||||
}
|
||||
{
|
||||
// test1: create one doc in dest
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(1, bulkResponse.getTotal());
|
||||
assertEquals(1, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
}
|
||||
{
|
||||
// test2: update using script
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(2, bulkResponse.getTotal());
|
||||
assertEquals(2, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getDeleted());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
assertEquals(
|
||||
3,
|
||||
(int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
|
||||
.getSourceAsMap().get("foo"))
|
||||
);
|
||||
}
|
||||
{
|
||||
// test update-by-query rethrottling
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
|
||||
// this following settings are supposed to halt reindexing after first document
|
||||
updateByQueryRequest.setBatchSize(1);
|
||||
updateByQueryRequest.setRequestsPerSecond(0.00001f);
|
||||
final CountDownLatch taskFinished = new CountDownLatch(1);
|
||||
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse response) {
|
||||
taskFinished.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
|
||||
float requestsPerSecond = 1000f;
|
||||
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
||||
assertThat(response.getTasks(), hasSize(1));
|
||||
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
||||
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
||||
assertEquals(Float.toString(requestsPerSecond),
|
||||
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
||||
taskFinished.await(2, TimeUnit.SECONDS);
|
||||
|
||||
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
|
||||
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
||||
assertTrue(response.getTasks().isEmpty());
|
||||
assertFalse(response.getNodeFailures().isEmpty());
|
||||
assertEquals(1, response.getNodeFailures().size());
|
||||
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
||||
response.getNodeFailures().get(0).getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeleteByQuery() throws Exception {
|
||||
final String sourceIndex = "source1";
|
||||
{
|
||||
// Prepare
|
||||
Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
highLevelClient().bulk(
|
||||
new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex).id("1")
|
||||
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("2")
|
||||
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("3")
|
||||
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
|
||||
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
|
||||
RequestOptions.DEFAULT
|
||||
).status()
|
||||
);
|
||||
}
|
||||
{
|
||||
// test1: delete one doc
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
deleteByQueryRequest.indices(sourceIndex);
|
||||
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
deleteByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
|
||||
assertEquals(1, bulkResponse.getTotal());
|
||||
assertEquals(1, bulkResponse.getDeleted());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
assertEquals(
|
||||
2,
|
||||
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value
|
||||
);
|
||||
}
|
||||
{
|
||||
// test delete-by-query rethrottling
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
deleteByQueryRequest.indices(sourceIndex);
|
||||
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3"));
|
||||
deleteByQueryRequest.setRefresh(true);
|
||||
|
||||
// this following settings are supposed to halt reindexing after first document
|
||||
deleteByQueryRequest.setBatchSize(1);
|
||||
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
|
||||
final CountDownLatch taskFinished = new CountDownLatch(1);
|
||||
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse response) {
|
||||
taskFinished.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
|
||||
float requestsPerSecond = 1000f;
|
||||
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
||||
assertThat(response.getTasks(), hasSize(1));
|
||||
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
||||
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
||||
assertEquals(Float.toString(requestsPerSecond),
|
||||
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
||||
taskFinished.await(2, TimeUnit.SECONDS);
|
||||
|
||||
// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
|
||||
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
||||
assertTrue(response.getTasks().isEmpty());
|
||||
assertFalse(response.getNodeFailures().isEmpty());
|
||||
assertEquals(1, response.getNodeFailures().size());
|
||||
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
||||
response.getNodeFailures().get(0).getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testBulkProcessorIntegration() throws IOException {
|
||||
int nbItems = randomIntBetween(10, 100);
|
||||
boolean[] errors = new boolean[nbItems];
|
||||
|
@ -19,23 +19,54 @@
|
||||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryAction;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryAction;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.tasks.RawTaskStatus;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
public class ReindexIT extends ESRestHighLevelClientTestCase {
|
||||
|
||||
private static final String CONFLICT_PIPELINE_ID = "conflict_pipeline";
|
||||
|
||||
public void testReindex() throws IOException {
|
||||
final String sourceIndex = "source1";
|
||||
final String destinationIndex = "dest";
|
||||
@ -122,10 +153,338 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testReindexConflict() throws IOException {
|
||||
final String sourceIndex = "testreindexconflict_source";
|
||||
final String destIndex = "testreindexconflict_dest";
|
||||
|
||||
final Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
createIndex(destIndex, settings);
|
||||
final BulkRequest bulkRequest = new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
|
||||
|
||||
putConflictPipeline();
|
||||
|
||||
final ReindexRequest reindexRequest = new ReindexRequest();
|
||||
reindexRequest.setSourceIndices(sourceIndex);
|
||||
reindexRequest.setDestIndex(destIndex);
|
||||
reindexRequest.setRefresh(true);
|
||||
reindexRequest.setDestPipeline(CONFLICT_PIPELINE_ID);
|
||||
final BulkByScrollResponse response = highLevelClient().reindex(reindexRequest, RequestOptions.DEFAULT);
|
||||
|
||||
assertThat(response.getVersionConflicts(), equalTo(2L));
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), hasSize(2));
|
||||
assertThat(
|
||||
response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()),
|
||||
everyItem(containsString("version conflict"))
|
||||
);
|
||||
|
||||
assertThat(response.getTotal(), equalTo(2L));
|
||||
assertThat(response.getCreated(), equalTo(0L));
|
||||
assertThat(response.getUpdated(), equalTo(0L));
|
||||
assertThat(response.getDeleted(), equalTo(0L));
|
||||
assertThat(response.getNoops(), equalTo(0L));
|
||||
assertThat(response.getBatches(), equalTo(1));
|
||||
assertTrue(response.getTook().getMillis() > 0);
|
||||
}
|
||||
|
||||
public void testUpdateByQuery() throws Exception {
|
||||
final String sourceIndex = "source1";
|
||||
{
|
||||
// Prepare
|
||||
Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
highLevelClient().bulk(
|
||||
new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex).id("1")
|
||||
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("2")
|
||||
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
|
||||
RequestOptions.DEFAULT
|
||||
).status()
|
||||
);
|
||||
}
|
||||
{
|
||||
// test1: create one doc in dest
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(1, bulkResponse.getTotal());
|
||||
assertEquals(1, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
}
|
||||
{
|
||||
// test2: update using script
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setScript(new Script("if (ctx._source.foo == 2) ctx._source.foo++;"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(updateByQueryRequest, highLevelClient()::updateByQuery, highLevelClient()::updateByQueryAsync);
|
||||
assertEquals(2, bulkResponse.getTotal());
|
||||
assertEquals(2, bulkResponse.getUpdated());
|
||||
assertEquals(0, bulkResponse.getDeleted());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
assertEquals(
|
||||
3,
|
||||
(int) (highLevelClient().get(new GetRequest(sourceIndex, "2"), RequestOptions.DEFAULT)
|
||||
.getSourceAsMap().get("foo"))
|
||||
);
|
||||
}
|
||||
{
|
||||
// test update-by-query rethrottling
|
||||
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(sourceIndex);
|
||||
updateByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
|
||||
// this following settings are supposed to halt reindexing after first document
|
||||
updateByQueryRequest.setBatchSize(1);
|
||||
updateByQueryRequest.setRequestsPerSecond(0.00001f);
|
||||
final CountDownLatch taskFinished = new CountDownLatch(1);
|
||||
highLevelClient().updateByQueryAsync(updateByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse response) {
|
||||
taskFinished.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
|
||||
float requestsPerSecond = 1000f;
|
||||
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
||||
assertThat(response.getTasks(), hasSize(1));
|
||||
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
||||
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
||||
assertEquals(Float.toString(requestsPerSecond),
|
||||
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
||||
taskFinished.await(2, TimeUnit.SECONDS);
|
||||
|
||||
// any rethrottling after the update-by-query is done performed with the same taskId should result in a failure
|
||||
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
|
||||
assertTrue(response.getTasks().isEmpty());
|
||||
assertFalse(response.getNodeFailures().isEmpty());
|
||||
assertEquals(1, response.getNodeFailures().size());
|
||||
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
||||
response.getNodeFailures().get(0).getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateByQueryConflict() throws IOException {
|
||||
final String index = "testupdatebyqueryconflict";
|
||||
|
||||
final Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(index, settings);
|
||||
final BulkRequest bulkRequest = new BulkRequest()
|
||||
.add(new IndexRequest(index).id("1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
||||
.add(new IndexRequest(index).id("2").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
assertThat(highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status(), equalTo(RestStatus.OK));
|
||||
|
||||
putConflictPipeline();
|
||||
|
||||
final UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
|
||||
updateByQueryRequest.indices(index);
|
||||
updateByQueryRequest.setRefresh(true);
|
||||
updateByQueryRequest.setPipeline(CONFLICT_PIPELINE_ID);
|
||||
final BulkByScrollResponse response = highLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
|
||||
|
||||
assertThat(response.getVersionConflicts(), equalTo(1L));
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), hasSize(1));
|
||||
assertThat(
|
||||
response.getSearchFailures().stream().map(ScrollableHitSource.SearchFailure::toString).collect(Collectors.toSet()),
|
||||
everyItem(containsString("version conflict"))
|
||||
);
|
||||
|
||||
assertThat(response.getTotal(), equalTo(2L));
|
||||
assertThat(response.getCreated(), equalTo(0L));
|
||||
assertThat(response.getUpdated(), equalTo(1L));
|
||||
assertThat(response.getDeleted(), equalTo(0L));
|
||||
assertThat(response.getNoops(), equalTo(0L));
|
||||
assertThat(response.getBatches(), equalTo(1));
|
||||
assertTrue(response.getTook().getMillis() > 0);
|
||||
}
|
||||
|
||||
public void testDeleteByQuery() throws Exception {
|
||||
final String sourceIndex = "source1";
|
||||
{
|
||||
// Prepare
|
||||
Settings settings = Settings.builder()
|
||||
.put("number_of_shards", 1)
|
||||
.put("number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(sourceIndex, settings);
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
highLevelClient().bulk(
|
||||
new BulkRequest()
|
||||
.add(new IndexRequest(sourceIndex).id("1")
|
||||
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("2")
|
||||
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
|
||||
.add(new IndexRequest(sourceIndex).id("3")
|
||||
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
|
||||
RequestOptions.DEFAULT
|
||||
).status()
|
||||
);
|
||||
}
|
||||
{
|
||||
// test1: delete one doc
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
deleteByQueryRequest.indices(sourceIndex);
|
||||
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
|
||||
deleteByQueryRequest.setRefresh(true);
|
||||
BulkByScrollResponse bulkResponse =
|
||||
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
|
||||
assertEquals(1, bulkResponse.getTotal());
|
||||
assertEquals(1, bulkResponse.getDeleted());
|
||||
assertEquals(0, bulkResponse.getNoops());
|
||||
assertEquals(0, bulkResponse.getVersionConflicts());
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertTrue(bulkResponse.getTook().getMillis() > 0);
|
||||
assertEquals(1, bulkResponse.getBatches());
|
||||
assertEquals(0, bulkResponse.getBulkFailures().size());
|
||||
assertEquals(0, bulkResponse.getSearchFailures().size());
|
||||
assertEquals(
|
||||
2,
|
||||
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().getTotalHits().value
|
||||
);
|
||||
}
|
||||
{
|
||||
// test delete-by-query rethrottling
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
|
||||
deleteByQueryRequest.indices(sourceIndex);
|
||||
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("2", "3"));
|
||||
deleteByQueryRequest.setRefresh(true);
|
||||
|
||||
// this following settings are supposed to halt reindexing after first document
|
||||
deleteByQueryRequest.setBatchSize(1);
|
||||
deleteByQueryRequest.setRequestsPerSecond(0.00001f);
|
||||
final CountDownLatch taskFinished = new CountDownLatch(1);
|
||||
highLevelClient().deleteByQueryAsync(deleteByQueryRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(BulkByScrollResponse response) {
|
||||
taskFinished.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
});
|
||||
|
||||
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
|
||||
float requestsPerSecond = 1000f;
|
||||
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
||||
assertThat(response.getTasks(), hasSize(1));
|
||||
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
|
||||
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
|
||||
assertEquals(Float.toString(requestsPerSecond),
|
||||
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
|
||||
taskFinished.await(2, TimeUnit.SECONDS);
|
||||
|
||||
// any rethrottling after the delete-by-query is done performed with the same taskId should result in a failure
|
||||
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
|
||||
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
|
||||
assertTrue(response.getTasks().isEmpty());
|
||||
assertFalse(response.getNodeFailures().isEmpty());
|
||||
assertEquals(1, response.getNodeFailures().size());
|
||||
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
|
||||
response.getNodeFailures().get(0).getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static TaskId findTaskToRethrottle(String actionName) throws IOException {
|
||||
long start = System.nanoTime();
|
||||
ListTasksRequest request = new ListTasksRequest();
|
||||
request.setActions(actionName);
|
||||
request.setDetailed(true);
|
||||
do {
|
||||
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
|
||||
list.rethrowFailures("Finding tasks to rethrottle");
|
||||
assertThat("tasks are left over from the last execution of this test",
|
||||
list.getTaskGroups(), hasSize(lessThan(2)));
|
||||
if (0 == list.getTaskGroups().size()) {
|
||||
// The parent task hasn't started yet
|
||||
continue;
|
||||
}
|
||||
TaskGroup taskGroup = list.getTaskGroups().get(0);
|
||||
assertThat(taskGroup.getChildTasks(), empty());
|
||||
return taskGroup.getTaskInfo().getTaskId();
|
||||
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
|
||||
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
|
||||
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
|
||||
}
|
||||
|
||||
static CheckedRunnable<Exception> checkCompletionStatus(RestClient client, String taskId) {
|
||||
return () -> {
|
||||
Response response = client.performRequest(new Request("GET", "/_tasks/" + taskId));
|
||||
assertTrue((boolean) entityAsMap(response).get("completed"));
|
||||
};
|
||||
}
|
||||
|
||||
private void putConflictPipeline() throws IOException {
|
||||
final XContentBuilder pipelineBuilder = jsonBuilder()
|
||||
.startObject()
|
||||
.startArray("processors")
|
||||
.startObject()
|
||||
.startObject("set")
|
||||
.field("field", "_version")
|
||||
.field("value", 1)
|
||||
.endObject()
|
||||
.endObject()
|
||||
.startObject()
|
||||
.startObject("set")
|
||||
.field("field", "_id")
|
||||
.field("value", "1")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endArray()
|
||||
.endObject();
|
||||
final PutPipelineRequest putPipelineRequest = new PutPipelineRequest(CONFLICT_PIPELINE_ID, BytesReference.bytes(pipelineBuilder),
|
||||
pipelineBuilder.contentType());
|
||||
assertTrue(highLevelClient().ingest().putPipeline(putPipelineRequest, RequestOptions.DEFAULT).isAcknowledged());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user