We don't need to read the SnapshotInfo for a snapshot to determine the indices that need to be updated when it is deleted as the `RepositoryData` contains that information already. This PR makes it so the `RepositoryData` is used to determine which indices to update and also removes the special handling for deleting snapshot metadata and the CS snapshot blob and has those simply be deleted as part of the deleting of other unreferenced blobs in the last step of the delete. This makes the snapshot delete a little faster and more resilient by removing two RPC calls (the separate delete and the get). Also, this shortens the diff with #46250 as a side-effect.
This commit is contained in:
parent
ec9b77deaa
commit
c1be7a802c
|
@ -123,6 +123,20 @@ public final class RepositoryData {
|
||||||
return indices;
|
return indices;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by other
|
||||||
|
* snapshots) after removing the given snapshot from the repository.
|
||||||
|
*
|
||||||
|
* @param snapshotId SnapshotId to remove
|
||||||
|
* @return List of indices that are changed but not removed
|
||||||
|
*/
|
||||||
|
public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId) {
|
||||||
|
return indexSnapshots.entrySet().stream()
|
||||||
|
.filter(entry -> entry.getValue().size() > 1 && entry.getValue().contains(snapshotId))
|
||||||
|
.map(Map.Entry::getKey)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
|
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
|
||||||
* already exists in the repository data, this method throws an IllegalArgumentException.
|
* already exists in the repository data, this method throws an IllegalArgumentException.
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.lucene.store.IOContext;
|
||||||
import org.apache.lucene.store.IndexInput;
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.RateLimiter;
|
import org.apache.lucene.store.RateLimiter;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
@ -61,7 +60,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
@ -101,17 +99,13 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
|
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
|
||||||
|
@ -407,6 +401,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
try {
|
try {
|
||||||
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
||||||
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
|
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
|
||||||
|
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||||
|
// delete an index that was created by another master node after writing this index-N blob.
|
||||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||||
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener);
|
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
|
@ -432,36 +428,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData,
|
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData,
|
||||||
ActionListener<Void> listener) throws IOException {
|
ActionListener<Void> listener) throws IOException {
|
||||||
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
|
||||||
// delete an index that was created by another master node after writing this index-N blob.
|
|
||||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||||
SnapshotInfo snapshot = null;
|
|
||||||
try {
|
|
||||||
snapshot = getSnapshotInfo(snapshotId);
|
|
||||||
} catch (SnapshotMissingException ex) {
|
|
||||||
listener.onFailure(ex);
|
|
||||||
return;
|
|
||||||
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
|
|
||||||
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
|
||||||
}
|
|
||||||
final List<String> snapMetaFilesToDelete =
|
|
||||||
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()));
|
|
||||||
try {
|
|
||||||
blobContainer().deleteBlobsIgnoringIfNotExists(snapMetaFilesToDelete);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
|
||||||
}
|
|
||||||
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
|
|
||||||
deleteIndices(
|
deleteIndices(
|
||||||
updatedRepositoryData,
|
updatedRepositoryData,
|
||||||
Optional.ofNullable(snapshot).map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId),
|
||||||
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList())).orElse(Collections.emptyList()),
|
|
||||||
snapshotId,
|
snapshotId,
|
||||||
ActionListener.delegateFailure(listener,
|
ActionListener.delegateFailure(listener,
|
||||||
(l, v) -> cleanupStaleBlobs(foundIndices,
|
(l, v) -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
||||||
Sets.difference(rootBlobs.keySet(), new HashSet<>(snapMetaFilesToDelete)).stream().collect(
|
|
||||||
Collectors.toMap(Function.identity(), rootBlobs::get)),
|
|
||||||
updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
|
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
|
||||||
|
@ -57,6 +58,17 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode());
|
assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndicesToUpdateAfterRemovingSnapshot() {
|
||||||
|
final RepositoryData repositoryData = generateRandomRepoData();
|
||||||
|
final List<IndexId> indicesBefore = new ArrayList<>(repositoryData.getIndices().values());
|
||||||
|
final SnapshotId randomSnapshot = randomFrom(repositoryData.getSnapshotIds());
|
||||||
|
final IndexId[] indicesToUpdate = indicesBefore.stream().filter(index -> {
|
||||||
|
final Set<SnapshotId> snapshotIds = repositoryData.getSnapshots(index);
|
||||||
|
return snapshotIds.contains(randomSnapshot) && snapshotIds.size() > 1;
|
||||||
|
}).toArray(IndexId[]::new);
|
||||||
|
assertThat(repositoryData.indicesToUpdateAfterRemovingSnapshot(randomSnapshot), containsInAnyOrder(indicesToUpdate));
|
||||||
|
}
|
||||||
|
|
||||||
public void testXContent() throws IOException {
|
public void testXContent() throws IOException {
|
||||||
RepositoryData repositoryData = generateRandomRepoData();
|
RepositoryData repositoryData = generateRandomRepoData();
|
||||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||||
|
|
Loading…
Reference in New Issue