From 2c27c58718f995793883a002e5e9ddcaae124cb7 Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Wed, 30 May 2018 13:32:52 +0200 Subject: [PATCH] REST high-level client: add synced flush API (2) (#30650) Adds the synced flush API to the high level REST client. Relates to #27205. --- .../elasticsearch/client/IndicesClient.java | 23 ++ .../client/RequestConverters.java | 9 + .../client/SyncedFlushResponse.java | 344 ++++++++++++++++++ .../elasticsearch/client/IndicesClientIT.java | 34 ++ .../client/RequestConvertersTests.java | 24 ++ .../client/SyncedFlushResponseTests.java | 269 ++++++++++++++ .../IndicesClientDocumentationIT.java | 89 ++++- .../high-level/indices/flush_synced.asciidoc | 91 +++++ .../high-level/supported-apis.asciidoc | 2 + 9 files changed, 881 insertions(+), 4 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java create mode 100644 docs/java-rest/high-level/indices/flush_synced.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 5aa64a5c137..b08b045d287 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -269,6 +270,28 @@ public final class IndicesClient { listener, emptySet(), headers); } + /** Initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced, + SyncedFlushResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { + restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced, + SyncedFlushResponse::fromXContent, listener, emptySet(), headers); + } + + /** * Retrieve the settings of one or more indices *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 6126d59b16a..1f542736d7d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -211,6 +212,14 @@ final class RequestConverters { return request; } + static Request flushSynced(SyncedFlushRequest syncedFlushRequest) { + String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced")); + Params parameters = new Params(request); + parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); + return request; + } + static Request forceMerge(ForceMergeRequest forceMergeRequest) { String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices(); Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge")); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java new file mode 100644 index 00000000000..53f3f3358ba --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java @@ -0,0 +1,344 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; + +public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment { + + public static final String SHARDS_FIELD = "_shards"; + + private ShardCounts totalCounts; + private Map indexResults; + + SyncedFlushResponse(ShardCounts totalCounts, Map indexResults) { + this.totalCounts = new ShardCounts(totalCounts.total, totalCounts.successful, totalCounts.failed); + this.indexResults = Collections.unmodifiableMap(indexResults); + } + + /** + * @return The total number of shard copies that were processed across all indexes + */ + public int totalShards() { + return totalCounts.total; + } + + /** + * @return The number of successful shard copies that were processed across all indexes + */ + public int successfulShards() { + return totalCounts.successful; + } + + /** + * @return The number of failed shard copies that were processed across all indexes + */ + public int failedShards() { + return totalCounts.failed; + } + + /** + * @return A map of results for each index where the keys of the map are the index names + * and the values are the results encapsulated in {@link IndexResult}. + */ + public Map getIndexResults() { + return indexResults; + } + + ShardCounts getShardCounts() { + return totalCounts; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(SHARDS_FIELD); + totalCounts.toXContent(builder, params); + builder.endObject(); + for (Map.Entry entry: indexResults.entrySet()) { + String indexName = entry.getKey(); + IndexResult indexResult = entry.getValue(); + builder.startObject(indexName); + indexResult.toXContent(builder, params); + builder.endObject(); + } + return builder; + } + + public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + ShardCounts totalCounts = null; + Map indexResults = new HashMap<>(); + XContentLocation startLoc = parser.getTokenLocation(); + while (parser.nextToken().equals(Token.FIELD_NAME)) { + if (parser.currentName().equals(SHARDS_FIELD)) { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + totalCounts = ShardCounts.fromXContent(parser); + } else { + String indexName = parser.currentName(); + IndexResult indexResult = IndexResult.fromXContent(parser); + indexResults.put(indexName, indexResult); + } + } + if (totalCounts != null) { + return new SyncedFlushResponse(totalCounts, indexResults); + } else { + throw new ParsingException( + startLoc, + "Unable to reconstruct object. Total counts for shards couldn't be parsed." + ); + } + } + + /** + * Encapsulates the number of total successful and failed shard copies + */ + public static final class ShardCounts implements ToXContentFragment { + + public static final String TOTAL_FIELD = "total"; + public static final String SUCCESSFUL_FIELD = "successful"; + public static final String FAILED_FIELD = "failed"; + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "shardcounts", + a -> new ShardCounts((Integer) a[0], (Integer) a[1], (Integer) a[2]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD)); + } + + private int total; + private int successful; + private int failed; + + + ShardCounts(int total, int successful, int failed) { + this.total = total; + this.successful = successful; + this.failed = failed; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TOTAL_FIELD, total); + builder.field(SUCCESSFUL_FIELD, successful); + builder.field(FAILED_FIELD, failed); + return builder; + } + + public static ShardCounts fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public boolean equals(ShardCounts other) { + if (other != null) { + return + other.total == this.total && + other.successful == this.successful && + other.failed == this.failed; + } else { + return false; + } + } + + } + + /** + * Description for the flush/synced results for a particular index. + * This includes total, successful and failed copies along with failure description for each failed copy. + */ + public static final class IndexResult implements ToXContentFragment { + + public static final String TOTAL_FIELD = "total"; + public static final String SUCCESSFUL_FIELD = "successful"; + public static final String FAILED_FIELD = "failed"; + public static final String FAILURES_FIELD = "failures"; + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "indexresult", + a -> new IndexResult((Integer) a[0], (Integer) a[1], (Integer) a[2], (List)a[3]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD)); + PARSER.declareObjectArray(optionalConstructorArg(), ShardFailure.PARSER, new ParseField(FAILURES_FIELD)); + } + + private ShardCounts counts; + private List failures; + + IndexResult(int total, int successful, int failed, List failures) { + counts = new ShardCounts(total, successful, failed); + if (failures != null) { + this.failures = Collections.unmodifiableList(failures); + } else { + this.failures = Collections.unmodifiableList(new ArrayList<>()); + } + } + + /** + * @return The total number of shard copies that were processed for this index. + */ + public int totalShards() { + return counts.total; + } + + /** + * @return The number of successful shard copies that were processed for this index. + */ + public int successfulShards() { + return counts.successful; + } + + /** + * @return The number of failed shard copies that were processed for this index. + */ + public int failedShards() { + return counts.failed; + } + + /** + * @return A list of {@link ShardFailure} objects that describe each of the failed shard copies for this index. + */ + public List failures() { + return failures; + } + + ShardCounts getShardCounts() { + return counts; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + counts.toXContent(builder, params); + if (failures.size() > 0) { + builder.startArray(FAILURES_FIELD); + for (ShardFailure failure : failures) { + failure.toXContent(builder, params); + } + builder.endArray(); + } + return builder; + } + + public static IndexResult fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } + + /** + * Description of a failed shard copy for an index. + */ + public static final class ShardFailure implements ToXContentFragment { + + public static String SHARD_ID_FIELD = "shard"; + public static String FAILURE_REASON_FIELD = "reason"; + public static String ROUTING_FIELD = "routing"; + + private int shardId; + private String failureReason; + private Map routing; + + @SuppressWarnings("unchecked") + static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "shardfailure", + a -> new ShardFailure((Integer)a[0], (String)a[1], (Map)a[2]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(SHARD_ID_FIELD)); + PARSER.declareString(constructorArg(), new ParseField(FAILURE_REASON_FIELD)); + PARSER.declareObject( + optionalConstructorArg(), + (parser, c) -> parser.map(), + new ParseField(ROUTING_FIELD) + ); + } + + ShardFailure(int shardId, String failureReason, Map routing) { + this.shardId = shardId; + this.failureReason = failureReason; + if (routing != null) { + this.routing = Collections.unmodifiableMap(routing); + } else { + this.routing = Collections.unmodifiableMap(new HashMap<>()); + } + } + + /** + * @return Id of the shard whose copy failed + */ + public int getShardId() { + return shardId; + } + + /** + * @return Reason for failure of the shard copy + */ + public String getFailureReason() { + return failureReason; + } + + /** + * @return Additional information about the failure. + */ + public Map getRouting() { + return routing; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SHARD_ID_FIELD, shardId); + builder.field(FAILURE_REASON_FIELD, failureReason); + if (routing.size() > 0) { + builder.field(ROUTING_FIELD, routing); + } + builder.endObject(); + return builder; + } + + public static ShardFailure fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index 88e4a256815..448ff0138d3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -563,6 +564,39 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase { } } + public void testSyncedFlush() throws IOException { + { + String index = "index"; + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index); + SyncedFlushResponse flushResponse = + execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync); + assertThat(flushResponse.totalShards(), equalTo(1)); + assertThat(flushResponse.successfulShards(), equalTo(1)); + assertThat(flushResponse.failedShards(), equalTo(0)); + } + { + String nonExistentIndex = "non_existent_index"; + assertFalse(indexExists(nonExistentIndex)); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex); + ElasticsearchException exception = expectThrows( + ElasticsearchException.class, + () -> + execute( + syncedFlushRequest, + highLevelClient().indices()::flushSynced, + highLevelClient().indices()::flushSyncedAsync + ) + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); + } + } + + public void testClearCache() throws IOException { { String index = "index"; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 1573071da33..0f0c1f27588 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -645,6 +646,29 @@ public class RequestConvertersTests extends ESTestCase { assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); } + public void testSyncedFlush() { + String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); + SyncedFlushRequest syncedFlushRequest; + if (randomBoolean()) { + syncedFlushRequest = new SyncedFlushRequest(indices); + } else { + syncedFlushRequest = new SyncedFlushRequest(); + syncedFlushRequest.indices(indices); + } + Map expectedParams = new HashMap<>(); + setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); + Request request = RequestConverters.flushSynced(syncedFlushRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + if (indices != null && indices.length > 0) { + endpoint.add(String.join(",", indices)); + } + endpoint.add("_flush/synced"); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getParameters(), equalTo(expectedParams)); + assertThat(request.getEntity(), nullValue()); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + } + public void testForceMerge() { String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); ForceMergeRequest forceMergeRequest; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java new file mode 100644 index 00000000000..bc8fc90dd75 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java @@ -0,0 +1,269 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; +import org.elasticsearch.test.ESTestCase; + +public class SyncedFlushResponseTests extends ESTestCase { + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + TestPlan plan = createTestPlan(); + + XContentBuilder serverResponsebuilder = XContentBuilder.builder(xContentType.xContent()); + assertNotNull(plan.result); + serverResponsebuilder.startObject(); + plan.result.toXContent(serverResponsebuilder, ToXContent.EMPTY_PARAMS); + serverResponsebuilder.endObject(); + XContentBuilder clientResponsebuilder = XContentBuilder.builder(xContentType.xContent()); + assertNotNull(plan.result); + clientResponsebuilder.startObject(); + plan.clientResult.toXContent(clientResponsebuilder, ToXContent.EMPTY_PARAMS); + clientResponsebuilder.endObject(); + Map serverContentMap = convertFailureListToSet( + serverResponsebuilder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(serverResponsebuilder).streamInput() + ).map() + ); + Map clientContentMap = convertFailureListToSet( + clientResponsebuilder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(clientResponsebuilder).streamInput() + ) + .map() + ); + assertEquals(serverContentMap, clientContentMap); + } + + public void testXContentDeserialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + TestPlan plan = createTestPlan(); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + builder.startObject(); + plan.result.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + SyncedFlushResponse originalResponse = plan.clientResult; + SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); + assertNotNull(parsedResponse); + assertShardCounts(originalResponse.getShardCounts(), parsedResponse.getShardCounts()); + for (Map.Entry entry: originalResponse.getIndexResults().entrySet()) { + String index = entry.getKey(); + SyncedFlushResponse.IndexResult responseResult = entry.getValue(); + SyncedFlushResponse.IndexResult parsedResult = parsedResponse.getIndexResults().get(index); + assertNotNull(responseResult); + assertNotNull(parsedResult); + assertShardCounts(responseResult.getShardCounts(), parsedResult.getShardCounts()); + assertEquals(responseResult.failures().size(), parsedResult.failures().size()); + for (SyncedFlushResponse.ShardFailure responseShardFailure: responseResult.failures()) { + assertTrue(containsFailure(parsedResult.failures(), responseShardFailure)); + } + } + } + + static class TestPlan { + SyncedFlushResponse.ShardCounts totalCounts; + Map countsPerIndex = new HashMap<>(); + ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); + org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse result; + SyncedFlushResponse clientResult; + } + + TestPlan createTestPlan() throws IOException { + final TestPlan testPlan = new TestPlan(); + final Map> indicesResults = new HashMap<>(); + Map indexResults = new HashMap<>(); + final XContentType xContentType = randomFrom(XContentType.values()); + final int indexCount = randomIntBetween(1, 10); + int totalShards = 0; + int totalSuccessful = 0; + int totalFailed = 0; + for (int i = 0; i < indexCount; i++) { + final String index = "index_" + i; + int shards = randomIntBetween(1, 4); + int replicas = randomIntBetween(0, 2); + int successful = 0; + int failed = 0; + int failures = 0; + List shardsResults = new ArrayList<>(); + List shardFailures = new ArrayList<>(); + for (int shard = 0; shard < shards; shard++) { + final ShardId shardId = new ShardId(index, "_na_", shard); + if (randomInt(5) < 2) { + // total shard failure + failed += replicas + 1; + failures++; + shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); + shardFailures.add( + new SyncedFlushResponse.ShardFailure( + shardId.id(), + "simulated total failure", + new HashMap<>() + ) + ); + } else { + Map shardResponses = new HashMap<>(); + for (int copy = 0; copy < replicas + 1; copy++) { + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + index, shard, "node_" + shardId + "_" + copy, null, + copy == 0, ShardRoutingState.STARTED + ); + if (randomInt(5) < 2) { + // shard copy failure + failed++; + failures++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); + // Building the shardRouting map here. + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + Map routing = + shardRouting.toXContent(builder, ToXContent.EMPTY_PARAMS) + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(builder).streamInput() + ) + .map(); + shardFailures.add( + new SyncedFlushResponse.ShardFailure( + shardId.id(), + "copy failure " + shardId, + routing + ) + ); + } else { + successful++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); + } + } + shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); + } + } + indicesResults.put(index, shardsResults); + indexResults.put( + index, + new SyncedFlushResponse.IndexResult( + shards * (replicas + 1), + successful, + failed, + shardFailures + ) + ); + testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); + testPlan.expectedFailuresPerIndex.put(index, failures); + totalFailed += failed; + totalShards += shards * (replicas + 1); + totalSuccessful += successful; + } + testPlan.result = new org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse(indicesResults); + testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed); + testPlan.clientResult = new SyncedFlushResponse( + new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed), + indexResults + ); + return testPlan; + } + + public boolean containsFailure(List failures, SyncedFlushResponse.ShardFailure origFailure) { + for (SyncedFlushResponse.ShardFailure failure: failures) { + if (failure.getShardId() == origFailure.getShardId() && + failure.getFailureReason().equals(origFailure.getFailureReason()) && + failure.getRouting().equals(origFailure.getRouting())) { + return true; + } + } + return false; + } + + + public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) { + if (first == null) { + assertNull(second); + } else { + assertTrue(first.equals(second)); + } + } + + public Map convertFailureListToSet(Map input) { + Map retMap = new HashMap<>(); + for (Map.Entry entry: input.entrySet()) { + if (entry.getKey().equals(SyncedFlushResponse.SHARDS_FIELD)) { + retMap.put(entry.getKey(), entry.getValue()); + } else { + // This was an index entry. + @SuppressWarnings("unchecked") + Map indexResult = (Map)entry.getValue(); + Map retResult = new HashMap<>(); + for (Map.Entry entry2: indexResult.entrySet()) { + if (entry2.getKey().equals(SyncedFlushResponse.IndexResult.FAILURES_FIELD)) { + @SuppressWarnings("unchecked") + List failures = (List)entry2.getValue(); + Set retSet = new HashSet<>(failures); + retResult.put(entry.getKey(), retSet); + } else { + retResult.put(entry2.getKey(), entry2.getValue()); + } + } + retMap.put(entry.getKey(), retResult); + } + } + return retMap; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java index 38a963fa33c..fd733b83d5a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -55,8 +56,6 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; import org.elasticsearch.action.admin.indices.shrink.ResizeResponse; import org.elasticsearch.action.admin.indices.shrink.ResizeType; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.support.ActiveShardCount; @@ -64,6 +63,7 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.SyncedFlushResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -81,8 +81,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; - /** * This class is used to generate the Java Indices API documentation. * You need to wrap your code between two tags like: @@ -784,6 +782,89 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase } } + public void testSyncedFlushIndex() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + createIndex("index1", Settings.EMPTY); + } + + { + // tag::flush-synced-request + SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1> + SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2> + SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3> + // end::flush-synced-request + + // tag::flush-synced-request-indicesOptions + request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1> + // end::flush-synced-request-indicesOptions + + // tag::flush-synced-execute + SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request); + // end::flush-synced-execute + + // tag::flush-synced-response + int totalShards = flushSyncedResponse.totalShards(); // <1> + int successfulShards = flushSyncedResponse.successfulShards(); // <2> + int failedShards = flushSyncedResponse.failedShards(); // <3> + + for (Map.Entry responsePerIndexEntry: + flushSyncedResponse.getIndexResults().entrySet()) { + String indexName = responsePerIndexEntry.getKey(); // <4> + SyncedFlushResponse.IndexResult indexResult = responsePerIndexEntry.getValue(); + int totalShardsForIndex = indexResult.totalShards(); // <5> + int successfulShardsForIndex = indexResult.successfulShards(); // <6> + int failedShardsForIndex = indexResult.failedShards(); // <7> + if (failedShardsForIndex > 0) { + for (SyncedFlushResponse.ShardFailure failureEntry: indexResult.failures()) { + int shardId = failureEntry.getShardId(); // <8> + String failureReason = failureEntry.getFailureReason(); // <9> + Map routing = failureEntry.getRouting(); // <10> + } + } + } + // end::flush-synced-response + + // tag::flush-synced-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SyncedFlushResponse refreshResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::flush-synced-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::flush-synced-execute-async + client.indices().flushSyncedAsync(request, listener); // <1> + // end::flush-synced-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + + { + // tag::flush-synced-notfound + try { + SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist"); + client.indices().flushSynced(request); + } catch (ElasticsearchException exception) { + if (exception.status() == RestStatus.NOT_FOUND) { + // <1> + } + } + // end::flush-synced-notfound + } + } + public void testGetSettings() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/indices/flush_synced.asciidoc b/docs/java-rest/high-level/indices/flush_synced.asciidoc new file mode 100644 index 00000000000..66404fe0270 --- /dev/null +++ b/docs/java-rest/high-level/indices/flush_synced.asciidoc @@ -0,0 +1,91 @@ +[[java-rest-high-flush]] +=== Flush Synced API + +[[java-rest-high-flush-synced-request]] +==== Flush Synced Request + +A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request] +-------------------------------------------------- +<1> Flush synced one index +<2> Flush synced multiple indices +<3> Flush synced all the indices + +==== Optional arguments + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions] +-------------------------------------------------- +<1> Setting `IndicesOptions` controls how unavailable indices are resolved and +how wildcard expressions are expanded + +[[java-rest-high-flush-synced-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute] +-------------------------------------------------- + +[[java-rest-high-flush-synced-async]] +==== Asynchronous Execution + +The asynchronous execution of a flush request requires both the `SyncedFlushRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async] +-------------------------------------------------- +<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `SyncedFlushResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-flush-response]] +==== Flush Synced Response + +The returned `SyncedFlushResponse` allows to retrieve information about the +executed operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response] +-------------------------------------------------- +<1> Total number of shards hit by the flush request +<2> Number of shards where the flush has succeeded +<3> Number of shards where the flush has failed +<4> Name of the index whose results we are about to calculate. +<5> Total number of shards for index mentioned in 4. +<6> Successful shards for index mentioned in 4. +<7> Failed shards for index mentioned in 4. +<8> One of the failed shard ids of the failed index mentioned in 4. +<9> Reason for failure of copies of the shard mentioned in 8. +<10> JSON represented by a Map. Contains shard related information like id, state, version etc. +for the failed shard copies. If the entire shard failed then this returns an empty map. + +By default, if the indices were not found, an `ElasticsearchException` will be thrown: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound] +-------------------------------------------------- +<1> Do something if the indices to be flushed were not found \ No newline at end of file diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 981c2caa543..41202e963a4 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -67,6 +67,7 @@ Index Management:: * <> * <> * <> +* <> * <> * <> * <> @@ -89,6 +90,7 @@ include::indices/shrink_index.asciidoc[] include::indices/split_index.asciidoc[] include::indices/refresh.asciidoc[] include::indices/flush.asciidoc[] +include::indices/flush_synced.asciidoc[] include::indices/clear_cache.asciidoc[] include::indices/force_merge.asciidoc[] include::indices/rollover.asciidoc[]