Add submitDeleteByQueryTask method to RestHighLevelClient (#46833)

The HLRC has a method for reindex, that allows to trigger an async reindex by running RestHighLevelClient.submitReindexTask and RestHighLevelClient.reindex. The delete by query however only has an RestHighLevelClient.deleteByQuery method (and its async counterpart), but no RestHighLevelClient.submitDeleteByQueryTask. So add RestHighLevelClient.submitDeleteByQueryTask

Closes #46395
This commit is contained in:
maidoo 2019-09-24 16:01:07 +08:00 committed by Martijn van Groningen
parent 6986d7f968
commit 618efcfcf9
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
4 changed files with 92 additions and 25 deletions

View File

@ -553,6 +553,10 @@ final class RequestConverters {
return prepareReindexRequest(reindexRequest, false);
}
static Request submitDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
return prepareDeleteByQueryRequest(deleteByQueryRequest, false);
}
private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
@ -572,6 +576,36 @@ final class RequestConverters {
return request;
}
private static Request prepareDeleteByQueryRequest(DeleteByQueryRequest deleteByQueryRequest,
boolean waitForCompletion) throws IOException {
String endpoint =
endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
.withRouting(deleteByQueryRequest.getRouting())
.withRefresh(deleteByQueryRequest.isRefresh())
.withTimeout(deleteByQueryRequest.getTimeout())
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(deleteByQueryRequest.indicesOptions())
.withWaitForCompletion(waitForCompletion);
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
}
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
}
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
}
if (deleteByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws IOException {
String endpoint =
endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
@ -602,31 +636,7 @@ final class RequestConverters {
}
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
String endpoint =
endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query");
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params()
.withRouting(deleteByQueryRequest.getRouting())
.withRefresh(deleteByQueryRequest.isRefresh())
.withTimeout(deleteByQueryRequest.getTimeout())
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(deleteByQueryRequest.indicesOptions());
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
}
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
}
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
}
if (deleteByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
return prepareDeleteByQueryRequest(deleteByQueryRequest, true);
}
static Request rethrottleReindex(RethrottleRequest rethrottleRequest) {

View File

@ -590,6 +590,21 @@ public class RestHighLevelClient implements Closeable {
);
}
/**
* Submits a delete by query task
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param deleteByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
*/
public final TaskSubmissionResponse submitDeleteByQueryTask(DeleteByQueryRequest deleteByQueryRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deleteByQueryRequest, RequestConverters::submitDeleteByQuery, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}
/**
* Asynchronously executes a delete by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">

View File

@ -436,6 +436,47 @@ public class ReindexIT extends ESRestHighLevelClientTestCase {
}
}
public void testDeleteByQueryTask() throws Exception {
final String sourceIndex = "source456";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex).id("1")
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("2")
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
.add(new IndexRequest(sourceIndex).id("3")
.source(Collections.singletonMap("foo", 3), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
RequestOptions.DEFAULT
).status()
);
}
{
// tag::submit-delete_by_query-task
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.indices(sourceIndex);
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1"));
deleteByQueryRequest.setRefresh(true);
TaskSubmissionResponse deleteByQuerySubmission = highLevelClient()
.submitDeleteByQueryTask(deleteByQueryRequest, RequestOptions.DEFAULT);
String taskId = deleteByQuerySubmission.getTask();
// end::submit-delete_by_query-task
assertBusy(checkCompletionStatus(client(), taskId));
}
}
private static TaskId findTaskToRethrottle(String actionName) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();

View File

@ -582,6 +582,7 @@ public class RequestConvertersTests extends ESTestCase {
}
setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
StringJoiner joiner = new StringJoiner("/", "/", "");
joiner.add(String.join(",", deleteByQueryRequest.indices()));