Convert upgrade action to broadcast by node

Several shard-level operations that previously broadcasted a request
per shard were converted to broadcast a request per node. This commit
converts upgrade action to this new model as well.

Closes #13204
This commit is contained in:
Jason Tedor 2015-08-30 20:40:34 -04:00
parent aa26b66e96
commit d1223b7369
10 changed files with 75 additions and 79 deletions

View File

@ -68,7 +68,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastByNodeAc
}
@Override
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures) {
protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new ClearIndicesCacheResponse(totalShards, successfulShards, failedShards, shardFailures);
}

View File

@ -62,7 +62,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction<Opti
}
@Override
protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures) {
protected OptimizeResponse newResponse(OptimizeRequest request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new OptimizeResponse(totalShards, successfulShards, failedShards, shardFailures);
}

View File

@ -69,7 +69,7 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction<Reco
@Override
protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List<RecoveryState> responses, List<ShardOperationFailedException> shardFailures) {
protected RecoveryResponse newResponse(RecoveryRequest request, int totalShards, int successfulShards, int failedShards, List<RecoveryState> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
Map<String, List<RecoveryState>> shardResponses = Maps.newHashMap();
for (RecoveryState recoveryState : responses) {
if (recoveryState == null) {

View File

@ -80,7 +80,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi
}
@Override
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardSegments> results, List<ShardOperationFailedException> shardFailures) {
protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardSegments> results, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new IndicesSegmentResponse(results.toArray(new ShardSegments[results.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

View File

@ -81,7 +81,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
}
@Override
protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardStats> responses, List<ShardOperationFailedException> shardFailures) {
protected IndicesStatsResponse newResponse(IndicesStatsRequest request, int totalShards, int successfulShards, int failedShards, List<ShardStats> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new IndicesStatsResponse(responses.toArray(new ShardStats[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

View File

@ -82,7 +82,7 @@ public class TransportUpgradeStatusAction extends TransportBroadcastByNodeAction
}
@Override
protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeStatus> responses, List<ShardOperationFailedException> shardFailures) {
protected UpgradeStatusResponse newResponse(UpgradeStatusRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeStatus> responses, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new UpgradeStatusResponse(responses.toArray(new ShardUpgradeStatus[responses.size()]), totalShards, successfulShards, failedShards, shardFailures);
}

View File

@ -20,9 +20,9 @@
package org.elasticsearch.action.admin.indices.upgrade.post;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -31,7 +31,9 @@ import java.text.ParseException;
/**
*
*/
class ShardUpgradeResponse extends BroadcastShardResponse {
class ShardUpgradeResult implements Streamable {
private ShardId shardId;
private org.apache.lucene.util.Version oldestLuceneSegment;
@ -40,16 +42,20 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
private boolean primary;
ShardUpgradeResponse() {
ShardUpgradeResult() {
}
ShardUpgradeResponse(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
super(shardId);
ShardUpgradeResult(ShardId shardId, boolean primary, Version upgradeVersion, org.apache.lucene.util.Version oldestLuceneSegment) {
this.shardId = shardId;
this.primary = primary;
this.upgradeVersion = upgradeVersion;
this.oldestLuceneSegment = oldestLuceneSegment;
}
public ShardId getShardId() {
return shardId;
}
public org.apache.lucene.util.Version oldestLuceneSegment() {
return this.oldestLuceneSegment;
}
@ -65,7 +71,7 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
primary = in.readBoolean();
upgradeVersion = Version.readVersion(in);
try {
@ -78,10 +84,9 @@ class ShardUpgradeResponse extends BroadcastShardResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeBoolean(primary);
Version.writeVersion(upgradeVersion, out);
out.writeString(oldestLuceneSegment.toString());
}
}

View File

@ -24,32 +24,30 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
@ -57,7 +55,7 @@ import static com.google.common.collect.Sets.newHashSet;
/**
* Upgrade index/indices action.
*/
public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequest, UpgradeResponse, ShardUpgradeRequest, ShardUpgradeResponse> {
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {
private final IndicesService indicesService;
@ -67,58 +65,42 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
UpgradeRequest.class, ShardUpgradeRequest.class, ThreadPool.Names.OPTIMIZE);
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest.class, ThreadPool.Names.OPTIMIZE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
}
@Override
protected UpgradeResponse newResponse(UpgradeRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
protected UpgradeResponse newResponse(UpgradeRequest request, int totalShards, int successfulShards, int failedShards, List<ShardUpgradeResult> shardUpgradeResults, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
Map<String, Integer> successfulPrimaryShards = newHashMap();
Map<String, Tuple<Version, org.apache.lucene.util.Version>> versions = newHashMap();
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore...
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
for (ShardUpgradeResult result : shardUpgradeResults) {
successfulShards++;
ShardUpgradeResponse shardUpgradeResponse = (ShardUpgradeResponse) shardResponse;
String index = shardUpgradeResponse.getIndex();
if (shardUpgradeResponse.primary()) {
String index = result.getShardId().getIndex();
if (result.primary()) {
Integer count = successfulPrimaryShards.get(index);
successfulPrimaryShards.put(index, count == null ? 1 : count + 1);
}
Tuple<Version, org.apache.lucene.util.Version> versionTuple = versions.get(index);
if (versionTuple == null) {
versions.put(index, new Tuple<>(shardUpgradeResponse.upgradeVersion(), shardUpgradeResponse.oldestLuceneSegment()));
versions.put(index, new Tuple<>(result.upgradeVersion(), result.oldestLuceneSegment()));
} else {
// We already have versions for this index - let's see if we need to update them based on the current shard
Version version = versionTuple.v1();
org.apache.lucene.util.Version luceneVersion = versionTuple.v2();
// For the metadata we are interested in the _latest_ elasticsearch version that was processing the metadata
// For the metadata we are interested in the _latest_ Elasticsearch version that was processing the metadata
// Since we rewrite the mapping during upgrade the metadata is always rewritten by the latest version
if (shardUpgradeResponse.upgradeVersion().after(versionTuple.v1())) {
version = shardUpgradeResponse.upgradeVersion();
if (result.upgradeVersion().after(versionTuple.v1())) {
version = result.upgradeVersion();
}
// For the lucene version we are interested in the _oldest_ lucene version since it determines the
// oldest version that we need to support
if (shardUpgradeResponse.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
luceneVersion = shardUpgradeResponse.oldestLuceneSegment();
if (result.oldestLuceneSegment().onOrAfter(versionTuple.v2()) == false) {
luceneVersion = result.oldestLuceneSegment();
}
versions.put(index, new Tuple<>(version, luceneVersion));
}
}
}
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = newHashMap();
MetaData metaData = clusterState.metaData();
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
@ -133,33 +115,37 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
}
}
return new UpgradeResponse(updatedVersions, shardsResponses.length(), successfulShards, failedShards, shardFailures);
return new UpgradeResponse(updatedVersions, totalShards, successfulShards, failedShards, shardFailures);
}
@Override
protected ShardUpgradeRequest newShardRequest(int numShards, ShardRouting shard, UpgradeRequest request) {
return new ShardUpgradeRequest(shard.shardId(), request);
protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
// We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version
return new ShardUpgradeResult(shardRouting.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
}
@Override
protected ShardUpgradeResponse newShardResponse() {
return new ShardUpgradeResponse();
protected ShardUpgradeResult readShardResult(StreamInput in) throws IOException {
ShardUpgradeResult result = new ShardUpgradeResult();
result.readFrom(in);
return result;
}
@Override
protected ShardUpgradeResponse shardOperation(ShardUpgradeRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request.upgradeRequest());
// We are using the current version of elasticsearch as upgrade version since we update mapping to match the current version
return new ShardUpgradeResponse(request.shardId(), indexShard.routingEntry().primary(), Version.CURRENT, oldestLuceneSegment);
protected UpgradeRequest readRequestFrom(StreamInput in) throws IOException {
UpgradeRequest request = new UpgradeRequest();
request.readFrom(in);
return request;
}
/**
* The upgrade request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, UpgradeRequest request, String[] concreteIndices) {
GroupShardsIterator iterator = clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
protected ShardsIterator shards(ClusterState clusterState, UpgradeRequest request, String[] concreteIndices) {
ShardsIterator iterator = clusterState.routingTable().allShards(concreteIndices);
Set<String> indicesWithMissingPrimaries = indicesWithMissingPrimaries(clusterState, concreteIndices);
if (indicesWithMissingPrimaries.isEmpty()) {
return iterator;
@ -231,5 +217,4 @@ public class TransportUpgradeAction extends TransportBroadcastAction<UpgradeRequ
}
});
}
}

View File

@ -108,7 +108,12 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
}, executor, new BroadcastByNodeTransportRequestHandler());
}
private final Response newResponse(Request request, AtomicReferenceArray responses, List<NoShardAvailableActionException> unavailableShardExceptions, Map<String, List<ShardRouting>> nodes) {
private final Response newResponse(
Request request,
AtomicReferenceArray responses,
List<NoShardAvailableActionException> unavailableShardExceptions,
Map<String, List<ShardRouting>> nodes,
ClusterState clusterState) {
int totalShards = 0;
int successfulShards = 0;
List<ShardOperationResult> broadcastByNodeResponses = new ArrayList<>();
@ -134,7 +139,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
}
totalShards += unavailableShardExceptions.size();
int failedShards = exceptions.size();
return newResponse(request, totalShards, successfulShards, failedShards, broadcastByNodeResponses, exceptions);
return newResponse(request, totalShards, successfulShards, failedShards, broadcastByNodeResponses, exceptions, clusterState);
}
/**
@ -155,9 +160,10 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
* @param failedShards the total number of shards for which execution of the operation failed
* @param results the per-node aggregated shard-level results
* @param shardFailures the exceptions corresponding to shard operationa failures
* @param clusterState the cluster state
* @return the response
*/
protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<ShardOperationResult> results, List<ShardOperationFailedException> shardFailures);
protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<ShardOperationResult> results, List<ShardOperationFailedException> shardFailures, ClusterState clusterState);
/**
* Deserialize a request from an input stream
@ -341,7 +347,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
protected void onCompletion() {
Response response = null;
try {
response = newResponse(request, responses, unavailableShardExceptions, nodeIds);
response = newResponse(request, responses, unavailableShardExceptions, nodeIds, clusterState);
} catch (Throwable t) {
logger.debug("failed to combine responses from nodes", t);
listener.onFailure(t);

View File

@ -119,7 +119,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
}
@Override
protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> emptyResults, List<ShardOperationFailedException> shardFailures) {
protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> emptyResults, List<ShardOperationFailedException> shardFailures, ClusterState clusterState) {
return new Response(totalShards, successfulShards, failedShards, shardFailures);
}