From 897f67ade84a0f01c035290dfaf6097e3ae4cfb3 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 18 Feb 2012 01:44:22 +0200 Subject: [PATCH] Local Gateway: Delete dangling indices after a (configurable) timeout, and not immediately, closes #1718. --- .../elasticsearch/env/NodeEnvironment.java | 6 +- .../state/meta/LocalGatewayMetaState.java | 2 +- .../indices/store/IndicesStore.java | 91 ++++++++++++++++--- 3 files changed, 85 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 2ca6fdbe26b..3c667684627 100644 --- a/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -178,7 +178,7 @@ public class NodeEnvironment extends AbstractComponent { return shardLocations; } - public Set finalAllIndices() throws Exception { + public Set findAllIndices() throws Exception { if (nodeFiles == null || locks == null) { throw new ElasticSearchIllegalStateException("node is not configured to store local location"); } @@ -189,7 +189,9 @@ public class NodeEnvironment extends AbstractComponent { continue; } for (File indexLocation : indicesList) { - indices.add(indexLocation.getName()); + if (indexLocation.isDirectory()) { + indices.add(indexLocation.getName()); + } } } return indices; diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 2c58b8fc14d..076f74f862f 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -268,7 +268,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS metaDataBuilder.metaData(globalMetaData); } - Set indices = nodeEnv.finalAllIndices(); + Set indices = nodeEnv.findAllIndices(); for (String index : indices) { IndexMetaData indexMetaData = loadIndex(index); if (indexMetaData == null) { diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 37988505937..8c76e0631ca 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -27,12 +27,18 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; import java.io.File; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; /** * @@ -45,12 +51,34 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final ClusterService clusterService; + private final ThreadPool threadPool; + + private final TimeValue danglingTimeout; + + private final Map danglingIndices = ConcurrentCollections.newConcurrentMap(); + + private final Object danglingMutex = new Object(); + + static class DanglingIndex { + public final String index; + public final ScheduledFuture future; + + DanglingIndex(String index, ScheduledFuture future) { + this.index = index; + this.future = future; + } + } + @Inject - public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService) { + public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { super(settings); this.nodeEnv = nodeEnv; this.indicesService = indicesService; this.clusterService = clusterService; + this.threadPool = threadPool; + + this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueMinutes(2)); + clusterService.addLast(this); } @@ -139,20 +167,61 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } } - // delete indices that are no longer part of the metadata - for (File indicesLocation : nodeEnv.indicesLocations()) { - File[] files = indicesLocation.listFiles(); - if (files != null) { - for (File file : files) { - // if we have the index on the metadata, don't delete it - if (event.state().metaData().hasIndex(file.getName())) { - continue; + if (danglingTimeout.millis() >= 0) { + synchronized (danglingMutex) { + for (String danglingIndex : danglingIndices.keySet()) { + if (event.state().metaData().hasIndex(danglingIndex)) { + logger.debug("[{}] no longer dangling (created), removing", danglingIndex); + DanglingIndex removed = danglingIndices.remove(danglingIndex); + removed.future.cancel(false); } - logger.debug("[{}] deleting index that is no longer in the cluster meta_date from [{}]", file.getName(), file); - FileSystemUtils.deleteRecursively(file); + } + // delete indices that are no longer part of the metadata + try { + for (String indexName : nodeEnv.findAllIndices()) { + // if we have the index on the metadata, don't delete it + if (event.state().metaData().hasIndex(indexName)) { + continue; + } + if (danglingIndices.containsKey(indexName)) { + // already dangling, continue + continue; + } + if (danglingTimeout.millis() == 0) { + logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName))); + } else { + logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}]", indexName, danglingTimeout); + danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); + } + } + } catch (Exception e) { + logger.warn("failed to find dangling indices", e); } } } } } + + class RemoveDanglingIndex implements Runnable { + + private final String index; + + RemoveDanglingIndex(String index) { + this.index = index; + } + + @Override + public void run() { + synchronized (danglingMutex) { + DanglingIndex remove = danglingIndices.remove(index); + // no longer there... + if (remove == null) { + return; + } + logger.info("[{}] deleting dangling index", index); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(index))); + } + } + } }