HLRC: reindex API with wait_for_completion false (#35202)

Extend High Level Rest Client Reindex API to support requests with
wait_for_completion=false. This method will return a TaskSubmissionResult with task identifier as string and results can be queried with Task API

refers: #27205
This commit is contained in:
Przemyslaw Gomulka 2018-11-08 08:19:27 +01:00 committed by GitHub
parent a6073f5130
commit a90ef6bd6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 400 additions and 192 deletions

View File

@ -486,9 +486,18 @@ final class RequestConverters {
}
static Request reindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, true);
}
static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
return prepareReindexRequest(reindexRequest, false);
}
private static Request prepareReindexRequest(ReindexRequest reindexRequest, boolean waitForCompletion) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withWaitForCompletion(waitForCompletion)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
@ -897,11 +906,8 @@ final class RequestConverters {
return this;
}
Params withWaitForCompletion(boolean waitForCompletion) {
if (waitForCompletion) {
return putParam("wait_for_completion", Boolean.TRUE.toString());
}
return this;
Params withWaitForCompletion(Boolean waitForCompletion) {
return putParam("wait_for_completion", waitForCompletion.toString());
}
Params withNodes(String[] nodes) {

View File

@ -60,6 +60,7 @@ import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.TermVectorsResponse;
import org.elasticsearch.client.core.TermVectorsRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
@ -476,6 +477,20 @@ public class RestHighLevelClient implements Closeable {
);
}
/**
* Submits a reindex task.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>
* @param reindexRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the submission response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final TaskSubmissionResponse submitReindexTask(ReindexRequest reindexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
reindexRequest, RequestConverters::submitReindex, options, TaskSubmissionResponse::fromXContent, emptySet()
);
}
/**
* Asynchronously executes a reindex request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html">Reindex API on elastic.co</a>

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.tasks;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
public class TaskSubmissionResponse extends ActionResponse {
private static final ParseField TASK = new ParseField("task");
public static final ConstructingObjectParser<TaskSubmissionResponse, Void> PARSER = new ConstructingObjectParser<>(
"task_submission_response",
true, a -> new TaskSubmissionResponse((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TASK);
}
private final String task;
TaskSubmissionResponse(String task) {
this.task = task;
}
/**
* Get the task id
*
* @return the id of the reindex task.
*/
public String getTask() {
return task;
}
@Override
public int hashCode() {
return Objects.hash(task);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
TaskSubmissionResponse that = (TaskSubmissionResponse) other;
return Objects.equals(task, that.task);
}
public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}

View File

@ -60,8 +60,6 @@ import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
@ -706,111 +704,6 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}
public void testReindex() throws Exception {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// test1: create one doc in dest
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test2: create 1 and update 1
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(2, bulkResponse.getTotal());
assertEquals(1, bulkResponse.getUpdated());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test reindex rethrottling
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
// this following settings are supposed to halt reindexing after first document
reindexRequest.setSourceBatchSize(1);
reindexRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
reindexTaskFinished.countDown();
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
TaskId taskIdToRethrottle = findTaskToRethrottle(ReindexAction.NAME);
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
reindexTaskFinished.await(2, TimeUnit.SECONDS);
// any rethrottling after the reindex is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}
private TaskId findTaskToRethrottle(String actionName) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();

View File

@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.function.BooleanSupplier;
public class ReindexIT extends ESRestHighLevelClientTestCase {
public void testReindex() throws IOException {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
// reindex one document with id 1 from source to destination
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync);
assertEquals(1, bulkResponse.getCreated());
assertEquals(1, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(1, bulkResponse.getBatches());
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
}
public void testReindexTask() throws IOException, InterruptedException {
final String sourceIndex = "source123";
final String destinationIndex = "dest2";
{
// Prepare
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
bulkRequest,
RequestOptions.DEFAULT
).status()
);
}
{
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type"));
reindexRequest.setRefresh(true);
TaskSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask());
awaitBusy(hasUpgradeCompleted);
}
}
private BooleanSupplier checkCompletionStatus(String taskId) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}

