From a71cc45023333b00e67a9d9874ddd4ec6997f1b6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 20 Feb 2015 11:37:55 +0100 Subject: [PATCH] [INDICES] Retry if shard deletes fail due to IOExceptions Today if a shard deletion fails we simply ignore it and move on. On system like windows where a virus scanner can hold on to files or any other process ie. the admins explorer window we fail to delete shards leaving large amout of data behind. We should try best effort to clean those shards up before we ack the delete. --- .../action/index/NodeIndexDeletedAction.java | 24 ++-- .../org/elasticsearch/index/IndexService.java | 3 +- .../elasticsearch/indices/IndicesService.java | 109 ++++++++++++++++-- .../indices/IndicesServiceTest.java | 47 ++++++++ .../test/ElasticsearchIntegrationTest.java | 18 +-- 5 files changed, 165 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index 7db80256d0e..2c33a5251b2 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -29,10 +29,12 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -52,16 +54,16 @@ public class NodeIndexDeletedAction extends AbstractComponent { private final ThreadPool threadPool; private final TransportService transportService; private final List listeners = new CopyOnWriteArrayList<>(); - private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; @Inject - public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv) { + public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv, IndicesService indicesService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; transportService.registerHandler(INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedTransportHandler()); transportService.registerHandler(INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedTransportHandler()); - this.nodeEnv = nodeEnv; + this.indicesService = indicesService; } public void add(Listener listener) { @@ -120,16 +122,12 @@ public class NodeIndexDeletedAction extends AbstractComponent { // master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock // due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be // deleted by the time we get the lock - final List locks = nodeEnv.lockAllForIndex(new Index(index), TimeUnit.MINUTES.toMillis(30)); - try { - if (nodes.localNodeMaster()) { - innerNodeIndexStoreDeleted(index, nodeId); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); - } - } finally { - IOUtils.close(locks); // release them again + indicesService.processPendingDeletes(new Index(index), new TimeValue(30, TimeUnit.MINUTES)); + if (nodes.localNodeMaster()) { + innerNodeIndexStoreDeleted(index, nodeId); + } else { + transportService.sendRequest(clusterState.nodes().masterNode(), + INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); } } catch (LockObtainFailedException exc) { logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index); diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index a9dbdf71a1d..69985a39134 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -440,7 +440,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesServices.deleteShardStore("delete index", lock, indexSettings); } } catch (IOException e) { - logger.warn("{} failed to delete shard content", e, lock.getShardId()); + indicesServices.addPendingDelete(index(), lock.getShardId(), indexSettings); + logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId()); } } } diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index b3e02da4608..f21ac36c344 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -92,13 +93,8 @@ import org.elasticsearch.plugins.PluginsService; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -122,6 +118,7 @@ public class IndicesService extends AbstractLifecycleComponent i private final ClusterService clusterService; private volatile Map> indices = ImmutableMap.of(); + private final Map> pendingDeletes = new HashMap<>(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -604,4 +601,102 @@ public class IndicesService extends AbstractLifecycleComponent i builder.put(metaData.getSettings()); return builder.build(); } + + /** + * Adds a pending delete for the given index. + */ + public void addPendingDelete(Index index, ShardId shardId, Settings settings) { + synchronized (pendingDeletes) { + List list = pendingDeletes.get(index); + if (list == null) { + list = new ArrayList<>(); + pendingDeletes.put(index, list); + } + list.add(new PendingDelete(shardId, settings)); + } + } + + private static final class PendingDelete { + final ShardId shardId; + final Settings settings; + + public PendingDelete(ShardId shardId, Settings settings) { + this.shardId = shardId; + this.settings = settings; + } + + @Override + public String toString() { + return shardId.toString(); + } + } + + /** + * Processes all pending deletes for the given index. This method will acquire all locks for the given index and will + * process all pending deletes for this index. Pending deletes might occur if the OS doesn't allow deletion of files because + * they are used by a different process ie. on Windows where files might still be open by a virus scanner. On a shared + * filesystem a replica might not have been closed when the primary is deleted causing problems on delete calls so we + * schedule there deletes later. + * @param index the index to process the pending deletes for + * @param timeout the timeout used for processing pending deletes + */ + public void processPendingDeletes(Index index, TimeValue timeout) throws IOException { + final long startTime = System.currentTimeMillis(); + final List shardLocks = nodeEnv.lockAllForIndex(index, timeout.millis()); + try { + Map locks = new HashMap<>(); + for (ShardLock lock : shardLocks) { + locks.put(lock.getShardId(), lock); + } + final List remove; + synchronized (pendingDeletes) { + remove = pendingDeletes.remove(index); + } + final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec + long sleepTime = 10; + do { + if (remove == null || remove.isEmpty()) { + break; + } + Iterator iterator = remove.iterator(); + while (iterator.hasNext()) { + PendingDelete delete = iterator.next(); + ShardLock shardLock = locks.get(delete.shardId); + if (shardLock != null) { + try { + deleteShardStore("pending delete", shardLock, delete.settings); + } catch (IOException ex) { + logger.debug("{} retry pending delete", shardLock.getShardId(), ex); + } + } else { + logger.warn("{} no shard lock for pending delete", delete.shardId); + } + iterator.remove(); + } + if (remove.isEmpty() == false) { + logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString()); + try { + Thread.sleep(sleepTime); + sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually + logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime); + } catch (InterruptedException e) { + Thread.interrupted(); + return; + } + } + } while ((System.currentTimeMillis() - startTime) < timeout.millis()); + } finally { + IOUtils.close(shardLocks); + } + } + + int numPendingDeletes(Index index) { + synchronized (pendingDeletes) { + List deleteList = pendingDeletes.get(index); + if (deleteList == null) { + return 0; + } + return deleteList.size(); + } + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java index 368bf7fef26..2ebc8e2d573 100644 --- a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java +++ b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.indices; +import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -28,14 +29,18 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchSingleNodeTest; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -120,4 +125,46 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest { assertAcked(client().admin().indices().prepareOpen("test")); ensureGreen("test"); } + + public void testPendingTasks() throws IOException { + IndicesService indicesService = getIndicesService(); + IndexService test = createIndex("test"); + NodeEnvironment nodeEnc = getInstanceFromNode(NodeEnvironment.class); + + assertTrue(test.hasShard(0)); + Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings()); + try { + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + fail("can't get lock"); + } catch (LockObtainFailedException ex) { + + } + for (Path p : paths) { + assertTrue(Files.exists(p)); + } + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings()); + assertAcked(client().admin().indices().prepareClose("test")); + for (Path p : paths) { + assertTrue(Files.exists(p)); + } + assertEquals(indicesService.numPendingDeletes(test.index()), 1); + // shard lock released... we can now delete + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + assertEquals(indicesService.numPendingDeletes(test.index()), 0); + for (Path p : paths) { + assertFalse(Files.exists(p)); + } + + if (randomBoolean()) { + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings()); + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 1), test.getIndexSettings()); + indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings()); + assertEquals(indicesService.numPendingDeletes(test.index()), 2); + // shard lock released... we can now delete + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + assertEquals(indicesService.numPendingDeletes(test.index()), 0); + } + assertAcked(client().admin().indices().prepareOpen("test")); + + } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 6bb805aca93..3b1ea2a0940 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -27,10 +27,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Lists; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.AbstractRandomizedTest; -import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; @@ -1806,7 +1804,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ public void assertPathHasBeenCleared(Path path) throws Exception { logger.info("--> checking that [{}] has been cleared", path); - final List foundFiles = new ArrayList<>(); + int count = 0; StringBuilder sb = new StringBuilder(); sb.append("["); if (Files.exists(path)) { @@ -1816,7 +1814,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (Files.isDirectory(file)) { assertPathHasBeenCleared(file); } else if (Files.isRegularFile(file)) { - foundFiles.add(file); + count++; sb.append(file.toAbsolutePath().toString()); sb.append("\n"); } @@ -1824,17 +1822,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } } sb.append("]"); - if (Constants.WINDOWS) { - if (foundFiles.size() > 0) { - for (Path file : foundFiles) { - // for now on windows we only ensure that there is at least no segments_N file left on the path - // we don't have a retry mechanism in place yet. - assertFalse(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), file.getFileName().toString().startsWith(IndexFileNames.SEGMENTS)); - } - } - } else { - assertThat(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), foundFiles.size(), equalTo(0)); - } + assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); } protected static class NumShards {