diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 707e77dbefe..d498119c5fa 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.snapshots.blobstore; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.io.ByteStreams; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -41,8 +43,11 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; @@ -65,10 +70,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.collect.Lists.newArrayList; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.toStreamOutput; /** * Blob store based implementation of IndexShardRepository @@ -96,7 +101,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements private RateLimitingInputStream.Listener snapshotThrottleListener; - private static final String SNAPSHOT_PREFIX = "snapshot-"; + private boolean compress; + + protected static final String SNAPSHOT_PREFIX = "snapshot-"; + + protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; + + protected static final String SNAPSHOT_TEMP_PREFIX = "pending-"; + + protected static final String DATA_BLOB_PREFIX = "__"; @Inject public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { @@ -115,7 +128,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements */ public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, - final RateLimiterListener rateLimiterListener) { + final RateLimiterListener rateLimiterListener, boolean compress) { this.blobStore = blobStore; this.basePath = basePath; this.chunkSize = chunkSize; @@ -128,6 +141,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements rateLimiterListener.onSnapshotPause(nanos); } }; + this.compress = compress; } /** @@ -232,11 +246,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Serializes snapshot to JSON * * @param snapshot snapshot - * @param stream the stream to output the snapshot JSON represetation to + * @param output the stream to output the snapshot JSON representation to * @throws IOException if an IOException occurs */ - public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); + public void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, StreamOutput output) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output).prettyPrint(); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); builder.flush(); } @@ -249,12 +263,36 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @throws IOException if an IOException occurs */ public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException { - try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) { + byte[] data = ByteStreams.toByteArray(stream); + try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { parser.nextToken(); return BlobStoreIndexShardSnapshot.fromXContent(parser); } } + /** + * Parses JSON representation of a snapshot + * + * @param stream JSON + * @return snapshot + * @throws IOException if an IOException occurs + * */ + public static BlobStoreIndexShardSnapshots readSnapshots(InputStream stream) throws IOException { + byte[] data = ByteStreams.toByteArray(stream); + try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { + parser.nextToken(); + return BlobStoreIndexShardSnapshots.fromXContent(parser); + } + } + /** + * Returns true if metadata files should be compressed + * + * @return true if compression is needed + */ + protected boolean isCompress() { + return compress; + } + /** * Context for snapshot/restore operations */ @@ -287,7 +325,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); } - BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); String commitPointName = snapshotBlobName(snapshotId); @@ -297,15 +337,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); } - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = Lists.newArrayList(); - for (BlobStoreIndexShardSnapshot point : snapshots) { + // Build a list of snapshots that should be preserved + List newSnapshotsList = Lists.newArrayList(); + for (SnapshotFiles point : snapshots) { if (!point.snapshot().equals(snapshotId.getSnapshot())) { newSnapshotsList.add(point); } } - cleanup(newSnapshotsList, blobs); + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); } /** @@ -322,26 +362,63 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } /** - * Removes all unreferenced files from the repository + * Removes all unreferenced files from the repository and writes new index file + * + * We need to be really careful in handling index files in case of failures to make sure we have index file that + * points to files that were deleted. + * * * @param snapshots list of active snapshots in the container + * @param fileListGeneration the generation number of the snapshot index file * @param blobs list of blobs in the container */ - protected void cleanup(List snapshots, ImmutableMap blobs) { + protected void finalize(List snapshots, int fileListGeneration, ImmutableMap blobs) { BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - // now go over all the blobs, and if they don't exists in a snapshot, delete them + // delete old index files first for (String blobName : blobs.keySet()) { - if (!blobName.startsWith("__")) { - continue; - } - if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { + // delete old file lists + if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { try { blobContainer.deleteBlob(blobName); } catch (IOException e) { - logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName); + // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up + // with references to non-existing files + throw new IndexShardSnapshotFailedException(shardId, "error deleting index file [{}] during cleanup", e); } } } + + // now go over all the blobs, and if they don't exists in a snapshot, delete them + for (String blobName : blobs.keySet()) { + // delete old file lists + if (blobName.startsWith(DATA_BLOB_PREFIX)) { + if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) { + try { + blobContainer.deleteBlob(blobName); + } catch (IOException e) { + logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName); + } + } + } + } + + // If we deleted all snapshots - we don't need to create the index file + if (snapshots.size() > 0) { + String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration; + try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) { + StreamOutput stream = compressIfNeeded(output); + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); + newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.flush(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); + } + try { + blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e); + } + } } /** @@ -351,7 +428,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @return the blob name */ protected String fileNameFromGeneration(long generation) { - return "__" + Long.toString(generation, Character.MAX_RADIX); + return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX); } /** @@ -363,17 +440,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements protected long findLatestFileNameGeneration(ImmutableMap blobs) { long generation = -1; for (String name : blobs.keySet()) { - if (!name.startsWith("__")) { + if (!name.startsWith(DATA_BLOB_PREFIX)) { continue; } name = FileInfo.canonicalName(name); try { - long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX); + long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX); if (currentGen > generation) { generation = currentGen; } } catch (NumberFormatException e) { - logger.warn("file [{}] does not conform to the '__' schema"); + logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX); } } return generation; @@ -383,20 +460,47 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Loads all available snapshots in the repository * * @param blobs list of blobs in repository - * @return BlobStoreIndexShardSnapshots + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation */ - protected BlobStoreIndexShardSnapshots buildBlobStoreIndexShardSnapshots(ImmutableMap blobs) { - List snapshots = Lists.newArrayList(); + protected Tuple buildBlobStoreIndexShardSnapshots(ImmutableMap blobs) { + int latest = -1; + for (String name : blobs.keySet()) { + if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { + try { + int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); + if (gen > latest) { + latest = gen; + } + } catch (NumberFormatException ex) { + logger.warn("failed to parse index file name [{}]", name); + } + } + } + if (latest >= 0) { + try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) { + return new Tuple<>(readSnapshots(stream), latest); + } catch (IOException e) { + logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); + } + } + + // We couldn't load the index file - falling back to loading individual snapshots + List snapshots = Lists.newArrayList(); for (String name : blobs.keySet()) { if (name.startsWith(SNAPSHOT_PREFIX)) { try (InputStream stream = blobContainer.openInput(name)) { - snapshots.add(readSnapshot(stream)); + BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream); + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); } catch (IOException e) { logger.warn("failed to read commit point [{}]", e, name); } } } - return new BlobStoreIndexShardSnapshots(snapshots); + return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); + } + + protected StreamOutput compressIfNeeded(OutputStream output) throws IOException { + return toStreamOutput(output, isCompress()); } } @@ -442,9 +546,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } long generation = findLatestFileNameGeneration(blobs); - BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); - final CopyOnWriteArrayList failures = new CopyOnWriteArrayList<>(); final List indexCommitPointFiles = newArrayList(); int indexNumberOfFiles = 0; @@ -464,23 +569,28 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); - boolean snapshotRequired = false; - BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); - } catch (Throwable e) { - logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + FileInfo existingFileInfo = null; + ImmutableList filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (FileInfo fileInfo : filesInfo) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Throwable e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } + if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } + } } - if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { - // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs - snapshotRequired = true; - } - - if (snapshotRequired) { + if (existingFileInfo == null) { indexNumberOfFiles++; indexTotalFilesSize += md.length(); // create a new FileInfo @@ -488,7 +598,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements indexCommitPointFiles.add(snapshotFileInfo); filesToSnapshot.add(snapshotFileInfo); } else { - indexCommitPointFiles.add(fileInfo); + indexCommitPointFiles.add(existingFileInfo); } } @@ -515,7 +625,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) { + try (StreamOutput output = compressIfNeeded(blobContainer.createOutput(snapshotBlobName))) { writeSnapshot(snapshot, output); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); @@ -523,12 +633,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // delete all files that are not referenced by any commit point // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = Lists.newArrayList(); - newSnapshotsList.add(snapshot); - for (BlobStoreIndexShardSnapshot point : snapshots) { + List newSnapshotsList = Lists.newArrayList(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { newSnapshotsList.add(point); } - cleanup(newSnapshotsList, blobs); + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalize(newSnapshotsList, fileListGeneration + 1, blobs); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); } finally { store.decRef(); @@ -709,6 +820,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); final Store.MetadataSnapshot recoveryTargetMetadata; try { recoveryTargetMetadata = store.getMetadataOrEmpty(); @@ -786,6 +898,22 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); } recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (!Store.isChecksum(storeFile) && !snapshotFiles.containPhysicalIndexFile(storeFile)) { + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); + } + } + } + } catch (IOException e) { + logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); + } } finally { store.decRef(); } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index b643bf234e5..e2acff96f57 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -190,6 +190,24 @@ public class BlobStoreIndexShardSnapshot { return metadata.isSame(md); } + /** + * Checks if a file in a store is the same file + * + * @param fileInfo file in a store + * @return true if file in a store this this file have the same checksum and length + */ + public boolean isSame(FileInfo fileInfo) { + if (numberOfParts != fileInfo.numberOfParts) return false; + if (partBytes != fileInfo.partBytes) return false; + if (!name.equals(fileInfo.name)) return false; + if (partSize != null) { + if (!partSize.equals(fileInfo.partSize)) return false; + } else { + if (fileInfo.partSize != null) return false; + } + return metadata.isSame(fileInfo.metadata); + } + static final class Fields { static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString PHYSICAL_NAME = new XContentBuilderString("physical_name"); @@ -484,38 +502,4 @@ public class BlobStoreIndexShardSnapshot { startTime, time, numberOfFiles, totalSize); } - /** - * Returns true if this snapshot contains a file with a given original name - * - * @param physicalName original file name - * @return true if the file was found, false otherwise - */ - public boolean containPhysicalIndexFile(String physicalName) { - return findPhysicalIndexFile(physicalName) != null; - } - - public FileInfo findPhysicalIndexFile(String physicalName) { - for (FileInfo file : indexFiles) { - if (file.physicalName().equals(physicalName)) { - return file; - } - } - return null; - } - - /** - * Returns true if this snapshot contains a file with a given name - * - * @param name file name - * @return true if file was found, false otherwise - */ - public FileInfo findNameFile(String name) { - for (FileInfo file : indexFiles) { - if (file.name().equals(name)) { - return file; - } - } - return null; - } - } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index a8657655122..11a8689bb26 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -20,10 +20,22 @@ package org.elasticsearch.index.snapshots.blobstore; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; +import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Maps.newHashMap; /** * Contains information about all snapshot for the given shard in repository @@ -31,19 +43,72 @@ import java.util.List; * This class is used to find files that were already snapshoted and clear out files that no longer referenced by any * snapshots */ -public class BlobStoreIndexShardSnapshots implements Iterable { - private final ImmutableList shardSnapshots; +public class BlobStoreIndexShardSnapshots implements Iterable, ToXContent { + private final ImmutableList shardSnapshots; + private final ImmutableMap files; + private final ImmutableMap> physicalFiles; - public BlobStoreIndexShardSnapshots(List shardSnapshots) { + public BlobStoreIndexShardSnapshots(List shardSnapshots) { this.shardSnapshots = ImmutableList.copyOf(shardSnapshots); + // Map between blob names and file info + Map newFiles = newHashMap(); + // Map between original physical names and file info + Map> physicalFiles = newHashMap(); + for (SnapshotFiles snapshot : shardSnapshots) { + // First we build map between filenames in the repo and their original file info + // this map will be used in the next loop + for (FileInfo fileInfo : snapshot.indexFiles()) { + FileInfo oldFile = newFiles.put(fileInfo.name(), fileInfo); + assert oldFile == null || oldFile.isSame(fileInfo); + } + // We are doing it in two loops here so we keep only one copy of the fileInfo per blob + // the first loop de-duplicates fileInfo objects that were loaded from different snapshots but refer to + // the same blob + for (FileInfo fileInfo : snapshot.indexFiles()) { + List physicalFileList = physicalFiles.get(fileInfo.physicalName()); + if (physicalFileList == null) { + physicalFileList = newArrayList(); + physicalFiles.put(fileInfo.physicalName(), physicalFileList); + } + physicalFileList.add(newFiles.get(fileInfo.name())); + } + } + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : physicalFiles.entrySet()) { + mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue())); + } + this.physicalFiles = mapBuilder.build(); + this.files = ImmutableMap.copyOf(newFiles); } + private BlobStoreIndexShardSnapshots(ImmutableMap files, ImmutableList shardSnapshots) { + this.shardSnapshots = shardSnapshots; + this.files = files; + Map> physicalFiles = newHashMap(); + for (SnapshotFiles snapshot : shardSnapshots) { + for (FileInfo fileInfo : snapshot.indexFiles()) { + List physicalFileList = physicalFiles.get(fileInfo.physicalName()); + if (physicalFileList == null) { + physicalFileList = newArrayList(); + physicalFiles.put(fileInfo.physicalName(), physicalFileList); + } + physicalFileList.add(files.get(fileInfo.name())); + } + } + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : physicalFiles.entrySet()) { + mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue())); + } + this.physicalFiles = mapBuilder.build(); + } + + /** * Returns list of snapshots * * @return list of snapshots */ - public ImmutableList snapshots() { + public ImmutableList snapshots() { return this.shardSnapshots; } @@ -51,16 +116,10 @@ public class BlobStoreIndexShardSnapshots implements Iterable findPhysicalIndexFiles(String physicalName) { + return physicalFiles.get(physicalName); } /** @@ -70,17 +129,166 @@ public class BlobStoreIndexShardSnapshots implements Iterable iterator() { + public Iterator iterator() { return shardSnapshots.iterator(); } + + static final class Fields { + static final XContentBuilderString FILES = new XContentBuilderString("files"); + static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots"); + } + + static final class ParseFields { + static final ParseField FILES = new ParseField("files"); + static final ParseField SNAPSHOTS = new ParseField("snapshots"); + } + + /** + * Writes index file for the shard in the following format. + *
+     * {@code
+     * {
+     *     "files": [{
+     *         "name": "__3",
+     *         "physical_name": "_0.si",
+     *         "length": 310,
+     *         "checksum": "1tpsg3p",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFxNMdWNlb......"
+     *     }, {
+     *         "name": "__2",
+     *         "physical_name": "segments_2",
+     *         "length": 150,
+     *         "checksum": "11qjpz6",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFwhzZWdtZ......."
+     *     }, {
+     *         "name": "__1",
+     *         "physical_name": "_0.cfe",
+     *         "length": 363,
+     *         "checksum": "er9r9g",
+     *         "written_by": "5.1.0"
+     *     }, {
+     *         "name": "__0",
+     *         "physical_name": "_0.cfs",
+     *         "length": 3354,
+     *         "checksum": "491liz",
+     *         "written_by": "5.1.0"
+     *     }, {
+     *         "name": "__4",
+     *         "physical_name": "segments_3",
+     *         "length": 150,
+     *         "checksum": "134567",
+     *         "written_by": "5.1.0",
+     *         "meta_hash": "P9dsFwhzZWdtZ......."
+     *     }],
+     *     "snapshots": {
+     *         "snapshot_1": {
+     *             "files": ["__0", "__1", "__2", "__3"]
+     *         },
+     *         "snapshot_2": {
+     *             "files": ["__0", "__1", "__2", "__4"]
+     *         }
+     *     }
+     * }
+     * }
+     * 
+ */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + // First we list all blobs with their file infos: + builder.startArray(Fields.FILES); + for (Map.Entry entry : files.entrySet()) { + FileInfo.toXContent(entry.getValue(), builder, params); + } + builder.endArray(); + // Then we list all snapshots with list of all blobs that are used by the snapshot + builder.startObject(Fields.SNAPSHOTS); + for (SnapshotFiles snapshot : shardSnapshots) { + builder.startObject(snapshot.snapshot(), XContentBuilder.FieldCaseConversion.NONE); + builder.startArray(Fields.FILES); + for (FileInfo fileInfo : snapshot.indexFiles()) { + builder.value(fileInfo.name()); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + + builder.endObject(); + return builder; + } + + public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + Map> snapshotsMap = newHashMap(); + ImmutableMap.Builder filesBuilder = ImmutableMap.builder(); + if (token == XContentParser.Token.START_OBJECT) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + String currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token == XContentParser.Token.START_ARRAY) { + if (ParseFields.FILES.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]"); + } + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + FileInfo fileInfo = FileInfo.fromXContent(parser); + filesBuilder.put(fileInfo.name(), fileInfo); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (ParseFields.SNAPSHOTS.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + String snapshot = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]"); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + if (ParseFields.FILES.match(currentFieldName) == false) { + throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]"); + } + List fileNames = newArrayList(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + fileNames.add(parser.text()); + } + snapshotsMap.put(snapshot, fileNames); + } + } + } + } + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + } + + ImmutableMap files = filesBuilder.build(); + ImmutableList.Builder snapshots = ImmutableList.builder(); + for (Map.Entry> entry : snapshotsMap.entrySet()) { + ImmutableList.Builder fileInfosBuilder = ImmutableList.builder(); + for (String file : entry.getValue()) { + FileInfo fileInfo = files.get(file); + assert fileInfo != null; + fileInfosBuilder.add(fileInfo); + } + snapshots.add(new SnapshotFiles(entry.getKey(), fileInfosBuilder.build())); + } + return new BlobStoreIndexShardSnapshots(files, snapshots.build()); + } + } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java new file mode 100644 index 00000000000..0dad85d2d54 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.snapshots.blobstore; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; + +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMap; + +/** + * Contains a list of files participating in a snapshot + */ +public class SnapshotFiles { + + private final String snapshot; + + private final ImmutableList indexFiles; + + private Map physicalFiles = null; + + public String snapshot() { + return snapshot; + } + + public SnapshotFiles(String snapshot, ImmutableList indexFiles ) { + this.snapshot = snapshot; + this.indexFiles = indexFiles; + } + + /** + * Returns a list of file in the snapshot + */ + public ImmutableList indexFiles() { + return indexFiles; + } + + /** + * Returns true if this snapshot contains a file with a given original name + * + * @param physicalName original file name + * @return true if the file was found, false otherwise + */ + public boolean containPhysicalIndexFile(String physicalName) { + return findPhysicalIndexFile(physicalName) != null; + } + + /** + * Returns information about a physical file with the given name + * @param physicalName the original file name + * @return information about this file + */ + public FileInfo findPhysicalIndexFile(String physicalName) { + if (physicalFiles == null) { + Map files = newHashMap(); + for(FileInfo fileInfo : indexFiles) { + files.put(fileInfo.physicalName(), fileInfo); + } + this.physicalFiles = files; + } + return physicalFiles.get(physicalName); + } + +} diff --git a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2cf35a9905d..a505b4b5493 100644 --- a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -174,9 +174,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshots) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = bStream; - if (isCompress()) { - stream = CompressorFactory.defaultCompressor().streamOutput(stream); - } + StreamOutput stream = compressIfNeeded(bStream); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); builder.startObject(); builder.startArray("snapshots");