From 69b8b0f4371e551a858e24b43502abcec0eb2ec0 Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 19 Oct 2010 18:03:48 +0200 Subject: [PATCH] don't use the index serivce to get the stored blobs in shared gateway, move it to upper node level --- .../BlobReuseExistingNodeAllocation.java | 23 ++++--------- .../gateway/blobstore/BlobStoreGateway.java | 26 +++++++++++++++ .../blobstore/BlobStoreIndexGateway.java | 33 +++---------------- 3 files changed, 37 insertions(+), 45 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 3f914e7c93a..d5753efd90a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -33,13 +33,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.gateway.Gateway; import org.elasticsearch.index.gateway.CommitPoint; -import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; -import org.elasticsearch.index.service.InternalIndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreFileMetaData; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.transport.ConnectTransportException; import java.util.Iterator; @@ -52,7 +52,7 @@ import java.util.concurrent.ConcurrentMap; */ public class BlobReuseExistingNodeAllocation extends NodeAllocation { - private final IndicesService indicesService; + private final Node node; private final TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData; @@ -62,10 +62,10 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { private final ConcurrentMap> cachedStores = ConcurrentCollections.newConcurrentMap(); - @Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService, + @Inject public BlobReuseExistingNodeAllocation(Settings settings, Node node, TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) { super(settings); - this.indicesService = indicesService; + this.node = node; // YACK!, we need the Gateway, but it creates crazy circular dependency this.transportNodesListShardStoreMetaData = transportNodesListShardStoreMetaData; this.listTimeout = componentSettings.getAsTime("list_timeout", TimeValue.timeValueSeconds(30)); @@ -102,14 +102,6 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); - InternalIndexService indexService = (InternalIndexService) indicesService.indexService(shard.index()); - if (indexService == null) { - continue; - } - // if the store is not persistent, it makes no sense to test for special allocation - if (!indexService.store().persistent()) { - continue; - } // pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing boolean canBeAllocatedToAtLeastOneNode = false; @@ -165,11 +157,10 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { // if its a primary, it will be recovered from the gateway, find one that is closet to it if (shard.primary()) { - BlobStoreIndexGateway indexGateway = (BlobStoreIndexGateway) indexService.gateway(); try { CommitPoint commitPoint = cachedCommitPoints.get(shard.shardId()); if (commitPoint == null) { - commitPoint = indexGateway.findCommitPoint(shard.id()); + commitPoint = ((BlobStoreGateway) ((InternalNode) this.node).injector().getInstance(Gateway.class)).findCommitPoint(shard.index(), shard.id()); if (commitPoint != null) { cachedCommitPoints.put(shard.shardId(), commitPoint); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index 60c406a17d2..aa2cb3542b6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -25,15 +25,20 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.common.blobstore.*; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.gateway.GatewayException; import org.elasticsearch.gateway.shared.SharedStorageGateway; +import org.elasticsearch.index.gateway.CommitPoint; +import org.elasticsearch.index.gateway.CommitPoints; +import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway; import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.List; /** * @author kimchy (shay.banon) @@ -102,6 +107,27 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { } } + public CommitPoint findCommitPoint(String index, int shardId) throws IOException { + ImmutableBlobContainer container = blobStore.immutableBlobContainer(BlobStoreIndexGateway.shardPath(basePath, index, shardId)); + ImmutableMap blobs = container.listBlobs(); + List commitPointsList = Lists.newArrayList(); + for (String name : blobs.keySet()) { + if (name.startsWith("commit-")) { + try { + commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name))); + } catch (Exception e) { + logger.warn("failed to read commit point [{}]", e, name); + } + } + } + CommitPoints commitPoints = new CommitPoints(commitPointsList); + if (commitPoints.commits().isEmpty()) { + return null; + } + return commitPoints.commits().get(0); + } + + @Override public void write(MetaData metaData) throws GatewayException { final String newMetaData = "metadata-" + (currentIndex + 1); XContentBuilder builder; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java index 343e7262b03..1b976bdc628 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexGateway.java @@ -20,12 +20,8 @@ package org.elasticsearch.index.gateway.blobstore; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.blobstore.ImmutableBlobContainer; -import org.elasticsearch.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.gateway.Gateway; @@ -33,14 +29,9 @@ import org.elasticsearch.gateway.blobstore.BlobStoreGateway; import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; -import org.elasticsearch.index.gateway.CommitPoint; -import org.elasticsearch.index.gateway.CommitPoints; import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.settings.IndexSettings; -import java.io.IOException; -import java.util.List; - /** * @author kimchy (shay.banon) */ @@ -69,26 +60,6 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple this.indexPath = this.gateway.basePath().add("indices").add(index.name()); } - public CommitPoint findCommitPoint(int shardId) throws IOException { - ImmutableBlobContainer container = blobStore.immutableBlobContainer(shardPath(shardId)); - ImmutableMap blobs = container.listBlobs(); - List commitPointsList = Lists.newArrayList(); - for (String name : blobs.keySet()) { - if (name.startsWith("commit-")) { - try { - commitPointsList.add(CommitPoints.fromXContent(container.readBlobFully(name))); - } catch (Exception e) { - logger.warn("failed to read commit point [{}]", e, name); - } - } - } - CommitPoints commitPoints = new CommitPoints(commitPointsList); - if (commitPoints.commits().isEmpty()) { - return null; - } - return commitPoints.commits().get(0); - } - @Override public String toString() { return type() + "://" + blobStore + "/" + indexPath; } @@ -105,6 +76,10 @@ public abstract class BlobStoreIndexGateway extends AbstractIndexComponent imple return indexPath.add(Integer.toString(shardId)); } + public static BlobPath shardPath(BlobPath basePath, String index, int shardId) { + return basePath.add("indices").add(index).add(Integer.toString(shardId)); + } + @Override public void close(boolean delete) throws ElasticSearchException { if (delete) { blobStore.delete(indexPath);