All snapshot metadata files use UUID for the blob ID
This commit is contained in:
parent
630218a16f
commit
a0a4d67eae
|
@ -35,6 +35,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -106,8 +107,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
final Path file = path.resolve(blobName);
|
||||
// TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files?
|
||||
try (OutputStream outputStream = Files.newOutputStream(file)) {
|
||||
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
|
||||
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
|
||||
}
|
||||
IOUtils.fsync(file, false);
|
||||
|
|
|
@ -119,7 +119,7 @@ import static java.util.Collections.unmodifiableMap;
|
|||
* {@code
|
||||
* STORE_ROOT
|
||||
* |- index-N - list of all snapshot name as JSON array, N is the generation of the file
|
||||
* |- index-latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
|
||||
* |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
|
||||
* |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010"
|
||||
* |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
|
||||
* |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011"
|
||||
|
@ -320,12 +320,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists");
|
||||
}
|
||||
// Write Global MetaData
|
||||
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, snapshotName);
|
||||
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, blobId(snapshotId));
|
||||
for (String index : indices) {
|
||||
final IndexMetaData indexMetaData = clusterMetadata.index(index);
|
||||
final BlobPath indexPath = basePath().add("indices").add(index);
|
||||
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotName);
|
||||
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, blobId(snapshotId));
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
|
||||
|
@ -367,14 +367,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// delete the snapshot file
|
||||
safeSnapshotBlobDelete(snapshot, blobId(snapshotId));
|
||||
// delete the global metadata file
|
||||
safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getName());
|
||||
safeGlobalMetaDataBlobDelete(snapshot, blobId(snapshotId));
|
||||
|
||||
// Now delete all indices
|
||||
for (String index : indices) {
|
||||
BlobPath indexPath = basePath().add("indices").add(index);
|
||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
try {
|
||||
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getName());
|
||||
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, blobId(snapshotId));
|
||||
} catch (IOException ex) {
|
||||
logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index);
|
||||
}
|
||||
|
@ -517,7 +517,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// When we delete corrupted snapshots we might not know which version we are dealing with
|
||||
// We can try detecting the version based on the metadata file format
|
||||
assert ignoreIndexErrors;
|
||||
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
|
||||
if (globalMetaDataFormat.exists(snapshotsBlobContainer, blobId(snapshotId))) {
|
||||
snapshotVersion = Version.CURRENT;
|
||||
} else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
|
||||
throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old");
|
||||
|
@ -526,7 +526,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
try {
|
||||
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getName());
|
||||
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, blobId(snapshotId));
|
||||
} catch (FileNotFoundException | NoSuchFileException ex) {
|
||||
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
|
||||
} catch (IOException ex) {
|
||||
|
@ -537,7 +537,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
BlobPath indexPath = basePath().add("indices").add(index);
|
||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
try {
|
||||
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getName()), false);
|
||||
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, blobId(snapshotId)), false);
|
||||
} catch (ElasticsearchParseException | IOException ex) {
|
||||
if (ignoreIndexErrors) {
|
||||
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index);
|
||||
|
@ -764,14 +764,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
// Package private for testing
|
||||
static String blobId(final SnapshotId snapshotId) {
|
||||
public static String blobId(final SnapshotId snapshotId) {
|
||||
final String uuid = snapshotId.getUUID();
|
||||
if (uuid.equals(SnapshotId.UNASSIGNED_UUID)) {
|
||||
// the old snapshot blob naming
|
||||
return snapshotId.getName();
|
||||
}
|
||||
return snapshotId.getName() + "-" + uuid;
|
||||
return uuid;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -976,7 +975,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
int fileListGeneration = tuple.v2();
|
||||
|
||||
try {
|
||||
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName());
|
||||
indexShardSnapshotFormat(version).delete(blobContainer, blobId(snapshotId));
|
||||
} catch (IOException e) {
|
||||
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
|
||||
}
|
||||
|
@ -997,7 +996,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*/
|
||||
public BlobStoreIndexShardSnapshot loadSnapshot() {
|
||||
try {
|
||||
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName());
|
||||
return indexShardSnapshotFormat(version).read(blobContainer, blobId(snapshotId));
|
||||
} catch (IOException ex) {
|
||||
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
|
||||
}
|
||||
|
@ -1126,7 +1125,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
try {
|
||||
BlobStoreIndexShardSnapshot snapshot = null;
|
||||
if (name.startsWith(SNAPSHOT_PREFIX)) {
|
||||
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
|
||||
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, blobId(snapshotId));
|
||||
} else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
|
||||
snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
|
||||
}
|
||||
|
@ -1266,7 +1265,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
//TODO: The time stored in snapshot doesn't include cleanup time.
|
||||
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
|
||||
try {
|
||||
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName());
|
||||
indexShardSnapshotFormat.write(snapshot, blobContainer, blobId(snapshotId));
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
|
||||
}
|
||||
|
|
|
@ -183,10 +183,10 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
assertThat(blobId(snapshotId), equalTo("abc-123")); // just the snapshot name
|
||||
String uuid = UUIDs.randomBase64UUID();
|
||||
snapshotId = new SnapshotId("abc123", uuid);
|
||||
assertThat(blobId(snapshotId), equalTo("abc123-" + uuid)); // snapshot name + '-' + uuid
|
||||
assertThat(blobId(snapshotId), equalTo(uuid)); // uuid only
|
||||
uuid = UUIDs.randomBase64UUID();
|
||||
snapshotId = new SnapshotId("abc-123", uuid);
|
||||
assertThat(blobId(snapshotId), equalTo("abc-123-" + uuid)); // snapshot name + '-' + uuid
|
||||
assertThat(blobId(snapshotId), equalTo(uuid)); // uuid only
|
||||
}
|
||||
|
||||
private BlobStoreRepository setupRepo() {
|
||||
|
|
|
@ -884,7 +884,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> delete index metadata and shard metadata");
|
||||
Path metadata = repo.resolve("meta-test-snap-1.dat");
|
||||
Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
|
||||
Files.delete(metadata);
|
||||
|
||||
logger.info("--> delete snapshot");
|
||||
|
@ -917,7 +917,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> truncate snapshot file to make it unreadable");
|
||||
Path snapshotPath = repo.resolve("snap-test-snap-1-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
|
||||
Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
|
||||
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
|
||||
outChan.truncate(randomInt(10));
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
@Override
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
|
||||
if (blockOnInitialization ) {
|
||||
if (blockOnInitialization) {
|
||||
blockExecution();
|
||||
}
|
||||
super.initializeSnapshot(snapshotId, indices, clusterMetadata);
|
||||
|
|
|
@ -129,16 +129,17 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/15579")
|
||||
public void testOverwriteFails() throws IOException {
|
||||
public void testVerifyOverwriteFails() throws IOException {
|
||||
try (final BlobStore store = newBlobStore()) {
|
||||
final String blobName = "foobar";
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
container.writeBlob(blobName, bytesArray);
|
||||
// should not be able to overwrite existing blob
|
||||
expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray));
|
||||
container.deleteBlob(blobName);
|
||||
container.writeBlob(blobName, bytesArray); // deleted it, so should be able to write it again
|
||||
container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue