Local Gateway: Delete dangling indices after a (configurable) timeout, and not immediately, closes #1718.

This commit is contained in:
Shay Banon 2012-02-18 01:44:22 +02:00
parent cc20852642
commit 897f67ade8
3 changed files with 85 additions and 14 deletions

View File

@ -178,7 +178,7 @@ public class NodeEnvironment extends AbstractComponent {
return shardLocations;
}
public Set<String> finalAllIndices() throws Exception {
public Set<String> findAllIndices() throws Exception {
if (nodeFiles == null || locks == null) {
throw new ElasticSearchIllegalStateException("node is not configured to store local location");
}
@ -189,9 +189,11 @@ public class NodeEnvironment extends AbstractComponent {
continue;
}
for (File indexLocation : indicesList) {
if (indexLocation.isDirectory()) {
indices.add(indexLocation.getName());
}
}
}
return indices;
}

View File

@ -268,7 +268,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
metaDataBuilder.metaData(globalMetaData);
}
Set<String> indices = nodeEnv.finalAllIndices();
Set<String> indices = nodeEnv.findAllIndices();
for (String index : indices) {
IndexMetaData indexMetaData = loadIndex(index);
if (indexMetaData == null) {

View File

@ -27,12 +27,18 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
*
@ -45,12 +51,34 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final TimeValue danglingTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
static class DanglingIndex {
public final String index;
public final ScheduledFuture future;
DanglingIndex(String index, ScheduledFuture future) {
this.index = index;
this.future = future;
}
}
@Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService) {
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.nodeEnv = nodeEnv;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueMinutes(2));
clusterService.addLast(this);
}
@ -139,20 +167,61 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
}
if (danglingTimeout.millis() >= 0) {
synchronized (danglingMutex) {
for (String danglingIndex : danglingIndices.keySet()) {
if (event.state().metaData().hasIndex(danglingIndex)) {
logger.debug("[{}] no longer dangling (created), removing", danglingIndex);
DanglingIndex removed = danglingIndices.remove(danglingIndex);
removed.future.cancel(false);
}
}
// delete indices that are no longer part of the metadata
for (File indicesLocation : nodeEnv.indicesLocations()) {
File[] files = indicesLocation.listFiles();
if (files != null) {
for (File file : files) {
try {
for (String indexName : nodeEnv.findAllIndices()) {
// if we have the index on the metadata, don't delete it
if (event.state().metaData().hasIndex(file.getName())) {
if (event.state().metaData().hasIndex(indexName)) {
continue;
}
logger.debug("[{}] deleting index that is no longer in the cluster meta_date from [{}]", file.getName(), file);
FileSystemUtils.deleteRecursively(file);
if (danglingIndices.containsKey(indexName)) {
// already dangling, continue
continue;
}
if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName)));
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout);
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
}
}
} catch (Exception e) {
logger.warn("failed to find dangling indices", e);
}
}
}
}
}
class RemoveDanglingIndex implements Runnable {
private final String index;
RemoveDanglingIndex(String index) {
this.index = index;
}
@Override
public void run() {
synchronized (danglingMutex) {
DanglingIndex remove = danglingIndices.remove(index);
// no longer there...
if (remove == null) {
return;
}
logger.info("[{}] deleting dangling index", index);
FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index)));
}
}
}
}