[CORE] Consolidate index / shard deletion in IndicesService

Today the logic related to deleting an index is spread across several
classes which makes changes to this rather delicate part of the code-base
very difficult. This commit consolidates this logic into the IndicesService
and moves the handling of ack-ing the delete to the master entirely into
`IndicesClusterStateService`.
This commit is contained in:
Simon Willnauer 2015-02-05 20:40:35 +01:00
parent d3762d6427
commit 2f0d158692
11 changed files with 410 additions and 261 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.action.index;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -27,12 +29,19 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
/**
*
@ -45,14 +54,16 @@ public class NodeIndexDeletedAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final NodeEnvironment nodeEnv;
@Inject
public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService) {
public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
transportService.registerHandler(INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedTransportHandler());
transportService.registerHandler(INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedTransportHandler());
this.nodeEnv = nodeEnv;
}
public void add(Listener listener) {
@ -64,32 +75,58 @@ public class NodeIndexDeletedAction extends AbstractComponent {
}
public void nodeIndexDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException {
DiscoveryNodes nodes = clusterState.nodes();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void run() {
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
@Override
protected void doRun() throws Exception {
innerNodeIndexDeleted(index, nodeId);
lockIndexAndAck(index, nodes, nodeId, clusterState);
}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}]failed to ack index store deleted for index", t, index);
}
@Override
protected void doRun() throws Exception {
lockIndexAndAck(index, nodes, nodeId, clusterState);
}
});
}
}
public void nodeIndexStoreDeleted(final ClusterState clusterState, final String index, final String nodeId) throws ElasticsearchException {
DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
private void lockIndexAndAck(String index, DiscoveryNodes nodes, String nodeId, ClusterState clusterState) throws IOException {
try {
// we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the
// master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
final List<ShardLock> locks = nodeEnv.lockAllForIndex(new Index(index), TimeUnit.MINUTES.toMillis(30));
try {
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
});
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
} finally {
IOUtils.close(locks); // release them again
}
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}
}

View File

@ -21,15 +21,14 @@ package org.elasticsearch.gateway;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -51,7 +50,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -120,7 +119,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final ThreadPool threadPool;
private final LocalAllocateDangledIndices allocateDangledIndices;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
@Nullable
private volatile MetaData currentMetaData;
@ -135,17 +133,17 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
private final TimeValue deleteTimeout;
private final Map<String, DanglingIndex> danglingIndices = ConcurrentCollections.newConcurrentMap();
private final Object danglingMutex = new Object();
private final IndicesService indicesService;
@Inject
public GatewayMetaState(Settings settings, ThreadPool threadPool, NodeEnvironment nodeEnv,
TransportNodesListGatewayMetaState nodesListGatewayMetaState, LocalAllocateDangledIndices allocateDangledIndices,
NodeIndexDeletedAction nodeIndexDeletedAction) throws Exception {
IndicesService indicesService) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
this.threadPool = threadPool;
this.format = XContentType.fromRestContentType(settings.get("format", "smile"));
this.allocateDangledIndices = allocateDangledIndices;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
nodesListGatewayMetaState.init(this);
if (this.format == XContentType.SMILE) {
@ -186,6 +184,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
throw e;
}
}
this.indicesService = indicesService;
}
public MetaData loadMetaState() throws Exception {
@ -194,18 +193,19 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
final ClusterState state = event.state();
if (state.blocks().disableStatePersistence()) {
// reset the current metadata, we need to start fresh...
this.currentMetaData = null;
return;
}
MetaData newMetaData = event.state().metaData();
MetaData newMetaData = state.metaData();
// we don't check if metaData changed, since we might be called several times and we need to check dangling...
boolean success = true;
// only applied to master node, writing the global and index level states
if (event.state().nodes().localNode().masterNode()) {
if (state.nodes().localNode().masterNode()) {
// check if the global state changed?
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, newMetaData)) {
try {
@ -248,41 +248,6 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
}
}
// delete indices that were there before, but are deleted now
// we need to do it so they won't be detected as dangling
if (currentMetaData != null) {
// only delete indices when we already received a state (currentMetaData != null)
// and we had a go at processing dangling indices at least once
// this will also delete the _state of the index itself
for (IndexMetaData current : currentMetaData) {
if (danglingIndices.containsKey(current.index())) {
continue;
}
if (!newMetaData.hasIndex(current.index())) {
logger.debug("[{}] deleting index that is no longer part of the metadata (indices: [{}])", current.index(), newMetaData.indices().keys());
if (nodeEnv.hasNodeFile()) {
try {
final Index idx = new Index(current.index());
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(idx));
// it may take a couple of seconds for outstanding shard reference
// to release their refs (for example, on going recoveries)
// we are working on a better solution see: https://github.com/elasticsearch/elasticsearch/pull/8608
nodeEnv.deleteIndexDirectorySafe(idx, deleteTimeout.millis(), current.settings());
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, current.index());
} catch (Exception ex) {
logger.warn("[{}] failed to delete index", ex, current.index());
}
}
try {
nodeIndexDeletedAction.nodeIndexStoreDeleted(event.state(), current.index(), event.state().nodes().localNodeId());
} catch (Throwable e) {
logger.debug("[{}] failed to notify master on local index store deletion", e, current.index());
}
}
}
}
// handle dangling indices, we handle those for all nodes that have a node file (data or master)
if (nodeEnv.hasNodeFile()) {
if (danglingTimeout.millis() >= 0) {
@ -306,45 +271,20 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
continue;
}
final IndexMetaData indexMetaData = loadIndexState(indexName);
final Index index = new Index(indexName);
if (indexMetaData != null) {
try {
// the index deletion might not have worked due to shards still being locked
// we have three cases here:
// - we acquired all shards locks here --> we can import the dangling index
// - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT
// - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT
// in the last case we should in-fact try to delete the directory since it might be a leftover...
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, 0);
if (shardLocks.isEmpty()) {
// no shards - try to remove the directory
nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings());
continue;
}
IOUtils.closeWhileHandlingException(shardLocks);
} catch (IOException ex) {
logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled);
continue;
}
if(autoImportDangled.shouldImport()){
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state [{}]", indexName, autoImportDangled);
danglingIndices.put(indexName, new DanglingIndex(indexName, null));
} else if (danglingTimeout.millis() == 0) {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, timeout set to 0, deleting now", indexName);
try {
nodeEnv.deleteIndexDirectorySafe(index, 0, indexMetaData.settings());
} catch (LockObtainFailedException ex) {
logger.debug("[{}] failed to delete index - at least one shards is still locked", ex, indexName);
} catch (Exception ex) {
logger.warn("[{}] failed to delete dangling index", ex, indexName);
}
indicesService.deleteIndexStore("dangling index with timeout set to 0", indexMetaData);
} else {
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
danglingIndices.put(indexName,
new DanglingIndex(indexName,
threadPool.schedule(danglingTimeout,
ThreadPool.Names.SAME,
new RemoveDanglingIndex(index, indexMetaData.settings()))));
new RemoveDanglingIndex(indexMetaData))));
}
}
}
@ -572,27 +512,23 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
class RemoveDanglingIndex implements Runnable {
private final Index index;
private final Settings indexSettings;
private final IndexMetaData metaData;
RemoveDanglingIndex(Index index, @IndexSettings Settings indexSettings) {
this.index = index;
this.indexSettings = indexSettings;
RemoveDanglingIndex(IndexMetaData metaData) {
this.metaData = metaData;
}
@Override
public void run() {
synchronized (danglingMutex) {
DanglingIndex remove = danglingIndices.remove(index.name());
DanglingIndex remove = danglingIndices.remove(metaData.index());
// no longer there...
if (remove == null) {
return;
}
logger.warn("[{}] deleting dangling index", index);
logger.warn("[{}] deleting dangling index", metaData.index());
try {
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
indicesService.deleteIndexStore("deleting dangling index", metaData);
} catch (Exception ex) {
logger.debug("failed to delete dangling index", ex);
}

View File

@ -73,6 +73,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ShardsPluginsModule;
@ -122,6 +123,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private final IndexSettingsService settingsService;
private final NodeEnvironment nodeEnv;
private final IndicesService indicesServices;
private volatile ImmutableMap<Integer, Tuple<IndexShard, Injector>> shards = ImmutableMap.of();
@ -133,7 +135,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache,
IndexStore indexStore, IndexSettingsService settingsService,
IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache) {
IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) {
super(index, indexSettings);
this.injector = injector;
this.indexSettings = indexSettings;
@ -149,6 +151,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
this.bitsetFilterCache = bitSetFilterCache;
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesServices = indicesServices;
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
// inject workarounds for cyclic dep
@ -430,7 +433,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private void onShardClose(ShardLock lock) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
indicesServices.deleteShardStore("delete index", lock, indexSettings);
} catch (IOException e) {
logger.warn("{} failed to delete shard content", e, lock.getShardId());
}
@ -450,4 +453,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
onShardClose(lock);
}
}
public Settings getIndexSettings() {
return indexSettings;
}
}

View File

@ -44,17 +44,6 @@ public interface IndexStore extends Closeable {
*/
Class<? extends DirectoryService> shardDirectory();
/**
* Returns <tt>true</tt> if this shard is allocated on this node. Allocated means
* that it has storage files that can be deleted using {@code deleteUnallocated(ShardId, Settings)}.
*/
boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings);
/**
* Deletes this shard store since its no longer allocated.
*/
void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException;
/**
* Return an array of all index folder locations for a given shard
*/

