From 16685335566048af304b6e9ada81f78d3e083e38 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 2 Jul 2012 17:17:16 +0200 Subject: [PATCH] improve dangling index support to not detect explicit deleted index as dangling, harden when we delete the _state of an index --- .../gateway/blobstore/BlobStoreGateway.java | 7 ++ .../state/meta/LocalGatewayMetaState.java | 86 ++++++++++++------- .../TransportNodesListGatewayMetaState.java | 6 +- .../gateway/shared/SharedStorageGateway.java | 54 +++++++++--- .../cluster/IndicesClusterStateService.java | 6 +- .../indices/store/IndicesStore.java | 4 +- 6 files changed, 116 insertions(+), 47 deletions(-) diff --git a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java index b3240d21fbc..d72a9d7a7a7 100644 --- a/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java +++ b/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java @@ -21,8 +21,10 @@ package org.elasticsearch.gateway.blobstore; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.*; @@ -140,6 +142,11 @@ public abstract class BlobStoreGateway extends SharedStorageGateway { return commitPoints.commits().get(0); } + @Override + protected void delete(IndexMetaData indexMetaData) throws ElasticSearchException { + BlobPath indexPath = basePath().add("indices").add(indexMetaData.index()); + blobStore.delete(indexPath); + } @Override public void write(MetaData metaData) throws GatewayException { diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java index 2824e62318a..203e4bba2eb 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/LocalGatewayMetaState.java @@ -98,6 +98,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS private final LocalAllocateDangledIndices allocateDangledIndices; + @Nullable private volatile MetaData currentMetaData; private final XContentType format; @@ -145,8 +146,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } } - public MetaData currentMetaData() { - return currentMetaData; + public MetaData loadMetaState() throws Exception { + return loadState(); } public boolean isDangling(String index) { @@ -156,27 +157,36 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS @Override public void clusterChanged(ClusterChangedEvent event) { if (event.state().blocks().disableStatePersistence()) { + // reset the current metadata, we need to start fresh... + this.currentMetaData = null; return; } + MetaData newMetaData = event.state().metaData(); // we don't check if metaData changed, since we might be called several times and we need to check dangling... boolean success = true; // only applied to master node, writing the global and index level states if (event.state().nodes().localNode().masterNode()) { // check if the global state changed? - if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) { + if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) { try { - writeGlobalState("changed", event.state().metaData(), currentMetaData); + writeGlobalState("changed", newMetaData, currentMetaData); } catch (Exception e) { success = false; } } // check and write changes in indices - for (IndexMetaData indexMetaData : event.state().metaData()) { + for (IndexMetaData indexMetaData : newMetaData) { String writeReason = null; - IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index()); + IndexMetaData currentIndexMetaData; + if (currentMetaData == null) { + // a new event..., check from the state stored + currentIndexMetaData = loadIndex(indexMetaData.index()); + } else { + currentIndexMetaData = currentMetaData.index(indexMetaData.index()); + } if (currentIndexMetaData == null) { writeReason = "freshly created"; } else if (currentIndexMetaData.version() != indexMetaData.version()) { @@ -196,12 +206,31 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } } + // delete indices that were there before, but are deleted now + // we need to do it so they won't be detected as dangling + if (nodeEnv.hasNodeFile()) { + if (currentMetaData != null) { + // only delete indices when we already received a state (currentMetaData != null) + // and we had a go at processing dangling indices at least once + // this will also delete the _state of the index itself + for (IndexMetaData current : currentMetaData) { + if (danglingIndices.containsKey(current.index())) { + continue; + } + if (!newMetaData.hasIndex(current.index())) { + logger.debug("[{}] deleting index that is no longer part of the metadata"); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(current.index()))); + } + } + } + } + // handle dangling indices, we handle those for all nodes that have a node file (data or master) if (nodeEnv.hasNodeFile()) { if (danglingTimeout.millis() >= 0) { synchronized (danglingMutex) { for (String danglingIndex : danglingIndices.keySet()) { - if (event.state().metaData().hasIndex(danglingIndex)) { + if (newMetaData.hasIndex(danglingIndex)) { logger.debug("[{}] no longer dangling (created), removing", danglingIndex); DanglingIndex removed = danglingIndices.remove(danglingIndex); removed.future.cancel(false); @@ -211,19 +240,22 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS try { for (String indexName : nodeEnv.findAllIndices()) { // if we have the index on the metadata, don't delete it - if (event.state().metaData().hasIndex(indexName)) { + if (newMetaData.hasIndex(indexName)) { continue; } if (danglingIndices.containsKey(indexName)) { // already dangling, continue continue; } - if (danglingTimeout.millis() == 0) { - logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); - FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName))); - } else { - logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); - danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); + IndexMetaData indexMetaData = loadIndex(indexName); + if (indexMetaData != null) { + if (danglingTimeout.millis() == 0) { + logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); + FileSystemUtils.deleteRecursively(nodeEnv.indexLocations(new Index(indexName))); + } else { + logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); + danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName)))); + } } } } catch (Exception e) { @@ -235,6 +267,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS final List dangled = Lists.newArrayList(); for (String indexName : danglingIndices.keySet()) { IndexMetaData indexMetaData = loadIndex(indexName); + if (indexMetaData == null) { + logger.debug("failed to find state for dangling index [{}]", indexName); + continue; + } // we might have someone copying over an index, renaming the directory, handle that if (!indexMetaData.index().equals(indexName)) { logger.info("dangled index directory name is [{}], state name is [{}], renaming to directory name", indexName, indexMetaData.index()); @@ -266,21 +302,8 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } } - if (event.state().nodes().localNode().masterNode()) { - // delete indices that are no longer there..., allocated dangled ones - if (currentMetaData != null) { - for (IndexMetaData current : currentMetaData) { - if (event.state().metaData().index(current.index()) == null) { - if (!danglingIndices.containsKey(current.index())) { - deleteIndex(current.index()); - } - } - } - } - } - if (success) { - currentMetaData = event.state().metaData(); + currentMetaData = newMetaData; } } @@ -383,7 +406,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } // delete the old files - if (previousMetaData != null && previousMetaData.version() != currentMetaData.version()) { + if (previousMetaData != null && previousMetaData.version() != metaData.version()) { for (File dataLocation : nodeEnv.nodeDataLocations()) { File stateFile = new File(new File(dataLocation, "_state"), "global-" + previousMetaData.version()); stateFile.delete(); @@ -394,7 +417,7 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS } } - private void loadState() throws Exception { + private MetaData loadState() throws Exception { MetaData.Builder metaDataBuilder = MetaData.builder(); MetaData globalMetaData = loadGlobalState(); if (globalMetaData != null) { @@ -410,9 +433,10 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS metaDataBuilder.put(indexMetaData, false); } } - currentMetaData = metaDataBuilder.build(); + return metaDataBuilder.build(); } + @Nullable private IndexMetaData loadIndex(String index) { long highestVersion = -1; IndexMetaData indexMetaData = null; diff --git a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java index a39113a349b..f150fd8134c 100644 --- a/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/local/state/meta/TransportNodesListGatewayMetaState.java @@ -116,7 +116,11 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA @Override protected NodeLocalGatewayMetaState nodeOperation(NodeRequest request) throws ElasticSearchException { - return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.currentMetaData()); + try { + return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.loadMetaState()); + } catch (Exception e) { + throw new ElasticSearchException("failed to load metadata", e); + } } @Override diff --git a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java index 34457607625..07d1eb32a9d 100644 --- a/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java +++ b/src/main/java/org/elasticsearch/gateway/shared/SharedStorageGateway.java @@ -25,12 +25,17 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.ExecutorService; @@ -50,12 +55,21 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent tuple : seenMappings.keySet()) { diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 3a001fbfb04..18042977f56 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -163,8 +163,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } } - // do the reverse, and delete dangling indices / shards that might remain on that node - // this can happen when deleting a closed index, or when a node joins and it has deleted indices / shards + // do the reverse, and delete dangling shards that might remain on that node + // but are allocated on other nodes if (nodeEnv.hasNodeFile()) { // delete unused shards for existing indices for (IndexRoutingTable indexRoutingTable : routingTable) {