Snapshot/Restore: change metadata file format
This commit is contained in:
parent
5951f2580d
commit
831cfa52d5
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
|
@ -59,11 +58,11 @@ public interface Repository extends LifecycleComponent<Repository> {
|
|||
* <p/>
|
||||
* The returned meta data contains global metadata as well as metadata for all indices listed in the indices parameter.
|
||||
*
|
||||
* @param snapshotId snapshot ID
|
||||
* @param snapshot snapshot
|
||||
* @param indices list of indices
|
||||
* @return information about snapshot
|
||||
*/
|
||||
MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices) throws IOException;
|
||||
MetaData readSnapshotMetaData(SnapshotId snapshotId, Snapshot snapshot, List<String> indices) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the list of snapshots currently stored in the repository
|
||||
|
|
|
@ -21,13 +21,13 @@ package org.elasticsearch.repositories.blobstore;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.ByteStreams;
|
||||
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
|
@ -137,7 +137,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
|
||||
private static final String TESTS_FILE = "tests-";
|
||||
|
||||
private static final String METADATA_PREFIX = "metadata-";
|
||||
private static final String METADATA_PREFIX = "meta-";
|
||||
|
||||
private static final String LEGACY_METADATA_PREFIX = "metadata-";
|
||||
|
||||
private static final String METADATA_SUFFIX = ".dat";
|
||||
|
||||
private final BlobStoreIndexShardRepository indexShardRepository;
|
||||
|
||||
|
@ -244,7 +248,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
}
|
||||
// Write Global MetaData
|
||||
// TODO: Check if metadata needs to be written
|
||||
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId)))) {
|
||||
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId, false)))) {
|
||||
writeGlobalMetaData(metaData, output);
|
||||
}
|
||||
for (String index : indices) {
|
||||
|
@ -270,8 +274,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId) {
|
||||
List<String> indices = Collections.EMPTY_LIST;
|
||||
Snapshot snapshot = null;
|
||||
try {
|
||||
indices = readSnapshot(snapshotId).indices();
|
||||
snapshot = readSnapshot(snapshotId);
|
||||
indices = snapshot.indices();
|
||||
} catch (SnapshotMissingException ex) {
|
||||
throw ex;
|
||||
} catch (SnapshotException | ElasticsearchParseException ex) {
|
||||
|
@ -279,7 +285,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
}
|
||||
MetaData metaData = null;
|
||||
try {
|
||||
metaData = readSnapshotMetaData(snapshotId, indices, true);
|
||||
if (snapshot != null) {
|
||||
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true);
|
||||
} else {
|
||||
try {
|
||||
metaData = readSnapshotMetaData(snapshotId, false, indices, true);
|
||||
} catch (IOException ex) {
|
||||
metaData = readSnapshotMetaData(snapshotId, true, indices, true);
|
||||
}
|
||||
}
|
||||
} catch (IOException | SnapshotException ex) {
|
||||
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
|
||||
}
|
||||
|
@ -287,7 +301,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
String blobName = snapshotBlobName(snapshotId);
|
||||
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK
|
||||
snapshotsBlobContainer.deleteBlob(blobName);
|
||||
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId));
|
||||
if (snapshot != null) {
|
||||
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, legacyMetaData(snapshot.version())));
|
||||
} else {
|
||||
// We don't know which version was the snapshot created with - try deleting both current and legacy metadata
|
||||
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, true));
|
||||
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, false));
|
||||
}
|
||||
// Delete snapshot from the snapshot list
|
||||
List<SnapshotId> snapshotIds = snapshots();
|
||||
if (snapshotIds.contains(snapshotId)) {
|
||||
|
@ -402,8 +422,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices) throws IOException {
|
||||
return readSnapshotMetaData(snapshotId, indices, false);
|
||||
public MetaData readSnapshotMetaData(SnapshotId snapshotId, Snapshot snapshot, List<String> indices) throws IOException {
|
||||
return readSnapshotMetaData(snapshotId, snapshot.version(), indices, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -422,11 +442,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
}
|
||||
}
|
||||
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, List<String> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
return readSnapshotMetaData(snapshotId, legacyMetaData(snapshotVersion), indices, ignoreIndexErrors);
|
||||
}
|
||||
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, boolean legacy, List<String> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
MetaData metaData;
|
||||
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId))) {
|
||||
byte[] data = ByteStreams.toByteArray(blob);
|
||||
metaData = readMetaData(data);
|
||||
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId, legacy))) {
|
||||
metaData = readMetaData(ByteStreams.toByteArray(blob));
|
||||
} catch (FileNotFoundException | NoSuchFileException ex) {
|
||||
throw new SnapshotMissingException(snapshotId, ex);
|
||||
} catch (IOException ex) {
|
||||
|
@ -554,10 +577,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
|
|||
* Returns name of metadata blob
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param legacy true if legacy (pre-2.0.0) format should be used
|
||||
* @return name of metadata blob
|
||||
*/
|
||||
private String metaDataBlobName(SnapshotId snapshotId) {
|
||||
return METADATA_PREFIX + snapshotId.getSnapshot();
|
||||
private String metaDataBlobName(SnapshotId snapshotId, boolean legacy) {
|
||||
if (legacy) {
|
||||
return LEGACY_METADATA_PREFIX + snapshotId.getSnapshot();
|
||||
} else {
|
||||
return METADATA_PREFIX + snapshotId.getSnapshot() + METADATA_SUFFIX;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In v2.0.0 we changed the matadata file format
|
||||
* @param version
|
||||
* @return true if legacy version should be used false otherwise
|
||||
*/
|
||||
private boolean legacyMetaData(Version version) {
|
||||
return version.before(Version.V_2_0_0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -157,7 +157,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
final SnapshotId snapshotId = new SnapshotId(request.repository(), request.name());
|
||||
final Snapshot snapshot = repository.readSnapshot(snapshotId);
|
||||
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshot.indices(), request.indices(), request.indicesOptions());
|
||||
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotId, filteredIndices);
|
||||
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotId, snapshot, filteredIndices);
|
||||
|
||||
final MetaData metaData;
|
||||
if (snapshot.version().before(Version.V_2_0_0)) {
|
||||
|
|
|
@ -500,7 +500,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|||
Repository repository = repositoriesService.repository(snapshotId.getRepository());
|
||||
IndexShardRepository indexShardRepository = repositoriesService.indexShardRepository(snapshotId.getRepository());
|
||||
Snapshot snapshot = repository.readSnapshot(snapshotId);
|
||||
MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot.indices());
|
||||
MetaData metaData = repository.readSnapshotMetaData(snapshotId, snapshot, snapshot.indices());
|
||||
for (String index : snapshot.indices()) {
|
||||
IndexMetaData indexMetaData = metaData.indices().get(index);
|
||||
if (indexMetaData != null) {
|
||||
|
@ -836,16 +836,20 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
|
|||
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
|
||||
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
|
||||
if (snapshotStatus != null) {
|
||||
if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.STARTED) {
|
||||
snapshotStatus.abort();
|
||||
} else if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.DONE) {
|
||||
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
|
||||
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
|
||||
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
|
||||
} else if (snapshotStatus.stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
|
||||
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
|
||||
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
|
||||
new ShardSnapshotStatus(event.state().nodes().localNodeId(), State.FAILED, snapshotStatus.failure())));
|
||||
switch (snapshotStatus.stage()) {
|
||||
case STARTED:
|
||||
snapshotStatus.abort();
|
||||
break;
|
||||
case DONE:
|
||||
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
|
||||
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
|
||||
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
|
||||
break;
|
||||
case FAILURE:
|
||||
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
|
||||
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
|
||||
new ShardSnapshotStatus(event.state().nodes().localNodeId(), State.FAILED, snapshotStatus.failure())));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -870,7 +870,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> delete index metadata and shard metadata");
|
||||
Path metadata = repo.resolve("metadata-test-snap-1");
|
||||
Path metadata = repo.resolve("meta-test-snap-1.dat");
|
||||
Files.delete(metadata);
|
||||
|
||||
logger.info("--> delete snapshot");
|
||||
|
|
Loading…
Reference in New Issue