improve caching of stored metadata fetched from nodes during allocation in order to reuse existing shards

This commit is contained in:
kimchy 2010-08-25 23:12:07 +03:00
parent 2910b6ab7f
commit dfa24f6d03
2 changed files with 93 additions and 49 deletions

View File

@ -62,8 +62,8 @@ public class ShardsAllocation extends AbstractComponent {
*/
public RoutingTable applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
boolean changed = applyStartedShards(routingNodes, startedShards);
nodeAllocations.applyStartedShards(nodeAllocations, routingNodes, clusterState.nodes(), startedShards);
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return clusterState.routingTable();
}
@ -78,8 +78,8 @@ public class ShardsAllocation extends AbstractComponent {
*/
public RoutingTable applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
boolean changed = applyFailedShards(routingNodes, failedShards);
nodeAllocations.applyFailedShards(nodeAllocations, routingNodes, clusterState.nodes(), failedShards);
boolean changed = applyFailedShards(routingNodes, failedShards);
if (!changed) {
return clusterState.routingTable();
}

View File

@ -28,7 +28,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -47,6 +47,7 @@ import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
@ -62,6 +63,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
@ -74,12 +77,14 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
@Override public void applyStartedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards) {
for (ShardRouting shardRouting : startedShards) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
for (ShardRouting shardRouting : failedShards) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
}
@ -94,7 +99,6 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
return changed;
}
Map<ShardId, TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData> cachedNodesStoreFilesMetaData = Maps.newHashMap();
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
@ -125,35 +129,16 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
continue;
}
// go and fetch the shard store data for it
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = cachedNodesStoreFilesMetaData.get(shard.shardId());
if (nodesStoreFilesMetaData == null) {
nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
cachedNodesStoreFilesMetaData.put(shard.shardId(), nodesStoreFilesMetaData);
}
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
long lastSizeMatched = 0;
DiscoveryNode lastDiscoNodeMatched = null;
RoutingNode lastNodeMatched = null;
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
DiscoveryNode discoNode = nodeStoreFilesMetaData.node();
for (Map.Entry<DiscoveryNode, IndexStore.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
logger.trace("{}: checking node [{}]", shard, discoNode);
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreFilesMetaData.storeFilesMetaData();
if (storeFilesMetaData == null) {
// already allocated on that node...
@ -179,7 +164,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
// if its a primary, it will be recovered from the gateway, find one that is closet to it
if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) {
if (shard.primary()) {
BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway();
try {
CommitPoint commitPoint = cachedCommitPoints.get(shard.shardId());
@ -232,36 +217,32 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
} else {
logger.trace("{}: node ignored for pre_allocation [{}], total_size_matched [{}] smaller than last_size_matched [{}]", shard, discoNode, new ByteSizeValue(sizeMatched), new ByteSizeValue(lastSizeMatched));
}
continue;
} catch (Exception e) {
// failed, log and try and allocate based on size
logger.debug("Failed to guess allocation of primary based on gateway for " + shard, e);
}
}
// if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it
// note, since we replicate operations, this might not be the same (different flush intervals)
if (!shard.primary()) {
} else {
// if its backup, see if there is a primary that *is* allocated, and try and assign a location that is closest to it
// note, since we replicate operations, this might not be the same (different flush intervals)
MutableShardRouting primaryShard = routingNodes.findPrimaryForReplica(shard);
if (primaryShard != null && primaryShard.active()) {
TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData primaryNodeStoreFileMetaData = nodesStoreFilesMetaData.nodesMap().get(primaryShard.currentNodeId());
if (primaryNodeStoreFileMetaData != null && primaryNodeStoreFileMetaData.storeFilesMetaData() != null && primaryNodeStoreFileMetaData.storeFilesMetaData().allocated()) {
long sizeMatched = 0;
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
if (primaryNode != null) {
IndexStore.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
long sizeMatched = 0;
IndexStore.StoreFilesMetaData primaryStoreFilesMetaData = primaryNodeStoreFileMetaData.storeFilesMetaData();
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (primaryStoreFilesMetaData.fileExists(storeFileMetaData.name()) && primaryStoreFilesMetaData.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
sizeMatched += storeFileMetaData.length();
for (StoreFileMetaData storeFileMetaData : storeFilesMetaData) {
if (primaryNodeStore.fileExists(storeFileMetaData.name()) && primaryNodeStore.file(storeFileMetaData.name()).length() == storeFileMetaData.length()) {
sizeMatched += storeFileMetaData.length();
}
}
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
}
}
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
lastNodeMatched = node;
}
continue;
}
}
}
@ -290,6 +271,69 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
return changed;
}
private ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
if (shardStores == null) {
shardStores = ConcurrentCollections.newConcurrentMap();
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
}
cachedStores.put(shard.shardId(), shardStores);
} else {
// clean nodes that have failed
for (DiscoveryNode node : shardStores.keySet()) {
if (!nodes.nodeExists(node.id())) {
shardStores.remove(node);
}
}
// we have stored cached from before, see if the nodes changed, if they have, go fetch again
Set<String> fetchedNodes = Sets.newHashSet();
for (DiscoveryNode node : nodes.dataNodes().values()) {
if (!shardStores.containsKey(node)) {
fetchedNodes.add(node.id());
}
}
if (!fetchedNodes.isEmpty()) {
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, fetchedNodes, listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
}
}
}
return shardStores;
}
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingNodes routingNodes) {
return Decision.YES;
}