From 5cb86130ec65c508b25b503d228f0acfb1797663 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 17 Aug 2015 14:49:12 -0400 Subject: [PATCH] Add mechanism for transporting shard-level actions by node Currently, many shard-level operations are transported with a request per shard via TransportBroadcastAction. These shard-level requests are then submitted to unbounded execution queues for asynchronous execution on the receiving node. This transport mechanism and stuffing of the execution queues can be problematic on large clusters. A better mechanism would be to aggregate the shard-level requests, transport them via a single request per node, and execute the shard-level operations serially on the receiving node. This commit introduces TransportNodeBroadcastAction which is the high-level mechanism for transporting the shard-level operations in a single request per node. The shard-level operations are executed serially on the receiving node and per-node shard-level results are aggregated into a single response per node. These node-level results are then aggregated into a single response to the initial request. One item of note is a new mechanism for registering request handlers. This mechanism enables registrants to provide a callback for instantiating new instances of the request class. Doing this enables the inner class to be instantiated with the context of its outer class. This is done so that a single NodeRequest class can be defined rather than defining a class per operation. Closes #7990 --- .../cluster/stats/ClusterStatsIndices.java | 5 +- .../clear/ShardClearIndicesCacheRequest.java | 92 --- .../clear/ShardClearIndicesCacheResponse.java | 36 -- .../TransportClearIndicesCacheAction.java | 59 +- .../optimize/ShardOptimizeRequest.java | 60 -- .../optimize/ShardOptimizeResponse.java | 36 -- .../optimize/TransportOptimizeAction.java | 57 +- .../indices/recovery/RecoveryResponse.java | 39 +- .../recovery/ShardRecoveryResponse.java | 100 ---- .../recovery/TransportRecoveryAction.java | 114 ++-- .../segments/IndicesSegmentResponse.java | 5 +- .../admin/indices/segments/ShardSegments.java | 7 +- .../TransportIndicesSegmentsAction.java | 85 +-- .../indices/stats/IndicesStatsResponse.java | 5 +- .../admin/indices/stats/ShardStats.java | 9 +- .../stats/TransportIndicesStatsAction.java | 130 ++-- .../get/TransportUpgradeStatusAction.java | 71 +-- .../upgrade/get/UpgradeStatusRequest.java | 4 - .../upgrade/get/UpgradeStatusResponse.java | 11 +- .../node/TransportBroadcastByNodeAction.java | 562 ++++++++++++++++++ .../cluster/routing/RoutingTable.java | 33 + .../rest/action/cat/RestRecoveryAction.java | 21 +- .../rest/action/cat/RestSegmentsAction.java | 15 +- .../transport/RequestHandlerRegistry.java | 34 +- .../transport/TransportService.java | 33 +- .../action/IndicesRequestIT.java | 16 +- .../TransportBroadcastByNodeActionTests.java | 422 +++++++++++++ .../recovery/ReplicaRecoveryBenchmark.java | 8 +- .../cluster/routing/RoutingTableTest.java | 21 + .../discovery/zen/ZenDiscoveryIT.java | 10 +- .../elasticsearch/document/ShardInfoIT.java | 7 +- .../RecoveryBackwardsCompatibilityIT.java | 4 +- .../gateway/RecoveryFromGatewayIT.java | 8 +- .../index/shard/IndexShardTests.java | 11 +- .../indices/recovery/IndexRecoveryIT.java | 153 ++--- .../indices/warmer/SimpleIndicesWarmerIT.java | 8 +- .../recovery/RecoveryWhileUnderLoadIT.java | 2 +- .../elasticsearch/recovery/RelocationIT.java | 3 +- .../DedicatedClusterSnapshotRestoreIT.java | 22 +- 39 files changed, 1439 insertions(+), 879 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java delete mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java delete mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java delete mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeResponse.java delete mode 100644 core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java create mode 100644 core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java create mode 100644 core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java index e8243bf5d50..c219d85f9d5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.cluster.stats; import com.carrotsearch.hppc.ObjectObjectHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -67,10 +66,10 @@ public class ClusterStatsIndices implements ToXContent, Streamable { for (ClusterStatsNodeResponse r : nodeResponses) { for (org.elasticsearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { - ShardStats indexShardStats = countsPerIndex.get(shardStats.getIndex()); + ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndex()); if (indexShardStats == null) { indexShardStats = new ShardStats(); - countsPerIndex.put(shardStats.getIndex(), indexShardStats); + countsPerIndex.put(shardStats.getShardRouting().getIndex(), indexShardStats); } indexShardStats.total++; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java deleted file mode 100644 index 3f583b7efa4..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.cache.clear; - -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * - */ -class ShardClearIndicesCacheRequest extends BroadcastShardRequest { - - private boolean queryCache = false; - private boolean fieldDataCache = false; - private boolean recycler; - private boolean requestCache = false; - - private String[] fields = null; - - ShardClearIndicesCacheRequest() { - } - - ShardClearIndicesCacheRequest(ShardId shardId, ClearIndicesCacheRequest request) { - super(shardId, request); - queryCache = request.queryCache(); - fieldDataCache = request.fieldDataCache(); - fields = request.fields(); - recycler = request.recycler(); - requestCache = request.requestCache(); - } - - public boolean queryCache() { - return queryCache; - } - - public boolean requestCache() { - return requestCache; - } - - public boolean fieldDataCache() { - return this.fieldDataCache; - } - - public boolean recycler() { - return this.recycler; - } - - public String[] fields() { - return this.fields; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - queryCache = in.readBoolean(); - fieldDataCache = in.readBoolean(); - recycler = in.readBoolean(); - fields = in.readStringArray(); - requestCache = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(queryCache); - out.writeBoolean(fieldDataCache); - out.writeBoolean(recycler); - out.writeStringArrayNullable(fields); - out.writeBoolean(requestCache); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java deleted file mode 100644 index c2931df6003..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.cache.clear; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -class ShardClearIndicesCacheResponse extends BroadcastShardResponse { - - ShardClearIndicesCacheResponse() { - } - - ShardClearIndicesCacheResponse(ShardId shardId) { - super(shardId); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 2f8d7f84cda..ba90ca19263 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -21,17 +21,16 @@ package org.elasticsearch.action.admin.indices.cache.clear; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -40,14 +39,14 @@ import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; +import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; + /** * Indices clear cache action. */ -public class TransportClearIndicesCacheAction extends TransportBroadcastAction { +public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final IndicesRequestCache indicesRequestCache; @@ -58,48 +57,33 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); } @Override - protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) { - return new ShardClearIndicesCacheRequest(shard.shardId(), request); + protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardClearIndicesCacheResponse newShardResponse() { - return new ShardClearIndicesCacheResponse(); + protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOException { + final ClearIndicesCacheRequest request = new ClearIndicesCacheRequest(); + request.readFrom(in); + return request; } @Override - protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRequest request) { - IndexService service = indicesService.indexService(request.shardId().getIndex()); + protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) { + IndexService service = indicesService.indexService(shardRouting.getIndex()); if (service != null) { - IndexShard shard = service.shard(request.shardId().id()); + IndexShard shard = service.shard(shardRouting.id()); boolean clearedAtLeastOne = false; if (request.queryCache()) { clearedAtLeastOne = true; @@ -137,15 +121,15 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction { +public class TransportOptimizeAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -54,55 +52,40 @@ public class TransportOptimizeAction extends TransportBroadcastAction shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // a non active shard, ignore... - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); } @Override - protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) { - return new ShardOptimizeRequest(shard.shardId(), request); + protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardOptimizeResponse newShardResponse() { - return new ShardOptimizeResponse(); + protected OptimizeRequest readRequestFrom(StreamInput in) throws IOException { + final OptimizeRequest request = new OptimizeRequest(); + request.readFrom(in); + return request; } @Override - protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); - indexShard.optimize(request.optimizeRequest()); - return new ShardOptimizeResponse(request.shardId()); + protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) { + IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id()); + indexShard.optimize(request); + return EmptyResult.INSTANCE; } /** * The refresh request works against *all* shards. */ @Override - protected GroupShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) { - return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true); + protected ShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) { + return clusterState.routingTable().allShards(concreteIndices); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java index fea33688c14..0e0881d1729 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.ArrayList; @@ -38,7 +39,7 @@ import java.util.Map; public class RecoveryResponse extends BroadcastResponse implements ToXContent { private boolean detailed = false; - private Map> shardResponses = new HashMap<>(); + private Map> shardRecoveryStates = new HashMap<>(); public RecoveryResponse() { } @@ -50,18 +51,18 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { * @param successfulShards Count of shards successfully processed * @param failedShards Count of shards which failed to process * @param detailed Display detailed metrics - * @param shardResponses Map of indices to shard recovery information + * @param shardRecoveryStates Map of indices to shard recovery information * @param shardFailures List of failures processing shards */ public RecoveryResponse(int totalShards, int successfulShards, int failedShards, boolean detailed, - Map> shardResponses, List shardFailures) { + Map> shardRecoveryStates, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); - this.shardResponses = shardResponses; + this.shardRecoveryStates = shardRecoveryStates; this.detailed = detailed; } public boolean hasRecoveries() { - return shardResponses.size() > 0; + return shardRecoveryStates.size() > 0; } public boolean detailed() { @@ -72,23 +73,23 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { this.detailed = detailed; } - public Map> shardResponses() { - return shardResponses; + public Map> shardRecoveryStates() { + return shardRecoveryStates; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (hasRecoveries()) { - for (String index : shardResponses.keySet()) { - List responses = shardResponses.get(index); - if (responses == null || responses.size() == 0) { + for (String index : shardRecoveryStates.keySet()) { + List recoveryStates = shardRecoveryStates.get(index); + if (recoveryStates == null || recoveryStates.size() == 0) { continue; } builder.startObject(index); builder.startArray("shards"); - for (ShardRecoveryResponse recoveryResponse : responses) { + for (RecoveryState recoveryState : recoveryStates) { builder.startObject(); - recoveryResponse.toXContent(builder, params); + recoveryState.toXContent(builder, params); builder.endObject(); } builder.endArray(); @@ -101,12 +102,12 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(shardResponses.size()); - for (Map.Entry> entry : shardResponses.entrySet()) { + out.writeVInt(shardRecoveryStates.size()); + for (Map.Entry> entry : shardRecoveryStates.entrySet()) { out.writeString(entry.getKey()); out.writeVInt(entry.getValue().size()); - for (ShardRecoveryResponse recoveryResponse : entry.getValue()) { - recoveryResponse.writeTo(out); + for (RecoveryState recoveryState : entry.getValue()) { + recoveryState.writeTo(out); } } } @@ -118,11 +119,11 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { for (int i = 0; i < size; i++) { String s = in.readString(); int listSize = in.readVInt(); - List list = new ArrayList<>(listSize); + List list = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { - list.add(ShardRecoveryResponse.readShardRecoveryResponse(in)); + list.add(RecoveryState.readRecoveryState(in)); } - shardResponses.put(s, list); + shardRecoveryStates.put(s, list); } } } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java deleted file mode 100644 index a4104fbc449..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.recovery; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryState; - -import java.io.IOException; - -/** - * Information regarding the recovery state of a shard. - */ -public class ShardRecoveryResponse extends BroadcastShardResponse implements ToXContent { - - RecoveryState recoveryState; - - public ShardRecoveryResponse() { } - - /** - * Constructs shard recovery information for the given index and shard id. - * - * @param shardId Id of the shard - */ - ShardRecoveryResponse(ShardId shardId) { - super(shardId); - } - - /** - * Sets the recovery state information for the shard. - * - * @param recoveryState Recovery state - */ - public void recoveryState(RecoveryState recoveryState) { - this.recoveryState = recoveryState; - } - - /** - * Gets the recovery state information for the shard. Null if shard wasn't recovered / recovery didn't start yet. - * - * @return Recovery state - */ - @Nullable - public RecoveryState recoveryState() { - return recoveryState; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - recoveryState.toXContent(builder, params); - return builder; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - recoveryState.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - recoveryState = RecoveryState.readRecoveryState(in); - } - - /** - * Builds a new ShardRecoveryResponse from the give input stream. - * - * @param in Input stream - * @return A new ShardRecoveryResponse - * @throws IOException - */ - public static ShardRecoveryResponse readShardRecoveryResponse(StreamInput in) throws IOException { - ShardRecoveryResponse response = new ShardRecoveryResponse(); - response.readFrom(in); - return response; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index cee59c7eb2a..86817fd1cd9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -19,40 +19,37 @@ package org.elasticsearch.action.admin.indices.recovery; +import com.google.common.collect.Maps; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * Transport action for shard recovery operation. This transport action does not actually * perform shard recovery, it only reports on recoveries (both active and complete). */ -public class TransportRecoveryAction extends TransportBroadcastAction { +public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -61,84 +58,55 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures = null; - Map> shardResponses = new HashMap<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); + @Override + protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + Map> shardResponses = Maps.newHashMap(); + for (RecoveryState recoveryState : responses) { + if (recoveryState == null) { + continue; + } + String indexName = recoveryState.getShardId().getIndex(); + if (!shardResponses.containsKey(indexName)) { + shardResponses.put(indexName, new ArrayList()); + } + if (request.activeOnly()) { + if (recoveryState.getStage() != RecoveryState.Stage.DONE) { + shardResponses.get(indexName).add(recoveryState); } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); } else { - ShardRecoveryResponse recoveryResponse = (ShardRecoveryResponse) shardResponse; - successfulShards++; - - if (recoveryResponse.recoveryState() == null) { - // recovery not yet started - continue; - } - - String indexName = recoveryResponse.getIndex(); - List responses = shardResponses.get(indexName); - - if (responses == null) { - responses = new ArrayList<>(); - shardResponses.put(indexName, responses); - } - - if (request.activeOnly()) { - if (recoveryResponse.recoveryState().getStage() != RecoveryState.Stage.DONE) { - responses.add(recoveryResponse); - } - } else { - responses.add(recoveryResponse); - } + shardResponses.get(indexName).add(recoveryState); } } - - return new RecoveryResponse(shardsResponses.length(), successfulShards, - failedShards, request.detailed(), shardResponses, shardFailures); + return new RecoveryResponse(totalShards, successfulShards, failedShards, request.detailed(), shardResponses, shardFailures); } @Override - protected ShardRecoveryRequest newShardRequest(int numShards, ShardRouting shard, RecoveryRequest request) { - return new ShardRecoveryRequest(shard.shardId(), request); + protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { + final RecoveryRequest recoveryRequest = new RecoveryRequest(); + recoveryRequest.readFrom(in); + return recoveryRequest; } @Override - protected ShardRecoveryResponse newShardResponse() { - return new ShardRecoveryResponse(); + protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); + return indexShard.recoveryState(); } @Override - protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) { - - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); - ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId()); - - RecoveryState state = indexShard.recoveryState(); - shardRecoveryResponse.recoveryState(state); - return shardRecoveryResponse; - } - - @Override - protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) { - return state.routingTable().allAssignedShardsGrouped(concreteIndices, true, true); + protected ShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) { + return state.routingTable().allShardsIncludingRelocationTargets(concreteIndices); } @Override @@ -150,14 +118,4 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures) { + IndicesSegmentResponse(ShardSegments[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; } @@ -63,7 +62,7 @@ public class IndicesSegmentResponse extends BroadcastResponse implements ToXCont Set indices = Sets.newHashSet(); for (ShardSegments shard : shards) { - indices.add(shard.getIndex()); + indices.add(shard.getShardRouting().getIndex()); } for (String index : indices) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java index 6e754a26210..4b3264fca40 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java @@ -20,10 +20,10 @@ package org.elasticsearch.action.admin.indices.segments; import com.google.common.collect.ImmutableList; -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.index.engine.Segment; import java.io.IOException; @@ -33,7 +33,7 @@ import java.util.List; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; -public class ShardSegments extends BroadcastShardResponse implements Iterable { +public class ShardSegments implements Streamable, Iterable { private ShardRouting shardRouting; @@ -43,7 +43,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable segments) { - super(shardRouting.shardId()); this.shardRouting = shardRouting; this.segments = segments; } @@ -89,7 +88,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable { +public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -59,7 +52,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardSegments) shardResponse); - successfulShards++; - } - } - return new IndicesSegmentResponse(shards.toArray(new ShardSegments[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardSegments readShardResult(StreamInput in) throws IOException { + return ShardSegments.readShardSegments(in); } @Override - protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) { - return new IndexShardSegmentRequest(shard.shardId(), request); + protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures) { + return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardSegments newShardResponse() { - return new ShardSegments(); + protected IndicesSegmentsRequest readRequestFrom(StreamInput in) throws IOException { + final IndicesSegmentsRequest request = new IndicesSegmentsRequest(); + request.readFrom(in); + return request; } @Override - protected ShardSegments shardOperation(IndexShardSegmentRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); - return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose)); - } - - static class IndexShardSegmentRequest extends BroadcastShardRequest { - boolean verbose; - - IndexShardSegmentRequest() { - verbose = false; - } - - IndexShardSegmentRequest(ShardId shardId, IndicesSegmentsRequest request) { - super(shardId, request); - verbose = request.verbose(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(verbose); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - verbose = in.readBoolean(); - } + protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.id()); + return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose())); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index d9b8e9da77d..885dddeea6a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -51,7 +50,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten } - IndicesStatsResponse(ShardStats[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { + IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; } @@ -90,7 +89,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten Set indices = Sets.newHashSet(); for (ShardStats shard : shards) { - indices.add(shard.getIndex()); + indices.add(shard.getShardRouting().getIndex()); } for (String index : indices) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index b3c87de3dd1..2ef6c4df41d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,17 +19,16 @@ package org.elasticsearch.action.admin.indices.stats; -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; @@ -37,7 +36,7 @@ import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEnt /** */ -public class ShardStats extends BroadcastShardResponse implements ToXContent { +public class ShardStats implements Streamable, ToXContent { private ShardRouting shardRouting; private CommonStats commonStats; @Nullable @@ -50,7 +49,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { } public ShardStats(IndexShard indexShard, CommonStatsFlags flags) { - super(indexShard.shardId()); this.shardRouting = indexShard.routingEntry(); this.dataPath = indexShard.shardPath().getRootDataPath().toString(); this.statePath = indexShard.shardPath().getRootStatePath().toString(); @@ -94,7 +92,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); shardRouting = readShardRoutingEntry(in); commonStats = CommonStats.readCommonStats(in); commitStats = CommitStats.readOptionalCommitStatsFrom(in); @@ -105,7 +102,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); shardRouting.writeTo(out); commonStats.writeTo(out); out.writeOptionalStreamable(commitStats); @@ -146,5 +142,4 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { static final XContentBuilderString NODE = new XContentBuilderString("node"); static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node"); } - } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 9ce5291ba66..bf38f4dc7c0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -21,37 +21,30 @@ package org.elasticsearch.action.admin.indices.stats; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportIndicesStatsAction extends TransportBroadcastAction { +public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -60,7 +53,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardStats) shardResponse); - successfulShards++; - } - } - return new IndicesStatsResponse(shards.toArray(new ShardStats[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardStats readShardResult(StreamInput in) throws IOException { + return ShardStats.readShardStats(in); } @Override - protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) { - return new IndexShardStatsRequest(shard.shardId(), request); + protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardStats newShardResponse() { - return new ShardStats(); + protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException { + IndicesStatsRequest request = new IndicesStatsRequest(); + request.readFrom(in); + return request; } @Override - protected ShardStats shardOperation(IndexShardStatsRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); // if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet if (indexShard.routingEntry() == null) { throw new ShardNotFoundException(indexShard.shardId()); @@ -128,92 +103,65 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction { +public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -58,7 +54,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardUpgradeStatus) shardResponse); - successfulShards++; - } - } - return new UpgradeStatusResponse(shards.toArray(new ShardUpgradeStatus[shards.size()]), shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardUpgradeStatus readShardResult(StreamInput in) throws IOException { + return ShardUpgradeStatus.readShardUpgradeStatus(in); } @Override - protected IndexShardUpgradeStatusRequest newShardRequest(int numShards, ShardRouting shard, UpgradeStatusRequest request) { - return new IndexShardUpgradeStatusRequest(shard.shardId(), request); + protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardUpgradeStatus newShardResponse() { - return new ShardUpgradeStatus(); + protected UpgradeStatusRequest readRequestFrom(StreamInput in) throws IOException { + UpgradeStatusRequest request = new UpgradeStatusRequest(); + request.readFrom(in); + return request; } @Override - protected ShardUpgradeStatus shardOperation(IndexShardUpgradeStatusRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); List segments = indexShard.engine().segments(false); long total_bytes = 0; long to_upgrade_bytes = 0; @@ -136,16 +115,4 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java index 16e24ee66ae..82683625df8 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java @@ -36,14 +36,11 @@ import java.util.Map; import java.util.Set; public class UpgradeStatusResponse extends BroadcastResponse implements ToXContent { - - private ShardUpgradeStatus[] shards; private Map indicesUpgradeStatus; UpgradeStatusResponse() { - } UpgradeStatusResponse(ShardUpgradeStatus[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { @@ -75,7 +72,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte return indicesUpgradeStats; } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -120,8 +116,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - - builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, getTotalBytes()); builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, getToUpgradeBytes()); builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, getToUpgradeBytesAncient()); @@ -163,10 +157,8 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte } builder.endObject(); } - builder.endObject(); } - builder.endObject(); } return builder; @@ -186,6 +178,5 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient"); static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes"); - } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java new file mode 100644 index 00000000000..036c0c5a607 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -0,0 +1,562 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.broadcast.node; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.NodeShouldNotConnectException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Abstraction for transporting aggregated shard-level operations in a single request (NodeRequest) per-node + * and executing the shard-level operations serially on the receiving node. Each shard-level operation can produce a + * result (ShardOperationResult), these per-node shard-level results are aggregated into a single result + * (BroadcastByNodeResponse) to the coordinating node. These per-node results are aggregated into a single result (Result) + * to the client. + * + * @param the underlying client request + * @param the response to the client request + * @param per-shard operation results + */ +public abstract class TransportBroadcastByNodeAction extends HandledTransportAction { + + private final ClusterService clusterService; + private final TransportService transportService; + + final String transportNodeBroadcastAction; + + public TransportBroadcastByNodeAction( + Settings settings, + String actionName, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Class request, + String executor) { + super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + + this.clusterService = clusterService; + this.transportService = transportService; + + transportNodeBroadcastAction = actionName + "[n]"; + + transportService.registerRequestHandler(transportNodeBroadcastAction, new Callable() { + @Override + public NodeRequest call() throws Exception { + return new NodeRequest(); + } + }, executor, new BroadcastByNodeTransportRequestHandler()); + } + + private final Response newResponse(Request request, AtomicReferenceArray responses, List unavailableShardExceptions, Map> nodes) { + int totalShards = 0; + int successfulShards = 0; + List broadcastByNodeResponses = new ArrayList<>(); + List exceptions = new ArrayList<>(); + for (int i = 0; i < responses.length(); i++) { + if (responses.get(i) instanceof FailedNodeException) { + FailedNodeException exception = (FailedNodeException) responses.get(i); + totalShards += nodes.get(exception.nodeId()).size(); + for (ShardRouting shard : nodes.get(exception.nodeId())) { + exceptions.add(new DefaultShardOperationFailedException(shard.getIndex(), shard.getId(), exception)); + } + } else { + NodeResponse response = (NodeResponse) responses.get(i); + broadcastByNodeResponses.addAll(response.results); + totalShards += response.getTotalShards(); + successfulShards += response.getSuccessfulShards(); + for (BroadcastShardOperationFailedException throwable : response.getExceptions()) { + if (!TransportActions.isShardNotAvailableException(throwable)) { + exceptions.add(new DefaultShardOperationFailedException(throwable.getIndex(), throwable.getShardId().getId(), throwable)); + } + } + } + } + totalShards += unavailableShardExceptions.size(); + int failedShards = exceptions.size(); + return newResponse(request, totalShards, successfulShards, failedShards, broadcastByNodeResponses, exceptions); + } + + /** + * Deserialize a shard-level result from an input stream + * + * @param in input stream + * @return a deserialized shard-level result + * @throws IOException + */ + protected abstract ShardOperationResult readShardResult(StreamInput in) throws IOException; + + /** + * Creates a new response to the underlying request. + * + * @param request the underlying request + * @param totalShards the total number of shards considered for execution of the operation + * @param successfulShards the total number of shards for which execution of the operation was successful + * @param failedShards the total number of shards for which execution of the operation failed + * @param results the per-node aggregated shard-level results + * @param shardFailures the exceptions corresponding to shard operationa failures + * @return the response + */ + protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures); + + /** + * Deserialize a request from an input stream + * + * @param in input stream + * @return a de-serialized request + * @throws IOException + */ + protected abstract Request readRequestFrom(StreamInput in) throws IOException; + + /** + * Executes the shard-level operation. This method is called once per shard serially on the receiving node. + * + * @param request the node-level request + * @param shardRouting the shard on which to execute the operation + * @return the result of the shard-level operation for the shard + */ + protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting); + + /** + * Determines the shards on which this operation will be executed on. The operation is executed once per shard. + * + * @param clusterState the cluster state + * @param request the underlying request + * @param concreteIndices the concrete indices on which to execute the operation + * @return the shards on which to execute the operation + */ + protected abstract ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices); + + /** + * Executes a global block check before polling the cluster state. + * + * @param state the cluster state + * @param request the underlying request + * @return a non-null exception if the operation is blocked + */ + protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); + + /** + * Executes a global request-level check before polling the cluster state. + * + * @param state the cluster state + * @param request the underlying request + * @param concreteIndices the concrete indices on which to execute the operation + * @return a non-null exception if the operation if blocked + */ + protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices); + + @Override + protected void doExecute(Request request, ActionListener listener) { + new AsyncAction(request, listener).start(); + } + + protected class AsyncAction { + private final Request request; + private final ActionListener listener; + private final ClusterState clusterState; + private final DiscoveryNodes nodes; + private final Map> nodeIds; + private final AtomicReferenceArray responses; + private final AtomicInteger counter = new AtomicInteger(); + private List unavailableShardExceptions = new ArrayList<>(); + + protected AsyncAction(Request request, ActionListener listener) { + this.request = request; + this.listener = listener; + + clusterState = clusterService.state(); + nodes = clusterState.nodes(); + + ClusterBlockException globalBlockException = checkGlobalBlock(clusterState, request); + if (globalBlockException != null) { + throw globalBlockException; + } + + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request); + ClusterBlockException requestBlockException = checkRequestBlock(clusterState, request, concreteIndices); + if (requestBlockException != null) { + throw requestBlockException; + } + + logger.trace("resolving shards for [{}] based on cluster state version [{}]", actionName, clusterState.version()); + ShardsIterator shardIt = shards(clusterState, request, concreteIndices); + nodeIds = Maps.newHashMap(); + + for (ShardRouting shard : shardIt.asUnordered()) { + if (shard.assignedToNode()) { + String nodeId = shard.currentNodeId(); + if (!nodeIds.containsKey(nodeId)) { + nodeIds.put(nodeId, new ArrayList()); + } + nodeIds.get(nodeId).add(shard); + } else { + unavailableShardExceptions.add( + new NoShardAvailableActionException( + shard.shardId(), + " no shards available for shard " + shard.toString() + " while executing " + actionName + ) + ); + } + } + + responses = new AtomicReferenceArray<>(nodeIds.size()); + } + + public void start() { + if (nodeIds.size() == 0) { + try { + onCompletion(); + } catch (Throwable e) { + listener.onFailure(e); + } + } else { + int nodeIndex = -1; + for (Map.Entry> entry : nodeIds.entrySet()) { + nodeIndex++; + DiscoveryNode node = nodes.get(entry.getKey()); + sendNodeRequest(node, entry.getValue(), nodeIndex); + } + } + } + + private void sendNodeRequest(final DiscoveryNode node, List shards, final int nodeIndex) { + try { + NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards); + transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new BaseTransportResponseHandler() { + @Override + public NodeResponse newInstance() { + return new NodeResponse(); + } + + @Override + public void handleResponse(NodeResponse response) { + onNodeResponse(node, nodeIndex, response); + } + + @Override + public void handleException(TransportException exp) { + onNodeFailure(node, nodeIndex, exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } catch (Throwable e) { + onNodeFailure(node, nodeIndex, e); + } + } + + protected void onNodeResponse(DiscoveryNode node, int nodeIndex, NodeResponse response) { + logger.trace("received response for [{}] from node [{}]", actionName, node.id()); + + // this is defensive to protect against the possibility of double invocation + // the current implementation of TransportService#sendRequest guards against this + // but concurrency is hard, safety is important, and the small performance loss here does not matter + if (responses.compareAndSet(nodeIndex, null, response)) { + if (counter.incrementAndGet() == responses.length()) { + onCompletion(); + } + } + } + + protected void onNodeFailure(DiscoveryNode node, int nodeIndex, Throwable t) { + String nodeId = node.id(); + if (logger.isDebugEnabled() && !(t instanceof NodeShouldNotConnectException)) { + logger.debug("failed to execute [{}] on node [{}]", t, actionName, nodeId); + } + + // this is defensive to protect against the possibility of double invocation + // the current implementation of TransportService#sendRequest guards against this + // but concurrency is hard, safety is important, and the small performance loss here does not matter + if (responses.compareAndSet(nodeIndex, null, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t))) { + if (counter.incrementAndGet() == responses.length()) { + onCompletion(); + } + } + } + + protected void onCompletion() { + Response response = null; + try { + response = newResponse(request, responses, unavailableShardExceptions, nodeIds); + } catch (Throwable t) { + logger.debug("failed to combine responses from nodes", t); + listener.onFailure(t); + } + if (response != null) { + try { + listener.onResponse(response); + } catch (Throwable t) { + listener.onFailure(t); + } + } + } + } + + class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception { + List shards = request.getShards(); + final int totalShards = shards.size(); + logger.trace("[{}] executing operation on [{}] shards", actionName, totalShards); + final Object[] shardResultOrExceptions = new Object[totalShards]; + + int shardIndex = -1; + for (final ShardRouting shardRouting : shards) { + shardIndex++; + onShardOperation(request, shardResultOrExceptions, shardIndex, shardRouting); + } + + List accumulatedExceptions = new ArrayList<>(); + List results = new ArrayList<>(); + for (int i = 0; i < totalShards; i++) { + if (shardResultOrExceptions[i] instanceof BroadcastShardOperationFailedException) { + accumulatedExceptions.add((BroadcastShardOperationFailedException) shardResultOrExceptions[i]); + } else { + results.add((ShardOperationResult) shardResultOrExceptions[i]); + } + } + + channel.sendResponse(new NodeResponse(request.getNodeId(), totalShards, results, accumulatedExceptions)); + } + + private void onShardOperation(final NodeRequest request, final Object[] shardResults, final int shardIndex, final ShardRouting shardRouting) { + try { + logger.trace("[{}] executing operation for shard [{}]", actionName, shardRouting.shortSummary()); + ShardOperationResult result = shardOperation(request.indicesLevelRequest, shardRouting); + shardResults[shardIndex] = result; + logger.trace("[{}] completed operation for shard [{}]", actionName, shardRouting.shortSummary()); + } catch (Throwable t) { + BroadcastShardOperationFailedException e = new BroadcastShardOperationFailedException(shardRouting.shardId(), "operation " + actionName + " failed", t); + e.setIndex(shardRouting.getIndex()); + e.setShard(shardRouting.shardId()); + shardResults[shardIndex] = e; + logger.debug("[{}] failed to execute operation for shard [{}]", e, actionName, shardRouting.shortSummary()); + } + } + } + + protected class NodeRequest extends TransportRequest implements IndicesRequest { + private String nodeId; + + private List shards; + + protected Request indicesLevelRequest; + + protected NodeRequest() { + } + + @SuppressForbidden(reason = "debug") + public NodeRequest(String nodeId, Request request, List shards) { + super(request); + this.indicesLevelRequest = request; + this.shards = shards; + this.nodeId = nodeId; + System.out.println(TransportBroadcastByNodeAction.this.getClass().getName()); + } + + public List getShards() { + return shards; + } + + public String getNodeId() { + return nodeId; + } + + public String[] indices() { + return indicesLevelRequest.indices(); + } + + public IndicesOptions indicesOptions() { + return indicesLevelRequest.indicesOptions(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indicesLevelRequest = readRequestFrom(in); + int size = in.readVInt(); + shards = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + shards.add(ShardRouting.readShardRoutingEntry(in)); + } + nodeId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + indicesLevelRequest.writeTo(out); + int size = shards.size(); + out.writeVInt(size); + for (int i = 0; i < size; i++) { + shards.get(i).writeTo(out); + } + out.writeString(nodeId); + } + } + + class NodeResponse extends TransportResponse { + protected String nodeId; + protected int totalShards; + protected List exceptions; + protected List results; + + public NodeResponse() { + } + + public NodeResponse(String nodeId, + int totalShards, + List results, + List exceptions) { + this.nodeId = nodeId; + this.totalShards = totalShards; + this.results = results; + this.exceptions = exceptions; + } + + public String getNodeId() { + return nodeId; + } + + public int getTotalShards() { + return totalShards; + } + + public int getSuccessfulShards() { + return results.size(); + } + + public List getExceptions() { + return exceptions; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + totalShards = in.readVInt(); + int resultsSize = in.readVInt(); + results = new ArrayList<>(resultsSize); + for (; resultsSize > 0; resultsSize--) { + final ShardOperationResult result = in.readBoolean() ? readShardResult(in) : null; + results.add(result); + } + if (in.readBoolean()) { + int failureShards = in.readVInt(); + exceptions = new ArrayList<>(failureShards); + for (int i = 0; i < failureShards; i++) { + exceptions.add(new BroadcastShardOperationFailedException(in)); + } + } else { + exceptions = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeVInt(totalShards); + out.writeVInt(results.size()); + for (ShardOperationResult result : results) { + out.writeOptionalStreamable(result); + } + out.writeBoolean(exceptions != null); + if (exceptions != null) { + int failureShards = exceptions.size(); + out.writeVInt(failureShards); + for (int i = 0; i < failureShards; i++) { + exceptions.get(i).writeTo(out); + } + } + } + } + + /** + * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting) shardOperation} for + * which there is no shard-level return value. + */ + public final static class EmptyResult implements Streamable { + public static EmptyResult INSTANCE = new EmptyResult(); + + private EmptyResult() { + } + + @Override + public void readFrom(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + } + + public static EmptyResult readEmptyResultFrom(StreamInput in) { + return INSTANCE; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 97749e63819..1e3bd9614dd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.IntSet; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.UnmodifiableIterator; @@ -223,6 +224,38 @@ public class RoutingTable implements Iterable, DiffablealwaysTrue(), false); + } + + public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) { + return allShardsSatisfyingPredicate(indices, Predicates.alwaysTrue(), true); + } + + // TODO: replace with JDK 8 native java.util.function.Predicate + private ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate predicate, boolean includeRelocationTargets) { + // use list here since we need to maintain identity across shards + List shards = new ArrayList<>(); + for (String index : indices) { + IndexRoutingTable indexRoutingTable = index(index); + if (indexRoutingTable == null) { + continue; + // we simply ignore indices that don't exists (make sense for operations that use it currently) + } + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (predicate.apply(shardRouting)) { + shards.add(shardRouting); + if (includeRelocationTargets && shardRouting.relocating()) { + shards.add(shardRouting.buildTargetRelocatingShard()); + } + } + } + } + } + return new PlainShardsIterator(shards); + } + /** * All the *active* primary shards for the provided indices grouped (each group is a single element, consisting * of the primary shard). This is handy for components that expect to get group iterators, but still want in some diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java index 9d58f8f74df..4c9b3de5c79 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.rest.action.cat; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; @@ -116,19 +115,19 @@ public class RestRecoveryAction extends AbstractCatAction { Table t = getTableWithHeader(request); - for (String index : response.shardResponses().keySet()) { + for (String index : response.shardRecoveryStates().keySet()) { - List shardRecoveryResponses = response.shardResponses().get(index); - if (shardRecoveryResponses.size() == 0) { + List shardRecoveryStates = response.shardRecoveryStates().get(index); + if (shardRecoveryStates.size() == 0) { continue; } // Sort ascending by shard id for readability - CollectionUtil.introSort(shardRecoveryResponses, new Comparator() { + CollectionUtil.introSort(shardRecoveryStates, new Comparator() { @Override - public int compare(ShardRecoveryResponse o1, ShardRecoveryResponse o2) { - int id1 = o1.recoveryState().getShardId().id(); - int id2 = o2.recoveryState().getShardId().id(); + public int compare(RecoveryState o1, RecoveryState o2) { + int id1 = o1.getShardId().id(); + int id2 = o2.getShardId().id(); if (id1 < id2) { return -1; } else if (id1 > id2) { @@ -139,12 +138,10 @@ public class RestRecoveryAction extends AbstractCatAction { } }); - for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) { - - RecoveryState state = shardResponse.recoveryState(); + for (RecoveryState state: shardRecoveryStates) { t.startRow(); t.addCell(index); - t.addCell(shardResponse.getShardId()); + t.addCell(state.getShardId().id()); t.addCell(state.getTimer().time()); t.addCell(state.getType().toString().toLowerCase(Locale.ROOT)); t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java index 972a1bc0110..734fb340090 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java @@ -21,7 +21,11 @@ package org.elasticsearch.rest.action.cat; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.segments.*; +import org.elasticsearch.action.admin.indices.segments.IndexSegments; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; @@ -29,7 +33,10 @@ import org.elasticsearch.common.Table; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Segment; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.support.RestActionListener; import org.elasticsearch.rest.action.support.RestResponseListener; import org.elasticsearch.rest.action.support.RestTable; @@ -120,8 +127,8 @@ public class RestSegmentsAction extends AbstractCatAction { for (Segment segment : segments) { table.startRow(); - table.addCell(shardSegment.getIndex()); - table.addCell(shardSegment.getShardId()); + table.addCell(shardSegment.getShardRouting().getIndex()); + table.addCell(shardSegment.getShardRouting().getId()); table.addCell(shardSegment.getShardRouting().primary() ? "p" : "r"); table.addCell(nodes.get(shardSegment.getShardRouting().currentNodeId()).getHostAddress()); table.addCell(shardSegment.getShardRouting().currentNodeId()); diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 2b8caf8f055..2aa6818c878 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import java.lang.reflect.Constructor; +import java.util.concurrent.Callable; /** * @@ -28,20 +29,19 @@ import java.lang.reflect.Constructor; public class RequestHandlerRegistry { private final String action; - private final Constructor requestConstructor; private final TransportRequestHandler handler; private final boolean forceExecution; private final String executor; + private final Callable requestFactory; RequestHandlerRegistry(String action, Class request, TransportRequestHandler handler, String executor, boolean forceExecution) { + this(action, new ReflectionFactory<>(request), handler, executor, forceExecution); + } + + public RequestHandlerRegistry(String action, Callable requestFactory, TransportRequestHandler handler, String executor, boolean forceExecution) { this.action = action; - try { - this.requestConstructor = request.getDeclaredConstructor(); - } catch (NoSuchMethodException e) { - throw new IllegalStateException("failed to create constructor (does it have a default constructor?) for request " + request, e); - } - this.requestConstructor.setAccessible(true); + this.requestFactory = requestFactory; assert newRequest() != null; this.handler = handler; this.forceExecution = forceExecution; @@ -54,7 +54,7 @@ public class RequestHandlerRegistry { public Request newRequest() { try { - return requestConstructor.newInstance(); + return requestFactory.call(); } catch (Exception e) { throw new IllegalStateException("failed to instantiate request ", e); } @@ -71,4 +71,22 @@ public class RequestHandlerRegistry { public String getExecutor() { return executor; } + + private final static class ReflectionFactory implements Callable { + private final Constructor requestConstructor; + + public ReflectionFactory(Class request) { + try { + this.requestConstructor = request.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new IllegalStateException("failed to create constructor (does it have a default constructor?) for request " + request, e); + } + this.requestConstructor.setAccessible(true); + } + + @Override + public Request call() throws Exception { + return requestConstructor.newInstance(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b70589ce52d..40fa908c2b3 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -22,8 +22,6 @@ package org.elasticsearch.transport; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.settings.ClusterDynamicSettings; -import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -35,12 +33,21 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.*; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -399,6 +406,18 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, handler, executor, false); + registerRequestHandler(reg); + } + /** * Registers a new request handler * @param action The action the request handler is associated with @@ -408,8 +427,12 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Class request, String executor, boolean forceExecution, TransportRequestHandler handler) { + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); + registerRequestHandler(reg); + } + + protected void registerRequestHandler(RequestHandlerRegistry reg) { synchronized (requestHandlerMutex) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction()); requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); if (replaced != null) { diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 10b34493334..0f0e6a28426 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -113,6 +113,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -395,7 +396,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testOptimize() { - String optimizeShardAction = OptimizeAction.NAME + "[s]"; + String optimizeShardAction = OptimizeAction.NAME + "[n]"; interceptTransportActions(optimizeShardAction); OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases()); @@ -419,7 +420,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testClearCache() { - String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]"; + String clearCacheAction = ClearIndicesCacheAction.NAME + "[n]"; interceptTransportActions(clearCacheAction); ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases()); @@ -431,7 +432,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testRecovery() { - String recoveryAction = RecoveryAction.NAME + "[s]"; + String recoveryAction = RecoveryAction.NAME + "[n]"; interceptTransportActions(recoveryAction); RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases()); @@ -443,7 +444,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testSegments() { - String segmentsAction = IndicesSegmentsAction.NAME + "[s]"; + String segmentsAction = IndicesSegmentsAction.NAME + "[n]"; interceptTransportActions(segmentsAction); IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases()); @@ -455,7 +456,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testIndicesStats() { - String indicesStats = IndicesStatsAction.NAME + "[s]"; + String indicesStats = IndicesStatsAction.NAME + "[n]"; interceptTransportActions(indicesStats); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases()); @@ -888,6 +889,11 @@ public class IndicesRequestIT extends ESIntegTestCase { super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler(action, handler)); } + @Override + public void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler(action, handler)); + } + private class InterceptingRequestHandler implements TransportRequestHandler { private final TransportRequestHandler requestHandler; diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java new file mode 100644 index 00000000000..c6e2f2cf942 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -0,0 +1,422 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.broadcast.node; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.TestClusterService; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.object.HasToString.hasToString; + +public class TransportBroadcastByNodeActionTests extends ESTestCase { + + private static final String TEST_INDEX = "test-index"; + private static final String TEST_CLUSTER = "test-cluster"; + private static ThreadPool THREAD_POOL; + + private TestClusterService clusterService; + private CapturingTransport transport; + private TransportService transportService; + + private TestTransportBroadcastByNodeAction action; + + public static class Request extends BroadcastRequest { + public Request() { + } + + public Request(String[] indices) { + super(indices); + } + } + + public static class Response extends BroadcastResponse { + public Response() { + } + + public Response(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + } + } + + class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { + private final Map shards = new HashMap<>(); + + public TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Class request, String executor) { + super(settings, "indices:admin/test", THREAD_POOL, TransportBroadcastByNodeActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor); + } + + @Override + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); + } + + @Override + protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List emptyResults, List shardFailures) { + return new Response(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + protected Request readRequestFrom(StreamInput in) throws IOException { + final Request request = new Request(); + request.readFrom(in); + return request; + } + + @Override + protected EmptyResult shardOperation(Request request, ShardRouting shardRouting) { + if (rarely()) { + shards.put(shardRouting, Boolean.TRUE); + return EmptyResult.INSTANCE; + } else { + ElasticsearchException e = new ElasticsearchException("operation failed"); + shards.put(shardRouting, e); + throw e; + } + } + + @Override + protected ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices) { + return clusterState.routingTable().allShards(new String[]{TEST_INDEX}); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + } + + public Map getResults() { + return shards; + } + } + + class MyResolver extends IndexNameExpressionResolver { + public MyResolver() { + super(Settings.EMPTY); + } + + @Override + public String[] concreteIndices(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } + + @BeforeClass + public static void startThreadPool() { + THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = new TestClusterService(THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + setClusterState(clusterService, TEST_INDEX); + action = new TestTransportBroadcastByNodeAction( + Settings.EMPTY, + transportService, + new ActionFilters(new HashSet()), + new MyResolver(), + Request.class, + ThreadPool.Names.SAME + ); + } + + void setClusterState(TestClusterService clusterService, String index) { + int numberOfNodes = randomIntBetween(3, 5); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index); + + int shardIndex = -1; + for (int i = 0; i < numberOfNodes; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + int numberOfShards = randomIntBetween(0, 10); + for (int j = 0; j < numberOfShards; j++) { + final ShardId shardId = new ShardId(index, ++shardIndex); + ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.id(), true, ShardRoutingState.STARTED, 1); + IndexShardRoutingTable.Builder indexShard = new IndexShardRoutingTable.Builder(shardId); + indexShard.addShard(shard); + indexRoutingTable.addIndexShard(indexShard.build()); + } + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(numberOfNodes - 1).id()); + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName(TEST_CLUSTER)); + stateBuilder.nodes(discoBuilder); + stateBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable.build()).build()); + ClusterState clusterState = stateBuilder.build(); + clusterService.setState(clusterState); + } + + static DiscoveryNode newNode(int nodeId) { + return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); + } + + @AfterClass + public static void destroyThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + THREAD_POOL = null; + } + + public void testGlobalBlock() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + try { + action.new AsyncAction(request, listener).start(); + fail("expected ClusterBlockException"); + } catch (ClusterBlockException expected) { + + } + } + + public void testRequestBlock() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + try { + action.new AsyncAction(request, listener).start(); + fail("expected ClusterBlockException"); + } catch (ClusterBlockException expected) { + + } + } + + public void testOneRequestIsSentToEachNodeHoldingAShard() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + action.new AsyncAction(request, listener).start(); + Map> capturedRequests = transport.capturedRequestsByTargetNode(); + + ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); + Set set = new HashSet<>(); + for (ShardRouting shard : shardIt.asUnordered()) { + set.add(shard.currentNodeId()); + } + + // check a request was sent to the right number of nodes + assertEquals(set.size(), capturedRequests.size()); + + // check requests were sent to the right nodes + assertEquals(set, capturedRequests.keySet()); + for (Map.Entry> entry : capturedRequests.entrySet()) { + // check one request was sent to each node + assertEquals(1, entry.getValue().size()); + } + } + + public void testOperationExecution() throws Exception { + ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); + Set shards = new HashSet<>(); + String nodeId = shardIt.asUnordered().iterator().next().currentNodeId(); + for (ShardRouting shard : shardIt.asUnordered()) { + if (nodeId.equals(shard.currentNodeId())) { + shards.add(shard); + } + } + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + action.new BroadcastByNodeTransportRequestHandler(); + + TestTransportChannel channel = new TestTransportChannel(); + + handler.messageReceived(action.new NodeRequest(nodeId, new Request(), new ArrayList<>(shards)), channel); + + // check the operation was executed only on the expected shards + assertEquals(shards, action.getResults().keySet()); + + TransportResponse response = channel.getCapturedResponse(); + assertTrue(response instanceof TransportBroadcastByNodeAction.NodeResponse); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = (TransportBroadcastByNodeAction.NodeResponse)response; + + // check the operation was executed on the correct node + assertEquals("node id", nodeId, nodeResponse.getNodeId()); + + int successfulShards = 0; + int failedShards = 0; + for (Object result : action.getResults().values()) { + if (!(result instanceof ElasticsearchException)) { + successfulShards++; + } else { + failedShards++; + } + } + + // check the operation results + assertEquals("successful shards", successfulShards, nodeResponse.getSuccessfulShards()); + assertEquals("total shards", action.getResults().size(), nodeResponse.getTotalShards()); + assertEquals("failed shards", failedShards, nodeResponse.getExceptions().size()); + List exceptions = nodeResponse.getExceptions(); + for (BroadcastShardOperationFailedException exception : exceptions) { + assertThat(exception.getMessage(), is("operation indices:admin/test failed")); + assertThat(exception, hasToString(containsString("operation failed"))); + } + } + + public void testResultAggregation() throws ExecutionException, InterruptedException { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + action.new AsyncAction(request, listener).start(); + Map> capturedRequests = transport.capturedRequestsByTargetNode(); + transport.clear(); + + ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX}); + Map> map = new HashMap<>(); + for (ShardRouting shard : shardIt.asUnordered()) { + if (!map.containsKey(shard.currentNodeId())) { + map.put(shard.currentNodeId(), new ArrayList()); + } + map.get(shard.currentNodeId()).add(shard); + } + + int totalShards = 0; + int totalSuccessfulShards = 0; + int totalFailedShards = 0; + for (Map.Entry> entry : capturedRequests.entrySet()) { + List exceptions = new ArrayList<>(); + long requestId = entry.getValue().get(0).requestId; + if (rarely()) { + // simulate node failure + totalShards += map.get(entry.getKey()).size(); + totalFailedShards += map.get(entry.getKey()).size(); + transport.handleResponse(requestId, new Exception()); + } else { + List shards = map.get(entry.getKey()); + List shardResults = new ArrayList<>(); + for (ShardRouting shard : shards) { + totalShards++; + if (rarely()) { + // simulate operation failure + totalFailedShards++; + exceptions.add(new BroadcastShardOperationFailedException(shard.shardId(), "operation indices:admin/test failed")); + } else { + shardResults.add(TransportBroadcastByNodeAction.EmptyResult.INSTANCE); + } + } + totalSuccessfulShards += shardResults.size(); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(), shardResults, exceptions); + transport.handleResponse(requestId, nodeResponse); + } + } + + Response response = listener.get(); + assertEquals("total shards", totalShards, response.getTotalShards()); + assertEquals("successful shards", totalSuccessfulShards, response.getSuccessfulShards()); + assertEquals("failed shards", totalFailedShards, response.getFailedShards()); + assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); + } + + public class TestTransportChannel implements TransportChannel { + private TransportResponse capturedResponse; + + public TransportResponse getCapturedResponse() { + return capturedResponse; + } + + @Override + public String action() { + return null; + } + + @Override + public String getProfileName() { + return ""; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + capturedResponse = response; + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + } + + @Override + public void sendResponse(Throwable error) throws IOException { + } + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java b/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java index ad2d1a79eda..e7ef3259199 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java +++ b/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java @@ -19,7 +19,6 @@ package org.elasticsearch.benchmark.recovery; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.bootstrap.BootstrapForTesting; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -30,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.node.Node; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.TransportModule; @@ -128,12 +128,12 @@ public class ReplicaRecoveryBenchmark { long currentTime = System.currentTimeMillis(); long currentDocs = indexer.totalIndexedDocs(); RecoveryResponse recoveryResponse = client1.admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).get(); - List indexRecoveries = recoveryResponse.shardResponses().get(INDEX_NAME); + List indexRecoveries = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); long translogOps; long bytes; if (indexRecoveries.size() > 0) { - translogOps = indexRecoveries.get(0).recoveryState().getTranslog().recoveredOperations(); - bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes(); + translogOps = indexRecoveries.get(0).getTranslog().recoveredOperations(); + bytes = recoveryResponse.shardRecoveryStates().get(INDEX_NAME).get(0).getIndex().recoveredBytes(); } else { bytes = lastBytes = 0; translogOps = lastTranslogOps = 0; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java index 2cbc364bafe..9a0cbb2bcca 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java @@ -243,4 +243,25 @@ public class RoutingTableTest extends ESAllocationTestCase { fail("Calling with non-existing index should be ignored at the moment"); } } + + public void testAllShardsForMultipleIndices() { + assertThat(this.emptyRoutingTable.allShards(new String[0]).size(), is(0)); + + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + initPrimaries(); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + startInitializingShards(TEST_INDEX_1); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + startInitializingShards(TEST_INDEX_2); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1, TEST_INDEX_2}).size(), is(this.totalNumberOfShards)); + + try { + this.testRoutingTable.allShards(new String[]{TEST_INDEX_1, "not_exists"}); + } catch (IndexNotFoundException e) { + fail("Calling with non-existing index should be ignored at the moment"); + } + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index cc293375a2c..55204365688 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -42,7 +42,11 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BytesTransportRequest; +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; import org.junit.Test; @@ -112,7 +116,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { createIndex("test"); ensureSearchable("test"); RecoveryResponse r = client().admin().indices().prepareRecoveries("test").get(); - int numRecoveriesBeforeNewMaster = r.shardResponses().get("test").size(); + int numRecoveriesBeforeNewMaster = r.shardRecoveryStates().get("test").size(); final String oldMaster = internalCluster().getMasterName(); internalCluster().stopCurrentMasterNode(); @@ -127,7 +131,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { ensureSearchable("test"); r = client().admin().indices().prepareRecoveries("test").get(); - int numRecoveriesAfterNewMaster = r.shardResponses().get("test").size(); + int numRecoveriesAfterNewMaster = r.shardRecoveryStates().get("test").size(); assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster)); } diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index f1c6d476eb5..529b60562bf 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -35,7 +35,10 @@ import org.elasticsearch.test.ESIntegTestCase; import org.junit.Test; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; /** */ @@ -146,7 +149,7 @@ public class ShardInfoIT extends ESIntegTestCase { RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") .setActiveOnly(true) .get(); - assertThat(recoveryResponse.shardResponses().get("idx").size(), equalTo(0)); + assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); } }); } diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java index 745c98f664e..fbd8b973fad 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -99,8 +98,7 @@ public class RecoveryBackwardsCompatibilityIT extends ESBackcompatTestCase { HashMap map = new HashMap<>(); map.put("details", "true"); final ToXContent.Params params = new ToXContent.MapParams(map); - for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { - RecoveryState recoveryState = response.recoveryState(); + for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { final String recoverStateAsJSON = XContentHelper.toString(recoveryState, params); if (!recoveryState.getPrimary()) { RecoveryState.Index index = recoveryState.getIndex(); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index aa229305a2f..341139ba88b 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; @@ -400,8 +399,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { assertSyncIdsNotNull(); } RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); - for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { - RecoveryState recoveryState = response.recoveryState(); + for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { long recovered = 0; for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { if (file.name().startsWith("segments")) { @@ -410,7 +408,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { } if (!recoveryState.getPrimary() && (useSyncIds == false)) { logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", - response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getShardId().getId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l)); @@ -422,7 +420,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { } else { if (useSyncIds && !recoveryState.getPrimary()) { logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}", - response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getShardId().getId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); } assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l)); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3ec731156be..656d251b1bf 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -28,7 +28,10 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -43,7 +46,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryParsingException; @@ -65,7 +67,10 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.EMPTY_PARAMS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index c2860c75033..7c27a792d79 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -53,7 +52,12 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.junit.Test; import java.io.IOException; @@ -67,7 +71,13 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESIntegTestCase.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; /** * @@ -155,18 +165,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - assertThat(response.shardResponses().size(), equalTo(SHARD_COUNT)); - assertThat(response.shardResponses().get(INDEX_NAME).size(), equalTo(1)); + assertThat(response.shardRecoveryStates().size(), equalTo(SHARD_COUNT)); + assertThat(response.shardRecoveryStates().get(INDEX_NAME).size(), equalTo(1)); - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(1)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(1)); - ShardRecoveryResponse shardResponse = shardResponses.get(0); - RecoveryState state = shardResponse.recoveryState(); + RecoveryState recoveryState = recoveryStates.get(0); - assertRecoveryState(state, 0, Type.STORE, Stage.DONE, node, node, false); + assertRecoveryState(recoveryState, 0, Type.STORE, Stage.DONE, node, node, false); - validateIndexRecoveryState(state.getIndex()); + validateIndexRecoveryState(recoveryState.getIndex()); } @Test @@ -183,8 +192,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).execute().actionGet(); - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(0)); // Should not expect any responses back + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(0)); // Should not expect any responses back } @Test @@ -209,23 +218,23 @@ public class IndexRecoveryIT extends ESIntegTestCase { RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); // we should now have two total shards, one primary and one replica - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(2)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(2)); - List nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); + List nodeAResponses = findRecoveriesForTargetNode(nodeA, recoveryStates); assertThat(nodeAResponses.size(), equalTo(1)); - List nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); + List nodeBResponses = findRecoveriesForTargetNode(nodeB, recoveryStates); assertThat(nodeBResponses.size(), equalTo(1)); // validate node A recovery - ShardRecoveryResponse nodeAShardResponse = nodeAResponses.get(0); - assertRecoveryState(nodeAShardResponse.recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); - validateIndexRecoveryState(nodeAShardResponse.recoveryState().getIndex()); + RecoveryState nodeARecoveryState = nodeAResponses.get(0); + assertRecoveryState(nodeARecoveryState, 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); + validateIndexRecoveryState(nodeARecoveryState.getIndex()); // validate node B recovery - ShardRecoveryResponse nodeBShardResponse = nodeBResponses.get(0); - assertRecoveryState(nodeBShardResponse.recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBShardResponse.recoveryState().getIndex()); + RecoveryState nodeBRecoveryState = nodeBResponses.get(0); + assertRecoveryState(nodeBRecoveryState, 0, Type.REPLICA, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryState.getIndex()); } @Test @@ -266,17 +275,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - List shardResponses = response.shardResponses().get(INDEX_NAME); - List nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(1)); - List nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + List nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); - validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertOnGoingRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); logger.info("--> request node recovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); @@ -325,11 +334,11 @@ public class IndexRecoveryIT extends ESIntegTestCase { response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(1)); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(1)); - assertRecoveryState(shardResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(shardResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(recoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(recoveryStates.get(0).getIndex()); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); assertThat(statsResponse.getNodes(), arrayWithSize(2)); @@ -377,45 +386,45 @@ public class IndexRecoveryIT extends ESIntegTestCase { .execute().actionGet().getState(); response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(1)); - nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); - List nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses); - assertThat(nodeCResponses.size(), equalTo(1)); + nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); - validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) - assertOnGoingRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, nodeB, nodeC, false); - validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex()); + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, nodeB, nodeC, false); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); logger.info("--> speeding up recoveries"); restoreRecoverySpeed(); ensureGreen(); response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(0)); - nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); - nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses); - assertThat(nodeCResponses.size(), equalTo(1)); + nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(0)); + nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); + nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) - assertRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeC, false); - validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeC, false); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } @Test @@ -457,24 +466,24 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - for (Map.Entry> shardRecoveryResponse : response.shardResponses().entrySet()) { + for (Map.Entry> indexRecoveryStates : response.shardRecoveryStates().entrySet()) { - assertThat(shardRecoveryResponse.getKey(), equalTo(INDEX_NAME)); - List shardRecoveryResponses = shardRecoveryResponse.getValue(); - assertThat(shardRecoveryResponses.size(), equalTo(totalShards)); + assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME)); + List recoveryStates = indexRecoveryStates.getValue(); + assertThat(recoveryStates.size(), equalTo(totalShards)); - for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) { - assertRecoveryState(shardResponse.recoveryState(), 0, Type.SNAPSHOT, Stage.DONE, null, nodeA, true); - validateIndexRecoveryState(shardResponse.recoveryState().getIndex()); + for (RecoveryState recoveryState : recoveryStates) { + assertRecoveryState(recoveryState, 0, Type.SNAPSHOT, Stage.DONE, null, nodeA, true); + validateIndexRecoveryState(recoveryState.getIndex()); } } } - private List findRecoveriesForTargetNode(String nodeName, List responses) { - List nodeResponses = new ArrayList<>(); - for (ShardRecoveryResponse response : responses) { - if (response.recoveryState().getTargetNode().getName().equals(nodeName)) { - nodeResponses.add(response); + private List findRecoveriesForTargetNode(String nodeName, List recoveryStates) { + List nodeResponses = new ArrayList<>(); + for (RecoveryState recoveryState : recoveryStates) { + if (recoveryState.getTargetNode().getName().equals(nodeName)) { + nodeResponses.add(recoveryState); } } return nodeResponses; diff --git a/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java b/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java index 695845bca0d..3a00be0aeb9 100644 --- a/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.warmer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.collect.ImmutableList; - import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; @@ -49,7 +48,10 @@ import org.junit.Test; import java.util.Locale; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; public class SimpleIndicesWarmerIT extends ESIntegTestCase { @@ -272,7 +274,7 @@ public class SimpleIndicesWarmerIT extends ESIntegTestCase { for (IndexShardSegments indexShardSegments : indicesSegments) { for (ShardSegments shardSegments : indexShardSegments) { for (Segment segment : shardSegments) { - logger.debug("+=" + segment.memoryInBytes + " " + indexShardSegments.getShardId() + " " + shardSegments.getIndex()); + logger.debug("+=" + segment.memoryInBytes + " " + indexShardSegments.getShardId() + " " + shardSegments.getShardRouting().getIndex()); total += segment.memoryInBytes; } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index 60a6acb7ed9..450212b75b5 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -284,7 +284,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().get(); for (ShardStats shardStats : indicesStatsResponse.getShards()) { DocsStats docsStats = shardStats.getStats().docs; - logger.info("shard [{}] - count {}, primary {}", shardStats.getShardId(), docsStats.getCount(), shardStats.getShardRouting().primary()); + logger.info("shard [{}] - count {}, primary {}", shardStats.getShardRouting().id(), docsStats.getCount(), shardStats.getShardRouting().primary()); } //if there was an error we try to wait and see if at some point it'll get fixed diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 39622287ea2..628a886e6ae 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -63,7 +63,6 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -422,7 +421,7 @@ public class RelocationIT extends ESIntegTestCase { public boolean apply(Object input) { RecoveryResponse recoveryResponse = internalCluster().client(redNodeName).admin().indices().prepareRecoveries(indexName) .get(); - return !recoveryResponse.shardResponses().get(indexName).isEmpty(); + return !recoveryResponse.shardRecoveryStates().get(indexName).isEmpty(); } } ); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index d36ecfa7d46..5c7fc445fbf 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AbstractDiffable; @@ -55,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.rest.RestChannel; @@ -81,8 +81,18 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESIntegTestCase.ClusterScope; import static org.elasticsearch.test.ESIntegTestCase.Scope; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -import static org.hamcrest.Matchers.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; /** */ @@ -592,9 +602,9 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); IntSet reusedShards = new IntHashSet(); - for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) { - if (response.recoveryState().getIndex().reusedBytes() > 0) { - reusedShards.add(response.getShardId()); + for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries("test-idx").get().shardRecoveryStates().get("test-idx")) { + if (recoveryState.getIndex().reusedBytes() > 0) { + reusedShards.add(recoveryState.getShardId().getId()); } } logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards);