diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java index e8243bf5d50..c219d85f9d5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.cluster.stats; import com.carrotsearch.hppc.ObjectObjectHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -67,10 +66,10 @@ public class ClusterStatsIndices implements ToXContent, Streamable { for (ClusterStatsNodeResponse r : nodeResponses) { for (org.elasticsearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { - ShardStats indexShardStats = countsPerIndex.get(shardStats.getIndex()); + ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndex()); if (indexShardStats == null) { indexShardStats = new ShardStats(); - countsPerIndex.put(shardStats.getIndex(), indexShardStats); + countsPerIndex.put(shardStats.getShardRouting().getIndex(), indexShardStats); } indexShardStats.total++; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java deleted file mode 100644 index 3f583b7efa4..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheRequest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.cache.clear; - -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; - -/** - * - */ -class ShardClearIndicesCacheRequest extends BroadcastShardRequest { - - private boolean queryCache = false; - private boolean fieldDataCache = false; - private boolean recycler; - private boolean requestCache = false; - - private String[] fields = null; - - ShardClearIndicesCacheRequest() { - } - - ShardClearIndicesCacheRequest(ShardId shardId, ClearIndicesCacheRequest request) { - super(shardId, request); - queryCache = request.queryCache(); - fieldDataCache = request.fieldDataCache(); - fields = request.fields(); - recycler = request.recycler(); - requestCache = request.requestCache(); - } - - public boolean queryCache() { - return queryCache; - } - - public boolean requestCache() { - return requestCache; - } - - public boolean fieldDataCache() { - return this.fieldDataCache; - } - - public boolean recycler() { - return this.recycler; - } - - public String[] fields() { - return this.fields; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - queryCache = in.readBoolean(); - fieldDataCache = in.readBoolean(); - recycler = in.readBoolean(); - fields = in.readStringArray(); - requestCache = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(queryCache); - out.writeBoolean(fieldDataCache); - out.writeBoolean(recycler); - out.writeStringArrayNullable(fields); - out.writeBoolean(requestCache); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java deleted file mode 100644 index c2931df6003..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ShardClearIndicesCacheResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.cache.clear; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.index.shard.ShardId; - -/** - * - */ -class ShardClearIndicesCacheResponse extends BroadcastShardResponse { - - ShardClearIndicesCacheResponse() { - } - - ShardClearIndicesCacheResponse(ShardId shardId) { - super(shardId); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 2f8d7f84cda..ba90ca19263 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -21,17 +21,16 @@ package org.elasticsearch.action.admin.indices.cache.clear; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -40,14 +39,14 @@ import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; +import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; + /** * Indices clear cache action. */ -public class TransportClearIndicesCacheAction extends TransportBroadcastAction { +public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; private final IndicesRequestCache indicesRequestCache; @@ -58,48 +57,33 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); } @Override - protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) { - return new ShardClearIndicesCacheRequest(shard.shardId(), request); + protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardClearIndicesCacheResponse newShardResponse() { - return new ShardClearIndicesCacheResponse(); + protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOException { + final ClearIndicesCacheRequest request = new ClearIndicesCacheRequest(); + request.readFrom(in); + return request; } @Override - protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRequest request) { - IndexService service = indicesService.indexService(request.shardId().getIndex()); + protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) { + IndexService service = indicesService.indexService(shardRouting.getIndex()); if (service != null) { - IndexShard shard = service.shard(request.shardId().id()); + IndexShard shard = service.shard(shardRouting.id()); boolean clearedAtLeastOne = false; if (request.queryCache()) { clearedAtLeastOne = true; @@ -137,15 +121,15 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction { +public class TransportOptimizeAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -54,55 +52,40 @@ public class TransportOptimizeAction extends TransportBroadcastAction shardFailures = null; - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // a non active shard, ignore... - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - successfulShards++; - } - } - return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); } @Override - protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) { - return new ShardOptimizeRequest(shard.shardId(), request); + protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardOptimizeResponse newShardResponse() { - return new ShardOptimizeResponse(); + protected OptimizeRequest readRequestFrom(StreamInput in) throws IOException { + final OptimizeRequest request = new OptimizeRequest(); + request.readFrom(in); + return request; } @Override - protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) { - IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id()); - indexShard.optimize(request.optimizeRequest()); - return new ShardOptimizeResponse(request.shardId()); + protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) { + IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id()); + indexShard.optimize(request); + return EmptyResult.INSTANCE; } /** * The refresh request works against *all* shards. */ @Override - protected GroupShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) { - return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true); + protected ShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) { + return clusterState.routingTable().allShards(concreteIndices); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java index fea33688c14..0e0881d1729 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.ArrayList; @@ -38,7 +39,7 @@ import java.util.Map; public class RecoveryResponse extends BroadcastResponse implements ToXContent { private boolean detailed = false; - private Map> shardResponses = new HashMap<>(); + private Map> shardRecoveryStates = new HashMap<>(); public RecoveryResponse() { } @@ -50,18 +51,18 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { * @param successfulShards Count of shards successfully processed * @param failedShards Count of shards which failed to process * @param detailed Display detailed metrics - * @param shardResponses Map of indices to shard recovery information + * @param shardRecoveryStates Map of indices to shard recovery information * @param shardFailures List of failures processing shards */ public RecoveryResponse(int totalShards, int successfulShards, int failedShards, boolean detailed, - Map> shardResponses, List shardFailures) { + Map> shardRecoveryStates, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); - this.shardResponses = shardResponses; + this.shardRecoveryStates = shardRecoveryStates; this.detailed = detailed; } public boolean hasRecoveries() { - return shardResponses.size() > 0; + return shardRecoveryStates.size() > 0; } public boolean detailed() { @@ -72,23 +73,23 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { this.detailed = detailed; } - public Map> shardResponses() { - return shardResponses; + public Map> shardRecoveryStates() { + return shardRecoveryStates; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { if (hasRecoveries()) { - for (String index : shardResponses.keySet()) { - List responses = shardResponses.get(index); - if (responses == null || responses.size() == 0) { + for (String index : shardRecoveryStates.keySet()) { + List recoveryStates = shardRecoveryStates.get(index); + if (recoveryStates == null || recoveryStates.size() == 0) { continue; } builder.startObject(index); builder.startArray("shards"); - for (ShardRecoveryResponse recoveryResponse : responses) { + for (RecoveryState recoveryState : recoveryStates) { builder.startObject(); - recoveryResponse.toXContent(builder, params); + recoveryState.toXContent(builder, params); builder.endObject(); } builder.endArray(); @@ -101,12 +102,12 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVInt(shardResponses.size()); - for (Map.Entry> entry : shardResponses.entrySet()) { + out.writeVInt(shardRecoveryStates.size()); + for (Map.Entry> entry : shardRecoveryStates.entrySet()) { out.writeString(entry.getKey()); out.writeVInt(entry.getValue().size()); - for (ShardRecoveryResponse recoveryResponse : entry.getValue()) { - recoveryResponse.writeTo(out); + for (RecoveryState recoveryState : entry.getValue()) { + recoveryState.writeTo(out); } } } @@ -118,11 +119,11 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent { for (int i = 0; i < size; i++) { String s = in.readString(); int listSize = in.readVInt(); - List list = new ArrayList<>(listSize); + List list = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { - list.add(ShardRecoveryResponse.readShardRecoveryResponse(in)); + list.add(RecoveryState.readRecoveryState(in)); } - shardResponses.put(s, list); + shardRecoveryStates.put(s, list); } } } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java deleted file mode 100644 index a4104fbc449..00000000000 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.admin.indices.recovery; - -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.recovery.RecoveryState; - -import java.io.IOException; - -/** - * Information regarding the recovery state of a shard. - */ -public class ShardRecoveryResponse extends BroadcastShardResponse implements ToXContent { - - RecoveryState recoveryState; - - public ShardRecoveryResponse() { } - - /** - * Constructs shard recovery information for the given index and shard id. - * - * @param shardId Id of the shard - */ - ShardRecoveryResponse(ShardId shardId) { - super(shardId); - } - - /** - * Sets the recovery state information for the shard. - * - * @param recoveryState Recovery state - */ - public void recoveryState(RecoveryState recoveryState) { - this.recoveryState = recoveryState; - } - - /** - * Gets the recovery state information for the shard. Null if shard wasn't recovered / recovery didn't start yet. - * - * @return Recovery state - */ - @Nullable - public RecoveryState recoveryState() { - return recoveryState; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - recoveryState.toXContent(builder, params); - return builder; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - recoveryState.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - recoveryState = RecoveryState.readRecoveryState(in); - } - - /** - * Builds a new ShardRecoveryResponse from the give input stream. - * - * @param in Input stream - * @return A new ShardRecoveryResponse - * @throws IOException - */ - public static ShardRecoveryResponse readShardRecoveryResponse(StreamInput in) throws IOException { - ShardRecoveryResponse response = new ShardRecoveryResponse(); - response.readFrom(in); - return response; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index cee59c7eb2a..86817fd1cd9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -19,40 +19,37 @@ package org.elasticsearch.action.admin.indices.recovery; +import com.google.common.collect.Maps; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * Transport action for shard recovery operation. This transport action does not actually * perform shard recovery, it only reports on recoveries (both active and complete). */ -public class TransportRecoveryAction extends TransportBroadcastAction { +public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -61,84 +58,55 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures = null; - Map> shardResponses = new HashMap<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); + @Override + protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + Map> shardResponses = Maps.newHashMap(); + for (RecoveryState recoveryState : responses) { + if (recoveryState == null) { + continue; + } + String indexName = recoveryState.getShardId().getIndex(); + if (!shardResponses.containsKey(indexName)) { + shardResponses.put(indexName, new ArrayList()); + } + if (request.activeOnly()) { + if (recoveryState.getStage() != RecoveryState.Stage.DONE) { + shardResponses.get(indexName).add(recoveryState); } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); } else { - ShardRecoveryResponse recoveryResponse = (ShardRecoveryResponse) shardResponse; - successfulShards++; - - if (recoveryResponse.recoveryState() == null) { - // recovery not yet started - continue; - } - - String indexName = recoveryResponse.getIndex(); - List responses = shardResponses.get(indexName); - - if (responses == null) { - responses = new ArrayList<>(); - shardResponses.put(indexName, responses); - } - - if (request.activeOnly()) { - if (recoveryResponse.recoveryState().getStage() != RecoveryState.Stage.DONE) { - responses.add(recoveryResponse); - } - } else { - responses.add(recoveryResponse); - } + shardResponses.get(indexName).add(recoveryState); } } - - return new RecoveryResponse(shardsResponses.length(), successfulShards, - failedShards, request.detailed(), shardResponses, shardFailures); + return new RecoveryResponse(totalShards, successfulShards, failedShards, request.detailed(), shardResponses, shardFailures); } @Override - protected ShardRecoveryRequest newShardRequest(int numShards, ShardRouting shard, RecoveryRequest request) { - return new ShardRecoveryRequest(shard.shardId(), request); + protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException { + final RecoveryRequest recoveryRequest = new RecoveryRequest(); + recoveryRequest.readFrom(in); + return recoveryRequest; } @Override - protected ShardRecoveryResponse newShardResponse() { - return new ShardRecoveryResponse(); + protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); + return indexShard.recoveryState(); } @Override - protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) { - - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); - ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId()); - - RecoveryState state = indexShard.recoveryState(); - shardRecoveryResponse.recoveryState(state); - return shardRecoveryResponse; - } - - @Override - protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) { - return state.routingTable().allAssignedShardsGrouped(concreteIndices, true, true); + protected ShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) { + return state.routingTable().allShardsIncludingRelocationTargets(concreteIndices); } @Override @@ -150,14 +118,4 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures) { + IndicesSegmentResponse(ShardSegments[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; } @@ -63,7 +62,7 @@ public class IndicesSegmentResponse extends BroadcastResponse implements ToXCont Set indices = Sets.newHashSet(); for (ShardSegments shard : shards) { - indices.add(shard.getIndex()); + indices.add(shard.getShardRouting().getIndex()); } for (String index : indices) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java index 6e754a26210..4b3264fca40 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java @@ -20,10 +20,10 @@ package org.elasticsearch.action.admin.indices.segments; import com.google.common.collect.ImmutableList; -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.index.engine.Segment; import java.io.IOException; @@ -33,7 +33,7 @@ import java.util.List; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; -public class ShardSegments extends BroadcastShardResponse implements Iterable { +public class ShardSegments implements Streamable, Iterable { private ShardRouting shardRouting; @@ -43,7 +43,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable segments) { - super(shardRouting.shardId()); this.shardRouting = shardRouting; this.segments = segments; } @@ -89,7 +88,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable { +public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -59,7 +52,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardSegments) shardResponse); - successfulShards++; - } - } - return new IndicesSegmentResponse(shards.toArray(new ShardSegments[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardSegments readShardResult(StreamInput in) throws IOException { + return ShardSegments.readShardSegments(in); } @Override - protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) { - return new IndexShardSegmentRequest(shard.shardId(), request); + protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures) { + return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardSegments newShardResponse() { - return new ShardSegments(); + protected IndicesSegmentsRequest readRequestFrom(StreamInput in) throws IOException { + final IndicesSegmentsRequest request = new IndicesSegmentsRequest(); + request.readFrom(in); + return request; } @Override - protected ShardSegments shardOperation(IndexShardSegmentRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); - return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose)); - } - - static class IndexShardSegmentRequest extends BroadcastShardRequest { - boolean verbose; - - IndexShardSegmentRequest() { - verbose = false; - } - - IndexShardSegmentRequest(ShardId shardId, IndicesSegmentsRequest request) { - super(shardId, request); - verbose = request.verbose(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(verbose); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - verbose = in.readBoolean(); - } + protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.id()); + return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose())); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index d9b8e9da77d..885dddeea6a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -24,7 +24,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -51,7 +50,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten } - IndicesStatsResponse(ShardStats[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { + IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; } @@ -90,7 +89,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten Set indices = Sets.newHashSet(); for (ShardStats shard : shards) { - indices.add(shard.getIndex()); + indices.add(shard.getShardRouting().getIndex()); } for (String index : indices) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index b3c87de3dd1..2ef6c4df41d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,17 +19,16 @@ package org.elasticsearch.action.admin.indices.stats; -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; @@ -37,7 +36,7 @@ import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEnt /** */ -public class ShardStats extends BroadcastShardResponse implements ToXContent { +public class ShardStats implements Streamable, ToXContent { private ShardRouting shardRouting; private CommonStats commonStats; @Nullable @@ -50,7 +49,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { } public ShardStats(IndexShard indexShard, CommonStatsFlags flags) { - super(indexShard.shardId()); this.shardRouting = indexShard.routingEntry(); this.dataPath = indexShard.shardPath().getRootDataPath().toString(); this.statePath = indexShard.shardPath().getRootStatePath().toString(); @@ -94,7 +92,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); shardRouting = readShardRoutingEntry(in); commonStats = CommonStats.readCommonStats(in); commitStats = CommitStats.readOptionalCommitStatsFrom(in); @@ -105,7 +102,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); shardRouting.writeTo(out); commonStats.writeTo(out); out.writeOptionalStreamable(commitStats); @@ -146,5 +142,4 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent { static final XContentBuilderString NODE = new XContentBuilderString("node"); static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node"); } - } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 9ce5291ba66..bf38f4dc7c0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -21,37 +21,30 @@ package org.elasticsearch.action.admin.indices.stats; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportIndicesStatsAction extends TransportBroadcastAction { +public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -60,7 +53,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardStats) shardResponse); - successfulShards++; - } - } - return new IndicesStatsResponse(shards.toArray(new ShardStats[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardStats readShardResult(StreamInput in) throws IOException { + return ShardStats.readShardStats(in); } @Override - protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) { - return new IndexShardStatsRequest(shard.shardId(), request); + protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardStats newShardResponse() { - return new ShardStats(); + protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException { + IndicesStatsRequest request = new IndicesStatsRequest(); + request.readFrom(in); + return request; } @Override - protected ShardStats shardOperation(IndexShardStatsRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); // if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet if (indexShard.routingEntry() == null) { throw new ShardNotFoundException(indexShard.shardId()); @@ -128,92 +103,65 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction { +public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -58,7 +54,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction shardFailures = null; - final List shards = new ArrayList<>(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // simply ignore non active shards - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); - } else { - shards.add((ShardUpgradeStatus) shardResponse); - successfulShards++; - } - } - return new UpgradeStatusResponse(shards.toArray(new ShardUpgradeStatus[shards.size()]), shardsResponses.length(), successfulShards, failedShards, shardFailures); + protected ShardUpgradeStatus readShardResult(StreamInput in) throws IOException { + return ShardUpgradeStatus.readShardUpgradeStatus(in); } @Override - protected IndexShardUpgradeStatusRequest newShardRequest(int numShards, ShardRouting shard, UpgradeStatusRequest request) { - return new IndexShardUpgradeStatusRequest(shard.shardId(), request); + protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } @Override - protected ShardUpgradeStatus newShardResponse() { - return new ShardUpgradeStatus(); + protected UpgradeStatusRequest readRequestFrom(StreamInput in) throws IOException { + UpgradeStatusRequest request = new UpgradeStatusRequest(); + request.readFrom(in); + return request; } @Override - protected ShardUpgradeStatus shardOperation(IndexShardUpgradeStatusRequest request) { - IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.shardSafe(request.shardId().id()); + protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) { + IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); + IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id()); List segments = indexShard.engine().segments(false); long total_bytes = 0; long to_upgrade_bytes = 0; @@ -136,16 +115,4 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java index 16e24ee66ae..82683625df8 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java @@ -36,14 +36,11 @@ import java.util.Map; import java.util.Set; public class UpgradeStatusResponse extends BroadcastResponse implements ToXContent { - - private ShardUpgradeStatus[] shards; private Map indicesUpgradeStatus; UpgradeStatusResponse() { - } UpgradeStatusResponse(ShardUpgradeStatus[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) { @@ -75,7 +72,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte return indicesUpgradeStats; } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -120,8 +116,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - - builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, getTotalBytes()); builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, getToUpgradeBytes()); builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, getToUpgradeBytesAncient()); @@ -163,10 +157,8 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte } builder.endObject(); } - builder.endObject(); } - builder.endObject(); } return builder; @@ -186,6 +178,5 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient"); static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes"); - } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java new file mode 100644 index 00000000000..036c0c5a607 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -0,0 +1,562 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.broadcast.node; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.NodeShouldNotConnectException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Abstraction for transporting aggregated shard-level operations in a single request (NodeRequest) per-node + * and executing the shard-level operations serially on the receiving node. Each shard-level operation can produce a + * result (ShardOperationResult), these per-node shard-level results are aggregated into a single result + * (BroadcastByNodeResponse) to the coordinating node. These per-node results are aggregated into a single result (Result) + * to the client. + * + * @param the underlying client request + * @param the response to the client request + * @param per-shard operation results + */ +public abstract class TransportBroadcastByNodeAction extends HandledTransportAction { + + private final ClusterService clusterService; + private final TransportService transportService; + + final String transportNodeBroadcastAction; + + public TransportBroadcastByNodeAction( + Settings settings, + String actionName, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Class request, + String executor) { + super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); + + this.clusterService = clusterService; + this.transportService = transportService; + + transportNodeBroadcastAction = actionName + "[n]"; + + transportService.registerRequestHandler(transportNodeBroadcastAction, new Callable() { + @Override + public NodeRequest call() throws Exception { + return new NodeRequest(); + } + }, executor, new BroadcastByNodeTransportRequestHandler()); + } + + private final Response newResponse(Request request, AtomicReferenceArray responses, List unavailableShardExceptions, Map> nodes) { + int totalShards = 0; + int successfulShards = 0; + List broadcastByNodeResponses = new ArrayList<>(); + List exceptions = new ArrayList<>(); + for (int i = 0; i < responses.length(); i++) { + if (responses.get(i) instanceof FailedNodeException) { + FailedNodeException exception = (FailedNodeException) responses.get(i); + totalShards += nodes.get(exception.nodeId()).size(); + for (ShardRouting shard : nodes.get(exception.nodeId())) { + exceptions.add(new DefaultShardOperationFailedException(shard.getIndex(), shard.getId(), exception)); + } + } else { + NodeResponse response = (NodeResponse) responses.get(i); + broadcastByNodeResponses.addAll(response.results); + totalShards += response.getTotalShards(); + successfulShards += response.getSuccessfulShards(); + for (BroadcastShardOperationFailedException throwable : response.getExceptions()) { + if (!TransportActions.isShardNotAvailableException(throwable)) { + exceptions.add(new DefaultShardOperationFailedException(throwable.getIndex(), throwable.getShardId().getId(), throwable)); + } + } + } + } + totalShards += unavailableShardExceptions.size(); + int failedShards = exceptions.size(); + return newResponse(request, totalShards, successfulShards, failedShards, broadcastByNodeResponses, exceptions); + } + + /** + * Deserialize a shard-level result from an input stream + * + * @param in input stream + * @return a deserialized shard-level result + * @throws IOException + */ + protected abstract ShardOperationResult readShardResult(StreamInput in) throws IOException; + + /** + * Creates a new response to the underlying request. + * + * @param request the underlying request + * @param totalShards the total number of shards considered for execution of the operation + * @param successfulShards the total number of shards for which execution of the operation was successful + * @param failedShards the total number of shards for which execution of the operation failed + * @param results the per-node aggregated shard-level results + * @param shardFailures the exceptions corresponding to shard operationa failures + * @return the response + */ + protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures); + + /** + * Deserialize a request from an input stream + * + * @param in input stream + * @return a de-serialized request + * @throws IOException + */ + protected abstract Request readRequestFrom(StreamInput in) throws IOException; + + /** + * Executes the shard-level operation. This method is called once per shard serially on the receiving node. + * + * @param request the node-level request + * @param shardRouting the shard on which to execute the operation + * @return the result of the shard-level operation for the shard + */ + protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting); + + /** + * Determines the shards on which this operation will be executed on. The operation is executed once per shard. + * + * @param clusterState the cluster state + * @param request the underlying request + * @param concreteIndices the concrete indices on which to execute the operation + * @return the shards on which to execute the operation + */ + protected abstract ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices); + + /** + * Executes a global block check before polling the cluster state. + * + * @param state the cluster state + * @param request the underlying request + * @return a non-null exception if the operation is blocked + */ + protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); + + /** + * Executes a global request-level check before polling the cluster state. + * + * @param state the cluster state + * @param request the underlying request + * @param concreteIndices the concrete indices on which to execute the operation + * @return a non-null exception if the operation if blocked + */ + protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices); + + @Override + protected void doExecute(Request request, ActionListener listener) { + new AsyncAction(request, listener).start(); + } + + protected class AsyncAction { + private final Request request; + private final ActionListener listener; + private final ClusterState clusterState; + private final DiscoveryNodes nodes; + private final Map> nodeIds; + private final AtomicReferenceArray responses; + private final AtomicInteger counter = new AtomicInteger(); + private List unavailableShardExceptions = new ArrayList<>(); + + protected AsyncAction(Request request, ActionListener listener) { + this.request = request; + this.listener = listener; + + clusterState = clusterService.state(); + nodes = clusterState.nodes(); + + ClusterBlockException globalBlockException = checkGlobalBlock(clusterState, request); + if (globalBlockException != null) { + throw globalBlockException; + } + + String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request); + ClusterBlockException requestBlockException = checkRequestBlock(clusterState, request, concreteIndices); + if (requestBlockException != null) { + throw requestBlockException; + } + + logger.trace("resolving shards for [{}] based on cluster state version [{}]", actionName, clusterState.version()); + ShardsIterator shardIt = shards(clusterState, request, concreteIndices); + nodeIds = Maps.newHashMap(); + + for (ShardRouting shard : shardIt.asUnordered()) { + if (shard.assignedToNode()) { + String nodeId = shard.currentNodeId(); + if (!nodeIds.containsKey(nodeId)) { + nodeIds.put(nodeId, new ArrayList()); + } + nodeIds.get(nodeId).add(shard); + } else { + unavailableShardExceptions.add( + new NoShardAvailableActionException( + shard.shardId(), + " no shards available for shard " + shard.toString() + " while executing " + actionName + ) + ); + } + } + + responses = new AtomicReferenceArray<>(nodeIds.size()); + } + + public void start() { + if (nodeIds.size() == 0) { + try { + onCompletion(); + } catch (Throwable e) { + listener.onFailure(e); + } + } else { + int nodeIndex = -1; + for (Map.Entry> entry : nodeIds.entrySet()) { + nodeIndex++; + DiscoveryNode node = nodes.get(entry.getKey()); + sendNodeRequest(node, entry.getValue(), nodeIndex); + } + } + } + + private void sendNodeRequest(final DiscoveryNode node, List shards, final int nodeIndex) { + try { + NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards); + transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new BaseTransportResponseHandler() { + @Override + public NodeResponse newInstance() { + return new NodeResponse(); + } + + @Override + public void handleResponse(NodeResponse response) { + onNodeResponse(node, nodeIndex, response); + } + + @Override + public void handleException(TransportException exp) { + onNodeFailure(node, nodeIndex, exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + } catch (Throwable e) { + onNodeFailure(node, nodeIndex, e); + } + } + + protected void onNodeResponse(DiscoveryNode node, int nodeIndex, NodeResponse response) { + logger.trace("received response for [{}] from node [{}]", actionName, node.id()); + + // this is defensive to protect against the possibility of double invocation + // the current implementation of TransportService#sendRequest guards against this + // but concurrency is hard, safety is important, and the small performance loss here does not matter + if (responses.compareAndSet(nodeIndex, null, response)) { + if (counter.incrementAndGet() == responses.length()) { + onCompletion(); + } + } + } + + protected void onNodeFailure(DiscoveryNode node, int nodeIndex, Throwable t) { + String nodeId = node.id(); + if (logger.isDebugEnabled() && !(t instanceof NodeShouldNotConnectException)) { + logger.debug("failed to execute [{}] on node [{}]", t, actionName, nodeId); + } + + // this is defensive to protect against the possibility of double invocation + // the current implementation of TransportService#sendRequest guards against this + // but concurrency is hard, safety is important, and the small performance loss here does not matter + if (responses.compareAndSet(nodeIndex, null, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t))) { + if (counter.incrementAndGet() == responses.length()) { + onCompletion(); + } + } + } + + protected void onCompletion() { + Response response = null; + try { + response = newResponse(request, responses, unavailableShardExceptions, nodeIds); + } catch (Throwable t) { + logger.debug("failed to combine responses from nodes", t); + listener.onFailure(t); + } + if (response != null) { + try { + listener.onResponse(response); + } catch (Throwable t) { + listener.onFailure(t); + } + } + } + } + + class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception { + List shards = request.getShards(); + final int totalShards = shards.size(); + logger.trace("[{}] executing operation on [{}] shards", actionName, totalShards); + final Object[] shardResultOrExceptions = new Object[totalShards]; + + int shardIndex = -1; + for (final ShardRouting shardRouting : shards) { + shardIndex++; + onShardOperation(request, shardResultOrExceptions, shardIndex, shardRouting); + } + + List accumulatedExceptions = new ArrayList<>(); + List results = new ArrayList<>(); + for (int i = 0; i < totalShards; i++) { + if (shardResultOrExceptions[i] instanceof BroadcastShardOperationFailedException) { + accumulatedExceptions.add((BroadcastShardOperationFailedException) shardResultOrExceptions[i]); + } else { + results.add((ShardOperationResult) shardResultOrExceptions[i]); + } + } + + channel.sendResponse(new NodeResponse(request.getNodeId(), totalShards, results, accumulatedExceptions)); + } + + private void onShardOperation(final NodeRequest request, final Object[] shardResults, final int shardIndex, final ShardRouting shardRouting) { + try { + logger.trace("[{}] executing operation for shard [{}]", actionName, shardRouting.shortSummary()); + ShardOperationResult result = shardOperation(request.indicesLevelRequest, shardRouting); + shardResults[shardIndex] = result; + logger.trace("[{}] completed operation for shard [{}]", actionName, shardRouting.shortSummary()); + } catch (Throwable t) { + BroadcastShardOperationFailedException e = new BroadcastShardOperationFailedException(shardRouting.shardId(), "operation " + actionName + " failed", t); + e.setIndex(shardRouting.getIndex()); + e.setShard(shardRouting.shardId()); + shardResults[shardIndex] = e; + logger.debug("[{}] failed to execute operation for shard [{}]", e, actionName, shardRouting.shortSummary()); + } + } + } + + protected class NodeRequest extends TransportRequest implements IndicesRequest { + private String nodeId; + + private List shards; + + protected Request indicesLevelRequest; + + protected NodeRequest() { + } + + @SuppressForbidden(reason = "debug") + public NodeRequest(String nodeId, Request request, List shards) { + super(request); + this.indicesLevelRequest = request; + this.shards = shards; + this.nodeId = nodeId; + System.out.println(TransportBroadcastByNodeAction.this.getClass().getName()); + } + + public List getShards() { + return shards; + } + + public String getNodeId() { + return nodeId; + } + + public String[] indices() { + return indicesLevelRequest.indices(); + } + + public IndicesOptions indicesOptions() { + return indicesLevelRequest.indicesOptions(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + indicesLevelRequest = readRequestFrom(in); + int size = in.readVInt(); + shards = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + shards.add(ShardRouting.readShardRoutingEntry(in)); + } + nodeId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + indicesLevelRequest.writeTo(out); + int size = shards.size(); + out.writeVInt(size); + for (int i = 0; i < size; i++) { + shards.get(i).writeTo(out); + } + out.writeString(nodeId); + } + } + + class NodeResponse extends TransportResponse { + protected String nodeId; + protected int totalShards; + protected List exceptions; + protected List results; + + public NodeResponse() { + } + + public NodeResponse(String nodeId, + int totalShards, + List results, + List exceptions) { + this.nodeId = nodeId; + this.totalShards = totalShards; + this.results = results; + this.exceptions = exceptions; + } + + public String getNodeId() { + return nodeId; + } + + public int getTotalShards() { + return totalShards; + } + + public int getSuccessfulShards() { + return results.size(); + } + + public List getExceptions() { + return exceptions; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + nodeId = in.readString(); + totalShards = in.readVInt(); + int resultsSize = in.readVInt(); + results = new ArrayList<>(resultsSize); + for (; resultsSize > 0; resultsSize--) { + final ShardOperationResult result = in.readBoolean() ? readShardResult(in) : null; + results.add(result); + } + if (in.readBoolean()) { + int failureShards = in.readVInt(); + exceptions = new ArrayList<>(failureShards); + for (int i = 0; i < failureShards; i++) { + exceptions.add(new BroadcastShardOperationFailedException(in)); + } + } else { + exceptions = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(nodeId); + out.writeVInt(totalShards); + out.writeVInt(results.size()); + for (ShardOperationResult result : results) { + out.writeOptionalStreamable(result); + } + out.writeBoolean(exceptions != null); + if (exceptions != null) { + int failureShards = exceptions.size(); + out.writeVInt(failureShards); + for (int i = 0; i < failureShards; i++) { + exceptions.get(i).writeTo(out); + } + } + } + } + + /** + * Can be used for implementations of {@link #shardOperation(BroadcastRequest, ShardRouting) shardOperation} for + * which there is no shard-level return value. + */ + public final static class EmptyResult implements Streamable { + public static EmptyResult INSTANCE = new EmptyResult(); + + private EmptyResult() { + } + + @Override + public void readFrom(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + } + + public static EmptyResult readEmptyResultFrom(StreamInput in) { + return INSTANCE; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 97749e63819..1e3bd9614dd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.IntSet; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.UnmodifiableIterator; @@ -223,6 +224,38 @@ public class RoutingTable implements Iterable, DiffablealwaysTrue(), false); + } + + public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) { + return allShardsSatisfyingPredicate(indices, Predicates.alwaysTrue(), true); + } + + // TODO: replace with JDK 8 native java.util.function.Predicate + private ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate predicate, boolean includeRelocationTargets) { + // use list here since we need to maintain identity across shards + List shards = new ArrayList<>(); + for (String index : indices) { + IndexRoutingTable indexRoutingTable = index(index); + if (indexRoutingTable == null) { + continue; + // we simply ignore indices that don't exists (make sense for operations that use it currently) + } + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + if (predicate.apply(shardRouting)) { + shards.add(shardRouting); + if (includeRelocationTargets && shardRouting.relocating()) { + shards.add(shardRouting.buildTargetRelocatingShard()); + } + } + } + } + } + return new PlainShardsIterator(shards); + } + /** * All the *active* primary shards for the provided indices grouped (each group is a single element, consisting * of the primary shard). This is handy for components that expect to get group iterators, but still want in some diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java index 9d58f8f74df..4c9b3de5c79 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestRecoveryAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.rest.action.cat; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; @@ -116,19 +115,19 @@ public class RestRecoveryAction extends AbstractCatAction { Table t = getTableWithHeader(request); - for (String index : response.shardResponses().keySet()) { + for (String index : response.shardRecoveryStates().keySet()) { - List shardRecoveryResponses = response.shardResponses().get(index); - if (shardRecoveryResponses.size() == 0) { + List shardRecoveryStates = response.shardRecoveryStates().get(index); + if (shardRecoveryStates.size() == 0) { continue; } // Sort ascending by shard id for readability - CollectionUtil.introSort(shardRecoveryResponses, new Comparator() { + CollectionUtil.introSort(shardRecoveryStates, new Comparator() { @Override - public int compare(ShardRecoveryResponse o1, ShardRecoveryResponse o2) { - int id1 = o1.recoveryState().getShardId().id(); - int id2 = o2.recoveryState().getShardId().id(); + public int compare(RecoveryState o1, RecoveryState o2) { + int id1 = o1.getShardId().id(); + int id2 = o2.getShardId().id(); if (id1 < id2) { return -1; } else if (id1 > id2) { @@ -139,12 +138,10 @@ public class RestRecoveryAction extends AbstractCatAction { } }); - for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) { - - RecoveryState state = shardResponse.recoveryState(); + for (RecoveryState state: shardRecoveryStates) { t.startRow(); t.addCell(index); - t.addCell(shardResponse.getShardId()); + t.addCell(state.getShardId().id()); t.addCell(state.getTimer().time()); t.addCell(state.getType().toString().toLowerCase(Locale.ROOT)); t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java index 972a1bc0110..734fb340090 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java @@ -21,7 +21,11 @@ package org.elasticsearch.rest.action.cat; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.segments.*; +import org.elasticsearch.action.admin.indices.segments.IndexSegments; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; @@ -29,7 +33,10 @@ import org.elasticsearch.common.Table; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Segment; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.support.RestActionListener; import org.elasticsearch.rest.action.support.RestResponseListener; import org.elasticsearch.rest.action.support.RestTable; @@ -120,8 +127,8 @@ public class RestSegmentsAction extends AbstractCatAction { for (Segment segment : segments) { table.startRow(); - table.addCell(shardSegment.getIndex()); - table.addCell(shardSegment.getShardId()); + table.addCell(shardSegment.getShardRouting().getIndex()); + table.addCell(shardSegment.getShardRouting().getId()); table.addCell(shardSegment.getShardRouting().primary() ? "p" : "r"); table.addCell(nodes.get(shardSegment.getShardRouting().currentNodeId()).getHostAddress()); table.addCell(shardSegment.getShardRouting().currentNodeId()); diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 2b8caf8f055..2aa6818c878 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import java.lang.reflect.Constructor; +import java.util.concurrent.Callable; /** * @@ -28,20 +29,19 @@ import java.lang.reflect.Constructor; public class RequestHandlerRegistry { private final String action; - private final Constructor requestConstructor; private final TransportRequestHandler handler; private final boolean forceExecution; private final String executor; + private final Callable requestFactory; RequestHandlerRegistry(String action, Class request, TransportRequestHandler handler, String executor, boolean forceExecution) { + this(action, new ReflectionFactory<>(request), handler, executor, forceExecution); + } + + public RequestHandlerRegistry(String action, Callable requestFactory, TransportRequestHandler handler, String executor, boolean forceExecution) { this.action = action; - try { - this.requestConstructor = request.getDeclaredConstructor(); - } catch (NoSuchMethodException e) { - throw new IllegalStateException("failed to create constructor (does it have a default constructor?) for request " + request, e); - } - this.requestConstructor.setAccessible(true); + this.requestFactory = requestFactory; assert newRequest() != null; this.handler = handler; this.forceExecution = forceExecution; @@ -54,7 +54,7 @@ public class RequestHandlerRegistry { public Request newRequest() { try { - return requestConstructor.newInstance(); + return requestFactory.call(); } catch (Exception e) { throw new IllegalStateException("failed to instantiate request ", e); } @@ -71,4 +71,22 @@ public class RequestHandlerRegistry { public String getExecutor() { return executor; } + + private final static class ReflectionFactory implements Callable { + private final Constructor requestConstructor; + + public ReflectionFactory(Class request) { + try { + this.requestConstructor = request.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new IllegalStateException("failed to create constructor (does it have a default constructor?) for request " + request, e); + } + this.requestConstructor.setAccessible(true); + } + + @Override + public Request call() throws Exception { + return requestConstructor.newInstance(); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index b70589ce52d..40fa908c2b3 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -22,8 +22,6 @@ package org.elasticsearch.transport; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.settings.ClusterDynamicSettings; -import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -35,12 +33,21 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.util.concurrent.*; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -399,6 +406,18 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, handler, executor, false); + registerRequestHandler(reg); + } + /** * Registers a new request handler * @param action The action the request handler is associated with @@ -408,8 +427,12 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Class request, String executor, boolean forceExecution, TransportRequestHandler handler) { + RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); + registerRequestHandler(reg); + } + + protected void registerRequestHandler(RequestHandlerRegistry reg) { synchronized (requestHandlerMutex) { - RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, handler, executor, forceExecution); RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction()); requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); if (replaced != null) { diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 10b34493334..0f0e6a28426 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -113,6 +113,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -395,7 +396,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testOptimize() { - String optimizeShardAction = OptimizeAction.NAME + "[s]"; + String optimizeShardAction = OptimizeAction.NAME + "[n]"; interceptTransportActions(optimizeShardAction); OptimizeRequest optimizeRequest = new OptimizeRequest(randomIndicesOrAliases()); @@ -419,7 +420,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testClearCache() { - String clearCacheAction = ClearIndicesCacheAction.NAME + "[s]"; + String clearCacheAction = ClearIndicesCacheAction.NAME + "[n]"; interceptTransportActions(clearCacheAction); ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(randomIndicesOrAliases()); @@ -431,7 +432,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testRecovery() { - String recoveryAction = RecoveryAction.NAME + "[s]"; + String recoveryAction = RecoveryAction.NAME + "[n]"; interceptTransportActions(recoveryAction); RecoveryRequest recoveryRequest = new RecoveryRequest(randomIndicesOrAliases()); @@ -443,7 +444,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testSegments() { - String segmentsAction = IndicesSegmentsAction.NAME + "[s]"; + String segmentsAction = IndicesSegmentsAction.NAME + "[n]"; interceptTransportActions(segmentsAction); IndicesSegmentsRequest segmentsRequest = new IndicesSegmentsRequest(randomIndicesOrAliases()); @@ -455,7 +456,7 @@ public class IndicesRequestIT extends ESIntegTestCase { @Test public void testIndicesStats() { - String indicesStats = IndicesStatsAction.NAME + "[s]"; + String indicesStats = IndicesStatsAction.NAME + "[n]"; interceptTransportActions(indicesStats); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(randomIndicesOrAliases()); @@ -888,6 +889,11 @@ public class IndicesRequestIT extends ESIntegTestCase { super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler(action, handler)); } + @Override + public void registerRequestHandler(String action, Callable requestFactory, String executor, TransportRequestHandler handler) { + super.registerRequestHandler(action, requestFactory, executor, new InterceptingRequestHandler(action, handler)); + } + private class InterceptingRequestHandler implements TransportRequestHandler { private final TransportRequestHandler requestHandler; diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java new file mode 100644 index 00000000000..c6e2f2cf942 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -0,0 +1,422 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.broadcast.node; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.cluster.TestClusterService; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseOptions; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.object.HasToString.hasToString; + +public class TransportBroadcastByNodeActionTests extends ESTestCase { + + private static final String TEST_INDEX = "test-index"; + private static final String TEST_CLUSTER = "test-cluster"; + private static ThreadPool THREAD_POOL; + + private TestClusterService clusterService; + private CapturingTransport transport; + private TransportService transportService; + + private TestTransportBroadcastByNodeAction action; + + public static class Request extends BroadcastRequest { + public Request() { + } + + public Request(String[] indices) { + super(indices); + } + } + + public static class Response extends BroadcastResponse { + public Response() { + } + + public Response(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + } + } + + class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction { + private final Map shards = new HashMap<>(); + + public TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Class request, String executor) { + super(settings, "indices:admin/test", THREAD_POOL, TransportBroadcastByNodeActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor); + } + + @Override + protected EmptyResult readShardResult(StreamInput in) throws IOException { + return EmptyResult.readEmptyResultFrom(in); + } + + @Override + protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List emptyResults, List shardFailures) { + return new Response(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + protected Request readRequestFrom(StreamInput in) throws IOException { + final Request request = new Request(); + request.readFrom(in); + return request; + } + + @Override + protected EmptyResult shardOperation(Request request, ShardRouting shardRouting) { + if (rarely()) { + shards.put(shardRouting, Boolean.TRUE); + return EmptyResult.INSTANCE; + } else { + ElasticsearchException e = new ElasticsearchException("operation failed"); + shards.put(shardRouting, e); + throw e; + } + } + + @Override + protected ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices) { + return clusterState.routingTable().allShards(new String[]{TEST_INDEX}); + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices); + } + + public Map getResults() { + return shards; + } + } + + class MyResolver extends IndexNameExpressionResolver { + public MyResolver() { + super(Settings.EMPTY); + } + + @Override + public String[] concreteIndices(ClusterState state, IndicesRequest request) { + return request.indices(); + } + } + + @BeforeClass + public static void startThreadPool() { + THREAD_POOL = new ThreadPool(TransportBroadcastByNodeActionTests.class.getSimpleName()); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + transport = new CapturingTransport(); + clusterService = new TestClusterService(THREAD_POOL); + transportService = new TransportService(transport, THREAD_POOL); + transportService.start(); + setClusterState(clusterService, TEST_INDEX); + action = new TestTransportBroadcastByNodeAction( + Settings.EMPTY, + transportService, + new ActionFilters(new HashSet()), + new MyResolver(), + Request.class, + ThreadPool.Names.SAME + ); + } + + void setClusterState(TestClusterService clusterService, String index) { + int numberOfNodes = randomIntBetween(3, 5); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index); + + int shardIndex = -1; + for (int i = 0; i < numberOfNodes; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + int numberOfShards = randomIntBetween(0, 10); + for (int j = 0; j < numberOfShards; j++) { + final ShardId shardId = new ShardId(index, ++shardIndex); + ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.id(), true, ShardRoutingState.STARTED, 1); + IndexShardRoutingTable.Builder indexShard = new IndexShardRoutingTable.Builder(shardId); + indexShard.addShard(shard); + indexRoutingTable.addIndexShard(indexShard.build()); + } + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(numberOfNodes - 1).id()); + ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName(TEST_CLUSTER)); + stateBuilder.nodes(discoBuilder); + stateBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable.build()).build()); + ClusterState clusterState = stateBuilder.build(); + clusterService.setState(clusterState); + } + + static DiscoveryNode newNode(int nodeId) { + return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); + } + + @AfterClass + public static void destroyThreadPool() { + ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); + // since static must set to null to be eligible for collection + THREAD_POOL = null; + } + + public void testGlobalBlock() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + try { + action.new AsyncAction(request, listener).start(); + fail("expected ClusterBlockException"); + } catch (ClusterBlockException expected) { + + } + } + + public void testRequestBlock() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + try { + action.new AsyncAction(request, listener).start(); + fail("expected ClusterBlockException"); + } catch (ClusterBlockException expected) { + + } + } + + public void testOneRequestIsSentToEachNodeHoldingAShard() { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + action.new AsyncAction(request, listener).start(); + Map> capturedRequests = transport.capturedRequestsByTargetNode(); + + ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); + Set set = new HashSet<>(); + for (ShardRouting shard : shardIt.asUnordered()) { + set.add(shard.currentNodeId()); + } + + // check a request was sent to the right number of nodes + assertEquals(set.size(), capturedRequests.size()); + + // check requests were sent to the right nodes + assertEquals(set, capturedRequests.keySet()); + for (Map.Entry> entry : capturedRequests.entrySet()) { + // check one request was sent to each node + assertEquals(1, entry.getValue().size()); + } + } + + public void testOperationExecution() throws Exception { + ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX}); + Set shards = new HashSet<>(); + String nodeId = shardIt.asUnordered().iterator().next().currentNodeId(); + for (ShardRouting shard : shardIt.asUnordered()) { + if (nodeId.equals(shard.currentNodeId())) { + shards.add(shard); + } + } + final TransportBroadcastByNodeAction.BroadcastByNodeTransportRequestHandler handler = + action.new BroadcastByNodeTransportRequestHandler(); + + TestTransportChannel channel = new TestTransportChannel(); + + handler.messageReceived(action.new NodeRequest(nodeId, new Request(), new ArrayList<>(shards)), channel); + + // check the operation was executed only on the expected shards + assertEquals(shards, action.getResults().keySet()); + + TransportResponse response = channel.getCapturedResponse(); + assertTrue(response instanceof TransportBroadcastByNodeAction.NodeResponse); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = (TransportBroadcastByNodeAction.NodeResponse)response; + + // check the operation was executed on the correct node + assertEquals("node id", nodeId, nodeResponse.getNodeId()); + + int successfulShards = 0; + int failedShards = 0; + for (Object result : action.getResults().values()) { + if (!(result instanceof ElasticsearchException)) { + successfulShards++; + } else { + failedShards++; + } + } + + // check the operation results + assertEquals("successful shards", successfulShards, nodeResponse.getSuccessfulShards()); + assertEquals("total shards", action.getResults().size(), nodeResponse.getTotalShards()); + assertEquals("failed shards", failedShards, nodeResponse.getExceptions().size()); + List exceptions = nodeResponse.getExceptions(); + for (BroadcastShardOperationFailedException exception : exceptions) { + assertThat(exception.getMessage(), is("operation indices:admin/test failed")); + assertThat(exception, hasToString(containsString("operation failed"))); + } + } + + public void testResultAggregation() throws ExecutionException, InterruptedException { + Request request = new Request(new String[]{TEST_INDEX}); + PlainActionFuture listener = new PlainActionFuture<>(); + + action.new AsyncAction(request, listener).start(); + Map> capturedRequests = transport.capturedRequestsByTargetNode(); + transport.clear(); + + ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX}); + Map> map = new HashMap<>(); + for (ShardRouting shard : shardIt.asUnordered()) { + if (!map.containsKey(shard.currentNodeId())) { + map.put(shard.currentNodeId(), new ArrayList()); + } + map.get(shard.currentNodeId()).add(shard); + } + + int totalShards = 0; + int totalSuccessfulShards = 0; + int totalFailedShards = 0; + for (Map.Entry> entry : capturedRequests.entrySet()) { + List exceptions = new ArrayList<>(); + long requestId = entry.getValue().get(0).requestId; + if (rarely()) { + // simulate node failure + totalShards += map.get(entry.getKey()).size(); + totalFailedShards += map.get(entry.getKey()).size(); + transport.handleResponse(requestId, new Exception()); + } else { + List shards = map.get(entry.getKey()); + List shardResults = new ArrayList<>(); + for (ShardRouting shard : shards) { + totalShards++; + if (rarely()) { + // simulate operation failure + totalFailedShards++; + exceptions.add(new BroadcastShardOperationFailedException(shard.shardId(), "operation indices:admin/test failed")); + } else { + shardResults.add(TransportBroadcastByNodeAction.EmptyResult.INSTANCE); + } + } + totalSuccessfulShards += shardResults.size(); + TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(), shardResults, exceptions); + transport.handleResponse(requestId, nodeResponse); + } + } + + Response response = listener.get(); + assertEquals("total shards", totalShards, response.getTotalShards()); + assertEquals("successful shards", totalSuccessfulShards, response.getSuccessfulShards()); + assertEquals("failed shards", totalFailedShards, response.getFailedShards()); + assertEquals("accumulated exceptions", totalFailedShards, response.getShardFailures().length); + } + + public class TestTransportChannel implements TransportChannel { + private TransportResponse capturedResponse; + + public TransportResponse getCapturedResponse() { + return capturedResponse; + } + + @Override + public String action() { + return null; + } + + @Override + public String getProfileName() { + return ""; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + capturedResponse = response; + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + } + + @Override + public void sendResponse(Throwable error) throws IOException { + } + } +} \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java b/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java index ad2d1a79eda..e7ef3259199 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java +++ b/core/src/test/java/org/elasticsearch/benchmark/recovery/ReplicaRecoveryBenchmark.java @@ -19,7 +19,6 @@ package org.elasticsearch.benchmark.recovery; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.bootstrap.BootstrapForTesting; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -30,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.node.Node; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.transport.TransportModule; @@ -128,12 +128,12 @@ public class ReplicaRecoveryBenchmark { long currentTime = System.currentTimeMillis(); long currentDocs = indexer.totalIndexedDocs(); RecoveryResponse recoveryResponse = client1.admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).get(); - List indexRecoveries = recoveryResponse.shardResponses().get(INDEX_NAME); + List indexRecoveries = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); long translogOps; long bytes; if (indexRecoveries.size() > 0) { - translogOps = indexRecoveries.get(0).recoveryState().getTranslog().recoveredOperations(); - bytes = recoveryResponse.shardResponses().get(INDEX_NAME).get(0).recoveryState().getIndex().recoveredBytes(); + translogOps = indexRecoveries.get(0).getTranslog().recoveredOperations(); + bytes = recoveryResponse.shardRecoveryStates().get(INDEX_NAME).get(0).getIndex().recoveredBytes(); } else { bytes = lastBytes = 0; translogOps = lastTranslogOps = 0; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java index 2cbc364bafe..9a0cbb2bcca 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTest.java @@ -243,4 +243,25 @@ public class RoutingTableTest extends ESAllocationTestCase { fail("Calling with non-existing index should be ignored at the moment"); } } + + public void testAllShardsForMultipleIndices() { + assertThat(this.emptyRoutingTable.allShards(new String[0]).size(), is(0)); + + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + initPrimaries(); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + startInitializingShards(TEST_INDEX_1); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1}).size(), is(this.shardsPerIndex)); + + startInitializingShards(TEST_INDEX_2); + assertThat(this.testRoutingTable.allShards(new String[]{TEST_INDEX_1, TEST_INDEX_2}).size(), is(this.totalNumberOfShards)); + + try { + this.testRoutingTable.allShards(new String[]{TEST_INDEX_1, "not_exists"}); + } catch (IndexNotFoundException e) { + fail("Calling with non-existing index should be ignored at the moment"); + } + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index cc293375a2c..55204365688 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -42,7 +42,11 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BytesTransportRequest; +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; import org.hamcrest.Matchers; import org.junit.Test; @@ -112,7 +116,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { createIndex("test"); ensureSearchable("test"); RecoveryResponse r = client().admin().indices().prepareRecoveries("test").get(); - int numRecoveriesBeforeNewMaster = r.shardResponses().get("test").size(); + int numRecoveriesBeforeNewMaster = r.shardRecoveryStates().get("test").size(); final String oldMaster = internalCluster().getMasterName(); internalCluster().stopCurrentMasterNode(); @@ -127,7 +131,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { ensureSearchable("test"); r = client().admin().indices().prepareRecoveries("test").get(); - int numRecoveriesAfterNewMaster = r.shardResponses().get("test").size(); + int numRecoveriesAfterNewMaster = r.shardRecoveryStates().get("test").size(); assertThat(numRecoveriesAfterNewMaster, equalTo(numRecoveriesBeforeNewMaster)); } diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index f1c6d476eb5..529b60562bf 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -35,7 +35,10 @@ import org.elasticsearch.test.ESIntegTestCase; import org.junit.Test; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; /** */ @@ -146,7 +149,7 @@ public class ShardInfoIT extends ESIntegTestCase { RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("idx") .setActiveOnly(true) .get(); - assertThat(recoveryResponse.shardResponses().get("idx").size(), equalTo(0)); + assertThat(recoveryResponse.shardRecoveryStates().get("idx").size(), equalTo(0)); } }); } diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java index 745c98f664e..fbd8b973fad 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryBackwardsCompatibilityIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -99,8 +98,7 @@ public class RecoveryBackwardsCompatibilityIT extends ESBackcompatTestCase { HashMap map = new HashMap<>(); map.put("details", "true"); final ToXContent.Params params = new ToXContent.MapParams(map); - for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { - RecoveryState recoveryState = response.recoveryState(); + for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { final String recoverStateAsJSON = XContentHelper.toString(recoveryState, params); if (!recoveryState.getPrimary()) { RecoveryState.Index index = recoveryState.getIndex(); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index aa229305a2f..341139ba88b 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.gateway; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; @@ -400,8 +399,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { assertSyncIdsNotNull(); } RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); - for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { - RecoveryState recoveryState = response.recoveryState(); + for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { long recovered = 0; for (RecoveryState.File file : recoveryState.getIndex().fileDetails()) { if (file.name().startsWith("segments")) { @@ -410,7 +408,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { } if (!recoveryState.getPrimary() && (useSyncIds == false)) { logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", - response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getShardId().getId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered)); assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0l)); @@ -422,7 +420,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { } else { if (useSyncIds && !recoveryState.getPrimary()) { logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}", - response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getShardId().getId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes()); } assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0l)); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3ec731156be..656d251b1bf 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -28,7 +28,10 @@ import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.ClusterInfoService; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -43,7 +46,6 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryParsingException; @@ -65,7 +67,10 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.EMPTY_PARAMS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index c2860c75033..7c27a792d79 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -53,7 +52,12 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; import org.junit.Test; import java.io.IOException; @@ -67,7 +71,13 @@ import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESIntegTestCase.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; /** * @@ -155,18 +165,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - assertThat(response.shardResponses().size(), equalTo(SHARD_COUNT)); - assertThat(response.shardResponses().get(INDEX_NAME).size(), equalTo(1)); + assertThat(response.shardRecoveryStates().size(), equalTo(SHARD_COUNT)); + assertThat(response.shardRecoveryStates().get(INDEX_NAME).size(), equalTo(1)); - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(1)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(1)); - ShardRecoveryResponse shardResponse = shardResponses.get(0); - RecoveryState state = shardResponse.recoveryState(); + RecoveryState recoveryState = recoveryStates.get(0); - assertRecoveryState(state, 0, Type.STORE, Stage.DONE, node, node, false); + assertRecoveryState(recoveryState, 0, Type.STORE, Stage.DONE, node, node, false); - validateIndexRecoveryState(state.getIndex()); + validateIndexRecoveryState(recoveryState.getIndex()); } @Test @@ -183,8 +192,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).setActiveOnly(true).execute().actionGet(); - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(0)); // Should not expect any responses back + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(0)); // Should not expect any responses back } @Test @@ -209,23 +218,23 @@ public class IndexRecoveryIT extends ESIntegTestCase { RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); // we should now have two total shards, one primary and one replica - List shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(2)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(2)); - List nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); + List nodeAResponses = findRecoveriesForTargetNode(nodeA, recoveryStates); assertThat(nodeAResponses.size(), equalTo(1)); - List nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); + List nodeBResponses = findRecoveriesForTargetNode(nodeB, recoveryStates); assertThat(nodeBResponses.size(), equalTo(1)); // validate node A recovery - ShardRecoveryResponse nodeAShardResponse = nodeAResponses.get(0); - assertRecoveryState(nodeAShardResponse.recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); - validateIndexRecoveryState(nodeAShardResponse.recoveryState().getIndex()); + RecoveryState nodeARecoveryState = nodeAResponses.get(0); + assertRecoveryState(nodeARecoveryState, 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); + validateIndexRecoveryState(nodeARecoveryState.getIndex()); // validate node B recovery - ShardRecoveryResponse nodeBShardResponse = nodeBResponses.get(0); - assertRecoveryState(nodeBShardResponse.recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBShardResponse.recoveryState().getIndex()); + RecoveryState nodeBRecoveryState = nodeBResponses.get(0); + assertRecoveryState(nodeBRecoveryState, 0, Type.REPLICA, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryState.getIndex()); } @Test @@ -266,17 +275,17 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - List shardResponses = response.shardResponses().get(INDEX_NAME); - List nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(1)); - List nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); + List recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + List nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + List nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); - validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertOnGoingRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); logger.info("--> request node recovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); @@ -325,11 +334,11 @@ public class IndexRecoveryIT extends ESIntegTestCase { response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); - assertThat(shardResponses.size(), equalTo(1)); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); + assertThat(recoveryStates.size(), equalTo(1)); - assertRecoveryState(shardResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(shardResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(recoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(recoveryStates.get(0).getIndex()); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); assertThat(statsResponse.getNodes(), arrayWithSize(2)); @@ -377,45 +386,45 @@ public class IndexRecoveryIT extends ESIntegTestCase { .execute().actionGet().getState(); response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(1)); - nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); - List nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses); - assertThat(nodeCResponses.size(), equalTo(1)); + nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(1)); + nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); + List nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeAResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); - validateIndexRecoveryState(nodeAResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); + validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) - assertOnGoingRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, nodeB, nodeC, false); - validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex()); + assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, nodeB, nodeC, false); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); logger.info("--> speeding up recoveries"); restoreRecoverySpeed(); ensureGreen(); response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - shardResponses = response.shardResponses().get(INDEX_NAME); + recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); - nodeAResponses = findRecoveriesForTargetNode(nodeA, shardResponses); - assertThat(nodeAResponses.size(), equalTo(0)); - nodeBResponses = findRecoveriesForTargetNode(nodeB, shardResponses); - assertThat(nodeBResponses.size(), equalTo(1)); - nodeCResponses = findRecoveriesForTargetNode(nodeC, shardResponses); - assertThat(nodeCResponses.size(), equalTo(1)); + nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates); + assertThat(nodeARecoveryStates.size(), equalTo(0)); + nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates); + assertThat(nodeBRecoveryStates.size(), equalTo(1)); + nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); + assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeBResponses.get(0).recoveryState(), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); - validateIndexRecoveryState(nodeBResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) - assertRecoveryState(nodeCResponses.get(0).recoveryState(), 0, Type.REPLICA, Stage.DONE, nodeB, nodeC, false); - validateIndexRecoveryState(nodeCResponses.get(0).recoveryState().getIndex()); + assertRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeC, false); + validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } @Test @@ -457,24 +466,24 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - for (Map.Entry> shardRecoveryResponse : response.shardResponses().entrySet()) { + for (Map.Entry> indexRecoveryStates : response.shardRecoveryStates().entrySet()) { - assertThat(shardRecoveryResponse.getKey(), equalTo(INDEX_NAME)); - List shardRecoveryResponses = shardRecoveryResponse.getValue(); - assertThat(shardRecoveryResponses.size(), equalTo(totalShards)); + assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME)); + List recoveryStates = indexRecoveryStates.getValue(); + assertThat(recoveryStates.size(), equalTo(totalShards)); - for (ShardRecoveryResponse shardResponse : shardRecoveryResponses) { - assertRecoveryState(shardResponse.recoveryState(), 0, Type.SNAPSHOT, Stage.DONE, null, nodeA, true); - validateIndexRecoveryState(shardResponse.recoveryState().getIndex()); + for (RecoveryState recoveryState : recoveryStates) { + assertRecoveryState(recoveryState, 0, Type.SNAPSHOT, Stage.DONE, null, nodeA, true); + validateIndexRecoveryState(recoveryState.getIndex()); } } } - private List findRecoveriesForTargetNode(String nodeName, List responses) { - List nodeResponses = new ArrayList<>(); - for (ShardRecoveryResponse response : responses) { - if (response.recoveryState().getTargetNode().getName().equals(nodeName)) { - nodeResponses.add(response); + private List findRecoveriesForTargetNode(String nodeName, List recoveryStates) { + List nodeResponses = new ArrayList<>(); + for (RecoveryState recoveryState : recoveryStates) { + if (recoveryState.getTargetNode().getName().equals(nodeName)) { + nodeResponses.add(recoveryState); } } return nodeResponses; diff --git a/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java b/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java index 695845bca0d..3a00be0aeb9 100644 --- a/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/warmer/SimpleIndicesWarmerIT.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.warmer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import com.google.common.collect.ImmutableList; - import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; @@ -49,7 +48,10 @@ import org.junit.Test; import java.util.Locale; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; public class SimpleIndicesWarmerIT extends ESIntegTestCase { @@ -272,7 +274,7 @@ public class SimpleIndicesWarmerIT extends ESIntegTestCase { for (IndexShardSegments indexShardSegments : indicesSegments) { for (ShardSegments shardSegments : indexShardSegments) { for (Segment segment : shardSegments) { - logger.debug("+=" + segment.memoryInBytes + " " + indexShardSegments.getShardId() + " " + shardSegments.getIndex()); + logger.debug("+=" + segment.memoryInBytes + " " + indexShardSegments.getShardId() + " " + shardSegments.getShardRouting().getIndex()); total += segment.memoryInBytes; } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index 60a6acb7ed9..450212b75b5 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -284,7 +284,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase { IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().get(); for (ShardStats shardStats : indicesStatsResponse.getShards()) { DocsStats docsStats = shardStats.getStats().docs; - logger.info("shard [{}] - count {}, primary {}", shardStats.getShardId(), docsStats.getCount(), shardStats.getShardRouting().primary()); + logger.info("shard [{}] - count {}, primary {}", shardStats.getShardRouting().id(), docsStats.getCount(), shardStats.getShardRouting().primary()); } //if there was an error we try to wait and see if at some point it'll get fixed diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 39622287ea2..628a886e6ae 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -63,7 +63,6 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -422,7 +421,7 @@ public class RelocationIT extends ESIntegTestCase { public boolean apply(Object input) { RecoveryResponse recoveryResponse = internalCluster().client(redNodeName).admin().indices().prepareRecoveries(indexName) .get(); - return !recoveryResponse.shardResponses().get(indexName).isEmpty(); + return !recoveryResponse.shardRecoveryStates().get(indexName).isEmpty(); } } ); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index d36ecfa7d46..5c7fc445fbf 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; -import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AbstractDiffable; @@ -55,6 +54,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.rest.RestChannel; @@ -81,8 +81,18 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESIntegTestCase.ClusterScope; import static org.elasticsearch.test.ESIntegTestCase.Scope; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -import static org.hamcrest.Matchers.*; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; /** */ @@ -592,9 +602,9 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest assertThat(client().prepareCount("test-idx").get().getCount(), equalTo(100L)); IntSet reusedShards = new IntHashSet(); - for (ShardRecoveryResponse response : client().admin().indices().prepareRecoveries("test-idx").get().shardResponses().get("test-idx")) { - if (response.recoveryState().getIndex().reusedBytes() > 0) { - reusedShards.add(response.getShardId()); + for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries("test-idx").get().shardRecoveryStates().get("test-idx")) { + if (recoveryState.getIndex().reusedBytes() > 0) { + reusedShards.add(recoveryState.getShardId().getId()); } } logger.info("--> check that at least half of the shards had some reuse: [{}]", reusedShards);