move list of local files to be indices level, so no need for index created on a node in order to list it
This commit is contained in:
parent
5635260922
commit
53a3df5d8e
|
@ -37,7 +37,6 @@ import org.elasticsearch.index.gateway.CommitPoint;
|
||||||
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
|
import org.elasticsearch.index.gateway.blobstore.BlobStoreIndexGateway;
|
||||||
import org.elasticsearch.index.service.InternalIndexService;
|
import org.elasticsearch.index.service.InternalIndexService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
|
||||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||||
|
@ -61,7 +60,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
|
|
||||||
private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap();
|
private final ConcurrentMap<ShardId, CommitPoint> cachedCommitPoints = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
|
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
|
@Inject public BlobReuseExistingNodeAllocation(Settings settings, IndicesService indicesService,
|
||||||
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
|
TransportNodesListShardStoreMetaData transportNodesListShardStoreMetaData) {
|
||||||
|
@ -130,15 +129,15 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
|
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
|
||||||
|
|
||||||
long lastSizeMatched = 0;
|
long lastSizeMatched = 0;
|
||||||
DiscoveryNode lastDiscoNodeMatched = null;
|
DiscoveryNode lastDiscoNodeMatched = null;
|
||||||
RoutingNode lastNodeMatched = null;
|
RoutingNode lastNodeMatched = null;
|
||||||
|
|
||||||
for (Map.Entry<DiscoveryNode, IndexStore.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
|
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
|
||||||
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
||||||
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
|
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
|
||||||
logger.trace("{}: checking node [{}]", shard, discoNode);
|
logger.trace("{}: checking node [{}]", shard, discoNode);
|
||||||
|
|
||||||
if (storeFilesMetaData == null) {
|
if (storeFilesMetaData == null) {
|
||||||
|
@ -229,7 +228,7 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
if (primaryShard != null && primaryShard.active()) {
|
if (primaryShard != null && primaryShard.active()) {
|
||||||
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
|
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
|
||||||
if (primaryNode != null) {
|
if (primaryNode != null) {
|
||||||
IndexStore.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
|
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
|
||||||
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
|
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
|
||||||
long sizeMatched = 0;
|
long sizeMatched = 0;
|
||||||
|
|
||||||
|
@ -272,8 +271,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
|
private ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
|
||||||
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
|
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
|
||||||
if (shardStores == null) {
|
if (shardStores == null) {
|
||||||
shardStores = ConcurrentCollections.newConcurrentMap();
|
shardStores = ConcurrentCollections.newConcurrentMap();
|
||||||
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
|
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = transportNodesListShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
|
||||||
|
@ -292,7 +291,9 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cachedStores.put(shard.shardId(), shardStores);
|
cachedStores.put(shard.shardId(), shardStores);
|
||||||
} else {
|
} else {
|
||||||
|
@ -328,7 +329,9 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.index.service.InternalIndexService;
|
import org.elasticsearch.index.service.InternalIndexService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
|
||||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||||
|
@ -63,7 +62,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
|
|
||||||
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
private final TransportNodesListShardStoreMetaData listShardStoreMetaData;
|
||||||
|
|
||||||
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
|
private final ConcurrentMap<ShardId, ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData>> cachedStores = ConcurrentCollections.newConcurrentMap();
|
||||||
|
|
||||||
private final TimeValue listTimeout;
|
private final TimeValue listTimeout;
|
||||||
|
|
||||||
|
@ -278,15 +277,15 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
|
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = buildShardStores(nodes, shard);
|
||||||
|
|
||||||
long lastSizeMatched = 0;
|
long lastSizeMatched = 0;
|
||||||
DiscoveryNode lastDiscoNodeMatched = null;
|
DiscoveryNode lastDiscoNodeMatched = null;
|
||||||
RoutingNode lastNodeMatched = null;
|
RoutingNode lastNodeMatched = null;
|
||||||
|
|
||||||
for (Map.Entry<DiscoveryNode, IndexStore.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
|
for (Map.Entry<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> nodeStoreEntry : shardStores.entrySet()) {
|
||||||
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
DiscoveryNode discoNode = nodeStoreEntry.getKey();
|
||||||
IndexStore.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
|
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue();
|
||||||
logger.trace("{}: checking node [{}]", shard, discoNode);
|
logger.trace("{}: checking node [{}]", shard, discoNode);
|
||||||
|
|
||||||
if (storeFilesMetaData == null) {
|
if (storeFilesMetaData == null) {
|
||||||
|
@ -316,7 +315,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
if (primaryShard != null && primaryShard.active()) {
|
if (primaryShard != null && primaryShard.active()) {
|
||||||
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
|
DiscoveryNode primaryNode = nodes.get(primaryShard.currentNodeId());
|
||||||
if (primaryNode != null) {
|
if (primaryNode != null) {
|
||||||
IndexStore.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
|
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryNodeStore = shardStores.get(primaryNode);
|
||||||
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
|
if (primaryNodeStore != null && primaryNodeStore.allocated()) {
|
||||||
long sizeMatched = 0;
|
long sizeMatched = 0;
|
||||||
|
|
||||||
|
@ -369,8 +368,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
|
private ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
|
||||||
ConcurrentMap<DiscoveryNode, IndexStore.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
|
ConcurrentMap<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
|
||||||
if (shardStores == null) {
|
if (shardStores == null) {
|
||||||
shardStores = ConcurrentCollections.newConcurrentMap();
|
shardStores = ConcurrentCollections.newConcurrentMap();
|
||||||
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
|
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodes.dataNodes().keySet(), listTimeout).actionGet();
|
||||||
|
@ -389,7 +388,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cachedStores.put(shard.shardId(), shardStores);
|
cachedStores.put(shard.shardId(), shardStores);
|
||||||
} else {
|
} else {
|
||||||
|
@ -425,7 +426,9 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
|
||||||
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
|
||||||
|
shardStores.put(nodeStoreFilesMetaData.node(), nodeStoreFilesMetaData.storeFilesMetaData());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,11 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.store;
|
package org.elasticsearch.index.store;
|
||||||
|
|
||||||
import org.elasticsearch.common.collect.Maps;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.index.IndexComponent;
|
import org.elasticsearch.index.IndexComponent;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Index store is an index level information of the {@link Store} each shard will use.
|
* Index store is an index level information of the {@link Store} each shard will use.
|
||||||
|
@ -59,79 +53,4 @@ public interface IndexStore extends IndexComponent {
|
||||||
ByteSizeValue backingStoreFreeSpace();
|
ByteSizeValue backingStoreFreeSpace();
|
||||||
|
|
||||||
void deleteUnallocated(ShardId shardId) throws IOException;
|
void deleteUnallocated(ShardId shardId) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Lists the store files metadata for a shard. Note, this should be able to list also
|
|
||||||
* metadata for shards that are no allocated as well.
|
|
||||||
*/
|
|
||||||
StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException;
|
|
||||||
|
|
||||||
static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
|
|
||||||
private boolean allocated;
|
|
||||||
private ShardId shardId;
|
|
||||||
private Map<String, StoreFileMetaData> files;
|
|
||||||
|
|
||||||
StoreFilesMetaData() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public StoreFilesMetaData(boolean allocated, ShardId shardId, Map<String, StoreFileMetaData> files) {
|
|
||||||
this.allocated = allocated;
|
|
||||||
this.shardId = shardId;
|
|
||||||
this.files = files;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean allocated() {
|
|
||||||
return allocated;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ShardId shardId() {
|
|
||||||
return this.shardId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long totalSizeInBytes() {
|
|
||||||
long totalSizeInBytes = 0;
|
|
||||||
for (StoreFileMetaData file : this) {
|
|
||||||
totalSizeInBytes += file.length();
|
|
||||||
}
|
|
||||||
return totalSizeInBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public Iterator<StoreFileMetaData> iterator() {
|
|
||||||
return files.values().iterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean fileExists(String name) {
|
|
||||||
return files.containsKey(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public StoreFileMetaData file(String name) {
|
|
||||||
return files.get(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException {
|
|
||||||
StoreFilesMetaData md = new StoreFilesMetaData();
|
|
||||||
md.readFrom(in);
|
|
||||||
return md;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void readFrom(StreamInput in) throws IOException {
|
|
||||||
allocated = in.readBoolean();
|
|
||||||
shardId = ShardId.readShardId(in);
|
|
||||||
int size = in.readVInt();
|
|
||||||
files = Maps.newHashMapWithExpectedSize(size);
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
|
|
||||||
files.put(md.name(), md);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
|
||||||
out.writeBoolean(allocated);
|
|
||||||
shardId.writeTo(out);
|
|
||||||
out.writeVInt(files.size());
|
|
||||||
for (StoreFileMetaData md : files.values()) {
|
|
||||||
md.writeTo(out);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,6 @@
|
||||||
package org.elasticsearch.index.store.fs;
|
package org.elasticsearch.index.store.fs;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
|
||||||
import org.elasticsearch.common.collect.Maps;
|
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -30,12 +28,10 @@ import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.service.IndexService;
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
|
||||||
import org.elasticsearch.index.store.support.AbstractIndexStore;
|
import org.elasticsearch.index.store.support.AbstractIndexStore;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author kimchy (shay.banon)
|
* @author kimchy (shay.banon)
|
||||||
|
@ -89,21 +85,6 @@ public abstract class FsIndexStore extends AbstractIndexStore {
|
||||||
FileSystemUtils.deleteRecursively(shardLocation(shardId));
|
FileSystemUtils.deleteRecursively(shardLocation(shardId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected StoreFilesMetaData listUnallocatedStoreMetaData(ShardId shardId) throws IOException {
|
|
||||||
if (location == null) {
|
|
||||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
|
||||||
}
|
|
||||||
File shardIndexLocation = shardIndexLocation(shardId);
|
|
||||||
if (!shardIndexLocation.exists()) {
|
|
||||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
|
||||||
}
|
|
||||||
Map<String, StoreFileMetaData> files = Maps.newHashMap();
|
|
||||||
for (File file : shardIndexLocation.listFiles()) {
|
|
||||||
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified()));
|
|
||||||
}
|
|
||||||
return new StoreFilesMetaData(false, shardId, files);
|
|
||||||
}
|
|
||||||
|
|
||||||
public File shardLocation(ShardId shardId) {
|
public File shardLocation(ShardId shardId) {
|
||||||
return new File(location, Integer.toString(shardId.id()));
|
return new File(location, Integer.toString(shardId.id()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,16 +19,13 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.store.support;
|
package org.elasticsearch.index.store.support;
|
||||||
|
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.AbstractIndexComponent;
|
import org.elasticsearch.index.AbstractIndexComponent;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.service.IndexService;
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.store.IndexStore;
|
||||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -47,17 +44,4 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
@Override public void deleteUnallocated(ShardId shardId) throws IOException {
|
@Override public void deleteUnallocated(ShardId shardId) throws IOException {
|
||||||
// do nothing here...
|
// do nothing here...
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException {
|
|
||||||
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
|
|
||||||
if (indexShard == null) {
|
|
||||||
return listUnallocatedStoreMetaData(shardId);
|
|
||||||
} else {
|
|
||||||
return new StoreFilesMetaData(true, shardId, indexShard.store().list());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected StoreFilesMetaData listUnallocatedStoreMetaData(ShardId shardId) throws IOException {
|
|
||||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,23 +25,32 @@ import org.elasticsearch.action.FailedNodeException;
|
||||||
import org.elasticsearch.action.support.nodes.*;
|
import org.elasticsearch.action.support.nodes.*;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.Lists;
|
import org.elasticsearch.common.collect.Lists;
|
||||||
|
import org.elasticsearch.common.collect.Maps;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.service.InternalIndexService;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.store.IndexStore;
|
import org.elasticsearch.index.shard.service.InternalIndexShard;
|
||||||
|
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||||
|
|
||||||
|
@ -52,10 +61,13 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
||||||
|
|
||||||
private final IndicesService indicesService;
|
private final IndicesService indicesService;
|
||||||
|
|
||||||
|
private final NodeEnvironment nodeEnv;
|
||||||
|
|
||||||
@Inject public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
@Inject public TransportNodesListShardStoreMetaData(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
|
||||||
IndicesService indicesService) {
|
IndicesService indicesService, NodeEnvironment nodeEnv) {
|
||||||
super(settings, clusterName, threadPool, clusterService, transportService);
|
super(settings, clusterName, threadPool, clusterService, transportService);
|
||||||
this.indicesService = indicesService;
|
this.indicesService = indicesService;
|
||||||
|
this.nodeEnv = nodeEnv;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, @Nullable TimeValue timeout) {
|
public ActionFuture<NodesStoreFilesMetaData> list(ShardId shardId, boolean onlyUnallocated, Set<String> nodesIds, @Nullable TimeValue timeout) {
|
||||||
|
@ -102,21 +114,128 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws ElasticSearchException {
|
@Override protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws ElasticSearchException {
|
||||||
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.shardId.index().name());
|
if (request.unallocated) {
|
||||||
if (request.unallocated && indexService.hasShard(request.shardId.id())) {
|
IndexService indexService = indicesService.indexService(request.shardId.index().name());
|
||||||
|
if (indexService == null) {
|
||||||
|
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
|
||||||
|
}
|
||||||
|
if (!indexService.hasShard(request.shardId.id())) {
|
||||||
|
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IndexMetaData metaData = clusterService.state().metaData().index(request.shardId.index().name());
|
||||||
|
if (metaData == null) {
|
||||||
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
|
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), indexService.store().listStoreMetaData(request.shardId));
|
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), listStoreMetaData(request.shardId));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ElasticSearchException("Failed to list store metadata for shard [" + request.shardId + "]", e);
|
throw new ElasticSearchException("Failed to list store metadata for shard [" + request.shardId + "]", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException {
|
||||||
|
IndexService indexService = indicesService.indexService(shardId.index().name());
|
||||||
|
if (indexService != null) {
|
||||||
|
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
|
||||||
|
if (indexShard != null) {
|
||||||
|
return new StoreFilesMetaData(true, shardId, indexShard.store().list());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// try and see if we an list unallocated
|
||||||
|
IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name());
|
||||||
|
if (metaData == null) {
|
||||||
|
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||||
|
}
|
||||||
|
String storeType = metaData.settings().get("index.store.type", "fs");
|
||||||
|
if (!storeType.contains("fs")) {
|
||||||
|
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||||
|
}
|
||||||
|
File indexFile = new File(new File(new File(new File(nodeEnv.nodeLocation(), "indices"), shardId.index().name()), Integer.toString(shardId.id())), "index");
|
||||||
|
if (!indexFile.exists()) {
|
||||||
|
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||||
|
}
|
||||||
|
Map<String, StoreFileMetaData> files = Maps.newHashMap();
|
||||||
|
for (File file : indexFile.listFiles()) {
|
||||||
|
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length(), file.lastModified()));
|
||||||
|
}
|
||||||
|
return new StoreFilesMetaData(false, shardId, files);
|
||||||
|
}
|
||||||
|
|
||||||
@Override protected boolean accumulateExceptions() {
|
@Override protected boolean accumulateExceptions() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
|
||||||
|
private boolean allocated;
|
||||||
|
private ShardId shardId;
|
||||||
|
private Map<String, StoreFileMetaData> files;
|
||||||
|
|
||||||
|
StoreFilesMetaData() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public StoreFilesMetaData(boolean allocated, ShardId shardId, Map<String, StoreFileMetaData> files) {
|
||||||
|
this.allocated = allocated;
|
||||||
|
this.shardId = shardId;
|
||||||
|
this.files = files;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean allocated() {
|
||||||
|
return allocated;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShardId shardId() {
|
||||||
|
return this.shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long totalSizeInBytes() {
|
||||||
|
long totalSizeInBytes = 0;
|
||||||
|
for (StoreFileMetaData file : this) {
|
||||||
|
totalSizeInBytes += file.length();
|
||||||
|
}
|
||||||
|
return totalSizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public Iterator<StoreFileMetaData> iterator() {
|
||||||
|
return files.values().iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean fileExists(String name) {
|
||||||
|
return files.containsKey(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public StoreFileMetaData file(String name) {
|
||||||
|
return files.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static StoreFilesMetaData readStoreFilesMetaData(StreamInput in) throws IOException {
|
||||||
|
StoreFilesMetaData md = new StoreFilesMetaData();
|
||||||
|
md.readFrom(in);
|
||||||
|
return md;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void readFrom(StreamInput in) throws IOException {
|
||||||
|
allocated = in.readBoolean();
|
||||||
|
shardId = ShardId.readShardId(in);
|
||||||
|
int size = in.readVInt();
|
||||||
|
files = Maps.newHashMapWithExpectedSize(size);
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
StoreFileMetaData md = StoreFileMetaData.readStoreFileMetaData(in);
|
||||||
|
files.put(md.name(), md);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeBoolean(allocated);
|
||||||
|
shardId.writeTo(out);
|
||||||
|
out.writeVInt(files.size());
|
||||||
|
for (StoreFileMetaData md : files.values()) {
|
||||||
|
md.writeTo(out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static class Request extends NodesOperationRequest {
|
static class Request extends NodesOperationRequest {
|
||||||
|
|
||||||
private ShardId shardId;
|
private ShardId shardId;
|
||||||
|
@ -220,17 +339,17 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
||||||
|
|
||||||
public static class NodeStoreFilesMetaData extends NodeOperationResponse {
|
public static class NodeStoreFilesMetaData extends NodeOperationResponse {
|
||||||
|
|
||||||
private IndexStore.StoreFilesMetaData storeFilesMetaData;
|
private StoreFilesMetaData storeFilesMetaData;
|
||||||
|
|
||||||
NodeStoreFilesMetaData() {
|
NodeStoreFilesMetaData() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public NodeStoreFilesMetaData(DiscoveryNode node, IndexStore.StoreFilesMetaData storeFilesMetaData) {
|
public NodeStoreFilesMetaData(DiscoveryNode node, StoreFilesMetaData storeFilesMetaData) {
|
||||||
super(node);
|
super(node);
|
||||||
this.storeFilesMetaData = storeFilesMetaData;
|
this.storeFilesMetaData = storeFilesMetaData;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexStore.StoreFilesMetaData storeFilesMetaData() {
|
public StoreFilesMetaData storeFilesMetaData() {
|
||||||
return storeFilesMetaData;
|
return storeFilesMetaData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,7 +362,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio
|
||||||
@Override public void readFrom(StreamInput in) throws IOException {
|
@Override public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
storeFilesMetaData = IndexStore.StoreFilesMetaData.readStoreFilesMetaData(in);
|
storeFilesMetaData = StoreFilesMetaData.readStoreFilesMetaData(in);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,4 +24,7 @@ package org.elasticsearch.test.integration.gateway.fs;
|
||||||
*/
|
*/
|
||||||
public class SimpleFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
public class SimpleFsIndexGatewayTests extends AbstractSimpleIndexGatewayTests {
|
||||||
|
|
||||||
|
@Override public void testSnapshotOperations() throws Exception {
|
||||||
|
super.testSnapshotOperations(); //To change body of overridden methods use File | Settings | File Templates.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue