HLRC: migration api - upgrade (#34898)

Implement high level client for migration upgrade API. It should wrap
RestHighLevelClient and expose high level IndexUpgradeRequest (new),
IndexTaskResponse for submissions with wait_for_completion=false and
BulkByScrollResponse (already used) objects.

refers: #29827
This commit is contained in:
Przemyslaw Gomulka 2018-11-13 15:01:53 +01:00 committed by GitHub
parent 0487181d0f
commit fd4cd80496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 324 additions and 7 deletions

View File

@ -21,6 +21,11 @@ package org.elasticsearch.client;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import java.io.IOException;
import java.util.Collections;
@ -52,4 +57,19 @@ public final class MigrationClient {
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::getMigrationAssistance, options,
IndexUpgradeInfoResponse::fromXContent, Collections.emptySet());
}
public BulkByScrollResponse upgrade(IndexUpgradeRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::migrate, options,
BulkByScrollResponse::fromXContent, Collections.emptySet());
}
public TaskSubmissionResponse submitUpgradeTask(IndexUpgradeRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, MigrationRequestConverters::submitMigrateTask, options,
TaskSubmissionResponse::fromXContent, Collections.emptySet());
}
public void upgradeAsync(IndexUpgradeRequest request, RequestOptions options, ActionListener<BulkByScrollResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, MigrationRequestConverters::migrate, options,
BulkByScrollResponse::fromXContent, listener, Collections.emptySet());
}
}

View File

@ -20,11 +20,14 @@
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
final class MigrationRequestConverters {
private MigrationRequestConverters() {}
private MigrationRequestConverters() {
}
static Request getMigrationAssistance(IndexUpgradeInfoRequest indexUpgradeInfoRequest) {
RequestConverters.EndpointBuilder endpointBuilder = new RequestConverters.EndpointBuilder()
@ -36,4 +39,26 @@ final class MigrationRequestConverters {
parameters.withIndicesOptions(indexUpgradeInfoRequest.indicesOptions());
return request;
}
static Request migrate(IndexUpgradeRequest indexUpgradeRequest) {
return prepareMigrateRequest(indexUpgradeRequest, true);
}
static Request submitMigrateTask(IndexUpgradeRequest indexUpgradeRequest) {
return prepareMigrateRequest(indexUpgradeRequest, false);
}
private static Request prepareMigrateRequest(IndexUpgradeRequest indexUpgradeRequest, boolean waitForCompletion) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "migration", "upgrade")
.addPathPart(indexUpgradeRequest.index())
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request)
.withWaitForCompletion(waitForCompletion);
return request;
}
}

View File

