cache commit points during smart allocation based on local storage when recovering from gateway

This commit is contained in:
kimchy 2010-08-25 01:12:16 +03:00
parent 244cd42298
commit 4bfd0a8c26
2 changed files with 42 additions and 6 deletions

View File

@ -22,19 +22,18 @@ package org.elasticsearch.gateway.blobstore;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.gateway.CommitPoint; import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -42,6 +41,7 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -54,13 +54,15 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final TimeValue listTimeout; private final TimeValue listTimeout;
private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap();
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService, @Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings); super(settings);
this.indicesService = indicesService; this.indicesService = indicesService;
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueMillis(5000)); this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(60));
} }
@Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { @Override public boolean allocate(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
@ -70,6 +72,24 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
return changed; return changed;
} }
// clean cached commit points for primaries that are already active
for (ShardId shardId : cachedCommitPoints.keySet()) {
IndexRoutingTable indexRoutingTable = routingNodes.routingTable().index(shardId.index().name());
if (indexRoutingTable == null) {
cachedCommitPoints.remove(shardId);
continue;
}
ShardRouting primaryShardRouting = indexRoutingTable.shard(shardId.id()).primaryShard();
if (primaryShardRouting.active()) {
cachedCommitPoints.remove(shardId);
}
}
if (!routingNodes.hasUnassigned()) {
return changed;
}
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator(); Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) { while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next(); MutableShardRouting shard = unassignedIterator.next();
@ -134,7 +154,21 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) { if (shard.primary() && indexService.gateway() instanceof BlobStoreIndexGateway) {
BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway(); BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway();
try { try {
CommitPoint commitPoint = indexGateway.findCommitPoint(shard.id()); CommitPoint commitPoint = cachedCommitPoints.get(shard.shardId());
if (commitPoint == null) {
commitPoint = indexGateway.findCommitPoint(shard.id());
if (commitPoint != null) {
cachedCommitPoints.put(shard.shardId(), commitPoint);
} else {
cachedCommitPoints.put(shard.shardId(), CommitPoint.NULL);
}
} else if (commitPoint == CommitPoint.NULL) {
commitPoint = null;
}
if (commitPoint == null) {
break;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n"); StringBuilder sb = new StringBuilder(shard + ": checking for pre_allocation (gateway) on node " + discoNode + "\n");

View File

@ -28,6 +28,8 @@ import java.util.List;
*/ */
public class CommitPoint { public class CommitPoint {
public static CommitPoint NULL = new CommitPoint(-1, "_null_", Type.GENERATED, ImmutableList.<CommitPoint.FileInfo>of(), ImmutableList.<CommitPoint.FileInfo>of());
public static class FileInfo { public static class FileInfo {
private final String name; private final String name;
private final String physicalName; private final String physicalName;