diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 1b243bccd74..7371bde8939 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource.Type; import org.elasticsearch.cluster.routing.RoutingNode; @@ -551,18 +552,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple try { shard.updateRoutingEntry(shardRouting); if (shardRouting.primary()) { - IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - Set activeIds = indexShardRoutingTable.activeShards().stream() - // filter to shards that track seq# and should be taken into consideration for checkpoint tracking - // shards on old nodes will go through a file based recovery which will also transfer seq# information. - .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) - .map(r -> r.allocationId().getId()) - .collect(Collectors.toSet()); - Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream() - .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) - .map(r -> r.allocationId().getId()) - .collect(Collectors.toSet()); - shard.updateAllocationIdsFromMaster(activeIds, initializingIds); + final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); + /* + * Filter to shards that track sequence numbers and should be taken into consideration for checkpoint tracking. Shards on + * old nodes will go through a file-based recovery which will also transfer sequence number information. + */ + final Set activeIds = + allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); + final Set initializingIds = + allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); + shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); @@ -586,6 +585,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple } } + private Set allocationIdsForShardsOnNodesThatUnderstandSeqNos( + final List shardRoutings, + final DiscoveryNodes nodes) { + return shardRoutings + .stream() + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(ShardRouting::allocationId) + .map(AllocationId::getId) + .collect(Collectors.toSet()); + } + /** * Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard * routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.