Don't load global state when only restoring indices (#29239)
Restoring a snapshot, or getting the status of finished snapshots, currently always load the global state metadata file from the repository even if it not required. This slows down the restore process (or listing statuses process) and can also be an issue if the global state cannot be deserialized (because it has unknown customs for example). This commit splits the Repository.getSnapshotMetadata() method into two distincts methods: getGlobalMetadata() and getIndexMetadata() that are now called only when needed.
This commit is contained in:
parent
1f6a3c1d80
commit
36f8531bf4
|
@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
|
|||
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -78,15 +79,21 @@ public interface Repository extends LifecycleComponent {
|
|||
SnapshotInfo getSnapshotInfo(SnapshotId snapshotId);
|
||||
|
||||
/**
|
||||
* Returns global metadata associate with the snapshot.
|
||||
* <p>
|
||||
* The returned meta data contains global metadata as well as metadata for all indices listed in the indices parameter.
|
||||
* Returns global metadata associated with the snapshot.
|
||||
*
|
||||
* @param snapshot snapshot
|
||||
* @param indices list of indices
|
||||
* @return information about snapshot
|
||||
* @param snapshotId the snapshot id to load the global metadata from
|
||||
* @return the global metadata about the snapshot
|
||||
*/
|
||||
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;
|
||||
MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId);
|
||||
|
||||
/**
|
||||
* Returns the index metadata associated with the snapshot.
|
||||
*
|
||||
* @param snapshotId the snapshot id to load the index metadata from
|
||||
* @param index the {@link IndexId} to load the metadata from
|
||||
* @return the index metadata about the given index for the given snapshot
|
||||
*/
|
||||
IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
|
||||
|
|
|
@ -480,11 +480,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return blobStoreSnapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
|
||||
return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
|
||||
try {
|
||||
|
@ -496,38 +491,59 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<IndexId> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
MetaData metaData;
|
||||
@Override
|
||||
public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {
|
||||
try {
|
||||
return globalMetaDataFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
|
||||
} catch (NoSuchFileException ex) {
|
||||
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
||||
} catch (IOException ex) {
|
||||
throw new SnapshotException(metadata.name(), snapshotId, "failed to read global metadata", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException {
|
||||
final BlobPath indexPath = basePath().add("indices").add(index.getId());
|
||||
return indexMetaDataFormat.read(blobStore().blobContainer(indexPath), snapshotId.getUUID());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the global metadata associated with the snapshot.
|
||||
* <p>
|
||||
* The returned meta data contains global metadata as well as metadata
|
||||
* for all indices listed in the indices parameter.
|
||||
*/
|
||||
private MetaData readSnapshotMetaData(final SnapshotId snapshotId,
|
||||
final Version snapshotVersion,
|
||||
final List<IndexId> indices,
|
||||
final boolean ignoreErrors) throws IOException {
|
||||
if (snapshotVersion == null) {
|
||||
// When we delete corrupted snapshots we might not know which version we are dealing with
|
||||
// We can try detecting the version based on the metadata file format
|
||||
assert ignoreIndexErrors;
|
||||
assert ignoreErrors;
|
||||
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) == false) {
|
||||
throw new SnapshotMissingException(metadata.name(), snapshotId);
|
||||
}
|
||||
}
|
||||
try {
|
||||
metaData = globalMetaDataFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
|
||||
} catch (NoSuchFileException ex) {
|
||||
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
||||
} catch (IOException ex) {
|
||||
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
|
||||
}
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
|
||||
|
||||
final MetaData.Builder metaData = MetaData.builder(getSnapshotGlobalMetaData(snapshotId));
|
||||
if (indices != null) {
|
||||
for (IndexId index : indices) {
|
||||
BlobPath indexPath = basePath().add("indices").add(index.getId());
|
||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
try {
|
||||
metaDataBuilder.put(indexMetaDataFormat.read(indexMetaDataBlobContainer, snapshotId.getUUID()), false);
|
||||
metaData.put(getSnapshotIndexMetaData(snapshotId, index), false);
|
||||
} catch (ElasticsearchParseException | IOException ex) {
|
||||
if (ignoreIndexErrors) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index.getName()), ex);
|
||||
if (ignoreErrors == false) {
|
||||
throw new SnapshotException(metadata.name(), snapshotId,
|
||||
"[" + index.getName() + "] failed to read metadata for index", ex);
|
||||
} else {
|
||||
throw ex;
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index.getName()), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
return metaDataBuilder.build();
|
||||
}
|
||||
return metaData.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
|
@ -91,6 +92,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
|
|||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_UPGRADED;
|
||||
import static org.elasticsearch.common.util.set.Sets.newHashSet;
|
||||
import static org.elasticsearch.snapshots.SnapshotUtils.filterIndices;
|
||||
|
||||
/**
|
||||
* Service responsible for restoring snapshots
|
||||
|
@ -182,17 +184,34 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
if (matchingSnapshotId.isPresent() == false) {
|
||||
throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist");
|
||||
}
|
||||
|
||||
final SnapshotId snapshotId = matchingSnapshotId.get();
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
|
||||
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
|
||||
final MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
|
||||
|
||||
// Make sure that we can restore from this snapshot
|
||||
validateSnapshotRestorable(request.repositoryName, snapshotInfo);
|
||||
|
||||
// Find list of indices that we need to restore
|
||||
final Map<String, String> renamedIndices = renamedIndices(request, filteredIndices);
|
||||
// Resolve the indices from the snapshot that need to be restored
|
||||
final List<String> indicesInSnapshot = filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
|
||||
|
||||
final MetaData.Builder metaDataBuilder;
|
||||
if (request.includeGlobalState()) {
|
||||
metaDataBuilder = MetaData.builder(repository.getSnapshotGlobalMetaData(snapshotId));
|
||||
} else {
|
||||
metaDataBuilder = MetaData.builder();
|
||||
}
|
||||
|
||||
final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
|
||||
for (IndexId indexId : indexIdsInSnapshot) {
|
||||
metaDataBuilder.put(repository.getSnapshotIndexMetaData(snapshotId, indexId), false);
|
||||
}
|
||||
|
||||
final MetaData metaData = metaDataBuilder.build();
|
||||
|
||||
// Apply renaming on index names, returning a map of names where
|
||||
// the key is the renamed index and the value is the original name
|
||||
final Map<String, String> indices = renamedIndices(request, indicesInSnapshot);
|
||||
|
||||
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
|
||||
// and updating cluster metadata (global and index) as needed
|
||||
|
@ -222,12 +241,13 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
|
||||
Set<String> aliases = new HashSet<>();
|
||||
if (!renamedIndices.isEmpty()) {
|
||||
|
||||
if (indices.isEmpty() == false) {
|
||||
// We have some indices to restore
|
||||
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
|
||||
.minimumIndexCompatibilityVersion();
|
||||
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
|
||||
for (Map.Entry<String, String> indexEntry : indices.entrySet()) {
|
||||
String index = indexEntry.getValue();
|
||||
boolean partial = checkPartial(index);
|
||||
SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(snapshot, snapshotInfo.version(), index);
|
||||
|
@ -304,21 +324,42 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
}
|
||||
|
||||
shards = shardsBuilder.build();
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())), shards);
|
||||
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(indices.keySet())), shards);
|
||||
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry));
|
||||
} else {
|
||||
shards = ImmutableOpenMap.of();
|
||||
}
|
||||
|
||||
checkAliasNameConflicts(renamedIndices, aliases);
|
||||
checkAliasNameConflicts(indices, aliases);
|
||||
|
||||
// Restore global state if needed
|
||||
restoreGlobalStateIfRequested(mdBuilder);
|
||||
if (request.includeGlobalState()) {
|
||||
if (metaData.persistentSettings() != null) {
|
||||
Settings settings = metaData.persistentSettings();
|
||||
clusterSettings.validateUpdate(settings);
|
||||
mdBuilder.persistentSettings(settings);
|
||||
}
|
||||
if (metaData.templates() != null) {
|
||||
// TODO: Should all existing templates be deleted first?
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
|
||||
mdBuilder.put(cursor.value);
|
||||
}
|
||||
}
|
||||
if (metaData.customs() != null) {
|
||||
for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
|
||||
if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
|
||||
// Don't restore repositories while we are working with them
|
||||
// TODO: Should we restore them at the end?
|
||||
mdBuilder.putCustom(cursor.key, cursor.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (completed(shards)) {
|
||||
// We don't have any indices to restore - we are done
|
||||
restoreInfo = new RestoreInfo(snapshotId.getName(),
|
||||
Collections.unmodifiableList(new ArrayList<>(renamedIndices.keySet())),
|
||||
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
|
||||
shards.size(),
|
||||
shards.size() - failedShards(shards));
|
||||
}
|
||||
|
@ -426,32 +467,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
return builder.settings(settingsBuilder).build();
|
||||
}
|
||||
|
||||
private void restoreGlobalStateIfRequested(MetaData.Builder mdBuilder) {
|
||||
if (request.includeGlobalState()) {
|
||||
if (metaData.persistentSettings() != null) {
|
||||
Settings settings = metaData.persistentSettings();
|
||||
clusterSettings.validateUpdate(settings);
|
||||
mdBuilder.persistentSettings(settings);
|
||||
}
|
||||
if (metaData.templates() != null) {
|
||||
// TODO: Should all existing templates be deleted first?
|
||||
for (ObjectCursor<IndexTemplateMetaData> cursor : metaData.templates().values()) {
|
||||
mdBuilder.put(cursor.value);
|
||||
}
|
||||
}
|
||||
if (metaData.customs() != null) {
|
||||
for (ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
|
||||
if (!RepositoriesMetaData.TYPE.equals(cursor.key)) {
|
||||
// Don't restore repositories while we are working with them
|
||||
// TODO: Should we restore them at the end?
|
||||
mdBuilder.putCustom(cursor.key, cursor.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
|
||||
|
@ -757,7 +772,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
|
|||
"indices [" + index + "] and [" + previousIndex + "] are renamed into the same index [" + renamedIndex + "]");
|
||||
}
|
||||
}
|
||||
return renamedIndices;
|
||||
return Collections.unmodifiableMap(renamedIndices);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -148,7 +148,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @throws SnapshotMissingException if snapshot is not found
|
||||
*/
|
||||
public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Arrays.asList(snapshotId.getName()));
|
||||
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
|
||||
if (!entries.isEmpty()) {
|
||||
return inProgressSnapshot(entries.iterator().next());
|
||||
}
|
||||
|
@ -593,13 +593,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
*/
|
||||
public Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repositoryName,
|
||||
final SnapshotInfo snapshotInfo) throws IOException {
|
||||
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
RepositoryData repositoryData = repository.getRepositoryData();
|
||||
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(snapshotInfo.indices()));
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
final RepositoryData repositoryData = repository.getRepositoryData();
|
||||
|
||||
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||
for (String index : snapshotInfo.indices()) {
|
||||
IndexId indexId = repositoryData.resolveIndexId(index);
|
||||
IndexMetaData indexMetaData = metaData.indices().get(index);
|
||||
IndexMetaData indexMetaData = repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), indexId);
|
||||
if (indexMetaData != null) {
|
||||
int numberOfShards = indexMetaData.getNumberOfShards();
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
|
@ -633,7 +633,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return unmodifiableMap(shardStatus);
|
||||
}
|
||||
|
||||
|
||||
private SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
|
||||
for (SnapshotShardFailure shardFailure : shardFailures) {
|
||||
if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {
|
||||
|
|
|
@ -2605,7 +2605,12 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
|
||||
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
* This class tests whether global and index metadata are only loaded from the repository when needed.
|
||||
*/
|
||||
public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
/// This test uses a snapshot/restore plugin implementation that
|
||||
// counts the number of times metadata are loaded
|
||||
return Collections.singletonList(CountingMockRepositoryPlugin.class);
|
||||
}
|
||||
|
||||
public void testWhenMetadataAreLoaded() throws Exception {
|
||||
createIndex("docs");
|
||||
indexRandom(true,
|
||||
client().prepareIndex("docs", "doc", "1").setSource("rank", 1),
|
||||
client().prepareIndex("docs", "doc", "2").setSource("rank", 2),
|
||||
client().prepareIndex("docs", "doc", "3").setSource("rank", 3),
|
||||
client().prepareIndex("others", "other").setSource("rank", 4),
|
||||
client().prepareIndex("others", "other").setSource("rank", 5));
|
||||
|
||||
assertAcked(client().admin().cluster().preparePutRepository("repository")
|
||||
.setType("coutingmock")
|
||||
.setSettings(Settings.builder().put("location", randomRepoPath())));
|
||||
|
||||
// Creating a snapshot does not load any metadata
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("repository", "snap")
|
||||
.setIncludeGlobalState(true)
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), equalTo(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().status(), equalTo(RestStatus.OK));
|
||||
assertGlobalMetadataLoads("snap", 0);
|
||||
assertIndexMetadataLoads("snap", "docs", 0);
|
||||
assertIndexMetadataLoads("snap", "others", 0);
|
||||
|
||||
// Getting a snapshot does not load any metadata
|
||||
GetSnapshotsResponse getSnapshotsResponse =
|
||||
client().admin().cluster().prepareGetSnapshots("repository").addSnapshots("snap").setVerbose(randomBoolean()).get();
|
||||
assertThat(getSnapshotsResponse.getSnapshots(), hasSize(1));
|
||||
assertGlobalMetadataLoads("snap", 0);
|
||||
assertIndexMetadataLoads("snap", "docs", 0);
|
||||
assertIndexMetadataLoads("snap", "others", 0);
|
||||
|
||||
// Getting the status of a snapshot loads indices metadata but not global metadata
|
||||
SnapshotsStatusResponse snapshotStatusResponse =
|
||||
client().admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
|
||||
assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1));
|
||||
assertGlobalMetadataLoads("snap", 0);
|
||||
assertIndexMetadataLoads("snap", "docs", 1);
|
||||
assertIndexMetadataLoads("snap", "others", 1);
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete("docs", "others"));
|
||||
|
||||
// Restoring a snapshot loads indices metadata but not the global state
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("repository", "snap")
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().status(), equalTo(RestStatus.OK));
|
||||
assertGlobalMetadataLoads("snap", 0);
|
||||
assertIndexMetadataLoads("snap", "docs", 2);
|
||||
assertIndexMetadataLoads("snap", "others", 2);
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete("docs"));
|
||||
|
||||
// Restoring a snapshot with selective indices loads only required index metadata
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("repository", "snap")
|
||||
.setIndices("docs")
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().status(), equalTo(RestStatus.OK));
|
||||
assertGlobalMetadataLoads("snap", 0);
|
||||
assertIndexMetadataLoads("snap", "docs", 3);
|
||||
assertIndexMetadataLoads("snap", "others", 2);
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete("docs", "others"));
|
||||
|
||||
// Restoring a snapshot including the global state loads it with the index metadata
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("repository", "snap")
|
||||
.setIndices("docs", "oth*")
|
||||
.setRestoreGlobalState(true)
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().status(), equalTo(RestStatus.OK));
|
||||
assertGlobalMetadataLoads("snap", 1);
|
||||
assertIndexMetadataLoads("snap", "docs", 4);
|
||||
assertIndexMetadataLoads("snap", "others", 3);
|
||||
}
|
||||
|
||||
private void assertGlobalMetadataLoads(final String snapshot, final int times) {
|
||||
AtomicInteger count = getCountingMockRepository().globalMetadata.get(snapshot);
|
||||
if (times == 0) {
|
||||
assertThat("Global metadata for " + snapshot + " must not have been loaded", count, nullValue());
|
||||
} else {
|
||||
assertThat("Global metadata for " + snapshot + " must have been loaded " + times + " times", count.get(), equalTo(times));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertIndexMetadataLoads(final String snapshot, final String index, final int times) {
|
||||
final String key = key(snapshot, index);
|
||||
AtomicInteger count = getCountingMockRepository().indicesMetadata.get(key);
|
||||
if (times == 0) {
|
||||
assertThat("Index metadata for " + key + " must not have been loaded", count, nullValue());
|
||||
} else {
|
||||
assertThat("Index metadata for " + key + " must have been loaded " + times + " times", count.get(), equalTo(times));
|
||||
}
|
||||
}
|
||||
|
||||
private CountingMockRepository getCountingMockRepository() {
|
||||
String master = internalCluster().getMasterName();
|
||||
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, master);
|
||||
Repository repository = repositoriesService.repository("repository");
|
||||
assertThat(repository, instanceOf(CountingMockRepository.class));
|
||||
return (CountingMockRepository) repository;
|
||||
}
|
||||
|
||||
/** Compute a map key for the given snapshot and index names **/
|
||||
private static String key(final String snapshot, final String index) {
|
||||
return snapshot + ":" + index;
|
||||
}
|
||||
|
||||
/** A mocked repository that counts the number of times global/index metadata are accessed **/
|
||||
public static class CountingMockRepository extends MockRepository {
|
||||
|
||||
final Map<String, AtomicInteger> globalMetadata = new ConcurrentHashMap<>();
|
||||
final Map<String, AtomicInteger> indicesMetadata = new ConcurrentHashMap<>();
|
||||
|
||||
public CountingMockRepository(final RepositoryMetaData metadata,
|
||||
final Environment environment,
|
||||
final NamedXContentRegistry namedXContentRegistry) throws IOException {
|
||||
super(metadata, environment, namedXContentRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
|
||||
globalMetadata.computeIfAbsent(snapshotId.getName(), (s) -> new AtomicInteger(0)).incrementAndGet();
|
||||
return super.getSnapshotGlobalMetaData(snapshotId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId indexId) throws IOException {
|
||||
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
|
||||
return super.getSnapshotIndexMetaData(snapshotId, indexId);
|
||||
}
|
||||
}
|
||||
|
||||
/** A plugin that uses CountingMockRepository as implementation of the Repository **/
|
||||
public static class CountingMockRepositoryPlugin extends MockRepository.Plugin {
|
||||
@Override
|
||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
||||
return Collections.singletonMap("coutingmock", (metadata) -> new CountingMockRepository(metadata, env, namedXContentRegistry));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
|
@ -74,6 +73,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -85,6 +85,7 @@ import org.elasticsearch.ingest.IngestTestPlugin;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.script.MockScriptEngine;
|
||||
|
@ -109,6 +110,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -2590,12 +2592,155 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap-1"));
|
||||
|
||||
try {
|
||||
client.admin().cluster().prepareGetSnapshots("test-repo").setIgnoreUnavailable(false).get().getSnapshots();
|
||||
} catch (SnapshotException ex) {
|
||||
final SnapshotException ex = expectThrows(SnapshotException.class, () ->
|
||||
client.admin().cluster().prepareGetSnapshots("test-repo").setIgnoreUnavailable(false).get());
|
||||
assertThat(ex.getRepositoryName(), equalTo("test-repo"));
|
||||
assertThat(ex.getSnapshotName(), equalTo("test-snap-2"));
|
||||
}
|
||||
|
||||
/** Tests that a snapshot with a corrupted global state file can still be restored */
|
||||
public void testRestoreSnapshotWithCorruptedGlobalState() throws Exception {
|
||||
final Path repo = randomRepoPath();
|
||||
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs")
|
||||
.setSettings(Settings.builder()
|
||||
.put("location", repo)
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2");
|
||||
indexRandom(true,
|
||||
client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"),
|
||||
client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"),
|
||||
client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"));
|
||||
flushAndRefresh("test-idx-1", "test-idx-2");
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setIncludeGlobalState(true)
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
|
||||
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
|
||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||
|
||||
// Truncate the global state metadata file
|
||||
final Path globalStatePath = repo.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
|
||||
try(SeekableByteChannel outChan = Files.newByteChannel(globalStatePath, StandardOpenOption.WRITE)) {
|
||||
outChan.truncate(randomInt(10));
|
||||
}
|
||||
|
||||
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap"));
|
||||
|
||||
SnapshotsStatusResponse snapshotStatusResponse =
|
||||
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get();
|
||||
assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1));
|
||||
assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo("test-snap"));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete("test-idx-1", "test-idx-2"));
|
||||
|
||||
SnapshotException ex = expectThrows(SnapshotException.class, () -> client().admin().cluster()
|
||||
.prepareRestoreSnapshot("test-repo", "test-snap")
|
||||
.setRestoreGlobalState(true)
|
||||
.setWaitForCompletion(true)
|
||||
.get());
|
||||
assertThat(ex.getRepositoryName(), equalTo("test-repo"));
|
||||
assertThat(ex.getSnapshotName(), equalTo("test-snap"));
|
||||
assertThat(ex.getMessage(), containsString("failed to read global metadata"));
|
||||
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(snapshotInfo.successfulShards()));
|
||||
|
||||
ensureGreen("test-idx-1", "test-idx-2");
|
||||
assertHitCount(client().prepareSearch("test-idx-*").setSize(0).get(), 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a snapshot of multiple indices including one with a corrupted index metadata
|
||||
* file can still be used to restore the non corrupted indices
|
||||
* */
|
||||
public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
|
||||
final Client client = client();
|
||||
final Path repo = randomRepoPath();
|
||||
final int nbIndices = randomIntBetween(2, 3);
|
||||
|
||||
final Map<String, Integer> nbDocsPerIndex = new HashMap<>();
|
||||
for (int i = 0; i < nbIndices; i++) {
|
||||
String indexName = "test-idx-" + i;
|
||||
|
||||
assertAcked(prepareCreate(indexName).setSettings(Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, Math.min(2, numberOfShards())).put(SETTING_NUMBER_OF_REPLICAS, 0)));
|
||||
|
||||
int nbDocs = randomIntBetween(1, 10);
|
||||
nbDocsPerIndex.put(indexName, nbDocs);
|
||||
|
||||
IndexRequestBuilder[] documents = new IndexRequestBuilder[nbDocs];
|
||||
for (int j = 0; j < nbDocs; j++) {
|
||||
documents[j] = client.prepareIndex(indexName, "_doc").setSource("foo", "bar");
|
||||
}
|
||||
indexRandom(true, documents);
|
||||
}
|
||||
flushAndRefresh();
|
||||
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs")
|
||||
.setSettings(Settings.builder()
|
||||
.put("location", repo)));
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
|
||||
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
|
||||
assertThat(snapshotInfo.failedShards(), equalTo(0));
|
||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||
assertThat(snapshotInfo.indices(), hasSize(nbIndices));
|
||||
|
||||
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
||||
Repository repository = service.repository("test-repo");
|
||||
|
||||
final Map<String, IndexId> indexIds = repository.getRepositoryData().getIndices();
|
||||
assertThat(indexIds.size(), equalTo(nbIndices));
|
||||
|
||||
// Choose a random index from the snapshot
|
||||
final IndexId corruptedIndex = randomFrom(indexIds.values());
|
||||
final Path indexMetadataPath = repo.resolve("indices")
|
||||
.resolve(corruptedIndex.getId())
|
||||
.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
|
||||
|
||||
// Truncate the index metadata file
|
||||
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {
|
||||
outChan.truncate(randomInt(10));
|
||||
}
|
||||
|
||||
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap"));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete(nbDocsPerIndex.keySet().toArray(new String[nbDocsPerIndex.size()])));
|
||||
|
||||
Predicate<String> isRestorableIndex = index -> corruptedIndex.getName().equals(index) == false;
|
||||
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
|
||||
.setIndices(nbDocsPerIndex.keySet().stream().filter(isRestorableIndex).toArray(String[]::new))
|
||||
.setRestoreGlobalState(randomBoolean())
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
|
||||
ensureGreen();
|
||||
for (Map.Entry<String, Integer> entry : nbDocsPerIndex.entrySet()) {
|
||||
if (isRestorableIndex.test(entry.getKey())) {
|
||||
assertHitCount(client().prepareSearch(entry.getKey()).setSize(0).get(), entry.getValue().longValue());
|
||||
}
|
||||
}
|
||||
|
||||
assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get());
|
||||
}
|
||||
|
||||
public void testCannotCreateSnapshotsWithSameName() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue