Refactor update shard logic for primaries

This commit is a simple refactoring of the update shard logic for
primaries. Namely, there was some duplicated code here that was annoying
to have to read twice so it is now collapsed with this commit.
This commit is contained in:
Jason Tedor 2017-05-18 15:12:46 -04:00
parent 3d5154191c
commit fe83df2a50
1 changed files with 22 additions and 12 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
@ -551,18 +552,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try {
shard.updateRoutingEntry(shardRouting);
if (shardRouting.primary()) {
IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
Set<String> activeIds = indexShardRoutingTable.activeShards().stream()
// filter to shards that track seq# and should be taken into consideration for checkpoint tracking
// shards on old nodes will go through a file based recovery which will also transfer seq# information.
.filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED))
.map(r -> r.allocationId().getId())
.collect(Collectors.toSet());
Set<String> initializingIds = indexShardRoutingTable.getAllInitializingShards().stream()
.filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED))
.map(r -> r.allocationId().getId())
.collect(Collectors.toSet());
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
/*
* Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on
* old nodes will go through a file-based recovery which will also transfer sequence number information.
*/
final Set<String> activeIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
@ -586,6 +585,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
}
}
private Set<String> allocationIdsForShardsOnNodesThatUnderstandSeqNos(
final List<ShardRouting> shardRoutings,
final DiscoveryNodes nodes) {
return shardRoutings
.stream()
.filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED))
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet());
}
/**
* Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard
* routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.