[INDICES] Schedule pending delete if index store delete fails

We try to lock all shards when an index is deleted but likely not
succeeding since shards are still active. To ensure that shards
that used to be allocated on that node get cleaned up as well we have
to retry or block on the delete until we get the locks. This is not desirable
since the delete happens on the cluster state processing thread. Instead of blocking
this commit schedules a pending delete for the index just like if we can't delete shards.
This commit is contained in:
Simon Willnauer 2015-02-24 22:06:32 +01:00
parent 5a53ff6f1b
commit 306b7b0f2b
5 changed files with 126 additions and 61 deletions

View File

@ -230,19 +230,32 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
assert indexSettings != ImmutableSettings.EMPTY;
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
try {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, index.name());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
deleteIndexDirectoryUnderLock(index, indexSettings);
} finally {
IOUtils.closeWhileHandlingException(locks);
}
}
/**
* Deletes an indexes data directory recursively.
* Note: this method assumes that the shard lock is acquired
*
* @param index the index to delete
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, @IndexSettings Settings indexSettings) throws IOException {
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
assert indexSettings != ImmutableSettings.EMPTY;
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, index.name());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
}
/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired

View File

@ -444,7 +444,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesServices.deleteShardStore("delete index", lock, indexSettings);
}
} catch (IOException e) {
indicesServices.addPendingDelete(index(), lock.getShardId(), indexSettings);
indicesServices.addPendingDelete(lock.getShardId(), indexSettings);
logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId().id());
}
}

View File

@ -92,7 +92,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public static final String INDEX_STORE_STATS_REFRESH_INTERVAL = "index.store.stats_refresh_interval";
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final DirectoryService directoryService;
private final StoreDirectory directory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
@ -114,7 +113,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock, OnClose onClose) throws IOException {
super(shardId, indexSettings);
this.directoryService = directoryService;
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;

View File

@ -28,14 +28,17 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -485,6 +488,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
private void deleteIndexStore(String reason, Index index, Settings indexSettings) throws IOException {
boolean success = false;
try {
// we are trying to delete the index store here - not a big deal if the lock can't be obtained
// the store metadata gets wiped anyway even without the lock this is just best effort since
@ -493,11 +497,15 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
if (canDeleteIndexContents(index, indexSettings)) {
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
}
success = true;
} catch (LockObtainFailedException ex) {
logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index);
} catch (Exception ex) {
logger.warn("{} failed to delete index", ex, index);
} finally {
if (success == false) {
addPendingDelete(index, indexSettings);
}
// this is a pure protection to make sure this index doesn't get re-imported as a dangeling index.
// we should in the future rather write a tombstone rather than wiping the metadata.
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
@ -603,32 +611,61 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
/**
* Adds a pending delete for the given index.
* Adds a pending delete for the given index shard.
*/
public void addPendingDelete(Index index, ShardId shardId, Settings settings) {
public void addPendingDelete(ShardId shardId, @IndexSettings Settings settings) {
if (shardId == null) {
throw new ElasticsearchIllegalArgumentException("shardId must not be null");
}
if (settings == null) {
throw new ElasticsearchIllegalArgumentException("settings must not be null");
}
PendingDelete pendingDelete = new PendingDelete(shardId, settings, false);
addPendingDelete(shardId.index(), pendingDelete);
}
private void addPendingDelete(Index index, PendingDelete pendingDelete) {
synchronized (pendingDeletes) {
List<PendingDelete> list = pendingDeletes.get(index);
if (list == null) {
list = new ArrayList<>();
pendingDeletes.put(index, list);
}
list.add(new PendingDelete(shardId, settings));
list.add(pendingDelete);
}
}
private static final class PendingDelete {
/**
* Adds a pending delete for the given index shard.
*/
public void addPendingDelete(Index index, @IndexSettings Settings settings) {
PendingDelete pendingDelete = new PendingDelete(null, settings, true);
addPendingDelete(index, pendingDelete);
}
private static final class PendingDelete implements Comparable<PendingDelete> {
final ShardId shardId;
final Settings settings;
final boolean deleteIndex;
public PendingDelete(ShardId shardId, Settings settings) {
public PendingDelete(ShardId shardId, Settings settings, boolean deleteIndex) {
this.shardId = shardId;
this.settings = settings;
this.deleteIndex = deleteIndex;
assert deleteIndex || shardId != null;
}
@Override
public String toString() {
return shardId.toString();
}
@Override
public int compareTo(PendingDelete o) {
int left = deleteIndex ? -1 : shardId.id();
int right = o.deleteIndex ? -1 : o.shardId.id();
return Integer.compare(left, right);
}
}
/**
@ -653,40 +690,54 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
synchronized (pendingDeletes) {
remove = pendingDeletes.remove(index);
}
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
do {
if (remove == null || remove.isEmpty()) {
break;
}
Iterator<PendingDelete> iterator = remove.iterator();
while (iterator.hasNext()) {
PendingDelete delete = iterator.next();
ShardLock shardLock = locks.get(delete.shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, shardLock.getShardId());
if (remove != null && remove.isEmpty() == false) {
CollectionUtil.timSort(remove); // make sure we delete indices first
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
do {
if (remove.isEmpty()) {
break;
}
Iterator<PendingDelete> iterator = remove.iterator();
while (iterator.hasNext()) {
PendingDelete delete = iterator.next();
if (delete.deleteIndex) {
logger.debug("{} deleting index store reason [{}]", index, "pending delete");
try {
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, index);
}
} else {
ShardLock shardLock = locks.get(delete.shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, shardLock.getShardId());
}
} else {
logger.warn("{} no shard lock for pending delete", delete.shardId);
iterator.remove();
}
}
} else {
logger.warn("{} no shard lock for pending delete", delete.shardId);
iterator.remove();
}
}
if (remove.isEmpty() == false) {
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
try {
Thread.sleep(sleepTime);
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
} catch (InterruptedException e) {
Thread.interrupted();
return;
if (remove.isEmpty() == false) {
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
try {
Thread.sleep(sleepTime);
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
} catch (InterruptedException e) {
Thread.interrupted();
return;
}
}
}
} while ((System.currentTimeMillis() - startTime) < timeout.millis());
} while ((System.currentTimeMillis() - startTime) < timeout.millis());
}
} finally {
IOUtils.close(shardLocks);
}

View File

@ -21,18 +21,12 @@ package org.elasticsearch.indices;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
@ -130,10 +124,10 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
public void testPendingTasks() throws IOException {
IndicesService indicesService = getIndicesService();
IndexService test = createIndex("test");
NodeEnvironment nodeEnc = getInstanceFromNode(NodeEnvironment.class);
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class);
assertTrue(test.hasShard(0));
Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
Path[] paths = nodeEnv.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
try {
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
@ -143,23 +137,32 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
for (Path p : paths) {
assertTrue(Files.exists(p));
}
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
int numPending = 1;
if (randomBoolean()) {
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
} else {
if (randomBoolean()) {
numPending++;
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
}
indicesService.addPendingDelete(test.index(), test.getIndexSettings());
}
assertAcked(client().admin().indices().prepareClose("test"));
for (Path p : paths) {
assertTrue(Files.exists(p));
}
assertEquals(indicesService.numPendingDeletes(test.index()), 1);
assertEquals(indicesService.numPendingDeletes(test.index()), numPending);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
for (Path p : paths) {
assertFalse(Files.exists(p));
}
if (randomBoolean()) {
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId("bogus", 1), test.getIndexSettings());
assertEquals(indicesService.numPendingDeletes(test.index()), 2);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));