From 9f5e95505bf4adb28517604a46c2a9a531c402fb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 18 Sep 2020 14:04:58 +0200 Subject: [PATCH] Also abort ongoing file restores when snapshot restore is aborted (#62441) (#62607) Today when a snapshot restore is aborted (for example when the index is explicitly deleted) while the restoration of the files from the repository has already started the file restores are not interrupted. It means that Elasticsearch will continue to read the files from the repository and will continue to write them to disk until all files are restored; the store will then be closed and files will be deleted from disk at some point but this can take a while. This will also take some slots in the SNAPSHOT thread pool too. The Recovery API won't show any files actively being recovered, the only notable indicator would be the active threads in the SNAPSHOT thread pool. This commit adds a check before reading a file to restore and before writing bytes on disk so that a closing store can be detected more quickly and the file recovery process aborted. This way the file restores just stops and for most of the repository implementations it means that no more bytes are read (see #62370 for S3), finishing threads in the SNAPSHOT thread pool more quickly too. --- .../snapshots/AbortedRestoreIT.java | 123 ++++++++++++++++++ .../org/elasticsearch/index/store/Store.java | 10 +- .../blobstore/BlobStoreRepository.java | 12 ++ .../AbstractSnapshotIntegTestCase.java | 18 ++- .../snapshots/mockstore/MockRepository.java | 31 ++++- 5 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/snapshots/AbortedRestoreIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/AbortedRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/AbortedRestoreIT.java new file mode 100644 index 00000000000..fb132ea84c1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/AbortedRestoreIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; +import org.hamcrest.Matcher; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AbortedRestoreIT extends AbstractSnapshotIntegTestCase { + + public void testAbortedRestoreAlsoAbortFileRestores() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + + final String indexName = "test-abort-restore"; + createIndex(indexName, indexSettingsNoReplicas(1).build()); + indexRandomDocs(indexName, scaledRandomIntBetween(10, 1_000)); + ensureGreen(); + forceMerge(); + + final String repositoryName = "repository"; + createRepository(repositoryName, "mock"); + + final String snapshotName = "snapshot"; + createFullSnapshot(repositoryName, snapshotName); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + logger.info("--> blocking all data nodes for repository [{}]", repositoryName); + blockAllDataNodes(repositoryName); + failReadsAllDataNodes(repositoryName); + + logger.info("--> starting restore"); + final ActionFuture future = client().admin().cluster().prepareRestoreSnapshot(repositoryName, snapshotName) + .setWaitForCompletion(true) + .setIndices(indexName) + .execute(); + + assertBusy(() -> { + final RecoveryResponse recoveries = client().admin().indices().prepareRecoveries(indexName) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setActiveOnly(true).get(); + assertThat(recoveries.hasRecoveries(), is(true)); + final List shardRecoveries = recoveries.shardRecoveryStates().get(indexName); + assertThat(shardRecoveries, hasSize(1)); + assertThat(future.isDone(), is(false)); + + for (RecoveryState shardRecovery : shardRecoveries) { + assertThat(shardRecovery.getRecoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT)); + assertThat(shardRecovery.getStage(), equalTo(RecoveryState.Stage.INDEX)); + } + }); + + final ThreadPool.Info snapshotThreadPoolInfo = threadPool(dataNode).info(ThreadPool.Names.SNAPSHOT); + assertThat(snapshotThreadPoolInfo.getMax(), greaterThan(0)); + + logger.info("--> waiting for snapshot thread [max={}] pool to be full", snapshotThreadPoolInfo.getMax()); + waitForMaxActiveSnapshotThreads(dataNode, equalTo(snapshotThreadPoolInfo.getMax())); + + logger.info("--> aborting restore by deleting the index"); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + logger.info("--> unblocking repository [{}]", repositoryName); + unblockAllDataNodes(repositoryName); + + logger.info("--> restore should have failed"); + final RestoreSnapshotResponse restoreSnapshotResponse = future.get(); + assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(1)); + assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0)); + + logger.info("--> waiting for snapshot thread pool to be empty"); + waitForMaxActiveSnapshotThreads(dataNode, equalTo(0)); + } + + private static void waitForMaxActiveSnapshotThreads(final String node, final Matcher matcher) throws Exception { + assertBusy(() -> assertThat(threadPoolStats(node, ThreadPool.Names.SNAPSHOT).getActive(), matcher), 30L, TimeUnit.SECONDS); + } + + private static ThreadPool threadPool(final String node) { + return internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().threadPool(); + } + + private static ThreadPoolStats.Stats threadPoolStats(final String node, final String threadPoolName) { + return StreamSupport.stream(threadPool(node).stats().spliterator(), false) + .filter(threadPool -> threadPool.getName().equals(threadPoolName)) + .findFirst() + .orElseThrow(() -> new AssertionError("Failed to find thread pool " + threadPoolName)); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index de30d564198..d040e6f44c7 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -412,7 +412,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref @Override public void close() { - if (isClosed.compareAndSet(false, true)) { // only do this once! decRef(); @@ -420,6 +419,15 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * @return true if the {@link Store#close()} method has been called. This indicates that the current + * store is either closed or being closed waiting for all references to it to be released. + * You might prefer to use {@link Store#ensureOpen()} instead. + */ + public boolean isClosing() { + return isClosed.get(); + } + private void closeInternal() { // Leverage try-with-resources to close the shard lock for us try (Closeable c = shardLock) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4cbc5404b7a..cebd5e267d0 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -2102,6 +2103,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { + ensureNotClosing(store); boolean success = false; try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { @@ -2113,12 +2115,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) { @Override protected InputStream openSlice(int slice) throws IOException { + ensureNotClosing(store); return container.readBlob(fileInfo.partName(slice)); } })) { final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileInfo.length()))]; int length; while ((length = stream.read(buffer)) > 0) { + ensureNotClosing(store); indexOutput.writeBytes(buffer, 0, length); recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); } @@ -2141,6 +2145,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } } + + void ensureNotClosing(final Store store) throws AlreadyClosedException { + assert store.refCount() > 0; + if (store.isClosing()) { + throw new AlreadyClosedException("store is closing"); + } + } + }.restore(snapshotFiles, store, l); })); } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 344bde72802..2abb0d5b69b 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -277,6 +277,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } } + public static void failReadsAllDataNodes(String repository) { + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); + mockRepository.setFailReadsAfterUnblock(true); + } + } + public static void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException { final boolean blocked = waitUntil(() -> { for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { @@ -307,11 +314,16 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } protected void createRepository(String repoName, String type) { - Settings.Builder settings = Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()); + createRepository(repoName, type, randomRepositorySettings()); + } + + protected Settings.Builder randomRepositorySettings() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()).put("compress", randomBoolean()); if (rarely()) { - settings = settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); + settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); } - createRepository(repoName, type, settings); + return settings; } protected static Settings.Builder indexSettingsNoReplicas(int shards) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 0f27ec6cb37..95355a58508 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -123,6 +123,12 @@ public class MockRepository extends FsRepository { */ private volatile boolean failOnIndexLatest = false; + /** + * Reading blobs will fail with an {@link AssertionError} once the repository has been blocked once. + */ + private volatile boolean failReadsAfterUnblock; + private volatile boolean throwReadErrorAfterUnblock = false; + private volatile boolean blocked = false; public MockRepository(RepositoryMetadata metadata, Environment environment, @@ -206,6 +212,10 @@ public class MockRepository extends FsRepository { blockOnDeleteIndexN = true; } + public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) { + this.failReadsAfterUnblock = failReadsAfterUnblock; + } + public boolean blocked() { return blocked; } @@ -228,6 +238,10 @@ public class MockRepository extends FsRepository { Thread.currentThread().interrupt(); } logger.debug("[{}] Unblocking execution", metadata.name()); + if (wasBlocked && failReadsAfterUnblock) { + logger.debug("[{}] Next read operations will fail", metadata.name()); + this.throwReadErrorAfterUnblock = true; + } return wasBlocked; } @@ -255,7 +269,6 @@ public class MockRepository extends FsRepository { } private class MockBlobContainer extends FilterBlobContainer { - private MessageDigest digest; private boolean shouldFail(String blobName, double probability) { if (probability > 0.0) { @@ -270,7 +283,7 @@ public class MockRepository extends FsRepository { private int hashCode(String path) { try { - digest = MessageDigest.getInstance("MD5"); + MessageDigest digest = MessageDigest.getInstance("MD5"); byte[] bytes = digest.digest(path.getBytes("UTF-8")); int i = 0; return ((bytes[i++] & 0xFF) << 24) | ((bytes[i++] & 0xFF) << 16) @@ -331,6 +344,12 @@ public class MockRepository extends FsRepository { throw new IOException("exception after block"); } + private void maybeReadErrorAfterBlock(final String blobName) { + if (throwReadErrorAfterUnblock) { + throw new AssertionError("Read operation are not allowed anymore at this point [blob=" + blobName + "]"); + } + } + MockBlobContainer(BlobContainer delegate) { super(delegate); } @@ -342,10 +361,18 @@ public class MockRepository extends FsRepository { @Override public InputStream readBlob(String name) throws IOException { + maybeReadErrorAfterBlock(name); maybeIOExceptionOrBlock(name); return super.readBlob(name); } + @Override + public InputStream readBlob(String name, long position, long length) throws IOException { + maybeReadErrorAfterBlock(name); + maybeIOExceptionOrBlock(name); + return super.readBlob(name, position, length); + } + @Override public DeleteResult delete() throws IOException { DeleteResult deleteResult = DeleteResult.ZERO;