From fe83df2a503fbd7b91d3151fbd6c520375674008 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 18 May 2017 15:12:46 -0400 Subject: [PATCH] Refactor update shard logic for primaries This commit is a simple refactoring of the update shard logic for primaries. Namely, there was some duplicated code here that was annoying to have to read twice so it is now collapsed with this commit. --- .../cluster/IndicesClusterStateService.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 1b243bccd74..7371bde8939 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource.Type; import org.elasticsearch.cluster.routing.RoutingNode; @@ -551,18 +552,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple try { shard.updateRoutingEntry(shardRouting); if (shardRouting.primary()) { - IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - Set activeIds = indexShardRoutingTable.activeShards().stream() - // filter to shards that track seq# and should be taken into consideration for checkpoint tracking - // shards on old nodes will go through a file based recovery which will also transfer seq# information. - .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) - .map(r -> r.allocationId().getId()) - .collect(Collectors.toSet()); - Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream() - .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) - .map(r -> r.allocationId().getId()) - .collect(Collectors.toSet()); - shard.updateAllocationIdsFromMaster(activeIds, initializingIds); + final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); + /* + * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on + * old nodes will go through a file-based recovery which will also transfer sequence number information. + */ + final Set activeIds = + allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); + final Set initializingIds = + allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); + shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -586,6 +585,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple } } + private Set allocationIdsForShardsOnNodesThatUnderstandSeqNos( + final List shardRoutings, + final DiscoveryNodes nodes) { + return shardRoutings + .stream() + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toSet()); + } + /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.