From 59d9f7e1579ca70973d14e94da0b8794e60f3b85 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 2 Jun 2015 12:18:44 -1000 Subject: [PATCH] Improve snapshot creation and deletion performance on repositories with large number of snapshots Each shard repository consists of snapshot file for each snapshot - this file contains a map between original physical file that is snapshotted and its representation in repository. This data includes original filename, checksum and length. When a new snapshot is created, elasticsearch needs to read all these snapshot files to figure which file are already present in the repository and which files still have to be copied there. This change adds a new index file that contains all this information combined into a single file. So, if a repository has 1000 snapshots with 1000 shards elasticsearch will only need to read 1000 blobs (one per shard) instead of 1,000,000 to delete a snapshot. This change should also improve snapshot creation speed on repositories with large number of snapshot and high latency. Fixes #8958 --- .../BlobStoreIndexShardRepository.java | 236 +++++++++++++---- .../BlobStoreIndexShardSnapshot.java | 52 ++-- .../BlobStoreIndexShardSnapshots.java | 250 ++++++++++++++++-- .../snapshots/blobstore/SnapshotFiles.java | 81 ++++++ .../blobstore/BlobStoreRepository.java | 14 +- 5 files changed, 517 insertions(+), 116 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 707e77dbefe..d498119c5fa 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.snapshots.blobstore; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.io.ByteStreams; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -41,8 +43,11 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; @@ -65,10 +70,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.collect.Lists.newArrayList; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.toStreamOutput; /** * Blob store based implementation of IndexShardRepository @@ -96,7 +101,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements private RateLimitingInputStream.Listener snapshotThrottleListener; - private static final String SNAPSHOT_PREFIX = "snapshot-"; + private boolean compress; + + protected static final String SNAPSHOT_PREFIX = "snapshot-"; + + protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; + + protected static final String SNAPSHOT_TEMP_PREFIX = "pending-"; + + protected static final String DATA_BLOB_PREFIX = "__"; @Inject public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { @@ -115,7 +128,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, - final RateLimiterListener rateLimiterListener) { + final RateLimiterListener rateLimiterListener, boolean compress) { this.blobStore = blobStore; this.basePath = basePath; this.chunkSize = chunkSize; @@ -128,6 +141,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements rateLimiterListener.onSnapshotPause(nanos); } }; + this.compress = compress; } /** @@ -232,11 +246,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Serializes snapshot to JSON * * @param snapshot snapshot - * @param stream the stream to output the snapshot JSON represetation to + * @param output the stream to output the snapshot JSON representation to * @throws IOException if an IOException occurs */ - public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); + public void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, StreamOutput output) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output).prettyPrint(); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); builder.flush(); } @@ -249,12 +263,36 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @throws IOException if an IOException occurs */ public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException { - try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) { + byte[] data = ByteStreams.toByteArray(stream); + try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { parser.nextToken(); return BlobStoreIndexShardSnapshot.fromXContent(parser); } } + /** + * Parses JSON representation of a snapshot + * + * @param stream JSON + * @return snapshot + * @throws IOException if an IOException occurs + * */ + public static BlobStoreIndexShardSnapshots readSnapshots(InputStream stream) throws IOException { + byte[] data = ByteStreams.toByteArray(stream); + try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { + parser.nextToken(); + return BlobStoreIndexShardSnapshots.fromXContent(parser); + } + } + /** + * Returns true if metadata files should be compressed + * + * @return true if compression is needed + */ + protected boolean isCompress() { + return compress; + } + /** * Context for snapshot/restore operations */ @@ -287,7 +325,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); } - BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); String commitPointName = snapshotBlobName(snapshotId); @@ -297,15 +337,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); } - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = Lists.newArrayList(); - for (BlobStoreIndexShardSnapshot point : snapshots) { + // Build a list of snapshots that should be preserved + List newSnapshotsList = Lists.newArrayList(); + for (SnapshotFiles point : snapshots) { if (!point.snapshot().equals(snapshotId.getSnapshot())) { newSnapshotsList.add(point); } } - cleanup(newSnapshotsList, blobs); + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); } /** @@ -322,26 +362,63 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } /** - * Removes all unreferenced files from the repository + * Removes all unreferenced files from the repository and writes new index file + * + * We need to be really careful in handling index files in case of failures to make sure we have index file that + * points to files that were deleted. + * * * @param snapshots list of active snapshots in the container + * @param fileListGeneration the generation number of the snapshot index file * @param blobs list of blobs in the container */ - protected void cleanup(List snapshots, ImmutableMap blobs) { + protected void finalize(List snapshots, int fileListGeneration, ImmutableMap blobs) { BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - // now go over all the blobs, and if they don't exists in a snapshot, delete them + // delete old index files first for (String blobName : blobs.keySet()) { - if (!blobName.startsWith("__")) { - continue; - } - if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { + // delete old file lists + if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { try { blobContainer.deleteBlob(blobName); } catch (IOException e) { - logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName); + // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up + // with references to non-existing files + throw new IndexShardSnapshotFailedException(shardId, "error deleting index file [{}] during cleanup", e); } } } + + // now go over all the blobs, and if they don't exists in a snapshot, delete them + for (String blobName : blobs.keySet()) { + // delete old file lists + if (blobName.startsWith(DATA_BLOB_PREFIX)) { + if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { + try { + blobContainer.deleteBlob(blobName); + } catch (IOException e) { + logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName); + } + } + } + } + + // If we deleted all snapshots - we don't need to create the index file + if (snapshots.size() > 0) { + String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration; + try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) { + StreamOutput stream = compressIfNeeded(output); + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); + newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.flush(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); + } + try { + blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e); + } + } } /** @@ -351,7 +428,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @return the blob name */ protected String fileNameFromGeneration(long generation) { - return "__" + Long.toString(generation, Character.MAX_RADIX); + return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); } /** @@ -363,17 +440,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements protected long findLatestFileNameGeneration(ImmutableMap blobs) { long generation = -1; for (String name : blobs.keySet()) { - if (!name.startsWith("__")) { + if (!name.startsWith(DATA_BLOB_PREFIX)) { continue; } name = FileInfo.canonicalName(name); try { - long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX); + long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); if (currentGen > generation) { generation = currentGen; } } catch (NumberFormatException e) { - logger.warn("file [{}] does not conform to the '__' schema"); + logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); } } return generation; @@ -383,20 +460,47 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Loads all available snapshots in the repository * * @param blobs list of blobs in repository - * @return BlobStoreIndexShardSnapshots + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation */ - protected BlobStoreIndexShardSnapshots buildBlobStoreIndexShardSnapshots(ImmutableMap blobs) { - List snapshots = Lists.newArrayList(); + protected Tuple buildBlobStoreIndexShardSnapshots(ImmutableMap blobs) { + int latest = -1; + for (String name : blobs.keySet()) { + if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { + try { + int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); + if (gen > latest) { + latest = gen; + } + } catch (NumberFormatException ex) { + logger.warn("failed to parse index file name [{}]", name); + } + } + } + if (latest >= 0) { + try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) { + return new Tuple<>(readSnapshots(stream), latest); + } catch (IOException e) { + logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); + } + } + + // We couldn't load the index file - falling back to loading individual snapshots + List snapshots = Lists.newArrayList(); for (String name : blobs.keySet()) { if (name.startsWith(SNAPSHOT_PREFIX)) { try (InputStream stream = blobContainer.openInput(name)) { - snapshots.add(readSnapshot(stream)); + BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream); + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); } catch (IOException e) { logger.warn("failed to read commit point [{}]", e, name); } } } - return new BlobStoreIndexShardSnapshots(snapshots); + return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); + } + + protected StreamOutput compressIfNeeded(OutputStream output) throws IOException { + return toStreamOutput(output, isCompress()); } } @@ -442,9 +546,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } long generation = findLatestFileNameGeneration(blobs); - BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); - final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); final List indexCommitPointFiles = newArrayList(); int indexNumberOfFiles = 0; @@ -464,23 +569,28 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); - boolean snapshotRequired = false; - BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); - } catch (Throwable e) { - logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + FileInfo existingFileInfo = null; + ImmutableList filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (FileInfo fileInfo : filesInfo) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Throwable e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } + } } - if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { - // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs - snapshotRequired = true; - } - - if (snapshotRequired) { + if (existingFileInfo == null) { indexNumberOfFiles++; indexTotalFilesSize += md.length(); // create a new FileInfo @@ -488,7 +598,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements indexCommitPointFiles.add(snapshotFileInfo); filesToSnapshot.add(snapshotFileInfo); } else { - indexCommitPointFiles.add(fileInfo); + indexCommitPointFiles.add(existingFileInfo); } } @@ -515,7 +625,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) { + try (StreamOutput output = compressIfNeeded(blobContainer.createOutput(snapshotBlobName))) { writeSnapshot(snapshot, output); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); @@ -523,12 +633,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // delete all files that are not referenced by any commit point // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = Lists.newArrayList(); - newSnapshotsList.add(snapshot); - for (BlobStoreIndexShardSnapshot point : snapshots) { + List newSnapshotsList = Lists.newArrayList(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { newSnapshotsList.add(point); } - cleanup(newSnapshotsList, blobs); + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); } finally { store.decRef(); @@ -709,6 +820,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); final Store.MetadataSnapshot recoveryTargetMetadata; try { recoveryTargetMetadata = store.getMetadataOrEmpty(); @@ -786,6 +898,22 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); } recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (!Store.isChecksum(storeFile) && !snapshotFiles.containPhysicalIndexFile(storeFile)) { + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); + } + } + } + } catch (IOException e) { + logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); + } } finally { store.decRef(); } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index b643bf234e5..e2acff96f57 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -190,6 +190,24 @@ public class BlobStoreIndexShardSnapshot { return metadata.isSame(md); } + /** + * Checks if a file in a store is the same file + * + * @param fileInfo file in a store + * @return true if file in a store this this file have the same checksum and length + */ + public boolean isSame(FileInfo fileInfo) { + if (numberOfParts != fileInfo.numberOfParts) return false; + if (partBytes != fileInfo.partBytes) return false; + if (!name.equals(fileInfo.name)) return false; + if (partSize != null) { + if (!partSize.equals(fileInfo.partSize)) return false; + } else { + if (fileInfo.partSize != null) return false; + } + return metadata.isSame(fileInfo.metadata); + } + static final class Fields { static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString PHYSICAL_NAME = new XContentBuilderString("physical_name"); @@ -484,38 +502,4 @@ public class BlobStoreIndexShardSnapshot { startTime, time, numberOfFiles, totalSize); } - /** - * Returns true if this snapshot contains a file with a given original name - * - * @param physicalName original file name - * @return true if the file was found, false otherwise - */ - public boolean containPhysicalIndexFile(String physicalName) { - return findPhysicalIndexFile(physicalName) != null; - } - - public FileInfo findPhysicalIndexFile(String physicalName) { - for (FileInfo file : indexFiles) { - if (file.physicalName().equals(physicalName)) { - return file; - } - } - return null; - } - - /** - * Returns true if this snapshot contains a file with a given name - * - * @param name file name - * @return true if file was found, false otherwise - */ - public FileInfo findNameFile(String name) { - for (FileInfo file : indexFiles) { - if (file.name().equals(name)) { - return file; - } - } - return null; - } - } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index a8657655122..11a8689bb26 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -20,10 +20,22 @@ package org.elasticsearch.index.snapshots.blobstore; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; /** * Contains information about all snapshot for the given shard in repository @@ -31,19 +43,72 @@ import java.util.List; * This class is used to find files that were already snapshoted and clear out files that no longer referenced by any * snapshots */ -public class BlobStoreIndexShardSnapshots implements Iterable { - private final ImmutableList shardSnapshots; +public class BlobStoreIndexShardSnapshots implements Iterable, ToXContent { + private final ImmutableList shardSnapshots; + private final ImmutableMap files; + private final ImmutableMap> physicalFiles; - public BlobStoreIndexShardSnapshots(List shardSnapshots) { + public BlobStoreIndexShardSnapshots(List shardSnapshots) { this.shardSnapshots = ImmutableList.copyOf(shardSnapshots); + // Map between blob names and file info + Map newFiles = newHashMap(); + // Map between original physical names and file info + Map> physicalFiles = newHashMap(); + for (SnapshotFiles snapshot : shardSnapshots) { + // First we build map between filenames in the repo and their original file info + // this map will be used in the next loop + for (FileInfo fileInfo : snapshot.indexFiles()) { + FileInfo oldFile = newFiles.put(fileInfo.name(), fileInfo); + assert oldFile == null || oldFile.isSame(fileInfo); + } + // We are doing it in two loops here so we keep only one copy of the fileInfo per blob + // the first loop de-duplicates fileInfo objects that were loaded from different snapshots but refer to + // the same blob + for (FileInfo fileInfo : snapshot.indexFiles()) { + List physicalFileList = physicalFiles.get(fileInfo.physicalName()); + if (physicalFileList == null) { + physicalFileList = newArrayList(); + physicalFiles.put(fileInfo.physicalName(), physicalFileList); + } + physicalFileList.add(newFiles.get(fileInfo.name())); + } + } + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : physicalFiles.entrySet()) { + mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue())); + } + this.physicalFiles = mapBuilder.build(); + this.files = ImmutableMap.copyOf(newFiles); } + private BlobStoreIndexShardSnapshots(ImmutableMap files, ImmutableList shardSnapshots) { + this.shardSnapshots = shardSnapshots; + this.files = files; + Map> physicalFiles = newHashMap(); + for (SnapshotFiles snapshot : shardSnapshots) { + for (FileInfo fileInfo : snapshot.indexFiles()) { + List physicalFileList = physicalFiles.get(fileInfo.physicalName()); + if (physicalFileList == null) { + physicalFileList = newArrayList(); + physicalFiles.put(fileInfo.physicalName(), physicalFileList); + } + physicalFileList.add(files.get(fileInfo.name())); + } + } + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : physicalFiles.entrySet()) { + mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue())); + } + this.physicalFiles = mapBuilder.build(); + } + + /** * Returns list of snapshots * * @return list of snapshots */ - public ImmutableList snapshots() { + public ImmutableList snapshots() { return this.shardSnapshots; } @@ -51,16 +116,10 @@ public class BlobStoreIndexShardSnapshots implements Iterable findPhysicalIndexFiles(String physicalName) { + return physicalFiles.get(physicalName); } /** @@ -70,17 +129,166 @@ public class BlobStoreIndexShardSnapshots implements Iterable iterator() { + public Iterator iterator() { return shardSnapshots.iterator(); } + + static final class Fields { + static final XContentBuilderString FILES = new XContentBuilderString("files"); + static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots"); + } + + static final class ParseFields { + static final ParseField FILES = new ParseField("files"); + static final ParseField SNAPSHOTS = new ParseField("snapshots"); + } + + /** + * Writes index file for the shard in the following format. + *
+     * {@code
+     * {
+     *     "files": [{
+     *         "name": "__3",
+     *         "physical_name": "_0.si",
+     *         "length": 310,
+     *         "checksum": "1tpsg3p",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFxNMdWNlb......"
+     *     }, {
+     *         "name": "__2",
+     *         "physical_name": "segments_2",
+     *         "length": 150,
+     *         "checksum": "11qjpz6",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFwhzZWdtZ......."
+     *     }, {
+     *         "name": "__1",
+     *         "physical_name": "_0.cfe",
+     *         "length": 363,
+     *         "checksum": "er9r9g",
+     *         "written_by": "5.1.0"
+     *     }, {
+     *         "name": "__0",
+     *         "physical_name": "_0.cfs",
+     *         "length": 3354,
+     *         "checksum": "491liz",
+     *         "written_by": "5.1.0"
+     *     }, {
+     *         "name": "__4",
+     *         "physical_name": "segments_3",
+     *         "length": 150,
+     *         "checksum": "134567",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFwhzZWdtZ......."
+     *     }],
+     *     "snapshots": {
+     *         "snapshot_1": {
+     *             "files": ["__0", "__1", "__2", "__3"]
+     *         },
+     *         "snapshot_2": {
+     *             "files": ["__0", "__1", "__2", "__4"]
+     *         }
+     *     }
+     * }
+     * }
+     * 
+ */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + // First we list all blobs with their file infos: + builder.startArray(Fields.FILES); + for (Map.Entry entry : files.entrySet()) { + FileInfo.toXContent(entry.getValue(), builder, params); + } + builder.endArray(); + // Then we list all snapshots with list of all blobs that are used by the snapshot + builder.startObject(Fields.SNAPSHOTS); + for (SnapshotFiles snapshot : shardSnapshots) { + builder.startObject(snapshot.snapshot(), XContentBuilder.FieldCaseConversion.NONE); + builder.startArray(Fields.FILES); + for (FileInfo fileInfo : snapshot.indexFiles()) { + builder.value(fileInfo.name()); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + + builder.endObject(); + return builder; + } + + public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + Map> snapshotsMap = newHashMap(); + ImmutableMap.Builder filesBuilder = ImmutableMap.builder(); + if (token == XContentParser.Token.START_OBJECT) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + String currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token == XContentParser.Token.START_ARRAY) { + if (ParseFields.FILES.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]"); + } + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + FileInfo fileInfo = FileInfo.fromXContent(parser); + filesBuilder.put(fileInfo.name(), fileInfo); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (ParseFields.SNAPSHOTS.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + String snapshot = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + if (ParseFields.FILES.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]"); + } + List fileNames = newArrayList(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + fileNames.add(parser.text()); + } + snapshotsMap.put(snapshot, fileNames); + } + } + } + } + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + } + + ImmutableMap files = filesBuilder.build(); + ImmutableList.Builder snapshots = ImmutableList.builder(); + for (Map.Entry> entry : snapshotsMap.entrySet()) { + ImmutableList.Builder fileInfosBuilder = ImmutableList.builder(); + for (String file : entry.getValue()) { + FileInfo fileInfo = files.get(file); + assert fileInfo != null; + fileInfosBuilder.add(fileInfo); + } + snapshots.add(new SnapshotFiles(entry.getKey(), fileInfosBuilder.build())); + } + return new BlobStoreIndexShardSnapshots(files, snapshots.build()); + } + } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java new file mode 100644 index 00000000000..0dad85d2d54 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.snapshots.blobstore; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; + +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * Contains a list of files participating in a snapshot + */ +public class SnapshotFiles { + + private final String snapshot; + + private final ImmutableList indexFiles; + + private Map physicalFiles = null; + + public String snapshot() { + return snapshot; + } + + public SnapshotFiles(String snapshot, ImmutableList indexFiles ) { + this.snapshot = snapshot; + this.indexFiles = indexFiles; + } + + /** + * Returns a list of file in the snapshot + */ + public ImmutableList indexFiles() { + return indexFiles; + } + + /** + * Returns true if this snapshot contains a file with a given original name + * + * @param physicalName original file name + * @return true if the file was found, false otherwise + */ + public boolean containPhysicalIndexFile(String physicalName) { + return findPhysicalIndexFile(physicalName) != null; + } + + /** + * Returns information about a physical file with the given name + * @param physicalName the original file name + * @return information about this file + */ + public FileInfo findPhysicalIndexFile(String physicalName) { + if (physicalFiles == null) { + Map files = newHashMap(); + for(FileInfo fileInfo : indexFiles) { + files.put(fileInfo.physicalName(), fileInfo); + } + this.physicalFiles = files; + } + return physicalFiles.get(physicalName); + } + +} diff --git a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2cf35a9905d..a505b4b5493 100644 --- a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -174,9 +174,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshots) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = bStream; - if (isCompress()) { - stream = CompressorFactory.defaultCompressor().streamOutput(stream); - } + StreamOutput stream = compressIfNeeded(bStream); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); builder.startObject(); builder.startArray("snapshots");