more work on reusing work directory, clean unallocated shards when they are not needed
This commit is contained in:
parent
b43f0f5965
commit
a7c13826da
|
@ -80,6 +80,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return shards();
|
||||
}
|
||||
|
||||
public int countWithState(ShardRoutingState state) {
|
||||
int count = 0;
|
||||
for (ShardRouting shard : this) {
|
||||
if (state == shard.state()) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
public ShardsIterator shardsIt() {
|
||||
return new IndexShardsIterator(0);
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.elasticsearch.index.query.IndexQueryParserService;
|
|||
import org.elasticsearch.index.routing.OperationRouting;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -53,6 +54,8 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
|
|||
|
||||
IndexEngine indexEngine();
|
||||
|
||||
IndexStore store();
|
||||
|
||||
IndexShard createShard(int sShardId) throws ElasticSearchException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -160,6 +160,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
|
|||
return injector;
|
||||
}
|
||||
|
||||
@Override public IndexStore store() {
|
||||
return indexStore;
|
||||
}
|
||||
|
||||
@Override public IndexCache cache() {
|
||||
return indexCache;
|
||||
}
|
||||
|
|
|
@ -58,6 +58,13 @@ public interface IndexStore extends IndexComponent {
|
|||
*/
|
||||
ByteSizeValue backingStoreFreeSpace();
|
||||
|
||||
/**
|
||||
* Lists all unallocated stores.
|
||||
*/
|
||||
StoreFilesMetaData[] listUnallocatedStores() throws IOException;
|
||||
|
||||
void deleteUnallocated(ShardId shardId) throws IOException;
|
||||
|
||||
/**
|
||||
* Lists the store files metadata for a shard. Note, this should be able to list also
|
||||
* metadata for shards that are no allocated as well.
|
||||
|
@ -66,13 +73,15 @@ public interface IndexStore extends IndexComponent {
|
|||
|
||||
static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
|
||||
private boolean allocated;
|
||||
private ShardId shardId;
|
||||
private Map<String, StoreFileMetaData> files;
|
||||
|
||||
StoreFilesMetaData() {
|
||||
}
|
||||
|
||||
public StoreFilesMetaData(boolean allocated, Map<String, StoreFileMetaData> files) {
|
||||
public StoreFilesMetaData(boolean allocated, ShardId shardId, Map<String, StoreFileMetaData> files) {
|
||||
this.allocated = allocated;
|
||||
this.shardId = shardId;
|
||||
this.files = files;
|
||||
}
|
||||
|
||||
|
@ -80,6 +89,18 @@ public interface IndexStore extends IndexComponent {
|
|||
return allocated;
|
||||
}
|
||||
|
||||
public ShardId shardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
||||
public long totalSizeInBytes() {
|
||||
long totalSizeInBytes = 0;
|
||||
for (StoreFileMetaData file : this) {
|
||||
totalSizeInBytes += file.sizeInBytes();
|
||||
}
|
||||
return totalSizeInBytes;
|
||||
}
|
||||
|
||||
@Override public Iterator<StoreFileMetaData> iterator() {
|
||||
return files.values().iterator();
|
||||
}
|
||||
|
@ -96,6 +117,7 @@ public interface IndexStore extends IndexComponent {
|
|||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
allocated = in.readBoolean();
|
||||
shardId = ShardId.readShardId(in);
|
||||
int size = in.readVInt();
|
||||
files = Maps.newHashMapWithExpectedSize(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -106,6 +128,7 @@ public interface IndexStore extends IndexComponent {
|
|||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeBoolean(allocated);
|
||||
shardId.writeTo(out);
|
||||
out.writeVInt(files.size());
|
||||
for (StoreFileMetaData md : files.values()) {
|
||||
out.writeUTF(md.name());
|
||||
|
|
|
@ -19,8 +19,11 @@
|
|||
|
||||
package org.elasticsearch.index.store.fs;
|
||||
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
|
@ -32,6 +35,7 @@ import org.elasticsearch.index.store.support.AbstractIndexStore;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -74,23 +78,55 @@ public abstract class FsIndexStore extends AbstractIndexStore {
|
|||
return new ByteSizeValue(usableSpace);
|
||||
}
|
||||
|
||||
@Override public void deleteUnallocated(ShardId shardId) throws IOException {
|
||||
if (indexService.hasShard(shardId.id())) {
|
||||
throw new ElasticSearchIllegalStateException(shardId + " allocated, can't be deleted");
|
||||
}
|
||||
FileSystemUtils.deleteRecursively(shardLocation(shardId));
|
||||
}
|
||||
|
||||
@Override public StoreFilesMetaData[] listUnallocatedStores() throws IOException {
|
||||
File[] shardLocations = location.listFiles();
|
||||
if (shardLocations == null || shardLocations.length == 0) {
|
||||
return new StoreFilesMetaData[0];
|
||||
}
|
||||
List<StoreFilesMetaData> shards = Lists.newArrayList();
|
||||
for (File shardLocation : shardLocations) {
|
||||
int shardId = Integer.parseInt(shardLocation.getName());
|
||||
if (!indexService.hasShard(shardId)) {
|
||||
shards.add(listUnallocatedStoreMetaData(new ShardId(index, shardId)));
|
||||
}
|
||||
}
|
||||
return shards.toArray(new StoreFilesMetaData[shards.size()]);
|
||||
}
|
||||
|
||||
@Override protected StoreFilesMetaData listUnallocatedStoreMetaData(ShardId shardId) throws IOException {
|
||||
File shardIndexLocation = shardIndexLocation(shardId);
|
||||
if (!shardIndexLocation.exists()) {
|
||||
return new StoreFilesMetaData(false, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
}
|
||||
Map<String, StoreFileMetaData> files = Maps.newHashMap();
|
||||
for (File file : shardIndexLocation.listFiles()) {
|
||||
files.put(file.getName(), new StoreFileMetaData(file.getName(), file.length()));
|
||||
}
|
||||
return new StoreFilesMetaData(false, files);
|
||||
return new StoreFilesMetaData(false, shardId, files);
|
||||
}
|
||||
|
||||
public File location() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public File shardLocation(ShardId shardId) {
|
||||
return new File(location, Integer.toString(shardId.id()));
|
||||
}
|
||||
|
||||
public File shardIndexLocation(ShardId shardId) {
|
||||
return new File(new File(location, Integer.toString(shardId.id())), "index");
|
||||
return new File(shardLocation(shardId), "index");
|
||||
}
|
||||
|
||||
// not used currently, but here to state that this store also defined a file based translog location
|
||||
|
||||
public File shardTranslogLocation(ShardId shardId) {
|
||||
return new File(shardLocation(shardId), "translog");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,13 +38,21 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore {
|
||||
|
||||
private final IndexService indexService;
|
||||
protected final IndexService indexService;
|
||||
|
||||
protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) {
|
||||
super(index, indexSettings);
|
||||
this.indexService = indexService;
|
||||
}
|
||||
|
||||
@Override public void deleteUnallocated(ShardId shardId) throws IOException {
|
||||
// do nothing here...
|
||||
}
|
||||
|
||||
@Override public StoreFilesMetaData[] listUnallocatedStores() throws IOException {
|
||||
return new StoreFilesMetaData[0];
|
||||
}
|
||||
|
||||
@Override public StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException {
|
||||
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
|
||||
if (indexShard == null) {
|
||||
|
@ -54,11 +62,11 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
|||
for (String file : indexShard.store().directory().listAll()) {
|
||||
files.put(file, new StoreFileMetaData(file, indexShard.store().directory().fileLength(file)));
|
||||
}
|
||||
return new StoreFilesMetaData(true, files);
|
||||
return new StoreFilesMetaData(true, shardId, files);
|
||||
}
|
||||
}
|
||||
|
||||
protected StoreFilesMetaData listUnallocatedStoreMetaData(ShardId shardId) throws IOException {
|
||||
return new StoreFilesMetaData(false, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.index.similarity.SimilarityModule;
|
|||
import org.elasticsearch.index.store.IndexStoreModule;
|
||||
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.plugins.IndicesPluginsModule;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
|
||||
|
@ -77,6 +78,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
|
||||
private final IndicesAnalysisService indicesAnalysisService;
|
||||
|
||||
private final IndicesStore indicesStore;
|
||||
|
||||
private final Injector injector;
|
||||
|
||||
private final PluginsService pluginsService;
|
||||
|
@ -86,11 +89,12 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
private volatile ImmutableMap<String, IndexService> indices = ImmutableMap.of();
|
||||
|
||||
@Inject public InternalIndicesService(Settings settings, IndicesClusterStateService clusterStateService,
|
||||
IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector) {
|
||||
IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, IndicesStore indicesStore, Injector injector) {
|
||||
super(settings);
|
||||
this.clusterStateService = clusterStateService;
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||
this.indicesAnalysisService = indicesAnalysisService;
|
||||
this.indicesStore = indicesStore;
|
||||
this.injector = injector;
|
||||
|
||||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
|
@ -109,6 +113,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
clusterStateService.close();
|
||||
indicesStore.close();
|
||||
indicesAnalysisService.close();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.store;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class IndicesStore extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
@Inject public IndicesStore(Settings settings, IndicesService indicesService, ClusterService clusterService) {
|
||||
super(settings);
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
clusterService.remove(this);
|
||||
}
|
||||
|
||||
@Override public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (!event.routingTableChanged()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// when all shards are started within a shard replication group, delete an unallocated shard on this node
|
||||
RoutingTable routingTable = event.state().routingTable();
|
||||
for (IndexRoutingTable indexRoutingTable : routingTable) {
|
||||
IndexService indexService = indicesService.indexService(indexRoutingTable.index());
|
||||
if (indexService == null) {
|
||||
// not allocated on this node yet...
|
||||
continue;
|
||||
}
|
||||
// if the store is not persistent, don't bother trying to check if it can be deleted
|
||||
if (!indexService.store().persistent()) {
|
||||
continue;
|
||||
}
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
// if it has been created on this node, we don't want to delete it
|
||||
if (indexService.hasShard(indexShardRoutingTable.shardId().id())) {
|
||||
continue;
|
||||
}
|
||||
// only delete an unallocated shard if all (other shards) are started
|
||||
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}][{}] deleting unallocated shard", indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
|
||||
}
|
||||
try {
|
||||
indexService.store().deleteUnallocated(indexShardRoutingTable.shardId());
|
||||
} catch (Exception e) {
|
||||
logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue