diff --git a/core/pom.xml b/core/pom.xml
index 3186718248f..c9f8656eacb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -314,6 +314,7 @@
org/elasticsearch/common/util/MockBigArrays.class
org/elasticsearch/common/util/MockBigArrays$*.class
org/elasticsearch/node/NodeMocksPlugin.class
+ org/elasticsearch/node/MockNode.class
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java
index e8243bf5d50..c219d85f9d5 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java
@@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.cluster.stats;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -67,10 +66,10 @@ public class ClusterStatsIndices implements ToXContent, Streamable {
for (ClusterStatsNodeResponse r : nodeResponses) {
for (org.elasticsearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
- ShardStats indexShardStats = countsPerIndex.get(shardStats.getIndex());
+ ShardStats indexShardStats = countsPerIndex.get(shardStats.getShardRouting().getIndex());
if (indexShardStats == null) {
indexShardStats = new ShardStats();
- countsPerIndex.put(shardStats.getIndex(), indexShardStats);
+ countsPerIndex.put(shardStats.getShardRouting().getIndex(), indexShardStats);
}
indexShardStats.total++;
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
index 26e78264534..5ed40c5db0d 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
@@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
@@ -106,7 +107,7 @@ public class TransportClusterStatsAction extends TransportNodesAction {
+public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
private final IndicesRequestCache indicesRequestCache;
@@ -58,48 +57,33 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction shardFailures = null;
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // simply ignore non active shards
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
- } else {
- successfulShards++;
- }
- }
- return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
+ protected EmptyResult readShardResult(StreamInput in) throws IOException {
+ return EmptyResult.readEmptyResultFrom(in);
}
@Override
- protected ShardClearIndicesCacheRequest newShardRequest(int numShards, ShardRouting shard, ClearIndicesCacheRequest request) {
- return new ShardClearIndicesCacheRequest(shard.shardId(), request);
+ protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) {
+ return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures);
}
@Override
- protected ShardClearIndicesCacheResponse newShardResponse() {
- return new ShardClearIndicesCacheResponse();
+ protected ClearIndicesCacheRequest readRequestFrom(StreamInput in) throws IOException {
+ final ClearIndicesCacheRequest request = new ClearIndicesCacheRequest();
+ request.readFrom(in);
+ return request;
}
@Override
- protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRequest request) {
- IndexService service = indicesService.indexService(request.shardId().getIndex());
+ protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRouting shardRouting) {
+ IndexService service = indicesService.indexService(shardRouting.getIndex());
if (service != null) {
- IndexShard shard = service.shard(request.shardId().id());
+ IndexShard shard = service.shard(shardRouting.id());
boolean clearedAtLeastOne = false;
if (request.queryCache()) {
clearedAtLeastOne = true;
@@ -137,15 +121,15 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastAction {
+public class TransportOptimizeAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -54,55 +52,40 @@ public class TransportOptimizeAction extends TransportBroadcastAction shardFailures = null;
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // a non active shard, ignore...
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
- } else {
- successfulShards++;
- }
- }
- return new OptimizeResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
+ protected EmptyResult readShardResult(StreamInput in) throws IOException {
+ return EmptyResult.readEmptyResultFrom(in);
}
@Override
- protected ShardOptimizeRequest newShardRequest(int numShards, ShardRouting shard, OptimizeRequest request) {
- return new ShardOptimizeRequest(shard.shardId(), request);
+ protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) {
+ return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures);
}
@Override
- protected ShardOptimizeResponse newShardResponse() {
- return new ShardOptimizeResponse();
+ protected OptimizeRequest readRequestFrom(StreamInput in) throws IOException {
+ final OptimizeRequest request = new OptimizeRequest();
+ request.readFrom(in);
+ return request;
}
@Override
- protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) {
- IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
- indexShard.optimize(request.optimizeRequest());
- return new ShardOptimizeResponse(request.shardId());
+ protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) {
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
+ indexShard.optimize(request);
+ return EmptyResult.INSTANCE;
}
/**
* The refresh request works against *all* shards.
*/
@Override
- protected GroupShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) {
- return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
+ protected ShardsIterator shards(ClusterState clusterState, OptimizeRequest request, String[] concreteIndices) {
+ return clusterState.routingTable().allShards(concreteIndices);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java
index fea33688c14..0e0881d1729 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java
@@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,7 +39,7 @@ import java.util.Map;
public class RecoveryResponse extends BroadcastResponse implements ToXContent {
private boolean detailed = false;
- private Map> shardResponses = new HashMap<>();
+ private Map> shardRecoveryStates = new HashMap<>();
public RecoveryResponse() { }
@@ -50,18 +51,18 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent {
* @param successfulShards Count of shards successfully processed
* @param failedShards Count of shards which failed to process
* @param detailed Display detailed metrics
- * @param shardResponses Map of indices to shard recovery information
+ * @param shardRecoveryStates Map of indices to shard recovery information
* @param shardFailures List of failures processing shards
*/
public RecoveryResponse(int totalShards, int successfulShards, int failedShards, boolean detailed,
- Map> shardResponses, List shardFailures) {
+ Map> shardRecoveryStates, List shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
- this.shardResponses = shardResponses;
+ this.shardRecoveryStates = shardRecoveryStates;
this.detailed = detailed;
}
public boolean hasRecoveries() {
- return shardResponses.size() > 0;
+ return shardRecoveryStates.size() > 0;
}
public boolean detailed() {
@@ -72,23 +73,23 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent {
this.detailed = detailed;
}
- public Map> shardResponses() {
- return shardResponses;
+ public Map> shardRecoveryStates() {
+ return shardRecoveryStates;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (hasRecoveries()) {
- for (String index : shardResponses.keySet()) {
- List responses = shardResponses.get(index);
- if (responses == null || responses.size() == 0) {
+ for (String index : shardRecoveryStates.keySet()) {
+ List recoveryStates = shardRecoveryStates.get(index);
+ if (recoveryStates == null || recoveryStates.size() == 0) {
continue;
}
builder.startObject(index);
builder.startArray("shards");
- for (ShardRecoveryResponse recoveryResponse : responses) {
+ for (RecoveryState recoveryState : recoveryStates) {
builder.startObject();
- recoveryResponse.toXContent(builder, params);
+ recoveryState.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
@@ -101,12 +102,12 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
- out.writeVInt(shardResponses.size());
- for (Map.Entry> entry : shardResponses.entrySet()) {
+ out.writeVInt(shardRecoveryStates.size());
+ for (Map.Entry> entry : shardRecoveryStates.entrySet()) {
out.writeString(entry.getKey());
out.writeVInt(entry.getValue().size());
- for (ShardRecoveryResponse recoveryResponse : entry.getValue()) {
- recoveryResponse.writeTo(out);
+ for (RecoveryState recoveryState : entry.getValue()) {
+ recoveryState.writeTo(out);
}
}
}
@@ -118,11 +119,11 @@ public class RecoveryResponse extends BroadcastResponse implements ToXContent {
for (int i = 0; i < size; i++) {
String s = in.readString();
int listSize = in.readVInt();
- List list = new ArrayList<>(listSize);
+ List list = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
- list.add(ShardRecoveryResponse.readShardRecoveryResponse(in));
+ list.add(RecoveryState.readRecoveryState(in));
}
- shardResponses.put(s, list);
+ shardRecoveryStates.put(s, list);
}
}
}
\ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java
deleted file mode 100644
index a4104fbc449..00000000000
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/ShardRecoveryResponse.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.admin.indices.recovery;
-
-import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.indices.recovery.RecoveryState;
-
-import java.io.IOException;
-
-/**
- * Information regarding the recovery state of a shard.
- */
-public class ShardRecoveryResponse extends BroadcastShardResponse implements ToXContent {
-
- RecoveryState recoveryState;
-
- public ShardRecoveryResponse() { }
-
- /**
- * Constructs shard recovery information for the given index and shard id.
- *
- * @param shardId Id of the shard
- */
- ShardRecoveryResponse(ShardId shardId) {
- super(shardId);
- }
-
- /**
- * Sets the recovery state information for the shard.
- *
- * @param recoveryState Recovery state
- */
- public void recoveryState(RecoveryState recoveryState) {
- this.recoveryState = recoveryState;
- }
-
- /**
- * Gets the recovery state information for the shard. Null if shard wasn't recovered / recovery didn't start yet.
- *
- * @return Recovery state
- */
- @Nullable
- public RecoveryState recoveryState() {
- return recoveryState;
- }
-
- @Override
- public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- recoveryState.toXContent(builder, params);
- return builder;
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- recoveryState.writeTo(out);
- }
-
- @Override
- public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
- recoveryState = RecoveryState.readRecoveryState(in);
- }
-
- /**
- * Builds a new ShardRecoveryResponse from the give input stream.
- *
- * @param in Input stream
- * @return A new ShardRecoveryResponse
- * @throws IOException
- */
- public static ShardRecoveryResponse readShardRecoveryResponse(StreamInput in) throws IOException {
- ShardRecoveryResponse response = new ShardRecoveryResponse();
- response.readFrom(in);
- return response;
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
index cee59c7eb2a..af00f8b7895 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
@@ -19,40 +19,37 @@
package org.elasticsearch.action.admin.indices.recovery;
+import com.google.common.collect.Maps;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
-import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
+import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* Transport action for shard recovery operation. This transport action does not actually
* perform shard recovery, it only reports on recoveries (both active and complete).
*/
-public class TransportRecoveryAction extends TransportBroadcastAction {
+public class TransportRecoveryAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -61,84 +58,55 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures = null;
- Map> shardResponses = new HashMap<>();
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // simply ignore non active shards
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
+ @Override
+ protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) {
+ Map> shardResponses = Maps.newHashMap();
+ for (RecoveryState recoveryState : responses) {
+ if (recoveryState == null) {
+ continue;
+ }
+ String indexName = recoveryState.getShardId().getIndex();
+ if (!shardResponses.containsKey(indexName)) {
+ shardResponses.put(indexName, new ArrayList());
+ }
+ if (request.activeOnly()) {
+ if (recoveryState.getStage() != RecoveryState.Stage.DONE) {
+ shardResponses.get(indexName).add(recoveryState);
}
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
- ShardRecoveryResponse recoveryResponse = (ShardRecoveryResponse) shardResponse;
- successfulShards++;
-
- if (recoveryResponse.recoveryState() == null) {
- // recovery not yet started
- continue;
- }
-
- String indexName = recoveryResponse.getIndex();
- List responses = shardResponses.get(indexName);
-
- if (responses == null) {
- responses = new ArrayList<>();
- shardResponses.put(indexName, responses);
- }
-
- if (request.activeOnly()) {
- if (recoveryResponse.recoveryState().getStage() != RecoveryState.Stage.DONE) {
- responses.add(recoveryResponse);
- }
- } else {
- responses.add(recoveryResponse);
- }
+ shardResponses.get(indexName).add(recoveryState);
}
}
-
- return new RecoveryResponse(shardsResponses.length(), successfulShards,
- failedShards, request.detailed(), shardResponses, shardFailures);
+ return new RecoveryResponse(totalShards, successfulShards, failedShards, request.detailed(), shardResponses, shardFailures);
}
@Override
- protected ShardRecoveryRequest newShardRequest(int numShards, ShardRouting shard, RecoveryRequest request) {
- return new ShardRecoveryRequest(shard.shardId(), request);
+ protected RecoveryRequest readRequestFrom(StreamInput in) throws IOException {
+ final RecoveryRequest recoveryRequest = new RecoveryRequest();
+ recoveryRequest.readFrom(in);
+ return recoveryRequest;
}
@Override
- protected ShardRecoveryResponse newShardResponse() {
- return new ShardRecoveryResponse();
+ protected RecoveryState shardOperation(RecoveryRequest request, ShardRouting shardRouting) {
+ IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id());
+ return indexShard.recoveryState();
}
@Override
- protected ShardRecoveryResponse shardOperation(ShardRecoveryRequest request) {
-
- IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
- IndexShard indexShard = indexService.shardSafe(request.shardId().id());
- ShardRecoveryResponse shardRecoveryResponse = new ShardRecoveryResponse(request.shardId());
-
- RecoveryState state = indexShard.recoveryState();
- shardRecoveryResponse.recoveryState(state);
- return shardRecoveryResponse;
- }
-
- @Override
- protected GroupShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
- return state.routingTable().allAssignedShardsGrouped(concreteIndices, true, true);
+ protected ShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
+ return state.routingTable().allShardsIncludingRelocationTargets(concreteIndices);
}
@Override
@@ -150,14 +118,4 @@ public class TransportRecoveryAction extends TransportBroadcastAction shardFailures) {
+ IndicesSegmentResponse(ShardSegments[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
}
@@ -63,7 +62,7 @@ public class IndicesSegmentResponse extends BroadcastResponse implements ToXCont
Set indices = Sets.newHashSet();
for (ShardSegments shard : shards) {
- indices.add(shard.getIndex());
+ indices.add(shard.getShardRouting().getIndex());
}
for (String index : indices) {
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java
index 6e754a26210..4b3264fca40 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java
@@ -20,10 +20,10 @@
package org.elasticsearch.action.admin.indices.segments;
import com.google.common.collect.ImmutableList;
-import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.engine.Segment;
import java.io.IOException;
@@ -33,7 +33,7 @@ import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
-public class ShardSegments extends BroadcastShardResponse implements Iterable {
+public class ShardSegments implements Streamable, Iterable {
private ShardRouting shardRouting;
@@ -43,7 +43,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable segments) {
- super(shardRouting.shardId());
this.shardRouting = shardRouting;
this.segments = segments;
}
@@ -89,7 +88,6 @@ public class ShardSegments extends BroadcastShardResponse implements Iterable {
+public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -59,7 +52,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastAction shardFailures = null;
- final List shards = new ArrayList<>();
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // simply ignore non active shards
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
- } else {
- shards.add((ShardSegments) shardResponse);
- successfulShards++;
- }
- }
- return new IndicesSegmentResponse(shards.toArray(new ShardSegments[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
+ protected ShardSegments readShardResult(StreamInput in) throws IOException {
+ return ShardSegments.readShardSegments(in);
}
@Override
- protected IndexShardSegmentRequest newShardRequest(int numShards, ShardRouting shard, IndicesSegmentsRequest request) {
- return new IndexShardSegmentRequest(shard.shardId(), request);
+ protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures, ClusterState clusterState) {
+ return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures);
}
@Override
- protected ShardSegments newShardResponse() {
- return new ShardSegments();
+ protected IndicesSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
+ final IndicesSegmentsRequest request = new IndicesSegmentsRequest();
+ request.readFrom(in);
+ return request;
}
@Override
- protected ShardSegments shardOperation(IndexShardSegmentRequest request) {
- IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
- IndexShard indexShard = indexService.shardSafe(request.shardId().id());
- return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose));
- }
-
- static class IndexShardSegmentRequest extends BroadcastShardRequest {
- boolean verbose;
-
- IndexShardSegmentRequest() {
- verbose = false;
- }
-
- IndexShardSegmentRequest(ShardId shardId, IndicesSegmentsRequest request) {
- super(shardId, request);
- verbose = request.verbose();
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- out.writeBoolean(verbose);
- }
-
- @Override
- public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
- verbose = in.readBoolean();
- }
+ protected ShardSegments shardOperation(IndicesSegmentsRequest request, ShardRouting shardRouting) {
+ IndexService indexService = indicesService.indexServiceSafe(shardRouting.getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRouting.id());
+ return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments(request.verbose()));
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java
index d9b8e9da77d..885dddeea6a 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
-import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -51,7 +50,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten
}
- IndicesStatsResponse(ShardStats[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) {
+ IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
}
@@ -90,7 +89,7 @@ public class IndicesStatsResponse extends BroadcastResponse implements ToXConten
Set indices = Sets.newHashSet();
for (ShardStats shard : shards) {
- indices.add(shard.getIndex());
+ indices.add(shard.getShardRouting().getIndex());
}
for (String index : indices) {
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java
index b3c87de3dd1..8fea8c795eb 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java
@@ -19,11 +19,11 @@
package org.elasticsearch.action.admin.indices.stats;
-import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
@@ -37,7 +37,7 @@ import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEnt
/**
*/
-public class ShardStats extends BroadcastShardResponse implements ToXContent {
+public class ShardStats implements Streamable, ToXContent {
private ShardRouting shardRouting;
private CommonStats commonStats;
@Nullable
@@ -49,14 +49,13 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent {
ShardStats() {
}
- public ShardStats(IndexShard indexShard, CommonStatsFlags flags) {
- super(indexShard.shardId());
- this.shardRouting = indexShard.routingEntry();
- this.dataPath = indexShard.shardPath().getRootDataPath().toString();
- this.statePath = indexShard.shardPath().getRootStatePath().toString();
- this.isCustomDataPath = indexShard.shardPath().isCustomDataPath();
- this.commonStats = new CommonStats(indexShard, flags);
- this.commitStats = indexShard.commitStats();
+ public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) {
+ this.shardRouting = routing;
+ this.dataPath = shardPath.getRootDataPath().toString();
+ this.statePath = shardPath.getRootStatePath().toString();
+ this.isCustomDataPath = shardPath.isCustomDataPath();
+ this.commitStats = commitStats;
+ this.commonStats = commonStats;
}
/**
@@ -94,7 +93,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
shardRouting = readShardRoutingEntry(in);
commonStats = CommonStats.readCommonStats(in);
commitStats = CommitStats.readOptionalCommitStatsFrom(in);
@@ -105,7 +103,6 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent {
@Override
public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
shardRouting.writeTo(out);
commonStats.writeTo(out);
out.writeOptionalStreamable(commitStats);
@@ -146,5 +143,4 @@ public class ShardStats extends BroadcastShardResponse implements ToXContent {
static final XContentBuilderString NODE = new XContentBuilderString("node");
static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node");
}
-
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java
index 9ce5291ba66..0f0cc1afa14 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java
@@ -21,37 +21,30 @@ package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
-import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
+import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
-public class TransportIndicesStatsAction extends TransportBroadcastAction {
+public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -60,7 +53,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction shardFailures = null;
- final List shards = new ArrayList<>();
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // simply ignore non active shards
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
- } else {
- shards.add((ShardStats) shardResponse);
- successfulShards++;
- }
- }
- return new IndicesStatsResponse(shards.toArray(new ShardStats[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
+ protected ShardStats readShardResult(StreamInput in) throws IOException {
+ return ShardStats.readShardStats(in);
}
@Override
- protected IndexShardStatsRequest newShardRequest(int numShards, ShardRouting shard, IndicesStatsRequest request) {
- return new IndexShardStatsRequest(shard.shardId(), request);
+ protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) {
+ return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}
@Override
- protected ShardStats newShardResponse() {
- return new ShardStats();
+ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException {
+ IndicesStatsRequest request = new IndicesStatsRequest();
+ request.readFrom(in);
+ return request;
}
@Override
- protected ShardStats shardOperation(IndexShardStatsRequest request) {
- IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
- IndexShard indexShard = indexService.shardSafe(request.shardId().id());
+ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting) {
+ IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new ShardNotFoundException(indexShard.shardId());
@@ -128,92 +103,65 @@ public class TransportIndicesStatsAction extends TransportBroadcastAction {
+public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -58,7 +54,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction shardFailures = null;
- final List shards = new ArrayList<>();
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // simply ignore non active shards
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
- } else {
- shards.add((ShardUpgradeStatus) shardResponse);
- successfulShards++;
- }
- }
- return new UpgradeStatusResponse(shards.toArray(new ShardUpgradeStatus[shards.size()]), shardsResponses.length(), successfulShards, failedShards, shardFailures);
+ protected ShardUpgradeStatus readShardResult(StreamInput in) throws IOException {
+ return ShardUpgradeStatus.readShardUpgradeStatus(in);
}
@Override
- protected IndexShardUpgradeStatusRequest newShardRequest(int numShards, ShardRouting shard, UpgradeStatusRequest request) {
- return new IndexShardUpgradeStatusRequest(shard.shardId(), request);
+ protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) {
+ return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}
@Override
- protected ShardUpgradeStatus newShardResponse() {
- return new ShardUpgradeStatus();
+ protected UpgradeStatusRequest readRequestFrom(StreamInput in) throws IOException {
+ UpgradeStatusRequest request = new UpgradeStatusRequest();
+ request.readFrom(in);
+ return request;
}
@Override
- protected ShardUpgradeStatus shardOperation(IndexShardUpgradeStatusRequest request) {
- IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
- IndexShard indexShard = indexService.shardSafe(request.shardId().id());
+ protected ShardUpgradeStatus shardOperation(UpgradeStatusRequest request, ShardRouting shardRouting) {
+ IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
+ IndexShard indexShard = indexService.shardSafe(shardRouting.shardId().id());
List segments = indexShard.engine().segments(false);
long total_bytes = 0;
long to_upgrade_bytes = 0;
@@ -136,16 +115,4 @@ public class TransportUpgradeStatusAction extends TransportBroadcastAction {
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java
index 16e24ee66ae..82683625df8 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/UpgradeStatusResponse.java
@@ -36,14 +36,11 @@ import java.util.Map;
import java.util.Set;
public class UpgradeStatusResponse extends BroadcastResponse implements ToXContent {
-
-
private ShardUpgradeStatus[] shards;
private Map indicesUpgradeStatus;
UpgradeStatusResponse() {
-
}
UpgradeStatusResponse(ShardUpgradeStatus[] shards, int totalShards, int successfulShards, int failedShards, List shardFailures) {
@@ -75,7 +72,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte
return indicesUpgradeStats;
}
-
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@@ -120,8 +116,6 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-
-
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, getTotalBytes());
builder.byteSizeField(Fields.SIZE_TO_UPGRADE_IN_BYTES, Fields.SIZE_TO_UPGRADE, getToUpgradeBytes());
builder.byteSizeField(Fields.SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, Fields.SIZE_TO_UPGRADE_ANCIENT, getToUpgradeBytesAncient());
@@ -163,10 +157,8 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte
}
builder.endObject();
}
-
builder.endObject();
}
-
builder.endObject();
}
return builder;
@@ -186,6 +178,5 @@ public class UpgradeStatusResponse extends BroadcastResponse implements ToXConte
static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient");
static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes");
static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes");
-
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java
similarity index 83%
rename from core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java
rename to core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java
index d3942038164..46c51757159 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java
@@ -20,9 +20,9 @@
package org.elasticsearch.action.admin.indices.upgrade.post;
import org.elasticsearch.Version;
-import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@@ -31,7 +31,9 @@ import java.text.ParseException;
/**
*
*/
-class ShardUpgradeResponse extends BroadcastShardResponse {
+class ShardUpgradeResult implements Streamable {
+
+ private ShardId shardId;
private org.apache.lucene.util.Version oldestLuceneSegment;
@@ -40,16 +42,20 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
private boolean primary;
- ShardUpgradeResponse() {
+ ShardUpgradeResult() {
}
- ShardUpgradeResponse(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
- super(shardId);
+ ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
+ this.shardId = shardId;
this.primary = primary;
this.upgradeVersion = upgradeVersion;
this.oldestLuceneSegment = oldestLuceneSegment;
}
+ public ShardId getShardId() {
+ return shardId;
+ }
+
public org.apache.lucene.util.Version oldestLuceneSegment() {
return this.oldestLuceneSegment;
}
@@ -65,7 +71,7 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
+ shardId = ShardId.readShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {
@@ -78,10 +84,9 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
+ shardId.writeTo(out);
out.writeBoolean(primary);
Version.writeVersion(upgradeVersion, out);
out.writeString(oldestLuceneSegment.toString());
}
-
}
\ No newline at end of file
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java
index 23d40a55cf0..11bc190aef0 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java
@@ -24,32 +24,30 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
+import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
@@ -57,7 +55,7 @@ import static com.google.common.collect.Sets.newHashSet;
/**
* Upgrade index/indices action.
*/
-public class TransportUpgradeAction extends TransportBroadcastAction {
+public class TransportUpgradeAction extends TransportBroadcastByNodeAction {
private final IndicesService indicesService;
@@ -67,56 +65,40 @@ public class TransportUpgradeAction extends TransportBroadcastAction shardFailures = null;
+ protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, int successfulShards, int failedShards, List shardUpgradeResults, List shardFailures, ClusterState clusterState) {
Map successfulPrimaryShards = newHashMap();
Map> versions = newHashMap();
- for (int i = 0; i < shardsResponses.length(); i++) {
- Object shardResponse = shardsResponses.get(i);
- if (shardResponse == null) {
- // a non active shard, ignore...
- } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
- failedShards++;
- if (shardFailures == null) {
- shardFailures = new ArrayList<>();
- }
- shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
+ for (ShardUpgradeResult result : shardUpgradeResults) {
+ successfulShards++;
+ String index = result.getShardId().getIndex();
+ if (result.primary()) {
+ Integer count = successfulPrimaryShards.get(index);
+ successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
+ }
+ Tuple versionTuple = versions.get(index);
+ if (versionTuple == null) {
+ versions.put(index, new Tuple<>(result.upgradeVersion(), result.oldestLuceneSegment()));
} else {
- successfulShards++;
- ShardUpgradeResponse shardUpgradeResponse = (ShardUpgradeResponse) shardResponse;
- String index = shardUpgradeResponse.getIndex();
- if (shardUpgradeResponse.primary()) {
- Integer count = successfulPrimaryShards.get(index);
- successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
+ // We already have versions for this index - let's see if we need to update them based on the current shard
+ Version version = versionTuple.v1();
+ org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
+ // For the metadata we are interested in the _latest_ Elasticsearch version that was processing the metadata
+ // Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
+ if (result.upgradeVersion().after(versionTuple.v1())) {
+ version = result.upgradeVersion();
}
- Tuple versionTuple = versions.get(index);
- if (versionTuple == null) {
- versions.put(index, new Tuple<>(shardUpgradeResponse.upgradeVersion(), shardUpgradeResponse.oldestLuceneSegment()));
- } else {
- // We already have versions for this index - let's see if we need to update them based on the current shard
- Version version = versionTuple.v1();
- org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
- // For the metadata we are interested in the _latest_ elasticsearch version that was processing the metadata
- // Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
- if (shardUpgradeResponse.upgradeVersion().after(versionTuple.v1())) {
- version = shardUpgradeResponse.upgradeVersion();
- }
- // For the lucene version we are interested in the _oldest_ lucene version since it determines the
- // oldest version that we need to support
- if (shardUpgradeResponse.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
- luceneVersion = shardUpgradeResponse.oldestLuceneSegment();
- }
- versions.put(index, new Tuple<>(version, luceneVersion));
+ // For the lucene version we are interested in the _oldest_ lucene version since it determines the
+ // oldest version that we need to support
+ if (result.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
+ luceneVersion = result.oldestLuceneSegment();
}
+ versions.put(index, new Tuple<>(version, luceneVersion));
}
}
Map> updatedVersions = newHashMap();
@@ -133,33 +115,37 @@ public class TransportUpgradeAction extends TransportBroadcastAction indicesWithMissingPrimaries = indicesWithMissingPrimaries(clusterState, concreteIndices);
if (indicesWithMissingPrimaries.isEmpty()) {
return iterator;
@@ -231,5 +217,4 @@ public class TransportUpgradeAction extends TransportBroadcastAction the underlying client request
+ * @param the response to the client request
+ * @param per-shard operation results
+ */
+public abstract class TransportBroadcastByNodeAction extends HandledTransportAction {
+
+ private final ClusterService clusterService;
+ private final TransportService transportService;
+
+ final String transportNodeBroadcastAction;
+
+ public TransportBroadcastByNodeAction(
+ Settings settings,
+ String actionName,
+ ThreadPool threadPool,
+ ClusterService clusterService,
+ TransportService transportService,
+ ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver,
+ Class request,
+ String executor) {
+ super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
+
+ this.clusterService = clusterService;
+ this.transportService = transportService;
+
+ transportNodeBroadcastAction = actionName + "[n]";
+
+ transportService.registerRequestHandler(transportNodeBroadcastAction, new Callable() {
+ @Override
+ public NodeRequest call() throws Exception {
+ return new NodeRequest();
+ }
+ }, executor, new BroadcastByNodeTransportRequestHandler());
+ }
+
+ private final Response newResponse(
+ Request request,
+ AtomicReferenceArray responses,
+ List unavailableShardExceptions,
+ Map> nodes,
+ ClusterState clusterState) {
+ int totalShards = 0;
+ int successfulShards = 0;
+ List broadcastByNodeResponses = new ArrayList<>();
+ List exceptions = new ArrayList<>();
+ for (int i = 0; i < responses.length(); i++) {
+ if (responses.get(i) instanceof FailedNodeException) {
+ FailedNodeException exception = (FailedNodeException) responses.get(i);
+ totalShards += nodes.get(exception.nodeId()).size();
+ for (ShardRouting shard : nodes.get(exception.nodeId())) {
+ exceptions.add(new DefaultShardOperationFailedException(shard.getIndex(), shard.getId(), exception));
+ }
+ } else {
+ NodeResponse response = (NodeResponse) responses.get(i);
+ broadcastByNodeResponses.addAll(response.results);
+ totalShards += response.getTotalShards();
+ successfulShards += response.getSuccessfulShards();
+ for (BroadcastShardOperationFailedException throwable : response.getExceptions()) {
+ if (!TransportActions.isShardNotAvailableException(throwable)) {
+ exceptions.add(new DefaultShardOperationFailedException(throwable.getIndex(), throwable.getShardId().getId(), throwable));
+ }
+ }
+ }
+ }
+ totalShards += unavailableShardExceptions.size();
+ int failedShards = exceptions.size();
+ return newResponse(request, totalShards, successfulShards, failedShards, broadcastByNodeResponses, exceptions, clusterState);
+ }
+
+ /**
+ * Deserialize a shard-level result from an input stream
+ *
+ * @param in input stream
+ * @return a deserialized shard-level result
+ * @throws IOException
+ */
+ protected abstract ShardOperationResult readShardResult(StreamInput in) throws IOException;
+
+ /**
+ * Creates a new response to the underlying request.
+ *
+ * @param request the underlying request
+ * @param totalShards the total number of shards considered for execution of the operation
+ * @param successfulShards the total number of shards for which execution of the operation was successful
+ * @param failedShards the total number of shards for which execution of the operation failed
+ * @param results the per-node aggregated shard-level results
+ * @param shardFailures the exceptions corresponding to shard operationa failures
+ * @param clusterState the cluster state
+ * @return the response
+ */
+ protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures, ClusterState clusterState);
+
+ /**
+ * Deserialize a request from an input stream
+ *
+ * @param in input stream
+ * @return a de-serialized request
+ * @throws IOException
+ */
+ protected abstract Request readRequestFrom(StreamInput in) throws IOException;
+
+ /**
+ * Executes the shard-level operation. This method is called once per shard serially on the receiving node.
+ *
+ * @param request the node-level request
+ * @param shardRouting the shard on which to execute the operation
+ * @return the result of the shard-level operation for the shard
+ */
+ protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting);
+
+ /**
+ * Determines the shards on which this operation will be executed on. The operation is executed once per shard.
+ *
+ * @param clusterState the cluster state
+ * @param request the underlying request
+ * @param concreteIndices the concrete indices on which to execute the operation
+ * @return the shards on which to execute the operation
+ */
+ protected abstract ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
+
+ /**
+ * Executes a global block check before polling the cluster state.
+ *
+ * @param state the cluster state
+ * @param request the underlying request
+ * @return a non-null exception if the operation is blocked
+ */
+ protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
+
+ /**
+ * Executes a global request-level check before polling the cluster state.
+ *
+ * @param state the cluster state
+ * @param request the underlying request
+ * @param concreteIndices the concrete indices on which to execute the operation
+ * @return a non-null exception if the operation if blocked
+ */
+ protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
+
+ @Override
+ protected void doExecute(Request request, ActionListener listener) {
+ new AsyncAction(request, listener).start();
+ }
+
+ protected class AsyncAction {
+ private final Request request;
+ private final ActionListener listener;
+ private final ClusterState clusterState;
+ private final DiscoveryNodes nodes;
+ private final Map> nodeIds;
+ private final AtomicReferenceArray