From d1223b73693887a8035b6480df278cf4cde026b3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 30 Aug 2015 20:40:34 -0400 Subject: [PATCH] Convert upgrade action to broadcast by node Several shard-level operations that previously broadcasted a request per shard were converted to broadcast a request per node. This commit converts upgrade action to this new model as well. Closes #13204 --- .../TransportClearIndicesCacheAction.java | 2 +- .../optimize/TransportOptimizeAction.java | 2 +- .../recovery/TransportRecoveryAction.java | 2 +- .../TransportIndicesSegmentsAction.java | 2 +- .../stats/TransportIndicesStatsAction.java | 2 +- .../get/TransportUpgradeStatusAction.java | 2 +- ...eResponse.java => ShardUpgradeResult.java} | 21 ++-- .../upgrade/post/TransportUpgradeAction.java | 105 ++++++++---------- .../node/TransportBroadcastByNodeAction.java | 14 ++- .../TransportBroadcastByNodeActionTests.java | 2 +- 10 files changed, 75 insertions(+), 79 deletions(-) rename core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/{ShardUpgradeResponse.java => ShardUpgradeResult.java} (83%) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index ba90ca19263..4cbefe9b695 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -68,7 +68,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc } @Override - protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) { return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index c5fed3144c5..549ce8fd57b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -62,7 +62,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction responses, List shardFailures) { + protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) { return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index 86817fd1cd9..af00f8b7895 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -69,7 +69,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction responses, List shardFailures) { + protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) { Map> shardResponses = Maps.newHashMap(); for (RecoveryState recoveryState : responses) { if (recoveryState == null) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index c03a1cecde0..ef029594c1c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -80,7 +80,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi } @Override - protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures) { + protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures, ClusterState clusterState) { return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index bf38f4dc7c0..d5e47ab769e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -81,7 +81,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< } @Override - protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) { return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java index b12a7d81465..a201a43e00b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/get/TransportUpgradeStatusAction.java @@ -82,7 +82,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction } @Override - protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures) { + protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List responses, List shardFailures, ClusterState clusterState) { return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java similarity index 83% rename from core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java rename to core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java index d3942038164..46c51757159 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/ShardUpgradeResult.java @@ -20,9 +20,9 @@ package org.elasticsearch.action.admin.indices.upgrade.post; import org.elasticsearch.Version; -import org.elasticsearch.action.support.broadcast.BroadcastShardResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -31,7 +31,9 @@ import java.text.ParseException; /** * */ -class ShardUpgradeResponse extends BroadcastShardResponse { +class ShardUpgradeResult implements Streamable { + + private ShardId shardId; private org.apache.lucene.util.Version oldestLuceneSegment; @@ -40,16 +42,20 @@ class ShardUpgradeResponse extends BroadcastShardResponse { private boolean primary; - ShardUpgradeResponse() { + ShardUpgradeResult() { } - ShardUpgradeResponse(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) { - super(shardId); + ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) { + this.shardId = shardId; this.primary = primary; this.upgradeVersion = upgradeVersion; this.oldestLuceneSegment = oldestLuceneSegment; } + public ShardId getShardId() { + return shardId; + } + public org.apache.lucene.util.Version oldestLuceneSegment() { return this.oldestLuceneSegment; } @@ -65,7 +71,7 @@ class ShardUpgradeResponse extends BroadcastShardResponse { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + shardId = ShardId.readShardId(in); primary = in.readBoolean(); upgradeVersion = Version.readVersion(in); try { @@ -78,10 +84,9 @@ class ShardUpgradeResponse extends BroadcastShardResponse { @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + shardId.writeTo(out); out.writeBoolean(primary); Version.writeVersion(upgradeVersion, out); out.writeString(oldestLuceneSegment.toString()); } - } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index 23d40a55cf0..11bc190aef0 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -24,32 +24,30 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.PrimaryMissingActionException; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.TransportBroadcastAction; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReferenceArray; import static com.google.common.collect.Maps.newHashMap; import static com.google.common.collect.Sets.newHashSet; @@ -57,7 +55,7 @@ import static com.google.common.collect.Sets.newHashSet; /** * Upgrade index/indices action. */ -public class TransportUpgradeAction extends TransportBroadcastAction { +public class TransportUpgradeAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; @@ -67,56 +65,40 @@ public class TransportUpgradeAction extends TransportBroadcastAction shardFailures = null; + protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, int successfulShards, int failedShards, List shardUpgradeResults, List shardFailures, ClusterState clusterState) { Map successfulPrimaryShards = newHashMap(); Map> versions = newHashMap(); - for (int i = 0; i < shardsResponses.length(); i++) { - Object shardResponse = shardsResponses.get(i); - if (shardResponse == null) { - // a non active shard, ignore... - } else if (shardResponse instanceof BroadcastShardOperationFailedException) { - failedShards++; - if (shardFailures == null) { - shardFailures = new ArrayList<>(); - } - shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); + for (ShardUpgradeResult result : shardUpgradeResults) { + successfulShards++; + String index = result.getShardId().getIndex(); + if (result.primary()) { + Integer count = successfulPrimaryShards.get(index); + successfulPrimaryShards.put(index, count == null ? 1 : count + 1); + } + Tuple versionTuple = versions.get(index); + if (versionTuple == null) { + versions.put(index, new Tuple<>(result.upgradeVersion(), result.oldestLuceneSegment())); } else { - successfulShards++; - ShardUpgradeResponse shardUpgradeResponse = (ShardUpgradeResponse) shardResponse; - String index = shardUpgradeResponse.getIndex(); - if (shardUpgradeResponse.primary()) { - Integer count = successfulPrimaryShards.get(index); - successfulPrimaryShards.put(index, count == null ? 1 : count + 1); + // We already have versions for this index - let's see if we need to update them based on the current shard + Version version = versionTuple.v1(); + org.apache.lucene.util.Version luceneVersion = versionTuple.v2(); + // For the metadata we are interested in the _latest_ Elasticsearch version that was processing the metadata + // Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version + if (result.upgradeVersion().after(versionTuple.v1())) { + version = result.upgradeVersion(); } - Tuple versionTuple = versions.get(index); - if (versionTuple == null) { - versions.put(index, new Tuple<>(shardUpgradeResponse.upgradeVersion(), shardUpgradeResponse.oldestLuceneSegment())); - } else { - // We already have versions for this index - let's see if we need to update them based on the current shard - Version version = versionTuple.v1(); - org.apache.lucene.util.Version luceneVersion = versionTuple.v2(); - // For the metadata we are interested in the _latest_ elasticsearch version that was processing the metadata - // Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version - if (shardUpgradeResponse.upgradeVersion().after(versionTuple.v1())) { - version = shardUpgradeResponse.upgradeVersion(); - } - // For the lucene version we are interested in the _oldest_ lucene version since it determines the - // oldest version that we need to support - if (shardUpgradeResponse.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) { - luceneVersion = shardUpgradeResponse.oldestLuceneSegment(); - } - versions.put(index, new Tuple<>(version, luceneVersion)); + // For the lucene version we are interested in the _oldest_ lucene version since it determines the + // oldest version that we need to support + if (result.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) { + luceneVersion = result.oldestLuceneSegment(); } + versions.put(index, new Tuple<>(version, luceneVersion)); } } Map> updatedVersions = newHashMap(); @@ -133,33 +115,37 @@ public class TransportUpgradeAction extends TransportBroadcastAction indicesWithMissingPrimaries = indicesWithMissingPrimaries(clusterState, concreteIndices); if (indicesWithMissingPrimaries.isEmpty()) { return iterator; @@ -231,5 +217,4 @@ public class TransportUpgradeAction extends TransportBroadcastAction unavailableShardExceptions, Map> nodes) { + private final Response newResponse( + Request request, + AtomicReferenceArray responses, + List unavailableShardExceptions, + Map> nodes, + ClusterState clusterState) { int totalShards = 0; int successfulShards = 0; List broadcastByNodeResponses = new ArrayList<>(); @@ -134,7 +139,7 @@ public abstract class TransportBroadcastByNodeAction results, List shardFailures); + protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List results, List shardFailures, ClusterState clusterState); /** * Deserialize a request from an input stream @@ -341,7 +347,7 @@ public abstract class TransportBroadcastByNodeAction emptyResults, List shardFailures) { + protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List emptyResults, List shardFailures, ClusterState clusterState) { return new Response(totalShards, successfulShards, failedShards, shardFailures); }