Today when a snapshot restore is aborted (for example when the index is explicitly deleted) while the restoration of the files from the repository has already started the file restores are not interrupted. It means that Elasticsearch will continue to read the files from the repository and will continue to write them to disk until all files are restored; the store will then be closed and files will be deleted from disk at some point but this can take a while. This will also take some slots in the SNAPSHOT thread pool too. The Recovery API won't show any files actively being recovered, the only notable indicator would be the active threads in the SNAPSHOT thread pool. This commit adds a check before reading a file to restore and before writing bytes on disk so that a closing store can be detected more quickly and the file recovery process aborted. This way the file restores just stops and for most of the repository implementations it means that no more bytes are read (see #62370 for S3), finishing threads in the SNAPSHOT thread pool more quickly too.
This commit is contained in:
parent
8bea6b3711
commit
9f5e95505b
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class AbortedRestoreIT extends AbstractSnapshotIntegTestCase {
|
||||
|
||||
public void testAbortedRestoreAlsoAbortFileRestores() throws Exception {
|
||||
internalCluster().startMasterOnlyNode();
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
|
||||
final String indexName = "test-abort-restore";
|
||||
createIndex(indexName, indexSettingsNoReplicas(1).build());
|
||||
indexRandomDocs(indexName, scaledRandomIntBetween(10, 1_000));
|
||||
ensureGreen();
|
||||
forceMerge();
|
||||
|
||||
final String repositoryName = "repository";
|
||||
createRepository(repositoryName, "mock");
|
||||
|
||||
final String snapshotName = "snapshot";
|
||||
createFullSnapshot(repositoryName, snapshotName);
|
||||
assertAcked(client().admin().indices().prepareDelete(indexName));
|
||||
|
||||
logger.info("--> blocking all data nodes for repository [{}]", repositoryName);
|
||||
blockAllDataNodes(repositoryName);
|
||||
failReadsAllDataNodes(repositoryName);
|
||||
|
||||
logger.info("--> starting restore");
|
||||
final ActionFuture<RestoreSnapshotResponse> future = client().admin().cluster().prepareRestoreSnapshot(repositoryName, snapshotName)
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices(indexName)
|
||||
.execute();
|
||||
|
||||
assertBusy(() -> {
|
||||
final RecoveryResponse recoveries = client().admin().indices().prepareRecoveries(indexName)
|
||||
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setActiveOnly(true).get();
|
||||
assertThat(recoveries.hasRecoveries(), is(true));
|
||||
final List<RecoveryState> shardRecoveries = recoveries.shardRecoveryStates().get(indexName);
|
||||
assertThat(shardRecoveries, hasSize(1));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
for (RecoveryState shardRecovery : shardRecoveries) {
|
||||
assertThat(shardRecovery.getRecoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT));
|
||||
assertThat(shardRecovery.getStage(), equalTo(RecoveryState.Stage.INDEX));
|
||||
}
|
||||
});
|
||||
|
||||
final ThreadPool.Info snapshotThreadPoolInfo = threadPool(dataNode).info(ThreadPool.Names.SNAPSHOT);
|
||||
assertThat(snapshotThreadPoolInfo.getMax(), greaterThan(0));
|
||||
|
||||
logger.info("--> waiting for snapshot thread [max={}] pool to be full", snapshotThreadPoolInfo.getMax());
|
||||
waitForMaxActiveSnapshotThreads(dataNode, equalTo(snapshotThreadPoolInfo.getMax()));
|
||||
|
||||
logger.info("--> aborting restore by deleting the index");
|
||||
assertAcked(client().admin().indices().prepareDelete(indexName));
|
||||
|
||||
logger.info("--> unblocking repository [{}]", repositoryName);
|
||||
unblockAllDataNodes(repositoryName);
|
||||
|
||||
logger.info("--> restore should have failed");
|
||||
final RestoreSnapshotResponse restoreSnapshotResponse = future.get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(1));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0));
|
||||
|
||||
logger.info("--> waiting for snapshot thread pool to be empty");
|
||||
waitForMaxActiveSnapshotThreads(dataNode, equalTo(0));
|
||||
}
|
||||
|
||||
private static void waitForMaxActiveSnapshotThreads(final String node, final Matcher<Integer> matcher) throws Exception {
|
||||
assertBusy(() -> assertThat(threadPoolStats(node, ThreadPool.Names.SNAPSHOT).getActive(), matcher), 30L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private static ThreadPool threadPool(final String node) {
|
||||
return internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().threadPool();
|
||||
}
|
||||
|
||||
private static ThreadPoolStats.Stats threadPoolStats(final String node, final String threadPoolName) {
|
||||
return StreamSupport.stream(threadPool(node).stats().spliterator(), false)
|
||||
.filter(threadPool -> threadPool.getName().equals(threadPoolName))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new AssertionError("Failed to find thread pool " + threadPoolName));
|
||||
}
|
||||
}
|
|
@ -412,7 +412,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
if (isClosed.compareAndSet(false, true)) {
|
||||
// only do this once!
|
||||
decRef();
|
||||
|
@ -420,6 +419,15 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the {@link Store#close()} method has been called. This indicates that the current
|
||||
* store is either closed or being closed waiting for all references to it to be released.
|
||||
* You might prefer to use {@link Store#ensureOpen()} instead.
|
||||
*/
|
||||
public boolean isClosing() {
|
||||
return isClosed.get();
|
||||
}
|
||||
|
||||
private void closeInternal() {
|
||||
// Leverage try-with-resources to close the shard lock for us
|
||||
try (Closeable c = shardLock) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.index.CorruptIndexException;
|
|||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.apache.lucene.index.IndexFormatTooNewException;
|
||||
import org.apache.lucene.index.IndexFormatTooOldException;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
|
@ -2102,6 +2103,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
|
||||
ensureNotClosing(store);
|
||||
boolean success = false;
|
||||
try (IndexOutput indexOutput =
|
||||
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
|
||||
|
@ -2113,12 +2115,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
@Override
|
||||
protected InputStream openSlice(int slice) throws IOException {
|
||||
ensureNotClosing(store);
|
||||
return container.readBlob(fileInfo.partName(slice));
|
||||
}
|
||||
})) {
|
||||
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileInfo.length()))];
|
||||
int length;
|
||||
while ((length = stream.read(buffer)) > 0) {
|
||||
ensureNotClosing(store);
|
||||
indexOutput.writeBytes(buffer, 0, length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
|
||||
}
|
||||
|
@ -2141,6 +2145,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ensureNotClosing(final Store store) throws AlreadyClosedException {
|
||||
assert store.refCount() > 0;
|
||||
if (store.isClosing()) {
|
||||
throw new AlreadyClosedException("store is closing");
|
||||
}
|
||||
}
|
||||
|
||||
}.restore(snapshotFiles, store, l);
|
||||
}));
|
||||
}
|
||||
|
|
|
@ -277,6 +277,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static void failReadsAllDataNodes(String repository) {
|
||||
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
|
||||
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
|
||||
mockRepository.setFailReadsAfterUnblock(true);
|
||||
}
|
||||
}
|
||||
|
||||
public static void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException {
|
||||
final boolean blocked = waitUntil(() -> {
|
||||
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
|
||||
|
@ -307,11 +314,16 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
protected void createRepository(String repoName, String type) {
|
||||
Settings.Builder settings = Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean());
|
||||
createRepository(repoName, type, randomRepositorySettings());
|
||||
}
|
||||
|
||||
protected Settings.Builder randomRepositorySettings() {
|
||||
final Settings.Builder settings = Settings.builder();
|
||||
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
|
||||
if (rarely()) {
|
||||
settings = settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
|
||||
settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
|
||||
}
|
||||
createRepository(repoName, type, settings);
|
||||
return settings;
|
||||
}
|
||||
|
||||
protected static Settings.Builder indexSettingsNoReplicas(int shards) {
|
||||
|
|
|
@ -123,6 +123,12 @@ public class MockRepository extends FsRepository {
|
|||
*/
|
||||
private volatile boolean failOnIndexLatest = false;
|
||||
|
||||
/**
|
||||
* Reading blobs will fail with an {@link AssertionError} once the repository has been blocked once.
|
||||
*/
|
||||
private volatile boolean failReadsAfterUnblock;
|
||||
private volatile boolean throwReadErrorAfterUnblock = false;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
||||
public MockRepository(RepositoryMetadata metadata, Environment environment,
|
||||
|
@ -206,6 +212,10 @@ public class MockRepository extends FsRepository {
|
|||
blockOnDeleteIndexN = true;
|
||||
}
|
||||
|
||||
public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
|
||||
this.failReadsAfterUnblock = failReadsAfterUnblock;
|
||||
}
|
||||
|
||||
public boolean blocked() {
|
||||
return blocked;
|
||||
}
|
||||
|
@ -228,6 +238,10 @@ public class MockRepository extends FsRepository {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
logger.debug("[{}] Unblocking execution", metadata.name());
|
||||
if (wasBlocked && failReadsAfterUnblock) {
|
||||
logger.debug("[{}] Next read operations will fail", metadata.name());
|
||||
this.throwReadErrorAfterUnblock = true;
|
||||
}
|
||||
return wasBlocked;
|
||||
}
|
||||
|
||||
|
@ -255,7 +269,6 @@ public class MockRepository extends FsRepository {
|
|||
}
|
||||
|
||||
private class MockBlobContainer extends FilterBlobContainer {
|
||||
private MessageDigest digest;
|
||||
|
||||
private boolean shouldFail(String blobName, double probability) {
|
||||
if (probability > 0.0) {
|
||||
|
@ -270,7 +283,7 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private int hashCode(String path) {
|
||||
try {
|
||||
digest = MessageDigest.getInstance("MD5");
|
||||
MessageDigest digest = MessageDigest.getInstance("MD5");
|
||||
byte[] bytes = digest.digest(path.getBytes("UTF-8"));
|
||||
int i = 0;
|
||||
return ((bytes[i++] & 0xFF) << 24) | ((bytes[i++] & 0xFF) << 16)
|
||||
|
@ -331,6 +344,12 @@ public class MockRepository extends FsRepository {
|
|||
throw new IOException("exception after block");
|
||||
}
|
||||
|
||||
private void maybeReadErrorAfterBlock(final String blobName) {
|
||||
if (throwReadErrorAfterUnblock) {
|
||||
throw new AssertionError("Read operation are not allowed anymore at this point [blob=" + blobName + "]");
|
||||
}
|
||||
}
|
||||
|
||||
MockBlobContainer(BlobContainer delegate) {
|
||||
super(delegate);
|
||||
}
|
||||
|
@ -342,10 +361,18 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
@Override
|
||||
public InputStream readBlob(String name) throws IOException {
|
||||
maybeReadErrorAfterBlock(name);
|
||||
maybeIOExceptionOrBlock(name);
|
||||
return super.readBlob(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String name, long position, long length) throws IOException {
|
||||
maybeReadErrorAfterBlock(name);
|
||||
maybeIOExceptionOrBlock(name);
|
||||
return super.readBlob(name, position, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteResult delete() throws IOException {
|
||||
DeleteResult deleteResult = DeleteResult.ZERO;
|
||||
|
|
Loading…
Reference in New Issue