@ -26,6 +26,10 @@ import org.elasticsearch.common.Strings;
import java.util.Arrays;
import java.util.Objects;
/**
* A request for retrieving upgrade information
* Part of Migration API
*/
public class IndexUpgradeInfoRequest extends TimedRequest implements IndicesRequest.Replaceable {
private String[] indices = Strings.EMPTY_ARRAY;

View File

@ -28,6 +28,9 @@ import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* Response object that contains information about indices to be upgraded
*/
public class IndexUpgradeInfoResponse {
private static final ParseField INDICES = new ParseField("indices");

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.migration;
import org.elasticsearch.client.Validatable;
import java.util.Objects;
/**
* A request for performing Upgrade on Index
* Part of Migration API
*/
public class IndexUpgradeRequest implements Validatable {
private String index;
public IndexUpgradeRequest(String index) {
this.index = index;
}
public String index() {
return index;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexUpgradeRequest request = (IndexUpgradeRequest) o;
return Objects.equals(index, request.index);
}
@Override
public int hashCode() {
return Objects.hash(index);
}
}

View File

@ -19,25 +19,73 @@
package org.elasticsearch.client;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.function.BooleanSupplier;
import static org.hamcrest.Matchers.containsString;
public class MigrationIT extends ESRestHighLevelClientTestCase {
public void testGetAssistance() throws IOException {
RestHighLevelClient client = highLevelClient();
{
IndexUpgradeInfoResponse response = client.migration().getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
IndexUpgradeInfoResponse response = highLevelClient().migration()
.getAssistance(new IndexUpgradeInfoRequest(), RequestOptions.DEFAULT);
assertEquals(0, response.getActions().size());
}
{
client.indices().create(new CreateIndexRequest("test"), RequestOptions.DEFAULT);
IndexUpgradeInfoResponse response = client.migration().getAssistance(
createIndex("test", Settings.EMPTY);
IndexUpgradeInfoResponse response = highLevelClient().migration().getAssistance(
new IndexUpgradeInfoRequest("test"), RequestOptions.DEFAULT);
assertEquals(0, response.getActions().size());
}
}
public void testUpgradeWhenIndexCannotBeUpgraded() throws IOException {
createIndex("test", Settings.EMPTY);
ThrowingRunnable execute = () -> execute(new IndexUpgradeRequest("test"),
highLevelClient().migration()::upgrade,
highLevelClient().migration()::upgradeAsync);
ElasticsearchStatusException responseException = expectThrows(ElasticsearchStatusException.class, execute);
assertThat(responseException.getDetailedMessage(), containsString("cannot be upgraded"));
}
public void testUpgradeWithTaskApi() throws IOException, InterruptedException {
createIndex("test", Settings.EMPTY);
IndexUpgradeRequest request = new IndexUpgradeRequest("test");
TaskSubmissionResponse upgrade = highLevelClient().migration()
.submitUpgradeTask(request, RequestOptions.DEFAULT);
assertNotNull(upgrade.getTask());
BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(upgrade);
awaitBusy(hasUpgradeCompleted);
}
/**
* Using low-level api as high-level-rest-client's getTaskById work is in progress.
* TODO revisit once that work is finished
*/
private BooleanSupplier checkCompletionStatus(TaskSubmissionResponse upgrade) {
return () -> {
try {
Response response = client().performRequest(new Request("GET", "/_tasks/" + upgrade.getTask()));
return (boolean) entityAsMap(response).get("completed");
} catch (IOException e) {
fail(e.getMessage());
return false;
}
};
}
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.client;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
@ -45,4 +47,20 @@ public class MigrationRequestConvertersTests extends ESTestCase {
assertNull(request.getEntity());
assertEquals(expectedParams, request.getParameters());
}
public void testUpgradeRequest() {
String[] indices = RequestConvertersTests.randomIndicesNames(1, 1);
IndexUpgradeRequest upgradeInfoRequest = new IndexUpgradeRequest(indices[0]);
String expectedEndpoint = "/_xpack/migration/upgrade/" + indices[0];
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("wait_for_completion", Boolean.TRUE.toString());
Request request = MigrationRequestConverters.migrate(upgradeInfoRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedEndpoint, request.getEndpoint());
assertNull(request.getEntity());
assertEquals(expectedParams, request.getParameters());
}
}

View File

@ -19,17 +19,30 @@
package org.elasticsearch.client.documentation;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.client.migration.IndexUpgradeInfoRequest;
import org.elasticsearch.client.migration.IndexUpgradeInfoResponse;
import org.elasticsearch.client.migration.IndexUpgradeRequest;
import org.elasticsearch.client.migration.UpgradeActionRequired;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.not;
/**
* This class is used to generate the Java Migration API documentation.
@ -80,4 +93,66 @@ public class MigrationClientDocumentationIT extends ESRestHighLevelClientTestCas
}
// end::get-assistance-response
}
public void testUpgrade() throws IOException {
RestHighLevelClient client = highLevelClient();
createIndex("test", Settings.EMPTY);
// tag::upgrade-request
IndexUpgradeRequest request = new IndexUpgradeRequest("test"); // <1>
// end::upgrade-request
try {
// tag::upgrade-execute
BulkByScrollResponse response = client.migration().upgrade(request, RequestOptions.DEFAULT);
// end::upgrade-execute
} catch (ElasticsearchStatusException e) {
assertThat(e.getMessage(), containsString("cannot be upgraded"));
}
}
public void testUpgradeAsync() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();
createIndex("test", Settings.EMPTY);
final CountDownLatch latch = new CountDownLatch(1);
// tag::upgrade-async-listener
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::upgrade-async-listener
listener = new LatchedActionListener<>(listener, latch);
// tag::upgrade-async-execute
client.migration().upgradeAsync(new IndexUpgradeRequest("test"), RequestOptions.DEFAULT, listener); // <1>
// end::upgrade-async-execute
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
public void testUpgradeWithTaskApi() throws IOException {
createIndex("test", Settings.EMPTY);
RestHighLevelClient client = highLevelClient();
// tag::upgrade-task-api
IndexUpgradeRequest request = new IndexUpgradeRequest("test");
TaskSubmissionResponse response = client.migration()
.submitUpgradeTask(request, RequestOptions.DEFAULT);
String taskId = response.getTask();
// end::upgrade-task-api
assertThat(taskId, not(isEmptyOrNullString()));
}
}

View File

@ -0,0 +1,69 @@
[[java-rest-high-migration-upgrade]]
=== Migration Upgrade
[[java-rest-high-migraton-upgrade-request]]
==== Index Upgrade Request
An `IndexUpgradeRequest` requires an index argument. Only one index at the time should be upgraded:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MigrationClientDocumentationIT.java[upgrade-request]
--------------------------------------------------
<1> Create a new request instance
[[java-rest-high-migration-upgrade-execution]]
==== Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MigrationClientDocumentationIT.java[upgrade-execute]
--------------------------------------------------
[[java-rest-high-migration-upgrade-response]]
==== Response
The returned `BulkByScrollResponse` contains information about the executed operation
[[java-rest-high-migraton-async-upgrade-request]]
==== Asynchronous Execution
The asynchronous execution of a upgrade request requires both the `IndexUpgradeRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
A typical listener for `BulkResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MigrationClientDocumentationIT.java[upgrade-async-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument and contains a list of individual results for each
operation that was executed. Note that one or more operations might have
failed while the others have been successfully executed.
<2> Called when the whole `IndexUpgradeRequest` fails. In this case the raised
exception is provided as an argument and no operation has been executed.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MigrationClientDocumentationIT.java[upgrade-async-execute]
--------------------------------------------------
<1> The `IndexUpgradeRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
=== Migration Upgrade with Task API
Submission of upgrade request task will requires the `IndexUpgradeRequest` and will return
`IndexUpgradeSubmissionResponse`. The `IndexUpgradeSubmissionResponse` can later be use to fetch
TaskId and query the Task API for results.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MigrationClientDocumentationIT.java[upgrade-task-api]
--------------------------------------------------

View File

@ -296,8 +296,10 @@ include::ml/put-filter.asciidoc[]
The Java High Level REST Client supports the following Migration APIs:
* <<java-rest-high-migration-get-assistance>>
* <<java-rest-high-migration-upgrade>>
include::migration/get-assistance.asciidoc[]
include::migration/upgrade.asciidoc[]
== Rollup APIs