View File

@ -348,6 +348,7 @@ public class RequestConvertersTests extends ESTestCase {
setRandomTimeout(reindexRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
setRandomWaitForActiveShards(reindexRequest::setWaitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
expectedParams.put("scroll", reindexRequest.getScrollTime().getStringRep());
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
Request request = RequestConverters.reindex(reindexRequest);
assertEquals("/_reindex", request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());

View File

@ -83,6 +83,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
import org.hamcrest.Matchers;
import org.junit.Before;
import java.io.IOException;
@ -114,6 +115,8 @@ import static org.mockito.Mockito.when;
public class RestHighLevelClientTests extends ESTestCase {
private static final String SUBMIT_TASK_PREFIX = "submit_";
private static final String SUBMIT_TASK_SUFFIX = "_task";
private static final ProtocolVersion HTTP_PROTOCOL = new ProtocolVersion("http", 1, 1);
private static final RequestLine REQUEST_LINE = new BasicRequestLine(HttpGet.METHOD_NAME, "/", HTTP_PROTOCOL);
@ -728,47 +731,11 @@ public class RestHighLevelClientTests extends ESTestCase {
//we convert all the method names to snake case, hence we need to look for the '_async' suffix rather than 'Async'
if (apiName.endsWith("_async")) {
assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method",
methods.containsKey(apiName.substring(0, apiName.length() - 6)));
assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE));
assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length);
if (apiName.equals("security.authenticate_async") || apiName.equals("security.get_ssl_certificates_async")) {
assertEquals(2, method.getParameterTypes().length);
assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class));
assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class));
assertAsyncMethod(methods, method, apiName);
} else if (isSubmitTaskMethod(apiName)) {
assertSubmitTaskMethod(methods, method, apiName, restSpec);
} else {
assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length);
assertThat("the first parameter to async method [" + method + "] should be a request type",
method.getParameterTypes()[0].getSimpleName(), endsWith("Request"));
assertThat("the second parameter to async method [" + method + "] is the wrong type",
method.getParameterTypes()[1], equalTo(RequestOptions.class));
assertThat("the third parameter to async method [" + method + "] is the wrong type",
method.getParameterTypes()[2], equalTo(ActionListener.class));
}
} else {
//A few methods return a boolean rather than a response object
if (apiName.equals("ping") || apiName.contains("exist")) {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), equalTo("boolean"));
} else {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), endsWith("Response"));
}
assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length);
//a few methods don't accept a request object as argument
if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates")
|| apiName.equals("security.authenticate")) {
assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length);
assertThat("the parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[0], equalTo(RequestOptions.class));
} else {
assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length);
assertThat("the first parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[0].getSimpleName(), endsWith("Request"));
assertThat("the second parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[1], equalTo(RequestOptions.class));
}
assertSyncMethod(method, apiName);
boolean remove = apiSpec.remove(apiName);
if (remove == false) {
@ -804,6 +771,74 @@ public class RestHighLevelClientTests extends ESTestCase {
assertThat("Some API are not supported but they should be: " + apiSpec, apiSpec.size(), equalTo(0));
}
private void assertSyncMethod(Method method, String apiName) {
//A few methods return a boolean rather than a response object
if (apiName.equals("ping") || apiName.contains("exist")) {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), equalTo("boolean"));
} else {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), endsWith("Response"));
}
assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length);
//a few methods don't accept a request object as argument
if (apiName.equals("ping") || apiName.equals("info") || apiName.equals("security.get_ssl_certificates")
|| apiName.equals("security.authenticate")) {
assertEquals("incorrect number of arguments for method [" + method + "]", 1, method.getParameterTypes().length);
assertThat("the parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[0], equalTo(RequestOptions.class));
} else {
assertEquals("incorrect number of arguments for method [" + method + "]", 2, method.getParameterTypes().length);
assertThat("the first parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[0].getSimpleName(), endsWith("Request"));
assertThat("the second parameter to method [" + method + "] is the wrong type",
method.getParameterTypes()[1], equalTo(RequestOptions.class));
}
}
private void assertAsyncMethod(Map<String, Method> methods, Method method, String apiName) {
assertTrue("async method [" + method.getName() + "] doesn't have corresponding sync method",
methods.containsKey(apiName.substring(0, apiName.length() - 6)));
assertThat("async method [" + method + "] should return void", method.getReturnType(), equalTo(Void.TYPE));
assertEquals("async method [" + method + "] should not throw any exceptions", 0, method.getExceptionTypes().length);
if (apiName.equals("security.authenticate_async") || apiName.equals("security.get_ssl_certificates_async")) {
assertEquals(2, method.getParameterTypes().length);
assertThat(method.getParameterTypes()[0], equalTo(RequestOptions.class));
assertThat(method.getParameterTypes()[1], equalTo(ActionListener.class));
} else {
assertEquals("async method [" + method + "] has the wrong number of arguments", 3, method.getParameterTypes().length);
assertThat("the first parameter to async method [" + method + "] should be a request type",
method.getParameterTypes()[0].getSimpleName(), endsWith("Request"));
assertThat("the second parameter to async method [" + method + "] is the wrong type",
method.getParameterTypes()[1], equalTo(RequestOptions.class));
assertThat("the third parameter to async method [" + method + "] is the wrong type",
method.getParameterTypes()[2], equalTo(ActionListener.class));
}
}
private void assertSubmitTaskMethod(Map<String, Method> methods, Method method, String apiName, ClientYamlSuiteRestSpec restSpec) {
String methodName = extractMethodName(apiName);
assertTrue("submit task method [" + method.getName() + "] doesn't have corresponding sync method",
methods.containsKey(methodName));
assertEquals("submit task method [" + method + "] has the wrong number of arguments", 2, method.getParameterTypes().length);
assertThat("the first parameter to submit task method [" + method + "] is the wrong type",
method.getParameterTypes()[0].getSimpleName(), endsWith("Request"));
assertThat("the second parameter to submit task method [" + method + "] is the wrong type",
method.getParameterTypes()[1], equalTo(RequestOptions.class));
assertThat("submit task method [" + method + "] must have wait_for_completion parameter in rest spec",
restSpec.getApi(methodName).getParams(), Matchers.hasKey("wait_for_completion"));
}
private String extractMethodName(String apiName) {
return apiName.substring(SUBMIT_TASK_PREFIX.length(), apiName.length() - SUBMIT_TASK_SUFFIX.length());
}
private boolean isSubmitTaskMethod(String apiName) {
return apiName.startsWith(SUBMIT_TASK_PREFIX) && apiName.endsWith(SUBMIT_TASK_SUFFIX);
}
private static Stream<Tuple<String, Method>> getSubClientMethods(String namespace, Class<?> clientClass) {
return Arrays.stream(clientClass.getMethods()).filter(method -> method.getDeclaringClass().equals(clientClass))
.map(method -> Tuple.tuple(namespace + "." + toSnakeCase(method.getName()), method))

View File

@ -61,14 +61,14 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
RequestConvertersTests.setRandomLocal(getRepositoriesRequest, expectedParams);
if (randomBoolean()) {
String[] entries = new String[] { "a", "b", "c" };
String[] entries = new String[]{"a", "b", "c"};
getRepositoriesRequest.repositories(entries);
endpoint.append("/" + String.join(",", entries));
}
Request request = SnapshotRequestConverters.getRepositories(getRepositoriesRequest);
assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(expectedParams, equalTo(request.getParameters()));
}
@ -88,8 +88,8 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
.build());
Request request = SnapshotRequestConverters.createRepository(putRepositoryRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpPut.METHOD_NAME, equalTo(request.getMethod()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpPut.METHOD_NAME));
RequestConvertersTests.assertToXContentBody(putRepositoryRequest, request.getEntity());
}
@ -105,9 +105,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
RequestConvertersTests.setRandomTimeout(deleteRepositoryRequest::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
Request request = SnapshotRequestConverters.deleteRepository(deleteRepositoryRequest);
assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
assertThat(HttpDelete.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
assertNull(request.getEntity());
}
@ -121,9 +121,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
RequestConvertersTests.setRandomTimeout(verifyRepositoryRequest::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
Request request = SnapshotRequestConverters.verifyRepository(verifyRepositoryRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpPost.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
}
public void testCreateSnapshot() throws IOException {
@ -137,14 +137,12 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
Boolean waitForCompletion = randomBoolean();
createSnapshotRequest.waitForCompletion(waitForCompletion);
if (waitForCompletion) {
expectedParams.put("wait_for_completion", waitForCompletion.toString());
}
Request request = SnapshotRequestConverters.createSnapshot(createSnapshotRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpPut.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpPut.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
RequestConvertersTests.assertToXContentBody(createSnapshotRequest, request.getEntity());
}
@ -178,9 +176,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
}
Request request = SnapshotRequestConverters.getSnapshots(getSnapshotsRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
assertNull(request.getEntity());
}
@ -202,9 +200,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
expectedParams.put("verbose", Boolean.toString(verbose));
Request request = SnapshotRequestConverters.getSnapshots(getSnapshotsRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
assertNull(request.getEntity());
}
@ -239,10 +237,10 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repository, snapshot);
RequestConvertersTests.setRandomMasterTimeout(restoreSnapshotRequest, expectedParams);
if (randomBoolean()) {
restoreSnapshotRequest.waitForCompletion(true);
expectedParams.put("wait_for_completion", "true");
}
boolean waitForCompletion = randomBoolean();
restoreSnapshotRequest.waitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", Boolean.toString(waitForCompletion));
if (randomBoolean()) {
String timeout = randomTimeValue();
restoreSnapshotRequest.masterNodeTimeout(timeout);
@ -250,9 +248,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
}
Request request = SnapshotRequestConverters.restoreSnapshot(restoreSnapshotRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpPost.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
RequestConvertersTests.assertToXContentBody(restoreSnapshotRequest, request.getEntity());
}
@ -269,9 +267,9 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
RequestConvertersTests.setRandomMasterTimeout(deleteSnapshotRequest, expectedParams);
Request request = SnapshotRequestConverters.deleteSnapshot(deleteSnapshotRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpDelete.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(request.getEndpoint(), equalTo(endpoint));
assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME));
assertThat(request.getParameters(), equalTo(expectedParams));
assertNull(request.getEntity());
}
}

View File

@ -62,12 +62,10 @@ public class TasksRequestConvertersTests extends ESTestCase {
expectedParams.put("detailed", "true");
}
}
if (randomBoolean()) {
request.setWaitForCompletion(randomBoolean());
if (request.getWaitForCompletion()) {
expectedParams.put("wait_for_completion", "true");
}
}
expectedParams.put("wait_for_completion", Boolean.toString(request.getWaitForCompletion()));
if (randomBoolean()) {
String timeout = randomTimeValue();
request.setTimeout(timeout);

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.tasks;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class TaskSubmissionResponseTests extends ESTestCase {
public void testFromXContent() throws IOException {
xContentTester(
this::createParser,
this::createTestInstance,
this::toXContent,
TaskSubmissionResponse::fromXContent)
.supportsUnknownFields(true)
.test();
}
private void toXContent(TaskSubmissionResponse response, XContentBuilder xContentBuilder) throws IOException {
xContentBuilder.startObject();
xContentBuilder.field("task", response.getTask());
xContentBuilder.endObject();
}
private TaskSubmissionResponse createTestInstance() {
String taskId = randomAlphaOfLength(5) + ":" + randomLong();
return new TaskSubmissionResponse(taskId);
}
}