[INDICES] Retry if shard deletes fail due to IOExceptions

Today if a shard deletion fails we simply ignore it and move on. On system like
windows where a virus scanner can hold on to files or any other process ie. the admins
explorer window we fail to delete shards leaving large amout of data behind. We should try
best effort to clean those shards up before we ack the delete.
This commit is contained in:
Simon Willnauer 2015-02-20 11:37:55 +01:00
parent 8e09070246
commit a71cc45023
5 changed files with 165 additions and 36 deletions

View File

@ -29,10 +29,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -52,16 +54,16 @@ public class NodeIndexDeletedAction extends AbstractComponent {
private final ThreadPool threadPool;
private final TransportService transportService;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final NodeEnvironment nodeEnv;
private final IndicesService indicesService;
@Inject
public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv) {
public NodeIndexDeletedAction(Settings settings, ThreadPool threadPool, TransportService transportService, NodeEnvironment nodeEnv, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
transportService.registerHandler(INDEX_DELETED_ACTION_NAME, new NodeIndexDeletedTransportHandler());
transportService.registerHandler(INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedTransportHandler());
this.nodeEnv = nodeEnv;
this.indicesService = indicesService;
}
public void add(Listener listener) {
@ -120,17 +122,13 @@ public class NodeIndexDeletedAction extends AbstractComponent {
// master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
final List<ShardLock> locks = nodeEnv.lockAllForIndex(new Index(index), TimeUnit.MINUTES.toMillis(30));
try {
indicesService.processPendingDeletes(new Index(index), new TimeValue(30, TimeUnit.MINUTES));
if (nodes.localNodeMaster()) {
innerNodeIndexStoreDeleted(index, nodeId);
} else {
transportService.sendRequest(clusterState.nodes().masterNode(),
INDEX_STORE_DELETED_ACTION_NAME, new NodeIndexStoreDeletedMessage(index, nodeId), EmptyTransportResponseHandler.INSTANCE_SAME);
}
} finally {
IOUtils.close(locks); // release them again
}
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
}

View File

@ -440,7 +440,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesServices.deleteShardStore("delete index", lock, indexSettings);
}
} catch (IOException e) {
logger.warn("{} failed to delete shard content", e, lock.getShardId());
indicesServices.addPendingDelete(index(), lock.getShardId(), indexSettings);
logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId());
}
}
}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
@ -92,13 +93,8 @@ import org.elasticsearch.plugins.PluginsService;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -122,6 +118,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final ClusterService clusterService;
private volatile Map<String, Tuple<IndexService, Injector>> indices = ImmutableMap.of();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final OldShardsStats oldShardsStats = new OldShardsStats();
@ -604,4 +601,102 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
builder.put(metaData.getSettings());
return builder.build();
}
/**
* Adds a pending delete for the given index.
*/
public void addPendingDelete(Index index, ShardId shardId, Settings settings) {
synchronized (pendingDeletes) {
List<PendingDelete> list = pendingDeletes.get(index);
if (list == null) {
list = new ArrayList<>();
pendingDeletes.put(index, list);
}
list.add(new PendingDelete(shardId, settings));
}
}
private static final class PendingDelete {
final ShardId shardId;
final Settings settings;
public PendingDelete(ShardId shardId, Settings settings) {
this.shardId = shardId;
this.settings = settings;
}
@Override
public String toString() {
return shardId.toString();
}
}
/**
* Processes all pending deletes for the given index. This method will acquire all locks for the given index and will
* process all pending deletes for this index. Pending deletes might occur if the OS doesn't allow deletion of files because
* they are used by a different process ie. on Windows where files might still be open by a virus scanner. On a shared
* filesystem a replica might not have been closed when the primary is deleted causing problems on delete calls so we
* schedule there deletes later.
* @param index the index to process the pending deletes for
* @param timeout the timeout used for processing pending deletes
*/
public void processPendingDeletes(Index index, TimeValue timeout) throws IOException {
final long startTime = System.currentTimeMillis();
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, timeout.millis());
try {
Map<ShardId, ShardLock> locks = new HashMap<>();
for (ShardLock lock : shardLocks) {
locks.put(lock.getShardId(), lock);
}
final List<PendingDelete> remove;
synchronized (pendingDeletes) {
remove = pendingDeletes.remove(index);
}
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
do {
if (remove == null || remove.isEmpty()) {
break;
}
Iterator<PendingDelete> iterator = remove.iterator();
while (iterator.hasNext()) {
PendingDelete delete = iterator.next();
ShardLock shardLock = locks.get(delete.shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
} catch (IOException ex) {
logger.debug("{} retry pending delete", shardLock.getShardId(), ex);
}
} else {
logger.warn("{} no shard lock for pending delete", delete.shardId);
}
iterator.remove();
}
if (remove.isEmpty() == false) {
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
try {
Thread.sleep(sleepTime);
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
} catch (InterruptedException e) {
Thread.interrupted();
return;
}
}
} while ((System.currentTimeMillis() - startTime) < timeout.millis());
} finally {
IOUtils.close(shardLocks);
}
}
int numPendingDeletes(Index index) {
synchronized (pendingDeletes) {
List<PendingDelete> deleteList = pendingDeletes.get(index);
if (deleteList == null) {
return 0;
}
return deleteList.size();
}
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.indices;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@ -28,14 +29,18 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -120,4 +125,46 @@ public class IndicesServiceTest extends ElasticsearchSingleNodeTest {
assertAcked(client().admin().indices().prepareOpen("test"));
ensureGreen("test");
}
public void testPendingTasks() throws IOException {
IndicesService indicesService = getIndicesService();
IndexService test = createIndex("test");
NodeEnvironment nodeEnc = getInstanceFromNode(NodeEnvironment.class);
assertTrue(test.hasShard(0));
Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
try {
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
} catch (LockObtainFailedException ex) {
}
for (Path p : paths) {
assertTrue(Files.exists(p));
}
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
assertAcked(client().admin().indices().prepareClose("test"));
for (Path p : paths) {
assertTrue(Files.exists(p));
}
assertEquals(indicesService.numPendingDeletes(test.index()), 1);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
for (Path p : paths) {
assertFalse(Files.exists(p));
}
if (randomBoolean()) {
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings());
assertEquals(indicesService.numPendingDeletes(test.index()), 2);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
}
assertAcked(client().admin().indices().prepareOpen("test"));
}
}

View File

@ -27,10 +27,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
@ -1806,7 +1804,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/
public void assertPathHasBeenCleared(Path path) throws Exception {
logger.info("--> checking that [{}] has been cleared", path);
final List<Path> foundFiles = new ArrayList<>();
int count = 0;
StringBuilder sb = new StringBuilder();
sb.append("[");
if (Files.exists(path)) {
@ -1816,7 +1814,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
if (Files.isDirectory(file)) {
assertPathHasBeenCleared(file);
} else if (Files.isRegularFile(file)) {
foundFiles.add(file);
count++;
sb.append(file.toAbsolutePath().toString());
sb.append("\n");
}
@ -1824,17 +1822,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
}
sb.append("]");
if (Constants.WINDOWS) {
if (foundFiles.size() > 0) {
for (Path file : foundFiles) {
// for now on windows we only ensure that there is at least no segments_N file left on the path
// we don't have a retry mechanism in place yet.
assertFalse(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), file.getFileName().toString().startsWith(IndexFileNames.SEGMENTS));
}
}
} else {
assertThat(foundFiles.size() + " files exist that should have been cleaned:\n" + sb.toString(), foundFiles.size(), equalTo(0));
}
assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0));
}
protected static class NumShards {