diff --git a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java index 7db80256d0e..2c33a5251b2 100644 --- a/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/index/NodeIndexDeletedAction.java @@ -29,10 +29,12 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -52,16 +54,16 @@ public class NodeIndexDeletedAction extends AbstractComponent { private final ThreadPool threadPool; private final TransportService transportService; private final List listeners = new CopyOnWriteArrayList<>(); - private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; @Inject - public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv) { + public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv, IndicesService indicesService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; transportService.registerHandler(INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedTransportHandler()); transportService.registerHandler(INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedTransportHandler()); - this.nodeEnv = nodeEnv; + this.indicesService = indicesService; } public void add(Listener listener) { @@ -120,16 +122,12 @@ public class NodeIndexDeletedAction extends AbstractComponent { // master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock // due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be // deleted by the time we get the lock - final List locks = nodeEnv.lockAllForIndex(new Index(index), TimeUnit.MINUTES.toMillis(30)); - try { - if (nodes.localNodeMaster()) { - innerNodeIndexStoreDeleted(index, nodeId); - } else { - transportService.sendRequest(clusterState.nodes().masterNode(), - INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); - } - } finally { - IOUtils.close(locks); // release them again + indicesService.processPendingDeletes(new Index(index), new TimeValue(30, TimeUnit.MINUTES)); + if (nodes.localNodeMaster()) { + innerNodeIndexStoreDeleted(index, nodeId); + } else { + transportService.sendRequest(clusterState.nodes().masterNode(), + INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME); } } catch (LockObtainFailedException exc) { logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index); diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index a9dbdf71a1d..69985a39134 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -440,7 +440,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone indicesServices.deleteShardStore("delete index", lock, indexSettings); } } catch (IOException e) { - logger.warn("{} failed to delete shard content", e, lock.getShardId()); + indicesServices.addPendingDelete(index(), lock.getShardId(), indexSettings); + logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId()); } } } diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index b3e02da4608..f21ac36c344 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -50,6 +50,7 @@ import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -92,13 +93,8 @@ import org.elasticsearch.plugins.PluginsService; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; @@ -122,6 +118,7 @@ public class IndicesService extends AbstractLifecycleComponent i private final ClusterService clusterService; private volatile Map> indices = ImmutableMap.of(); + private final Map> pendingDeletes = new HashMap<>(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -604,4 +601,102 @@ public class IndicesService extends AbstractLifecycleComponent i builder.put(metaData.getSettings()); return builder.build(); } + + /** + * Adds a pending delete for the given index. + */ + public void addPendingDelete(Index index, ShardId shardId, Settings settings) { + synchronized (pendingDeletes) { + List list = pendingDeletes.get(index); + if (list == null) { + list = new ArrayList<>(); + pendingDeletes.put(index, list); + } + list.add(new PendingDelete(shardId, settings)); + } + } + + private static final class PendingDelete { + final ShardId shardId; + final Settings settings; + + public PendingDelete(ShardId shardId, Settings settings) { + this.shardId = shardId; + this.settings = settings; + } + + @Override + public String toString() { + return shardId.toString(); + } + } + + /** + * Processes all pending deletes for the given index. This method will acquire all locks for the given index and will + * process all pending deletes for this index. Pending deletes might occur if the OS doesn't allow deletion of files because + * they are used by a different process ie. on Windows where files might still be open by a virus scanner. On a shared + * filesystem a replica might not have been closed when the primary is deleted causing problems on delete calls so we + * schedule there deletes later. + * @param index the index to process the pending deletes for + * @param timeout the timeout used for processing pending deletes + */ + public void processPendingDeletes(Index index, TimeValue timeout) throws IOException { + final long startTime = System.currentTimeMillis(); + final List shardLocks = nodeEnv.lockAllForIndex(index, timeout.millis()); + try { + Map locks = new HashMap<>(); + for (ShardLock lock : shardLocks) { + locks.put(lock.getShardId(), lock); + } + final List remove; + synchronized (pendingDeletes) { + remove = pendingDeletes.remove(index); + } + final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec + long sleepTime = 10; + do { + if (remove == null || remove.isEmpty()) { + break; + } + Iterator iterator = remove.iterator(); + while (iterator.hasNext()) { + PendingDelete delete = iterator.next(); + ShardLock shardLock = locks.get(delete.shardId); + if (shardLock != null) { + try { + deleteShardStore("pending delete", shardLock, delete.settings); + } catch (IOException ex) { + logger.debug("{} retry pending delete", shardLock.getShardId(), ex); + } + } else { + logger.warn("{} no shard lock for pending delete", delete.shardId); + } + iterator.remove(); + } + if (remove.isEmpty() == false) { + logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString()); + try { + Thread.sleep(sleepTime); + sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually + logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime); + } catch (InterruptedException e) { + Thread.interrupted(); + return; + } + } + } while ((System.currentTimeMillis() - startTime) < timeout.millis()); + } finally { + IOUtils.close(shardLocks); + } + } + + int numPendingDeletes(Index index) { + synchronized (pendingDeletes) { + List deleteList = pendingDeletes.get(index); + if (deleteList == null) { + return 0; + } + return deleteList.size(); + } + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java index 368bf7fef26..2ebc8e2d573 100644 --- a/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java +++ b/src/test/java/org/elasticsearch/indices/IndicesServiceTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.indices; +import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -28,14 +29,18 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayMetaState; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchSingleNodeTest; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -120,4 +125,46 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest { assertAcked(client().admin().indices().prepareOpen("test")); ensureGreen("test"); } + + public void testPendingTasks() throws IOException { + IndicesService indicesService = getIndicesService(); + IndexService test = createIndex("test"); + NodeEnvironment nodeEnc = getInstanceFromNode(NodeEnvironment.class); + + assertTrue(test.hasShard(0)); + Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings()); + try { + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + fail("can't get lock"); + } catch (LockObtainFailedException ex) { + + } + for (Path p : paths) { + assertTrue(Files.exists(p)); + } + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings()); + assertAcked(client().admin().indices().prepareClose("test")); + for (Path p : paths) { + assertTrue(Files.exists(p)); + } + assertEquals(indicesService.numPendingDeletes(test.index()), 1); + // shard lock released... we can now delete + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + assertEquals(indicesService.numPendingDeletes(test.index()), 0); + for (Path p : paths) { + assertFalse(Files.exists(p)); + } + + if (randomBoolean()) { + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings()); + indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 1), test.getIndexSettings()); + indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings()); + assertEquals(indicesService.numPendingDeletes(test.index()), 2); + // shard lock released... we can now delete + indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS)); + assertEquals(indicesService.numPendingDeletes(test.index()), 0); + } + assertAcked(client().admin().indices().prepareOpen("test")); + + } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 6bb805aca93..3b1ea2a0940 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -27,10 +27,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Lists; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.AbstractRandomizedTest; -import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; @@ -1806,7 +1804,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase */ public void assertPathHasBeenCleared(Path path) throws Exception { logger.info("--> checking that [{}] has been cleared", path); - final List foundFiles = new ArrayList<>(); + int count = 0; StringBuilder sb = new StringBuilder(); sb.append("["); if (Files.exists(path)) { @@ -1816,7 +1814,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase if (Files.isDirectory(file)) { assertPathHasBeenCleared(file); } else if (Files.isRegularFile(file)) { - foundFiles.add(file); + count++; sb.append(file.toAbsolutePath().toString()); sb.append("\n"); } @@ -1824,17 +1822,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } } sb.append("]"); - if (Constants.WINDOWS) { - if (foundFiles.size() > 0) { - for (Path file : foundFiles) { - // for now on windows we only ensure that there is at least no segments_N file left on the path - // we don't have a retry mechanism in place yet. - assertFalse(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), file.getFileName().toString().startsWith(IndexFileNames.SEGMENTS)); - } - } - } else { - assertThat(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), foundFiles.size(), equalTo(0)); - } + assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0)); } protected static class NumShards {