From 2f0d1586925af8daf6971319f19f7a19c5d37c99 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 5 Feb 2015 20:40:35 +0100 Subject: [PATCH] [CORE] Consolidate index / shard deletion in IndicesService Today the logic related to deleting an index is spread across several classes which makes changes to this rather delicate part of the code-base very difficult. This commit consolidates this logic into the IndicesService and moves the handling of ack-ing the delete to the master entirely into `IndicesClusterStateService`. --- .../action/index/NodeIndexDeletedAction.java | 65 ++++-- .../gateway/GatewayMetaState.java | 98 ++------- .../org/elasticsearch/index/IndexService.java | 11 +- .../elasticsearch/index/store/IndexStore.java | 11 -- .../store/support/AbstractIndexStore.java | 27 --- .../elasticsearch/indices/IndicesModule.java | 1 - .../elasticsearch/indices/IndicesService.java | 187 ++++++++++++++++-- .../cluster/IndicesClusterStateService.java | 67 ++++--- .../indices/store/IndicesStore.java | 64 +----- .../indices/IndicesServiceTest.java | 123 ++++++++++++ .../indices/store/IndicesStoreTests.java | 17 +- 11 files changed, 410 insertions(+), 261 deletions(-) create mode 100644 src/test/java/org/elasticsearch/indices/IndicesServiceTest.java diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index 110dddfead3..7f799302352 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.action.index; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -27,12 +29,19 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLock; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; /** * @@ -45,14 +54,16 @@ public class NodeIndexDeletedAction extends AbstractComponent { private final ThreadPool threadPool; private final TransportService transportService; private final List listeners = new CopyOnWriteArrayList<>(); + private final NodeEnvironment nodeEnv; @Inject - public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService) { + public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv) { super(settings); this.threadPool = threadPool; this.transportService = transportService; transportService.registerHandler(INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedTransportHandler()); transportService.registerHandler(INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedTransportHandler()); + this.nodeEnv = nodeEnv; } public void add(Listener listener) { @@ -64,32 +75,58 @@ public class NodeIndexDeletedAction extends AbstractComponent { } public void nodeIndexDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException { - DiscoveryNodes nodes = clusterState.nodes(); + final DiscoveryNodes nodes = clusterState.nodes(); if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { + threadPool.generic().execute(new AbstractRunnable() { + @Override - public void run() { + public void onFailure(Throwable t) { + logger.warn("[{}]failed to ack index store deleted for index", t, index); + } + + @Override + protected void doRun() throws Exception { innerNodeIndexDeleted(index, nodeId); + lockIndexAndAck(index, nodes, nodeId, clusterState); + } }); } else { transportService.sendRequest(clusterState.nodes().masterNode(), INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.warn("[{}]failed to ack index store deleted for index", t, index); + } + + @Override + protected void doRun() throws Exception { + lockIndexAndAck(index, nodes, nodeId, clusterState); + } + }); } } - public void nodeIndexStoreDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException { - DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.localNodeMaster()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { + private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState) throws IOException { + try { + // we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the + // master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock + // due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be + // deleted by the time we get the lock + final List locks = nodeEnv.lockAllForIndex(new Index(index), TimeUnit.MINUTES.toMillis(30)); + try { + if (nodes.localNodeMaster()) { innerNodeIndexStoreDeleted(index, nodeId); + } else { + transportService.sendRequest(clusterState.nodes().masterNode(), + INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); } - }); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); + } finally { + IOUtils.close(locks); // release them again + } + } catch (LockObtainFailedException exc) { + logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index); } } diff --git a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index b97d55592a2..29dab15f78c 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -21,15 +21,14 @@ package org.elasticsearch.gateway; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -51,7 +50,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; -import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -120,7 +119,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private final ThreadPool threadPool; private final LocalAllocateDangledIndices allocateDangledIndices; - private final NodeIndexDeletedAction nodeIndexDeletedAction; @Nullable private volatile MetaData currentMetaData; @@ -135,17 +133,17 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL private final TimeValue deleteTimeout; private final Map danglingIndices = ConcurrentCollections.newConcurrentMap(); private final Object danglingMutex = new Object(); + private final IndicesService indicesService; @Inject public GatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices, - NodeIndexDeletedAction nodeIndexDeletedAction) throws Exception { + IndicesService indicesService) throws Exception { super(settings); this.nodeEnv = nodeEnv; this.threadPool = threadPool; this.format = XContentType.fromRestContentType(settings.get("format", "smile")); this.allocateDangledIndices = allocateDangledIndices; - this.nodeIndexDeletedAction = nodeIndexDeletedAction; nodesListGatewayMetaState.init(this); if (this.format == XContentType.SMILE) { @@ -186,6 +184,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL throw e; } } + this.indicesService = indicesService; } public MetaData loadMetaState() throws Exception { @@ -194,18 +193,19 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.state().blocks().disableStatePersistence()) { + final ClusterState state = event.state(); + if (state.blocks().disableStatePersistence()) { // reset the current metadata, we need to start fresh... this.currentMetaData = null; return; } - MetaData newMetaData = event.state().metaData(); + MetaData newMetaData = state.metaData(); // we don't check if metaData changed, since we might be called several times and we need to check dangling... boolean success = true; // only applied to master node, writing the global and index level states - if (event.state().nodes().localNode().masterNode()) { + if (state.nodes().localNode().masterNode()) { // check if the global state changed? if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) { try { @@ -248,41 +248,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL } } - // delete indices that were there before, but are deleted now - // we need to do it so they won't be detected as dangling - if (currentMetaData != null) { - // only delete indices when we already received a state (currentMetaData != null) - // and we had a go at processing dangling indices at least once - // this will also delete the _state of the index itself - for (IndexMetaData current : currentMetaData) { - if (danglingIndices.containsKey(current.index())) { - continue; - } - if (!newMetaData.hasIndex(current.index())) { - logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys()); - if (nodeEnv.hasNodeFile()) { - try { - final Index idx = new Index(current.index()); - MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx)); - // it may take a couple of seconds for outstanding shard reference - // to release their refs (for example, on going recoveries) - // we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608 - nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis(), current.settings()); - } catch (LockObtainFailedException ex) { - logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index()); - } catch (Exception ex) { - logger.warn("[{}] failed to delete index", ex, current.index()); - } - } - try { - nodeIndexDeletedAction.nodeIndexStoreDeleted(event.state(), current.index(), event.state().nodes().localNodeId()); - } catch (Throwable e) { - logger.debug("[{}] failed to notify master on local index store deletion", e, current.index()); - } - } - } - } - // handle dangling indices, we handle those for all nodes that have a node file (data or master) if (nodeEnv.hasNodeFile()) { if (danglingTimeout.millis() >= 0) { @@ -306,45 +271,20 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL continue; } final IndexMetaData indexMetaData = loadIndexState(indexName); - final Index index = new Index(indexName); if (indexMetaData != null) { - try { - // the index deletion might not have worked due to shards still being locked - // we have three cases here: - // - we acquired all shards locks here --> we can import the dangling index - // - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT - // - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT - // in the last case we should in-fact try to delete the directory since it might be a leftover... - final List shardLocks = nodeEnv.lockAllForIndex(index, 0); - if (shardLocks.isEmpty()) { - // no shards - try to remove the directory - nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings()); - continue; - } - IOUtils.closeWhileHandlingException(shardLocks); - } catch (IOException ex) { - logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled); - continue; - } if(autoImportDangled.shouldImport()){ logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state [{}]", indexName, autoImportDangled); danglingIndices.put(indexName, new DanglingIndex(indexName, null)); } else if (danglingTimeout.millis() == 0) { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName); - try { - nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings()); - } catch (LockObtainFailedException ex) { - logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName); - } catch (Exception ex) { - logger.warn("[{}] failed to delete dangling index", ex, indexName); - } + indicesService.deleteIndexStore("dangling index with timeout set to 0", indexMetaData); } else { logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled); danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, - new RemoveDanglingIndex(index, indexMetaData.settings())))); + new RemoveDanglingIndex(indexMetaData)))); } } } @@ -572,27 +512,23 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL class RemoveDanglingIndex implements Runnable { - private final Index index; - private final Settings indexSettings; + private final IndexMetaData metaData; - RemoveDanglingIndex(Index index, @IndexSettings Settings indexSettings) { - this.index = index; - this.indexSettings = indexSettings; + RemoveDanglingIndex(IndexMetaData metaData) { + this.metaData = metaData; } @Override public void run() { synchronized (danglingMutex) { - DanglingIndex remove = danglingIndices.remove(index.name()); + DanglingIndex remove = danglingIndices.remove(metaData.index()); // no longer there... if (remove == null) { return; } - logger.warn("[{}] deleting dangling index", index); - + logger.warn("[{}] deleting dangling index", metaData.index()); try { - MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index)); - nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); + indicesService.deleteIndexStore("deleting dangling index", metaData); } catch (Exception ex) { logger.debug("failed to delete dangling index", ex); } diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index f6f608f2010..be85ace475e 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -73,6 +73,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogModule; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; @@ -122,6 +123,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone private final IndexSettingsService settingsService; private final NodeEnvironment nodeEnv; + private final IndicesService indicesServices; private volatile ImmutableMap> shards = ImmutableMap.of(); @@ -133,7 +135,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexStore indexStore, IndexSettingsService settingsService, - IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache) { + IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) { super(index, indexSettings); this.injector = injector; this.indexSettings = indexSettings; @@ -149,6 +151,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone this.bitsetFilterCache = bitSetFilterCache; this.pluginsService = injector.getInstance(PluginsService.class); + this.indicesServices = indicesServices; this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); // inject workarounds for cyclic dep @@ -430,7 +433,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone private void onShardClose(ShardLock lock) { if (deleted.get()) { // we remove that shards content if this index has been deleted try { - nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings); + indicesServices.deleteShardStore("delete index", lock, indexSettings); } catch (IOException e) { logger.warn("{} failed to delete shard content", e, lock.getShardId()); } @@ -450,4 +453,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone onShardClose(lock); } } + + public Settings getIndexSettings() { + return indexSettings; + } } diff --git a/src/main/java/org/elasticsearch/index/store/IndexStore.java b/src/main/java/org/elasticsearch/index/store/IndexStore.java index d979075694b..3e334c3817c 100644 --- a/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -44,17 +44,6 @@ public interface IndexStore extends Closeable { */ Class shardDirectory(); - /** - * Returns true if this shard is allocated on this node. Allocated means - * that it has storage files that can be deleted using {@code deleteUnallocated(ShardId, Settings)}. - */ - boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings); - - /** - * Deletes this shard store since its no longer allocated. - */ - void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException; - /** * Return an array of all index folder locations for a given shard */ diff --git a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java index bb73c669047..13bd61fc5f9 100644 --- a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java @@ -126,33 +126,6 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting; } - - @Override - public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) { - if (locations == null) { - return false; - } - if (indexService.hasShard(shardId.id())) { - return false; - } - return FileSystemUtils.exists(nodeEnv.shardPaths(shardId)); - } - - @Override - public void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException { - if (locations == null) { - return; - } - if (indexService.hasShard(shardId.id())) { - throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted"); - } - try { - nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); - } catch (Exception ex) { - logger.debug("failed to delete shard locations", ex); - } - } - /** * Return an array of all index folder locations for a given shard. Uses * the index settings to determine if a custom data path is set for the diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 14adb00651c..102fa1854d4 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -66,7 +66,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(RecoverySettings.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton(); - bind(IndicesStore.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index e7e460b5c45..a3b1df54db7 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices; import com.google.common.base.Function; import com.google.common.collect.*; +import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; @@ -29,12 +30,20 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.*; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.ShardLock; +import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.*; import org.elasticsearch.index.aliases.IndexAliasesServiceModule; import org.elasticsearch.index.analysis.AnalysisModule; @@ -69,6 +78,7 @@ import org.elasticsearch.plugins.PluginsService; import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -95,21 +105,23 @@ public class IndicesService extends AbstractLifecycleComponent i private final Injector injector; private final PluginsService pluginsService; + private final NodeEnvironment nodeEnv; + private final ClusterService clusterService; private volatile Map> indices = ImmutableMap.of(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @Inject - public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector) { + public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv, ClusterService clusterService) { super(settings); this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; + this.clusterService = clusterService; this.indicesAnalysisService = indicesAnalysisService; this.injector = injector; - this.pluginsService = injector.getInstance(PluginsService.class); - this.indicesLifecycle.addListener(oldShardsStats); + this.nodeEnv = nodeEnv; } @Override @@ -164,7 +176,7 @@ public class IndicesService extends AbstractLifecycleComponent i * refresh and indexing, not for docs/store). */ public NodeIndicesStats stats(boolean includePrevious) { - return stats(true, new CommonStatsFlags().all()); + return stats(includePrevious, new CommonStatsFlags().all()); } public NodeIndicesStats stats(boolean includePrevious, CommonStatsFlags flags) { @@ -328,19 +340,6 @@ public class IndicesService extends AbstractLifecycleComponent i removeIndex(index, reason, false); } - /** - * Deletes the given index. Persistent parts of the index - * like the shards files, state and transaction logs are removed once all resources are released. - * - * Equivalent to {@link #removeIndex(String, String)} but fires - * different lifecycle events to ensure pending resources of this index are immediately removed. - * @param index the index to delete - * @param reason the high level reason causing this delete - */ - public void deleteIndex(String index, String reason) throws ElasticsearchException { - removeIndex(index, reason, true); - } - private void removeIndex(String index, String reason, boolean delete) throws ElasticsearchException { try { final IndexService indexService; @@ -390,7 +389,10 @@ public class IndicesService extends AbstractLifecycleComponent i logger.debug("[{}] closed... (reason [{}])", index, reason); indicesLifecycle.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings()); if (delete) { - indicesLifecycle.afterIndexDeleted(indexService.index(), indexService.settingsService().getSettings()); + final Settings indexSettings = indexService.getIndexSettings(); + indicesLifecycle.afterIndexDeleted(indexService.index(), indexSettings); + // now we are done - try to wipe data on disk if possible + deleteIndexStore(reason, indexService.index(), indexSettings); } } catch (IOException ex) { throw new ElasticsearchException("failed to remove index " + index, ex); @@ -419,4 +421,153 @@ public class IndicesService extends AbstractLifecycleComponent i } } } + + /** + * Deletes the given index. Persistent parts of the index + * like the shards files, state and transaction logs are removed once all resources are released. + * + * Equivalent to {@link #removeIndex(String, String)} but fires + * different lifecycle events to ensure pending resources of this index are immediately removed. + * @param index the index to delete + * @param reason the high level reason causing this delete + */ + public void deleteIndex(String index, String reason) throws IOException { + removeIndex(index, reason, true); + } + + public void deleteClosedIndex(String reason, IndexMetaData metaData) { + if (nodeEnv.hasNodeFile()) { + String indexName = metaData.getIndex(); + try { + ClusterState clusterState = clusterService.state(); + if (clusterState.metaData().hasIndex(indexName)) { + final IndexMetaData index = clusterState.metaData().index(indexName); + throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]"); + } + deleteIndexStore(reason, metaData); + } catch (IOException e) { + logger.warn("[{}] failed to delete closed index", e, metaData.index()); + } + } + } + + /** + * Deletes the index store trying to acquire all shards locks for this index. + * This method will delete the metadata for the index even if the actual shards can't be locked. + */ + public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOException { + if (nodeEnv.hasNodeFile()) { + synchronized (this) { + String indexName = metaData.index(); + if (indices.containsKey(metaData.index())) { + String localUUid = indices.get(metaData.index()).v1().indexUUID(); + throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]"); + } + ClusterState clusterState = clusterService.state(); + if (clusterState.metaData().hasIndex(indexName)) { + final IndexMetaData index = clusterState.metaData().index(indexName); + throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]"); + } + } + Index index = new Index(metaData.index()); + final Settings indexSettings = buildIndexSettings(metaData); + deleteIndexStore(reason, index, indexSettings); + } + } + + private void deleteIndexStore(String reason, Index index, Settings indexSettings) throws IOException { + try { + // we are trying to delete the index store here - not a big deal if the lock can't be obtained + // the store metadata gets wiped anyway even without the lock this is just best effort since + // every shards deletes its content under the shard lock it owns. + logger.debug("{} deleting index store reason [{}]", index, reason); + nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings); + } catch (LockObtainFailedException ex) { + logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index); + } catch (Exception ex) { + logger.warn("{} failed to delete index", ex, index); + } finally { + // this is a pure protection to make sure this index doesn't get re-imported as a dangeling index. + // we should in the future rather write a tombstone rather than wiping the metadata. + MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index)); + } + } + + /** + * Deletes the shard with an already acquired shard lock. + * @param reason the reason for the shard deletion + * @param lock the lock of the shard to delete + * @param indexSettings the shards index settings. + * @throws IOException if an IOException occurs + */ + public void deleteShardStore(String reason, ShardLock lock, Settings indexSettings) throws IOException { + ShardId shardId = lock.getShardId(); + if (canDeleteShardContent(shardId, indexSettings) == false) { + throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId); + } + logger.trace("{} deleting shard reason [{}]", shardId, reason); + nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings); + } + + /** + * This method deletes the shard contents on disk for the given shard ID. This method will fail if the shard deleting + * is prevented by {@link #canDeleteShardContent(org.elasticsearch.index.shard.ShardId, org.elasticsearch.cluster.metadata.IndexMetaData)} + * of if the shards lock can not be acquired. + * @param reason the reason for the shard deletion + * @param shardId the shards ID to delete + * @param metaData the shards index metadata. This is required to access the indexes settings etc. + * @throws IOException if an IOException occurs + */ + public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaData) throws IOException { + final Settings indexSettings = buildIndexSettings(metaData); + if (canDeleteShardContent(shardId, indexSettings) == false) { + throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId); + } + nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); + logger.trace("{} deleting shard reason [{}]", shardId, reason); + } + + /** + * Returns true iff the shards content for the given shard can be deleted. + * This method will return false if: + *
    + *
  • if the shard is still allocated / active on this node
  • + *
  • if for instance if the shard is located on shared and should not be deleted
  • + *
  • if the shards data locations do not exists
  • + *
+ * + * @param shardId the shard to delete. + * @param metaData the shards index metadata. This is required to access the indexes settings etc. + */ + public boolean canDeleteShardContent(ShardId shardId, IndexMetaData metaData) { + // we need the metadata here since we have to build the complete settings + // to decide where the shard content lives. In the future we might even need more info here ie. for shadow replicas + // The plan was to make it harder to miss-use and ask for metadata instead of simple settings + assert shardId.getIndex().equals(metaData.getIndex()); + final Settings indexSettings = buildIndexSettings(metaData); + return canDeleteShardContent(shardId, indexSettings); + } + + private boolean canDeleteShardContent(ShardId shardId, @IndexSettings Settings indexSettings) { + final Tuple indexServiceInjectorTuple = this.indices.get(shardId.getIndex()); + // TODO add some protection here to prevent shard deletion if we are on a shard FS or have ShadowReplicas enabled. + if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) { + final IndexService indexService = indexServiceInjectorTuple.v1(); + return indexService.hasShard(shardId.id()) == false; + } else if (nodeEnv.hasNodeFile()) { + final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings); + return FileSystemUtils.exists(shardLocations); + } + return false; + } + + private Settings buildIndexSettings(IndexMetaData metaData) { + // play safe here and make sure that we take node level settings into account. + // we might run on nodes where we use shard FS and then in the future don't delete + // actual content. + ImmutableSettings.Builder builder = settingsBuilder(); + builder.put(settings); + builder.put(metaData.getSettings()); + return builder.build(); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5c533202394..0c4a0ac5620 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -28,6 +28,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; @@ -177,42 +178,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent= 1.3.0 at some point we get back here and have the chance to - // run this api. (when cluster state is then updated) - if (node.getVersion().before(Version.V_1_3_0)) { - logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting); - return false; - } if (shardRouting.relocatingNodeId() != null) { node = state.nodes().get(shardRouting.relocatingNodeId()); if (node == null) { return false; } - if (node.getVersion().before(Version.V_1_3_0)) { - logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting); - return false; - } } // check if shard is active on the current node or is getting relocated to the our node @@ -318,38 +293,11 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); return currentState; } - - IndexService indexService = indicesService.indexService(shardId.getIndex()); IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex()); - if (indexService == null) { - // not physical allocation of the index, delete it from the file system if applicable - if (nodeEnv.hasNodeFile()) { - Path[] shardLocations = nodeEnv.shardPaths(shardId); - if (FileSystemUtils.exists(shardLocations)) { - logger.debug("{} deleting shard that is no longer used", shardId); - try { - nodeEnv.deleteShardDirectorySafe(shardId, indexMeta.settings()); - } catch (Exception ex) { - logger.debug("failed to delete shard locations", ex); - } - } - } - } else { - if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId, indexMeta.settings())) { - logger.debug("{} deleting shard that is no longer used", shardId); - try { - indexService.store().deleteUnallocated(shardId, indexMeta.settings()); - } catch (Exception e) { - logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId); - } - } - } else { - // this state is weird, should we log? - // basically, it means that the shard is not allocated on this node using the routing - // but its still physically exists on an IndexService - // Note, this listener should run after IndicesClusterStateService... - } + try { + indicesService.deleteShardStore("no longer used", shardId, indexMeta); + } catch (Exception ex) { + logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId); } return currentState; } diff --git a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java new file mode 100644 index 00000000000..368bf7fef26 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices; + +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Priority; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +public class IndicesServiceTest extends ElasticsearchSingleNodeTest { + + public IndicesService getIndicesService() { + return getInstanceFromNode(IndicesService.class); + } + + protected boolean resetNodeAfterTest() { + return true; + } + + public void testCanDeleteShardContent() { + IndicesService indicesService = getIndicesService(); + IndexMetaData meta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas( + 1).build(); + assertFalse("no shard location", indicesService.canDeleteShardContent(new ShardId("test", 0), meta)); + IndexService test = createIndex("test"); + assertTrue(test.hasShard(0)); + assertFalse("shard is allocated", indicesService.canDeleteShardContent(new ShardId("test", 0), meta)); + test.removeShard(0, "boom"); + assertTrue("shard is removed", indicesService.canDeleteShardContent(new ShardId("test", 0), meta)); + } + + public void testDeleteIndexStore() throws Exception { + IndicesService indicesService = getIndicesService(); + IndexService test = createIndex("test"); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + IndexMetaData firstMetaData = clusterService.state().metaData().index("test"); + assertTrue(test.hasShard(0)); + + try { + indicesService.deleteIndexStore("boom", firstMetaData); + fail(); + } catch (ElasticsearchIllegalStateException ex) { + // all good + } + + GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class); + MetaData meta = gwMetaState.loadMetaState(); + assertNotNull(meta); + assertNotNull(meta.index("test")); + assertAcked(client().admin().indices().prepareDelete("test")); + + meta = gwMetaState.loadMetaState(); + assertNotNull(meta); + assertNull(meta.index("test")); + + + createIndex("test"); + client().prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).get(); + client().admin().indices().prepareFlush("test").get(); + assertHitCount(client().prepareSearch("test").get(), 1); + IndexMetaData secondMetaData = clusterService.state().metaData().index("test"); + assertAcked(client().admin().indices().prepareClose("test")); + NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class); + Path[] paths = nodeEnv.shardDataPaths(new ShardId("test", 0), clusterService.state().getMetaData().index("test").getSettings()); + for (Path path : paths) { + assertTrue(Files.exists(path)); + } + + try { + indicesService.deleteIndexStore("boom", secondMetaData); + fail(); + } catch (ElasticsearchIllegalStateException ex) { + // all good + } + + for (Path path : paths) { + assertTrue(Files.exists(path)); + } + + // now delete the old one and make sure we resolve against the name + try { + indicesService.deleteIndexStore("boom", firstMetaData); + fail(); + } catch (ElasticsearchIllegalStateException ex) { + // all good + } + assertAcked(client().admin().indices().prepareOpen("test")); + ensureGreen("test"); + } +} diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java index 4dd1f764d73..0949033df1a 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreTests.java @@ -161,15 +161,8 @@ public class IndicesStoreTests extends ElasticsearchTestCase { } } - final boolean canBeDeleted; - if (nodeVersion.before(Version.V_1_3_0)) { - canBeDeleted = false; - } else { - canBeDeleted = true; - } - // shard exist on other node (abc) - assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted)); + assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); } @Test @@ -194,14 +187,8 @@ public class IndicesStoreTests extends ElasticsearchTestCase { } } - final boolean canBeDeleted; - if (nodeVersion.before(Version.V_1_3_0)) { - canBeDeleted = false; - } else { - canBeDeleted = true; - } // shard exist on other node (abc and def) - assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted)); + assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build())); } }