don't use the index serivce to get the stored blobs in shared gateway, move it to upper node level

This commit is contained in:
kimchy 2010-10-19 18:03:48 +02:00
parent 53a3df5d8e
commit 69b8b0f437
3 changed files with 37 additions and 45 deletions

View File

@ -33,13 +33,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.transport.ConnectTransportException;
import java.util.Iterator;
@ -52,7 +52,7 @@ import java.util.concurrent.ConcurrentMap;
*/
public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final IndicesService indicesService;
private final Node node;
private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData;
@ -62,10 +62,10 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
@Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node,
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
super(settings);
this.indicesService = indicesService;
this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency
this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData;
this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30));
@ -102,14 +102,6 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();
InternalIndexService indexService = (InternalIndexService) indicesService.indexService(shard.index());
if (indexService == null) {
continue;
}
// if the store is not persistent, it makes no sense to test for special allocation
if (!indexService.store().persistent()) {
continue;
}
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
boolean canBeAllocatedToAtLeastOneNode = false;
@ -165,11 +157,10 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
// if its a primary, it will be recovered from the gateway, find one that is closet to it
if (shard.primary()) {
BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway();
try {
CommitPoint commitPoint = cachedCommitPoints.get(shard.shardId());
if (commitPoint == null) {
commitPoint = indexGateway.findCommitPoint(shard.id());
commitPoint = ((BlobStoreGateway) ((InternalNode) this.node).injector().getInstance(Gateway.class)).findCommitPoint(shard.index(), shard.id());
if (commitPoint != null) {
cachedCommitPoints.put(shard.shardId(), commitPoint);
} else {

View File

@ -25,15 +25,20 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.blobstore.*;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.gateway.shared.SharedStorageGateway;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.CommitPoints;
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (shay.banon)
@ -102,6 +107,27 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
}
}
public CommitPoint findCommitPoint(String index, int shardId) throws IOException {
ImmutableBlobContainer container = blobStore.immutableBlobContainer(BlobStoreIndexGateway.shardPath(basePath, index, shardId));
ImmutableMap<String, BlobMetaData> blobs = container.listBlobs();
List<CommitPoint> commitPointsList = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith("commit-")) {
try {
commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name)));
} catch (Exception e) {
logger.warn("failed to read commit point [{}]", e, name);
}
}
}
CommitPoints commitPoints = new CommitPoints(commitPointsList);
if (commitPoints.commits().isEmpty()) {
return null;
}
return commitPoints.commits().get(0);
}
@Override public void write(MetaData metaData) throws GatewayException {
final String newMetaData = "metadata-" + (currentIndex + 1);
XContentBuilder builder;

View File

@ -20,12 +20,8 @@
package org.elasticsearch.index.gateway.blobstore;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.Gateway;
@ -33,14 +29,9 @@ import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.CommitPoint;
import org.elasticsearch.index.gateway.CommitPoints;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
@ -69,26 +60,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
this.indexPath = this.gateway.basePath().add("indices").add(index.name());
}
public CommitPoint findCommitPoint(int shardId) throws IOException {
ImmutableBlobContainer container = blobStore.immutableBlobContainer(shardPath(shardId));
ImmutableMap<String, BlobMetaData> blobs = container.listBlobs();
List<CommitPoint> commitPointsList = Lists.newArrayList();
for (String name : blobs.keySet()) {
if (name.startsWith("commit-")) {
try {
commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name)));
} catch (Exception e) {
logger.warn("failed to read commit point [{}]", e, name);
}
}
}
CommitPoints commitPoints = new CommitPoints(commitPointsList);
if (commitPoints.commits().isEmpty()) {
return null;
}
return commitPoints.commits().get(0);
}
@Override public String toString() {
return type() + "://" + blobStore + "/" + indexPath;
}
@ -105,6 +76,10 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple
return indexPath.add(Integer.toString(shardId));
}
public static BlobPath shardPath(BlobPath basePath, String index, int shardId) {
return basePath.add("indices").add(index).add(Integer.toString(shardId));
}
@Override public void close(boolean delete) throws ElasticSearchException {
if (delete) {
blobStore.delete(indexPath);