diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index 151300c6536..5d78917b3fa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -62,8 +62,8 @@ public class ShardsAllocation extends AbstractComponent { */ public RoutingTable applyStartedShards(ClusterState clusterState, List startedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - boolean changed = applyStartedShards(routingNodes, startedShards); nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards); + boolean changed = applyStartedShards(routingNodes, startedShards); if (!changed) { return clusterState.routingTable(); } @@ -78,8 +78,8 @@ public class ShardsAllocation extends AbstractComponent { */ public RoutingTable applyFailedShards(ClusterState clusterState, List failedShards) { RoutingNodes routingNodes = clusterState.routingNodes(); - boolean changed = applyFailedShards(routingNodes, failedShards); nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards); + boolean changed = applyFailedShards(routingNodes, failedShards); if (!changed) { return clusterState.routingTable(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index d98e0e61ac8..0b8f84c1a8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -28,7 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocations; -import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -47,6 +47,7 @@ import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -62,6 +63,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { private final ConcurrentMap cachedCommitPoints = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); + @Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService, TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { super(settings); @@ -74,12 +77,14 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { @Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards) { for (ShardRouting shardRouting : startedShards) { cachedCommitPoints.remove(shardRouting.shardId()); + cachedStores.remove(shardRouting.shardId()); } } @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { for (ShardRouting shardRouting : failedShards) { cachedCommitPoints.remove(shardRouting.shardId()); + cachedStores.remove(shardRouting.shardId()); } } @@ -94,7 +99,6 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { return changed; } - Map cachedNodesStoreFilesMetaData = Maps.newHashMap(); Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); @@ -125,35 +129,16 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { continue; } - // go and fetch the shard store data for it - TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = cachedNodesStoreFilesMetaData.get(shard.shardId()); - if (nodesStoreFilesMetaData == null) { - nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); - cachedNodesStoreFilesMetaData.put(shard.shardId(), nodesStoreFilesMetaData); - } - - if (logger.isDebugEnabled()) { - if (nodesStoreFilesMetaData.failures().length > 0) { - StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); - for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { - Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]); - if (cause instanceof ConnectTransportException) { - continue; - } - sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); - } - logger.debug(sb.toString()); - } - } + ConcurrentMap shardStores = buildShardStores(nodes, shard); long lastSizeMatched = 0; DiscoveryNode lastDiscoNodeMatched = null; RoutingNode lastNodeMatched = null; - for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { - DiscoveryNode discoNode = nodeStoreFilesMetaData.node(); + for (Map.Entry nodeStoreEntry : shardStores.entrySet()) { + DiscoveryNode discoNode = nodeStoreEntry.getKey(); + IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue(); logger.trace("{}: checking node [{}]", shard, discoNode); - IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreFilesMetaData.storeFilesMetaData(); if (storeFilesMetaData == null) { // already allocated on that node... @@ -179,7 +164,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { // if its a primary, it will be recovered from the gateway, find one that is closet to it - if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) { + if (shard.primary()) { BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway(); try { CommitPoint commitPoint = cachedCommitPoints.get(shard.shardId()); @@ -232,36 +217,32 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } else { logger.trace("{}: node ignored for pre_allocation [{}], total_size_matched [{}] smaller than last_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched), new ByteSizeValue(lastSizeMatched)); } - - continue; } catch (Exception e) { // failed, log and try and allocate based on size logger.debug("Failed to guess allocation of primary based on gateway for " + shard, e); } - } - - // if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it - // note, since we replicate operations, this might not be the same (different flush intervals) - if (!shard.primary()) { + } else { + // if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it + // note, since we replicate operations, this might not be the same (different flush intervals) MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard); if (primaryShard != null && primaryShard.active()) { - TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId()); - if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) { - long sizeMatched = 0; + DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId()); + if (primaryNode != null) { + IndexStore.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode); + if (primaryNodeStore != null && primaryNodeStore.allocated()) { + long sizeMatched = 0; - IndexStore.StoreFilesMetaData primaryStoreFilesMetaData = primaryNodeStoreFileMetaData.storeFilesMetaData(); - for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { - if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) { - sizeMatched += storeFileMetaData.length(); + for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) { + if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) { + sizeMatched += storeFileMetaData.length(); + } + } + if (sizeMatched > lastSizeMatched) { + lastSizeMatched = sizeMatched; + lastDiscoNodeMatched = discoNode; + lastNodeMatched = node; } } - if (sizeMatched > lastSizeMatched) { - lastSizeMatched = sizeMatched; - lastDiscoNodeMatched = discoNode; - lastNodeMatched = node; - } - - continue; } } } @@ -290,6 +271,69 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { return changed; } + private ConcurrentMap buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) { + ConcurrentMap shardStores = cachedStores.get(shard.shardId()); + if (shardStores == null) { + shardStores = ConcurrentCollections.newConcurrentMap(); + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet(); + if (logger.isDebugEnabled()) { + if (nodesStoreFilesMetaData.failures().length > 0) { + StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); + for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { + Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]); + if (cause instanceof ConnectTransportException) { + continue; + } + sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); + } + logger.debug(sb.toString()); + } + } + + for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { + shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); + } + cachedStores.put(shard.shardId(), shardStores); + } else { + // clean nodes that have failed + for (DiscoveryNode node : shardStores.keySet()) { + if (!nodes.nodeExists(node.id())) { + shardStores.remove(node); + } + } + + // we have stored cached from before, see if the nodes changed, if they have, go fetch again + Set fetchedNodes = Sets.newHashSet(); + for (DiscoveryNode node : nodes.dataNodes().values()) { + if (!shardStores.containsKey(node)) { + fetchedNodes.add(node.id()); + } + } + + if (!fetchedNodes.isEmpty()) { + TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet(); + if (logger.isDebugEnabled()) { + if (nodesStoreFilesMetaData.failures().length > 0) { + StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:"); + for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) { + Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]); + if (cause instanceof ConnectTransportException) { + continue; + } + sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage()); + } + logger.debug(sb.toString()); + } + } + + for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) { + shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData()); + } + } + } + return shardStores; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) { return Decision.YES; }