REST high-level client: add force merge API (#28896)

Relates to #27205
This commit is contained in:
Yu 2018-03-22 17:17:16 +01:00 committed by Luca Cavanna
parent 98f89c3952
commit 24c8d8f5ef
10 changed files with 341 additions and 5 deletions

View File

@ -34,15 +34,17 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
@ -261,6 +263,28 @@ public final class IndicesClient {
listener, emptySet(), headers);
}
/**
* Force merge one or more indices using the Force Merge API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html">
* Force Merge API on elastic.co</a>
*/
public ForceMergeResponse forceMerge(ForceMergeRequest forceMergeRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(forceMergeRequest, Request::forceMerge, ForceMergeResponse::fromXContent,
emptySet(), headers);
}
/**
* Asynchronously force merge one or more indices using the Force Merge API
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html">
* Force Merge API on elastic.co</a>
*/
public void forceMergeAsync(ForceMergeRequest forceMergeRequest, ActionListener<ForceMergeResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(forceMergeRequest, Request::forceMerge, ForceMergeResponse::fromXContent,
listener, emptySet(), headers);
}
/**
* Clears the cache of one or more indices using the Clear Cache API
* <p>

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@ -233,6 +234,17 @@ public final class Request {
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
}
static Request forceMerge(ForceMergeRequest forceMergeRequest) {
String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices();
String endpoint = endpoint(indices, "_forcemerge");
Params parameters = Params.builder();
parameters.withIndicesOptions(forceMergeRequest.indicesOptions());
parameters.putParam("max_num_segments", Integer.toString(forceMergeRequest.maxNumSegments()));
parameters.putParam("only_expunge_deletes", Boolean.toString(forceMergeRequest.onlyExpungeDeletes()));
parameters.putParam("flush", Boolean.toString(forceMergeRequest.flush()));
return new Request(HttpPost.METHOD_NAME, endpoint, parameters.getParams(), null);
}
static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) {
String[] indices = clearIndicesCacheRequest.indices() == null ? Strings.EMPTY_ARRAY :clearIndicesCacheRequest.indices();
String endpoint = endpoint(indices, "_cache/clear");

View File

@ -38,6 +38,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@ -467,6 +469,32 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
}
}
public void testForceMerge() throws IOException {
{
String index = "index";
Settings settings = Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.build();
createIndex(index, settings);
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(index);
ForceMergeResponse forceMergeResponse =
execute(forceMergeRequest, highLevelClient().indices()::forceMerge, highLevelClient().indices()::forceMergeAsync);
assertThat(forceMergeResponse.getTotalShards(), equalTo(1));
assertThat(forceMergeResponse.getSuccessfulShards(), equalTo(1));
assertThat(forceMergeResponse.getFailedShards(), equalTo(0));
assertThat(forceMergeResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
ForceMergeRequest forceMergeRequest = new ForceMergeRequest(nonExistentIndex);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(forceMergeRequest, highLevelClient().indices()::forceMerge, highLevelClient().indices()::forceMergeAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}
public void testExistsAlias() throws IOException {
GetAliasesRequest getAliasesRequest = new GetAliasesRequest("alias");
assertFalse(execute(getAliasesRequest, highLevelClient().indices()::existsAlias, highLevelClient().indices()::existsAliasAsync));

View File

@ -40,6 +40,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@ -621,6 +622,43 @@ public class RequestTests extends ESTestCase {
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}
public void testForceMerge() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
ForceMergeRequest forceMergeRequest;
if (randomBoolean()) {
forceMergeRequest = new ForceMergeRequest(indices);
} else {
forceMergeRequest = new ForceMergeRequest();
forceMergeRequest.indices(indices);
}
Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(forceMergeRequest::indicesOptions, forceMergeRequest::indicesOptions, expectedParams);
if (randomBoolean()) {
forceMergeRequest.maxNumSegments(randomInt());
}
expectedParams.put("max_num_segments", Integer.toString(forceMergeRequest.maxNumSegments()));
if (randomBoolean()) {
forceMergeRequest.onlyExpungeDeletes(randomBoolean());
}
expectedParams.put("only_expunge_deletes", Boolean.toString(forceMergeRequest.onlyExpungeDeletes()));
if (randomBoolean()) {
forceMergeRequest.flush(randomBoolean());
}
expectedParams.put("flush", Boolean.toString(forceMergeRequest.flush()));
Request request = Request.forceMerge(forceMergeRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
if (indices != null && indices.length > 0) {
endpoint.add(String.join(",", indices));
}
endpoint.add("_forcemerge");
assertThat(request.getEndpoint(), equalTo(endpoint.toString()));
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getEntity(), nullValue());
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
}
public void testClearCache() {
String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);
ClearIndicesCacheRequest clearIndicesCacheRequest;

View File

@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
@ -771,6 +773,79 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
}
}
public void testForceMergeIndex() throws Exception {
RestHighLevelClient client = highLevelClient();
{
createIndex("index", Settings.EMPTY);
}
{
// tag::force-merge-request
ForceMergeRequest request = new ForceMergeRequest("index1"); // <1>
ForceMergeRequest requestMultiple = new ForceMergeRequest("index1", "index2"); // <2>
ForceMergeRequest requestAll = new ForceMergeRequest(); // <3>
// end::force-merge-request
// tag::force-merge-request-indicesOptions
request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
// end::force-merge-request-indicesOptions
// tag::force-merge-request-segments-num
request.maxNumSegments(1); // <1>
// end::force-merge-request-segments-num
// tag::force-merge-request-only-expunge-deletes
request.onlyExpungeDeletes(true); // <1>
// end::force-merge-request-only-expunge-deletes
// tag::force-merge-request-flush
request.flush(true); // <1>
// end::force-merge-request-flush
// tag::force-merge-execute
ForceMergeResponse forceMergeResponse = client.indices().forceMerge(request);
// end::force-merge-execute
// tag::force-merge-response
int totalShards = forceMergeResponse.getTotalShards(); // <1>
int successfulShards = forceMergeResponse.getSuccessfulShards(); // <2>
int failedShards = forceMergeResponse.getFailedShards(); // <3>
DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures(); // <4>
// end::force-merge-response
// tag::force-merge-execute-listener
ActionListener<ForceMergeResponse> listener = new ActionListener<ForceMergeResponse>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::force-merge-execute-listener
// tag::force-merge-execute-async
client.indices().forceMergeAsync(request, listener); // <1>
// end::force-merge-execute-async
}
{
// tag::force-merge-notfound
try {
ForceMergeRequest request = new ForceMergeRequest("does_not_exist");
client.indices().forceMerge(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.NOT_FOUND) {
// <1>
}
}
// end::force-merge-notfound
}
}
public void testClearCache() throws Exception {
RestHighLevelClient client = highLevelClient();
@ -855,7 +930,6 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
}
}
public void testCloseIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

View File

@ -0,0 +1,102 @@
[[java-rest-high-force-merge]]
=== Force Merge API
[[java-rest-high-force-merge-request]]
==== Force merge Request
A `ForceMergeRequest` can be applied to one or more indices, or even on `_all` the indices:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-request]
--------------------------------------------------
<1> Force merge one index
<2> Force merge multiple indices
<3> Force merge all the indices
==== Optional arguments
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-request-indicesOptions]
--------------------------------------------------
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
how wildcard expressions are expanded
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-request-segments-num]
--------------------------------------------------
<1> Set `max_num_segments` to control the number of segments to merge down to.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-request-only-expunge-deletes]
--------------------------------------------------
<1> Set the `only_expunge_deletes` flag to `true`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-request-flush]
--------------------------------------------------
<1> Set the `flush` flag to `true`
[[java-rest-high-force-merge-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-execute]
--------------------------------------------------
[[java-rest-high-force-merge-async]]
==== Asynchronous Execution
The asynchronous execution of a force merge request requires both the `ForceMergeRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-execute-async]
--------------------------------------------------
<1> The `ForceMergeRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `ForceMergeResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-force-merge-response]]
==== Force Merge Response
The returned `ForceMergeResponse` allows to retrieve information about the
executed operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-response]
--------------------------------------------------
<1> Total number of shards hit by the force merge request
<2> Number of shards where the force merge has succeeded
<3> Number of shards where the force merge has failed
<4> A list of failures if the operation failed on one or more shards
By default, if the indices were not found, an `ElasticsearchException` will be thrown:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[force-merge-notfound]
--------------------------------------------------
<1> Do something if the indices to be force merged were not found

View File

@ -60,6 +60,7 @@ Index Management::
* <<java-rest-high-refresh>>
* <<java-rest-high-flush>>
* <<java-rest-high-clear-cache>>
* <<java-rest-high-force-merge>>
* <<java-rest-high-rollover-index>>
Mapping Management::
@ -79,6 +80,7 @@ include::indices/split_index.asciidoc[]
include::indices/refresh.asciidoc[]
include::indices/flush.asciidoc[]
include::indices/clear_cache.asciidoc[]
include::indices/force_merge.asciidoc[]
include::indices/rollover.asciidoc[]
include::indices/put_mapping.asciidoc[]
include::indices/update_aliases.asciidoc[]

View File

@ -21,7 +21,10 @@ package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.Arrays;
import java.util.List;
/**
@ -29,10 +32,25 @@ import java.util.List;
*/
public class ForceMergeResponse extends BroadcastResponse {
private static final ConstructingObjectParser<ForceMergeResponse, Void> PARSER = new ConstructingObjectParser<>("force_merge",
true, arg -> {
BroadcastResponse response = (BroadcastResponse) arg[0];
return new ForceMergeResponse(response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(),
Arrays.asList(response.getShardFailures()));
});
static {
declareBroadcastFields(PARSER);
}
ForceMergeResponse() {
}
ForceMergeResponse(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
public static ForceMergeResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -37,7 +37,6 @@ import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.action.RestActions.buildBroadcastShardsHeader;
public class RestForceMergeAction extends BaseRestHandler {
public RestForceMergeAction(Settings settings, RestController controller) {
@ -62,7 +61,7 @@ public class RestForceMergeAction extends BaseRestHandler {
@Override
public RestResponse buildResponse(ForceMergeResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
buildBroadcastShardsHeader(builder, request, response);
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(OK, builder);
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.forcemerge;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.AbstractBroadcastResponseTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import java.util.List;
public class ForceMergeResponseTests extends AbstractBroadcastResponseTestCase<ForceMergeResponse> {
@Override
protected ForceMergeResponse createTestInstance(int totalShards, int successfulShards, int failedShards,
List<DefaultShardOperationFailedException> failures) {
return new ForceMergeResponse(totalShards, successfulShards, failedShards, failures);
}
@Override
protected ForceMergeResponse doParseInstance(XContentParser parser) {
return ForceMergeResponse.fromXContent(parser);
}
}