Add back pending deletes (#18698)

Triggering the pending deletes logic was accidentally removed in the clean up PR #18602.
This commit is contained in:
Yannick Welsch 2016-06-06 15:14:09 +02:00
parent 371c73e140
commit 0a8afa2e72
6 changed files with 76 additions and 17 deletions

View File

@ -112,6 +112,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -141,6 +142,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private final CircuitBreakerService circuitBreakerService;
private volatile Map<String, IndexService> indices = emptyMap();
private final Map<Index, List<PendingDelete>> pendingDeletes = new HashMap<>();
private final AtomicInteger numUncompletedDeletes = new AtomicInteger();
private final OldShardsStats oldShardsStats = new OldShardsStats();
private final IndexStoreConfig indexStoreConfig;
private final MapperRegistry mapperRegistry;
@ -782,6 +784,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
pendingDeletes.put(index, list);
}
list.add(pendingDelete);
numUncompletedDeletes.incrementAndGet();
}
}
@ -840,6 +843,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
logger.debug("{} processing pending deletes", index);
final long startTimeNS = System.nanoTime();
final List<ShardLock> shardLocks = nodeEnv.lockAllForIndex(index, indexSettings, timeout.millis());
int numRemoved = 0;
try {
Map<ShardId, ShardLock> locks = new HashMap<>();
for (ShardLock lock : shardLocks) {
@ -850,6 +854,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
remove = pendingDeletes.remove(index);
}
if (remove != null && remove.isEmpty() == false) {
numRemoved = remove.size();
CollectionUtil.timSort(remove); // make sure we delete indices first
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
@ -896,6 +901,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
} finally {
IOUtils.close(shardLocks);
if (numRemoved > 0) {
int remainingUncompletedDeletes = numUncompletedDeletes.addAndGet(-numRemoved);
assert remainingUncompletedDeletes >= 0;
}
}
}
@ -909,6 +918,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
}
}
/**
* Checks if all pending deletes have completed. Used by tests to ensure we don't check directory contents while deletion still ongoing.
* The reason is that, on Windows, browsing the directory contents can interfere with the deletion process and delay it unnecessarily.
*/
public boolean hasUncompletedPendingDeletes() {
return numUncompletedDeletes.get() > 0;
}
/**
* Returns this nodes {@link IndicesQueriesRegistry}
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -41,11 +42,14 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -67,7 +71,6 @@ import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
@ -75,6 +78,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -213,11 +218,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
}
final IndexService idxService = indicesService.indexService(index);
final IndexSettings indexSettings;
if (idxService != null) {
indexSettings = idxService.getIndexSettings();
deleteIndex(index, "index no longer part of the metadata");
} else if (previousState.metaData().hasIndex(index.getName())) {
// The deleted index was part of the previous cluster state, but not loaded on the local node
final IndexMetaData metaData = previousState.metaData().index(index);
indexSettings = new IndexSettings(metaData, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, event.state());
} else {
// The previous cluster state's metadata also does not contain the index,
@ -227,7 +235,35 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// First, though, verify the precondition for applying this case by
// asserting that the previous cluster state is not initialized/recovered.
assert previousState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
indicesService.verifyIndexIsDeleted(index, event.state());
final IndexMetaData metaData = indicesService.verifyIndexIsDeleted(index, event.state());
if (metaData != null) {
indexSettings = new IndexSettings(metaData, settings);
} else {
indexSettings = null;
}
}
if (indexSettings != null) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to complete pending deletion for index", t, index);
}
@Override
protected void doRun() throws Exception {
try {
// we are waiting until we can lock the index / all shards on the node and then we ack the delete of the store to the
// master. If we can't acquire the locks here immediately there might be a shard of this index still holding on to the lock
// due to a "currently canceled recovery" or so. The shard will delete itself BEFORE the lock is released so it's guaranteed to be
// deleted by the time we get the lock
indicesService.processPendingDeletes(index, indexSettings, new TimeValue(30, TimeUnit.MINUTES));
} catch (LockObtainFailedException exc) {
logger.warn("[{}] failed to lock all shards for index - timed out after 30 seconds", index);
} catch (InterruptedException e) {
logger.warn("[{}] failed to lock all shards for index - interrupted", index);
}
}
});
}
}

View File

@ -70,7 +70,6 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -586,8 +585,8 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
logger.info("--> deleting index " + IDX);
assertAcked(client().admin().indices().prepareDelete(IDX));
// assertBusy(() -> assertPathHasBeenCleared(dataPath), 1, TimeUnit.MINUTES);
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
//norelease
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
//assertIndicesDirsDeleted(nodes);
@ -647,8 +646,8 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
assertHitCount(resp, docCount);
assertAcked(client().admin().indices().prepareDelete(IDX));
// assertBusy(() -> assertPathHasBeenCleared(dataPath), 1, TimeUnit.MINUTES);
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
//norelease
//TODO: uncomment the test below when https://github.com/elastic/elasticsearch/issues/17695 is resolved.
//assertIndicesDirsDeleted(nodes);
@ -839,8 +838,8 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
logger.info("--> deleting closed index");
client().admin().indices().prepareDelete(IDX).get();
assertBusy(() -> assertPathHasBeenCleared(dataPath), 1, TimeUnit.MINUTES);
assertAllIndicesRemovedAndDeletionCompleted(internalCluster().getInstances(IndicesService.class));
assertPathHasBeenCleared(dataPath);
assertIndicesDirsDeleted(nodes);
}

View File

@ -558,7 +558,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
SearchResponse response = client().prepareSearch("test").get();
assertHitCount(response, 1L);
client().admin().indices().prepareDelete("test").get();
assertBusyPathHasBeenCleared(idxPath);
assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class)));
assertPathHasBeenCleared(idxPath);
}
public void testExpectedShardSizeIsPresent() throws InterruptedException {
@ -641,8 +642,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertThat("found the hit", resp.getHits().getTotalHits(), equalTo(1L));
assertAcked(client().admin().indices().prepareDelete(INDEX));
assertBusyPathHasBeenCleared(startDir.toAbsolutePath());
assertBusyPathHasBeenCleared(endDir.toAbsolutePath());
assertAllIndicesRemovedAndDeletionCompleted(Collections.singleton(getInstanceFromNode(IndicesService.class)));
assertPathHasBeenCleared(startDir.toAbsolutePath());
assertPathHasBeenCleared(endDir.toAbsolutePath());
}
public void testShardStats() throws IOException {

View File

@ -190,10 +190,12 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
assertTrue(path.exists());
assertEquals(indicesService.numPendingDeletes(test.index()), numPending);
assertTrue(indicesService.hasUncompletedPendingDeletes());
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
assertFalse(indicesService.hasUncompletedPendingDeletes());
assertFalse(path.exists());
if (randomBoolean()) {
@ -201,9 +203,11 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
indicesService.addPendingDelete(new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId("bogus", "_na_", 1), test.getIndexSettings());
assertEquals(indicesService.numPendingDeletes(test.index()), 2);
assertTrue(indicesService.hasUncompletedPendingDeletes());
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
assertFalse(indicesService.hasUncompletedPendingDeletes());
}
assertAcked(client().admin().indices().prepareOpen("test"));

View File

@ -56,6 +56,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.test.junit.listeners.LoggingListener;
@ -672,11 +673,11 @@ public abstract class ESTestCase extends LuceneTestCase {
return enabled;
}
/**
* Asserts busily that there are no files in the specified path
*/
public void assertBusyPathHasBeenCleared(Path path) throws Exception {
assertBusy(() -> assertPathHasBeenCleared(path));
public void assertAllIndicesRemovedAndDeletionCompleted(Iterable<IndicesService> indicesServices) throws Exception {
for (IndicesService indicesService : indicesServices) {
assertBusy(() -> assertFalse(indicesService.iterator().hasNext()), 1, TimeUnit.MINUTES);
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
}
}
/**