[CORE] Ensure shards are deleted under lock on close
Today there is a race condition between the actual deletion of the shard and the release of the lock in the store. This race can cause rare imports of dangeling indices if the cluster state update loop tires to import the dangeling index in that particular windonw. This commit adds more safety to the import of dangeling indices and removes the race condition by holding on to the lock on store closing while the listener is notified.
This commit is contained in:
parent
abc0bc4c7f
commit
a6e6c4efc4
|
@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -287,6 +288,25 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
|||
}
|
||||
final IndexMetaData indexMetaData = loadIndexState(indexName);
|
||||
if (indexMetaData != null) {
|
||||
final Index index = new Index(indexName);
|
||||
try {
|
||||
// the index deletion might not have worked due to shards still being locked
|
||||
// we have three cases here:
|
||||
// - we acquired all shards locks here --> we can import the dangeling index
|
||||
// - we failed to acquire the lock --> somebody else uses it - DON'T IMPORT
|
||||
// - we acquired successfully but the lock list is empty --> no shards present - DON'T IMPORT
|
||||
// in the last case we should in-fact try to delete the directory since it might be a leftover...
|
||||
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index);
|
||||
if (shardLocks.isEmpty()) {
|
||||
// no shards - try to remove the directory
|
||||
nodeEnv.deleteIndexDirectorySafe(index);
|
||||
continue;
|
||||
}
|
||||
IOUtils.closeWhileHandlingException(shardLocks);
|
||||
} catch (IOException ex) {
|
||||
logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled);
|
||||
continue;
|
||||
}
|
||||
if(autoImportDangled.shouldImport()){
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state [{}]", indexName, autoImportDangled);
|
||||
danglingIndices.put(indexName, new DanglingIndex(indexName, null));
|
||||
|
@ -300,12 +320,6 @@ public class LocalGatewayMetaState extends AbstractComponent implements ClusterS
|
|||
logger.warn("[{}] failed to delete dangling index", ex, indexName);
|
||||
}
|
||||
} else {
|
||||
try { // the index deletion might not have worked due to shards still being locked
|
||||
IOUtils.closeWhileHandlingException(nodeEnv.lockAllForIndex(new Index(indexName)));
|
||||
} catch (IOException ex) {
|
||||
logger.warn("[{}] skipping locked dangling index, exists on local file system, but not in cluster metadata, auto import to cluster state is set to [{}]", ex, indexName, autoImportDangled);
|
||||
continue;
|
||||
}
|
||||
logger.info("[{}] dangling index, exists on local file system, but not in cluster metadata, scheduling to delete in [{}], auto import to cluster state [{}]", indexName, danglingTimeout, autoImportDangled);
|
||||
danglingIndices.put(indexName, new DanglingIndex(indexName, threadPool.schedule(danglingTimeout, ThreadPool.Names.SAME, new RemoveDanglingIndex(indexName))));
|
||||
}
|
||||
|
|
|
@ -362,9 +362,6 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
directory.innerClose(); // this closes the distributorDirectory as well
|
||||
} catch (IOException e) {
|
||||
logger.debug("failed to close directory", e);
|
||||
} finally {
|
||||
try {
|
||||
IOUtils.closeWhileHandlingException(shardLock);
|
||||
} finally {
|
||||
try {
|
||||
if (listener != null) {
|
||||
|
@ -372,7 +369,8 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
}
|
||||
} catch (Exception ex){
|
||||
logger.debug("OnCloseListener threw an exception", ex);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeWhileHandlingException(shardLock);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -113,7 +113,9 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
|
|||
public void onAllShardsClosed(Index index, List<Throwable> failures);
|
||||
|
||||
/**
|
||||
* Invoked once the last resource using the given shard ID is released
|
||||
* Invoked once the last resource using the given shard ID is released.
|
||||
* Yet, this method is called while still holding the shards lock such that
|
||||
* operations on the shards data can safely be executed in this callback.
|
||||
*/
|
||||
public void onShardClosed(ShardId shardId);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.indices;
|
||||
|
||||
import com.google.common.collect.*;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStats;
|
||||
|
@ -357,7 +358,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
@Override
|
||||
public void onShardClosed(ShardId shardId) {
|
||||
try {
|
||||
nodeEnv.deleteShardDirectorySafe(shardId);
|
||||
// this is called under the shard lock - we can safely delete it
|
||||
IOUtils.rm(nodeEnv.shardPaths(shardId));
|
||||
logger.debug("deleted shard [{}] from filesystem", shardId);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Can't delete shard {} ", e, shardId);
|
||||
|
|
Loading…
Reference in New Issue