View File

@ -126,33 +126,6 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
}
@Override
public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) {
if (locations == null) {
return false;
}
if (indexService.hasShard(shardId.id())) {
return false;
}
return FileSystemUtils.exists(nodeEnv.shardPaths(shardId));
}
@Override
public void deleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) throws IOException {
if (locations == null) {
return;
}
if (indexService.hasShard(shardId.id())) {
throw new ElasticsearchIllegalStateException(shardId + " allocated, can't be deleted");
}
try {
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
}
}
/**
* Return an array of all index folder locations for a given shard. Uses
* the index settings to determine if a custom data path is set for the

View File

@ -66,7 +66,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(RecoverySettings.class).asEagerSingleton();
bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton();
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndexingMemoryController.class).asEagerSingleton();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices;
import com.google.common.base.Function;
import com.google.common.collect.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
@ -29,12 +30,20 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.*;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule;
@ -69,6 +78,7 @@ import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -95,21 +105,23 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final Injector injector;
private final PluginsService pluginsService;
private final NodeEnvironment nodeEnv;
private final ClusterService clusterService;
private volatile Map<String, Tuple<IndexService, Injector>> indices = ImmutableMap.of();
private final OldShardsStats oldShardsStats = new OldShardsStats();
@Inject
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector) {
public IndicesService(Settings settings, IndicesLifecycle indicesLifecycle, IndicesAnalysisService indicesAnalysisService, Injector injector, NodeEnvironment nodeEnv, ClusterService clusterService) {
super(settings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.clusterService = clusterService;
this.indicesAnalysisService = indicesAnalysisService;
this.injector = injector;
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle.addListener(oldShardsStats);
this.nodeEnv = nodeEnv;
}
@Override
@ -164,7 +176,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
* refresh and indexing, not for docs/store).
*/
public NodeIndicesStats stats(boolean includePrevious) {
return stats(true, new CommonStatsFlags().all());
return stats(includePrevious, new CommonStatsFlags().all());
}
public NodeIndicesStats stats(boolean includePrevious, CommonStatsFlags flags) {
@ -328,19 +340,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
removeIndex(index, reason, false);
}
/**
* Deletes the given index. Persistent parts of the index
* like the shards files, state and transaction logs are removed once all resources are released.
*
* Equivalent to {@link #removeIndex(String, String)} but fires
* different lifecycle events to ensure pending resources of this index are immediately removed.
* @param index the index to delete
* @param reason the high level reason causing this delete
*/
public void deleteIndex(String index, String reason) throws ElasticsearchException {
removeIndex(index, reason, true);
}
private void removeIndex(String index, String reason, boolean delete) throws ElasticsearchException {
try {
final IndexService indexService;
@ -390,7 +389,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
logger.debug("[{}] closed... (reason [{}])", index, reason);
indicesLifecycle.afterIndexClosed(indexService.index(), indexService.settingsService().getSettings());
if (delete) {
indicesLifecycle.afterIndexDeleted(indexService.index(), indexService.settingsService().getSettings());
final Settings indexSettings = indexService.getIndexSettings();
indicesLifecycle.afterIndexDeleted(indexService.index(), indexSettings);
// now we are done - try to wipe data on disk if possible
deleteIndexStore(reason, indexService.index(), indexSettings);
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to remove index " + index, ex);
@ -419,4 +421,153 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
}
}
/**
* Deletes the given index. Persistent parts of the index
* like the shards files, state and transaction logs are removed once all resources are released.
*
* Equivalent to {@link #removeIndex(String, String)} but fires
* different lifecycle events to ensure pending resources of this index are immediately removed.
* @param index the index to delete
* @param reason the high level reason causing this delete
*/
public void deleteIndex(String index, String reason) throws IOException {
removeIndex(index, reason, true);
}
public void deleteClosedIndex(String reason, IndexMetaData metaData) {
if (nodeEnv.hasNodeFile()) {
String indexName = metaData.getIndex();
try {
ClusterState clusterState = clusterService.state();
if (clusterState.metaData().hasIndex(indexName)) {
final IndexMetaData index = clusterState.metaData().index(indexName);
throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]");
}
deleteIndexStore(reason, metaData);
} catch (IOException e) {
logger.warn("[{}] failed to delete closed index", e, metaData.index());
}
}
}
/**
* Deletes the index store trying to acquire all shards locks for this index.
* This method will delete the metadata for the index even if the actual shards can't be locked.
*/
public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOException {
if (nodeEnv.hasNodeFile()) {
synchronized (this) {
String indexName = metaData.index();
if (indices.containsKey(metaData.index())) {
String localUUid = indices.get(metaData.index()).v1().indexUUID();
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
}
ClusterState clusterState = clusterService.state();
if (clusterState.metaData().hasIndex(indexName)) {
final IndexMetaData index = clusterState.metaData().index(indexName);
throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]");
}
}
Index index = new Index(metaData.index());
final Settings indexSettings = buildIndexSettings(metaData);
deleteIndexStore(reason, index, indexSettings);
}
}
private void deleteIndexStore(String reason, Index index, Settings indexSettings) throws IOException {
try {
// we are trying to delete the index store here - not a big deal if the lock can't be obtained
// the store metadata gets wiped anyway even without the lock this is just best effort since
// every shards deletes its content under the shard lock it owns.
logger.debug("{} deleting index store reason [{}]", index, reason);
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
} catch (LockObtainFailedException ex) {
logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index);
} catch (Exception ex) {
logger.warn("{} failed to delete index", ex, index);
} finally {
// this is a pure protection to make sure this index doesn't get re-imported as a dangeling index.
// we should in the future rather write a tombstone rather than wiping the metadata.
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
}
}
/**
* Deletes the shard with an already acquired shard lock.
* @param reason the reason for the shard deletion
* @param lock the lock of the shard to delete
* @param indexSettings the shards index settings.
* @throws IOException if an IOException occurs
*/
public void deleteShardStore(String reason, ShardLock lock, Settings indexSettings) throws IOException {
ShardId shardId = lock.getShardId();
if (canDeleteShardContent(shardId, indexSettings) == false) {
throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId);
}
logger.trace("{} deleting shard reason [{}]", shardId, reason);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
}
/**
* This method deletes the shard contents on disk for the given shard ID. This method will fail if the shard deleting
* is prevented by {@link #canDeleteShardContent(org.elasticsearch.index.shard.ShardId, org.elasticsearch.cluster.metadata.IndexMetaData)}
* of if the shards lock can not be acquired.
* @param reason the reason for the shard deletion
* @param shardId the shards ID to delete
* @param metaData the shards index metadata. This is required to access the indexes settings etc.
* @throws IOException if an IOException occurs
*/
public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaData) throws IOException {
final Settings indexSettings = buildIndexSettings(metaData);
if (canDeleteShardContent(shardId, indexSettings) == false) {
throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId);
}
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
logger.trace("{} deleting shard reason [{}]", shardId, reason);
}
/**
* Returns <code>true</code> iff the shards content for the given shard can be deleted.
* This method will return <code>false</code> if:
* <ul>
* <li>if the shard is still allocated / active on this node</li>
* <li>if for instance if the shard is located on shared and should not be deleted</li>
* <li>if the shards data locations do not exists</li>
* </ul>
*
* @param shardId the shard to delete.
* @param metaData the shards index metadata. This is required to access the indexes settings etc.
*/
public boolean canDeleteShardContent(ShardId shardId, IndexMetaData metaData) {
// we need the metadata here since we have to build the complete settings
// to decide where the shard content lives. In the future we might even need more info here ie. for shadow replicas
// The plan was to make it harder to miss-use and ask for metadata instead of simple settings
assert shardId.getIndex().equals(metaData.getIndex());
final Settings indexSettings = buildIndexSettings(metaData);
return canDeleteShardContent(shardId, indexSettings);
}
private boolean canDeleteShardContent(ShardId shardId, @IndexSettings Settings indexSettings) {
final Tuple<IndexService, Injector> indexServiceInjectorTuple = this.indices.get(shardId.getIndex());
// TODO add some protection here to prevent shard deletion if we are on a shard FS or have ShadowReplicas enabled.
if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) {
final IndexService indexService = indexServiceInjectorTuple.v1();
return indexService.hasShard(shardId.id()) == false;
} else if (nodeEnv.hasNodeFile()) {
final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
return FileSystemUtils.exists(shardLocations);
}
return false;
}
private Settings buildIndexSettings(IndexMetaData metaData) {
// play safe here and make sure that we take node level settings into account.
// we might run on nodes where we use shard FS and then in the future don't delete
// actual content.
ImmutableSettings.Builder builder = settingsBuilder();
builder.put(settings);
builder.put(metaData.getSettings());
return builder.build();
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
@ -177,42 +178,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
cleanFailedShards(event);
cleanMismatchedIndexUUIDs(event);
applyDeletedIndices(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
applyNewOrUpdatedShards(event);
applyDeletedIndices(event);
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
sendIndexLifecycleEvents(event);
}
}
private void sendIndexLifecycleEvents(final ClusterChangedEvent event) {
String localNodeId = event.state().nodes().localNodeId();
assert localNodeId != null;
for (String index : event.indicesDeleted()) {
try {
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} deleted event", e, index);
}
}
}
private void cleanMismatchedIndexUUIDs(final ClusterChangedEvent event) {
for (IndexService indexService : indicesService) {
IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().name());
if (indexMetaData == null) {
// got deleted on us, will be deleted later
continue;
}
if (!indexMetaData.isSameUUID(indexService.indexUUID())) {
logger.debug("[{}] mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated", indexMetaData.index());
removeIndex(indexMetaData.index(), "mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated");
}
}
}
@ -246,15 +220,39 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
private void applyDeletedIndices(final ClusterChangedEvent event) {
final ClusterState previousState = event.previousState();
final String localNodeId = event.state().nodes().localNodeId();
assert localNodeId != null;
for (IndexService indexService : indicesService) {
final String index = indexService.index().name();
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().name());
if (indexMetaData != null) {
if (!indexMetaData.isSameUUID(indexService.indexUUID())) {
logger.debug("[{}] mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated", indexMetaData.index());
deleteIndex(indexMetaData.index(), "mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated");
}
deleteIndex(index, "index no longer part of the metadata");
}
}
for (String index : event.indicesDeleted()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
if (indicesService.hasIndex(index)) {
deleteIndex(index, "index no longer part of the metadata");
} else {
IndexMetaData metaData = previousState.metaData().index(index);
assert metaData != null;
indicesService.deleteClosedIndex("closed index no longer part of the metadata", metaData);
}
try {
nodeIndexDeletedAction.nodeIndexDeleted(event.state(), index, localNodeId);
} catch (Throwable e) {
logger.debug("failed to send to master index {} deleted event", e, index);
}
}
}
private void applyDeletedShards(final ClusterChangedEvent event) {
@ -872,6 +870,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
// clear seen mappings as well
clearSeenMappings(index);
}
private class FailedEngineHandler implements Engine.FailedEngineListener {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -30,7 +29,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -49,7 +47,6 @@ import org.elasticsearch.transport.*;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@ -160,20 +157,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
ShardId shardId = indexShardRoutingTable.shardId();
IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService == null) {
if (nodeEnv.hasNodeFile()) {
Path[] shardLocations = nodeEnv.shardPaths(shardId);
if (FileSystemUtils.exists(shardLocations)) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId, indexService.settingsService().getSettings())) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
if (indicesService.canDeleteShardContent(shardId, event.state().getMetaData().index(shardId.getIndex()))) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
}
}
}
@ -201,21 +186,11 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
if (node == null) {
return false;
}
// If all nodes have been upgraded to >= 1.3.0 at some point we get back here and have the chance to
// run this api. (when cluster state is then updated)
if (node.getVersion().before(Version.V_1_3_0)) {
logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting);
return false;
}
if (shardRouting.relocatingNodeId() != null) {
node = state.nodes().get(shardRouting.relocatingNodeId());
if (node == null) {
return false;
}
if (node.getVersion().before(Version.V_1_3_0)) {
logger.debug("Skip deleting deleting shard instance [{}], a node holding a shard instance is < 1.3.0", shardRouting);
return false;
}
}
// check if shard is active on the current node or is getting relocated to the our node
@ -318,38 +293,11 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion());
return currentState;
}
IndexService indexService = indicesService.indexService(shardId.getIndex());
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
if (indexService == null) {
// not physical allocation of the index, delete it from the file system if applicable
if (nodeEnv.hasNodeFile()) {
Path[] shardLocations = nodeEnv.shardPaths(shardId);
if (FileSystemUtils.exists(shardLocations)) {
logger.debug("{} deleting shard that is no longer used", shardId);
try {
nodeEnv.deleteShardDirectorySafe(shardId, indexMeta.settings());
} catch (Exception ex) {
logger.debug("failed to delete shard locations", ex);
}
}
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId, indexMeta.settings())) {
logger.debug("{} deleting shard that is no longer used", shardId);
try {
indexService.store().deleteUnallocated(shardId, indexMeta.settings());
} catch (Exception e) {
logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId);
}
}
} else {
// this state is weird, should we log?
// basically, it means that the shard is not allocated on this node using the routing
// but its still physically exists on an IndexService
// Note, this listener should run after IndicesClusterStateService...
}
try {
indicesService.deleteShardStore("no longer used", shardId, indexMeta);
} catch (Exception ex) {
logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId);
}
return currentState;
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
public IndicesService getIndicesService() {
return getInstanceFromNode(IndicesService.class);
}
protected boolean resetNodeAfterTest() {
return true;
}
public void testCanDeleteShardContent() {
IndicesService indicesService = getIndicesService();
IndexMetaData meta = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(
1).build();
assertFalse("no shard location", indicesService.canDeleteShardContent(new ShardId("test", 0), meta));
IndexService test = createIndex("test");
assertTrue(test.hasShard(0));
assertFalse("shard is allocated", indicesService.canDeleteShardContent(new ShardId("test", 0), meta));
test.removeShard(0, "boom");
assertTrue("shard is removed", indicesService.canDeleteShardContent(new ShardId("test", 0), meta));
}
public void testDeleteIndexStore() throws Exception {
IndicesService indicesService = getIndicesService();
IndexService test = createIndex("test");
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexMetaData firstMetaData = clusterService.state().metaData().index("test");
assertTrue(test.hasShard(0));
try {
indicesService.deleteIndexStore("boom", firstMetaData);
fail();
} catch (ElasticsearchIllegalStateException ex) {
// all good
}
GatewayMetaState gwMetaState = getInstanceFromNode(GatewayMetaState.class);
MetaData meta = gwMetaState.loadMetaState();
assertNotNull(meta);
assertNotNull(meta.index("test"));
assertAcked(client().admin().indices().prepareDelete("test"));
meta = gwMetaState.loadMetaState();
assertNotNull(meta);
assertNull(meta.index("test"));
createIndex("test");
client().prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).get();
client().admin().indices().prepareFlush("test").get();
assertHitCount(client().prepareSearch("test").get(), 1);
IndexMetaData secondMetaData = clusterService.state().metaData().index("test");
assertAcked(client().admin().indices().prepareClose("test"));
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class);
Path[] paths = nodeEnv.shardDataPaths(new ShardId("test", 0), clusterService.state().getMetaData().index("test").getSettings());
for (Path path : paths) {
assertTrue(Files.exists(path));
}
try {
indicesService.deleteIndexStore("boom", secondMetaData);
fail();
} catch (ElasticsearchIllegalStateException ex) {
// all good
}
for (Path path : paths) {
assertTrue(Files.exists(path));
}
// now delete the old one and make sure we resolve against the name
try {
indicesService.deleteIndexStore("boom", firstMetaData);
fail();
} catch (ElasticsearchIllegalStateException ex) {
// all good
}
assertAcked(client().admin().indices().prepareOpen("test"));
ensureGreen("test");
}
}

View File

@ -161,15 +161,8 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
}
}
final boolean canBeDeleted;
if (nodeVersion.before(Version.V_1_3_0)) {
canBeDeleted = false;
} else {
canBeDeleted = true;
}
// shard exist on other node (abc)
assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted));
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
}
@Test
@ -194,14 +187,8 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
}
}
final boolean canBeDeleted;
if (nodeVersion.before(Version.V_1_3_0)) {
canBeDeleted = false;
} else {
canBeDeleted = true;
}
// shard exist on other node (abc and def)
assertThat(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()), is(canBeDeleted));
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
}
}