Improve snapshot creation and deletion performance on repositories with large number of snapshots

Each shard repository consists of snapshot file for each snapshot - this file contains a map between original physical file that is snapshotted and its representation in repository. This data includes original filename, checksum and length. When a new snapshot is created, elasticsearch needs to read all these snapshot files to figure which file are already present in the repository and which files still have to be copied there. This change adds a new index file that contains all this information combined into a single file. So, if a repository has 1000 snapshots with 1000 shards elasticsearch will only need to read 1000 blobs (one per shard) instead of 1,000,000 to delete a snapshot. This change should also improve snapshot creation speed on repositories with large number of snapshot and high latency.

Fixes #8958
This commit is contained in:
Igor Motov 2015-06-02 12:18:44 -10:00
parent c381a64c66
commit 59d9f7e157
5 changed files with 517 additions and 116 deletions

View File

@ -19,9 +19,11 @@
package org.elasticsearch.index.snapshots.blobstore; package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexFormatTooOldException;
@ -41,8 +43,11 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -65,10 +70,10 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.toStreamOutput;
/** /**
* Blob store based implementation of IndexShardRepository * Blob store based implementation of IndexShardRepository
@ -96,7 +101,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private RateLimitingInputStream.Listener snapshotThrottleListener; private RateLimitingInputStream.Listener snapshotThrottleListener;
private static final String SNAPSHOT_PREFIX = "snapshot-"; private boolean compress;
protected static final String SNAPSHOT_PREFIX = "snapshot-";
protected static final String SNAPSHOT_INDEX_PREFIX = "index-";
protected static final String SNAPSHOT_TEMP_PREFIX = "pending-";
protected static final String DATA_BLOB_PREFIX = "__";
@Inject @Inject
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
@ -115,7 +128,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
*/ */
public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize, public void initialize(BlobStore blobStore, BlobPath basePath, ByteSizeValue chunkSize,
RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter, RateLimiter snapshotRateLimiter, RateLimiter restoreRateLimiter,
final RateLimiterListener rateLimiterListener) { final RateLimiterListener rateLimiterListener, boolean compress) {
this.blobStore = blobStore; this.blobStore = blobStore;
this.basePath = basePath; this.basePath = basePath;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
@ -128,6 +141,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
rateLimiterListener.onSnapshotPause(nanos); rateLimiterListener.onSnapshotPause(nanos);
} }
}; };
this.compress = compress;
} }
/** /**
@ -232,11 +246,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* Serializes snapshot to JSON * Serializes snapshot to JSON
* *
* @param snapshot snapshot * @param snapshot snapshot
* @param stream the stream to output the snapshot JSON represetation to * @param output the stream to output the snapshot JSON representation to
* @throws IOException if an IOException occurs * @throws IOException if an IOException occurs
*/ */
public static void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, OutputStream stream) throws IOException { public void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, StreamOutput output) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
builder.flush(); builder.flush();
} }
@ -249,12 +263,36 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @throws IOException if an IOException occurs * @throws IOException if an IOException occurs
*/ */
public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException { public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream) throws IOException {
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(stream)) { byte[] data = ByteStreams.toByteArray(stream);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
parser.nextToken(); parser.nextToken();
return BlobStoreIndexShardSnapshot.fromXContent(parser); return BlobStoreIndexShardSnapshot.fromXContent(parser);
} }
} }
/**
* Parses JSON representation of a snapshot
*
* @param stream JSON
* @return snapshot
* @throws IOException if an IOException occurs
* */
public static BlobStoreIndexShardSnapshots readSnapshots(InputStream stream) throws IOException {
byte[] data = ByteStreams.toByteArray(stream);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
parser.nextToken();
return BlobStoreIndexShardSnapshots.fromXContent(parser);
}
}
/**
* Returns true if metadata files should be compressed
*
* @return true if compression is needed
*/
protected boolean isCompress() {
return compress;
}
/** /**
* Context for snapshot/restore operations * Context for snapshot/restore operations
*/ */
@ -287,7 +325,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e);
} }
BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
String commitPointName = snapshotBlobName(snapshotId); String commitPointName = snapshotBlobName(snapshotId);
@ -297,15 +337,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
} }
// delete all files that are not referenced by any commit point // Build a list of snapshots that should be preserved
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones List<SnapshotFiles> newSnapshotsList = Lists.newArrayList();
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList(); for (SnapshotFiles point : snapshots) {
for (BlobStoreIndexShardSnapshot point : snapshots) {
if (!point.snapshot().equals(snapshotId.getSnapshot())) { if (!point.snapshot().equals(snapshotId.getSnapshot())) {
newSnapshotsList.add(point); newSnapshotsList.add(point);
} }
} }
cleanup(newSnapshotsList, blobs); // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
} }
/** /**
@ -322,26 +362,63 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} }
/** /**
* Removes all unreferenced files from the repository * Removes all unreferenced files from the repository and writes new index file
*
* We need to be really careful in handling index files in case of failures to make sure we have index file that
* points to files that were deleted.
*
* *
* @param snapshots list of active snapshots in the container * @param snapshots list of active snapshots in the container
* @param fileListGeneration the generation number of the snapshot index file
* @param blobs list of blobs in the container * @param blobs list of blobs in the container
*/ */
protected void cleanup(List<BlobStoreIndexShardSnapshot> snapshots, ImmutableMap<String, BlobMetaData> blobs) { protected void finalize(List<SnapshotFiles> snapshots, int fileListGeneration, ImmutableMap<String, BlobMetaData> blobs) {
BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots); BlobStoreIndexShardSnapshots newSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
// now go over all the blobs, and if they don't exists in a snapshot, delete them // delete old index files first
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
if (!blobName.startsWith("__")) { // delete old file lists
continue; if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
}
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
try { try {
blobContainer.deleteBlob(blobName); blobContainer.deleteBlob(blobName);
} catch (IOException e) { } catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName); // We cannot delete index file - this is fatal, we cannot continue, otherwise we might end up
// with references to non-existing files
throw new IndexShardSnapshotFailedException(shardId, "error deleting index file [{}] during cleanup", e);
} }
} }
} }
// now go over all the blobs, and if they don't exists in a snapshot, delete them
for (String blobName : blobs.keySet()) {
// delete old file lists
if (blobName.startsWith(DATA_BLOB_PREFIX)) {
if (newSnapshots.findNameFile(FileInfo.canonicalName(blobName)) == null) {
try {
blobContainer.deleteBlob(blobName);
} catch (IOException e) {
logger.debug("[{}] [{}] error deleting blob [{}] during cleanup", e, snapshotId, shardId, blobName);
}
}
}
}
// If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) {
String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration;
try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) {
StreamOutput stream = compressIfNeeded(output);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.flush();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e);
}
try {
blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e);
}
}
} }
/** /**
@ -351,7 +428,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @return the blob name * @return the blob name
*/ */
protected String fileNameFromGeneration(long generation) { protected String fileNameFromGeneration(long generation) {
return "__" + Long.toString(generation, Character.MAX_RADIX); return DATA_BLOB_PREFIX + Long.toString(generation, Character.MAX_RADIX);
} }
/** /**
@ -363,17 +440,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
protected long findLatestFileNameGeneration(ImmutableMap<String, BlobMetaData> blobs) { protected long findLatestFileNameGeneration(ImmutableMap<String, BlobMetaData> blobs) {
long generation = -1; long generation = -1;
for (String name : blobs.keySet()) { for (String name : blobs.keySet()) {
if (!name.startsWith("__")) { if (!name.startsWith(DATA_BLOB_PREFIX)) {
continue; continue;
} }
name = FileInfo.canonicalName(name); name = FileInfo.canonicalName(name);
try { try {
long currentGen = Long.parseLong(name.substring(2) /*__*/, Character.MAX_RADIX); long currentGen = Long.parseLong(name.substring(DATA_BLOB_PREFIX.length()), Character.MAX_RADIX);
if (currentGen > generation) { if (currentGen > generation) {
generation = currentGen; generation = currentGen;
} }
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
logger.warn("file [{}] does not conform to the '__' schema"); logger.warn("file [{}] does not conform to the '{}' schema", name, DATA_BLOB_PREFIX);
} }
} }
return generation; return generation;
@ -383,20 +460,47 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* Loads all available snapshots in the repository * Loads all available snapshots in the repository
* *
* @param blobs list of blobs in repository * @param blobs list of blobs in repository
* @return BlobStoreIndexShardSnapshots * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/ */
protected BlobStoreIndexShardSnapshots buildBlobStoreIndexShardSnapshots(ImmutableMap<String, BlobMetaData> blobs) { protected Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(ImmutableMap<String, BlobMetaData> blobs) {
List<BlobStoreIndexShardSnapshot> snapshots = Lists.newArrayList(); int latest = -1;
for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) {
return new Tuple<>(readSnapshots(stream), latest);
} catch (IOException e) {
logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest);
}
}
// We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = Lists.newArrayList();
for (String name : blobs.keySet()) { for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_PREFIX)) { if (name.startsWith(SNAPSHOT_PREFIX)) {
try (InputStream stream = blobContainer.openInput(name)) { try (InputStream stream = blobContainer.openInput(name)) {
snapshots.add(readSnapshot(stream)); BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream);
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
} catch (IOException e) { } catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name); logger.warn("failed to read commit point [{}]", e, name);
} }
} }
} }
return new BlobStoreIndexShardSnapshots(snapshots); return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1);
}
protected StreamOutput compressIfNeeded(OutputStream output) throws IOException {
return toStreamOutput(output, isCompress());
} }
} }
@ -442,9 +546,10 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} }
long generation = findLatestFileNameGeneration(blobs); long generation = findLatestFileNameGeneration(blobs);
BlobStoreIndexShardSnapshots snapshots = buildBlobStoreIndexShardSnapshots(blobs); Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList(); final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = newArrayList();
int indexNumberOfFiles = 0; int indexNumberOfFiles = 0;
@ -464,23 +569,28 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} }
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName); final StoreFileMetaData md = metadata.get(fileName);
boolean snapshotRequired = false; FileInfo existingFileInfo = null;
BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); ImmutableList<FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
try { if (filesInfo != null) {
// in 1.3.3 we added additional hashes for .si / segments_N files for (FileInfo fileInfo : filesInfo) {
// to ensure we don't double the space in the repo since old snapshots try {
// don't have this hash we try to read that hash from the blob store // in 1.3.3 we added additional hashes for .si / segments_N files
// in a bwc compatible way. // to ensure we don't double the space in the repo since old snapshots
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); // don't have this hash we try to read that hash from the blob store
} catch (Throwable e) { // in a bwc compatible way.
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
} catch (Throwable e) {
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
}
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
} }
if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { if (existingFileInfo == null) {
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
snapshotRequired = true;
}
if (snapshotRequired) {
indexNumberOfFiles++; indexNumberOfFiles++;
indexTotalFilesSize += md.length(); indexTotalFilesSize += md.length();
// create a new FileInfo // create a new FileInfo
@ -488,7 +598,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
indexCommitPointFiles.add(snapshotFileInfo); indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo); filesToSnapshot.add(snapshotFileInfo);
} else { } else {
indexCommitPointFiles.add(fileInfo); indexCommitPointFiles.add(existingFileInfo);
} }
} }
@ -515,7 +625,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time. //TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) { try (StreamOutput output = compressIfNeeded(blobContainer.createOutput(snapshotBlobName))) {
writeSnapshot(snapshot, output); writeSnapshot(snapshot, output);
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
@ -523,12 +633,13 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// delete all files that are not referenced by any commit point // delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<BlobStoreIndexShardSnapshot> newSnapshotsList = Lists.newArrayList(); List<SnapshotFiles> newSnapshotsList = Lists.newArrayList();
newSnapshotsList.add(snapshot); newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (BlobStoreIndexShardSnapshot point : snapshots) { for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point); newSnapshotsList.add(point);
} }
cleanup(newSnapshotsList, blobs); // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs);
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.DONE);
} finally { } finally {
store.decRef(); store.decRef();
@ -709,6 +820,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
try { try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); BlobStoreIndexShardSnapshot snapshot = loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final Store.MetadataSnapshot recoveryTargetMetadata; final Store.MetadataSnapshot recoveryTargetMetadata;
try { try {
recoveryTargetMetadata = store.getMetadataOrEmpty(); recoveryTargetMetadata = store.getMetadataOrEmpty();
@ -786,6 +898,22 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
} }
recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!Store.isChecksum(storeFile) && !snapshotFiles.containPhysicalIndexFile(storeFile)) {
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile);
}
}
}
} catch (IOException e) {
logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId);
}
} finally { } finally {
store.decRef(); store.decRef();
} }

View File

@ -190,6 +190,24 @@ public class BlobStoreIndexShardSnapshot {
return metadata.isSame(md); return metadata.isSame(md);
} }
/**
* Checks if a file in a store is the same file
*
* @param fileInfo file in a store
* @return true if file in a store this this file have the same checksum and length
*/
public boolean isSame(FileInfo fileInfo) {
if (numberOfParts != fileInfo.numberOfParts) return false;
if (partBytes != fileInfo.partBytes) return false;
if (!name.equals(fileInfo.name)) return false;
if (partSize != null) {
if (!partSize.equals(fileInfo.partSize)) return false;
} else {
if (fileInfo.partSize != null) return false;
}
return metadata.isSame(fileInfo.metadata);
}
static final class Fields { static final class Fields {
static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString PHYSICAL_NAME = new XContentBuilderString("physical_name"); static final XContentBuilderString PHYSICAL_NAME = new XContentBuilderString("physical_name");
@ -484,38 +502,4 @@ public class BlobStoreIndexShardSnapshot {
startTime, time, numberOfFiles, totalSize); startTime, time, numberOfFiles, totalSize);
} }
/**
* Returns true if this snapshot contains a file with a given original name
*
* @param physicalName original file name
* @return true if the file was found, false otherwise
*/
public boolean containPhysicalIndexFile(String physicalName) {
return findPhysicalIndexFile(physicalName) != null;
}
public FileInfo findPhysicalIndexFile(String physicalName) {
for (FileInfo file : indexFiles) {
if (file.physicalName().equals(physicalName)) {
return file;
}
}
return null;
}
/**
* Returns true if this snapshot contains a file with a given name
*
* @param name file name
* @return true if file was found, false otherwise
*/
public FileInfo findNameFile(String name) {
for (FileInfo file : indexFiles) {
if (file.name().equals(name)) {
return file;
}
}
return null;
}
} }

View File

@ -20,10 +20,22 @@
package org.elasticsearch.index.snapshots.blobstore; package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.newHashMap;
/** /**
* Contains information about all snapshot for the given shard in repository * Contains information about all snapshot for the given shard in repository
@ -31,19 +43,72 @@ import java.util.List;
* This class is used to find files that were already snapshoted and clear out files that no longer referenced by any * This class is used to find files that were already snapshoted and clear out files that no longer referenced by any
* snapshots * snapshots
*/ */
public class BlobStoreIndexShardSnapshots implements Iterable<BlobStoreIndexShardSnapshot> { public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, ToXContent {
private final ImmutableList<BlobStoreIndexShardSnapshot> shardSnapshots; private final ImmutableList<SnapshotFiles> shardSnapshots;
private final ImmutableMap<String, FileInfo> files;
private final ImmutableMap<String, ImmutableList<FileInfo>> physicalFiles;
public BlobStoreIndexShardSnapshots(List<BlobStoreIndexShardSnapshot> shardSnapshots) { public BlobStoreIndexShardSnapshots(List<SnapshotFiles> shardSnapshots) {
this.shardSnapshots = ImmutableList.copyOf(shardSnapshots); this.shardSnapshots = ImmutableList.copyOf(shardSnapshots);
// Map between blob names and file info
Map<String, FileInfo> newFiles = newHashMap();
// Map between original physical names and file info
Map<String, List<FileInfo>> physicalFiles = newHashMap();
for (SnapshotFiles snapshot : shardSnapshots) {
// First we build map between filenames in the repo and their original file info
// this map will be used in the next loop
for (FileInfo fileInfo : snapshot.indexFiles()) {
FileInfo oldFile = newFiles.put(fileInfo.name(), fileInfo);
assert oldFile == null || oldFile.isSame(fileInfo);
}
// We are doing it in two loops here so we keep only one copy of the fileInfo per blob
// the first loop de-duplicates fileInfo objects that were loaded from different snapshots but refer to
// the same blob
for (FileInfo fileInfo : snapshot.indexFiles()) {
List<FileInfo> physicalFileList = physicalFiles.get(fileInfo.physicalName());
if (physicalFileList == null) {
physicalFileList = newArrayList();
physicalFiles.put(fileInfo.physicalName(), physicalFileList);
}
physicalFileList.add(newFiles.get(fileInfo.name()));
}
}
ImmutableMap.Builder<String, ImmutableList<FileInfo>> mapBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<FileInfo>> entry : physicalFiles.entrySet()) {
mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue()));
}
this.physicalFiles = mapBuilder.build();
this.files = ImmutableMap.copyOf(newFiles);
} }
private BlobStoreIndexShardSnapshots(ImmutableMap<String, FileInfo> files, ImmutableList<SnapshotFiles> shardSnapshots) {
this.shardSnapshots = shardSnapshots;
this.files = files;
Map<String, List<FileInfo>> physicalFiles = newHashMap();
for (SnapshotFiles snapshot : shardSnapshots) {
for (FileInfo fileInfo : snapshot.indexFiles()) {
List<FileInfo> physicalFileList = physicalFiles.get(fileInfo.physicalName());
if (physicalFileList == null) {
physicalFileList = newArrayList();
physicalFiles.put(fileInfo.physicalName(), physicalFileList);
}
physicalFileList.add(files.get(fileInfo.name()));
}
}
ImmutableMap.Builder<String, ImmutableList<FileInfo>> mapBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<FileInfo>> entry : physicalFiles.entrySet()) {
mapBuilder.put(entry.getKey(), ImmutableList.copyOf(entry.getValue()));
}
this.physicalFiles = mapBuilder.build();
}
/** /**
* Returns list of snapshots * Returns list of snapshots
* *
* @return list of snapshots * @return list of snapshots
*/ */
public ImmutableList<BlobStoreIndexShardSnapshot> snapshots() { public ImmutableList<SnapshotFiles> snapshots() {
return this.shardSnapshots; return this.shardSnapshots;
} }
@ -51,16 +116,10 @@ public class BlobStoreIndexShardSnapshots implements Iterable<BlobStoreIndexShar
* Finds reference to a snapshotted file by its original name * Finds reference to a snapshotted file by its original name
* *
* @param physicalName original name * @param physicalName original name
* @return file info or null if file is not present in any of snapshots * @return a list of file infos that match specified physical file or null if the file is not present in any of snapshots
*/ */
public FileInfo findPhysicalIndexFile(String physicalName) { public ImmutableList<FileInfo> findPhysicalIndexFiles(String physicalName) {
for (BlobStoreIndexShardSnapshot snapshot : shardSnapshots) { return physicalFiles.get(physicalName);
FileInfo fileInfo = snapshot.findPhysicalIndexFile(physicalName);
if (fileInfo != null) {
return fileInfo;
}
}
return null;
} }
/** /**
@ -70,17 +129,166 @@ public class BlobStoreIndexShardSnapshots implements Iterable<BlobStoreIndexShar
* @return file info or null if file is not present in any of snapshots * @return file info or null if file is not present in any of snapshots
*/ */
public FileInfo findNameFile(String name) { public FileInfo findNameFile(String name) {
for (BlobStoreIndexShardSnapshot snapshot : shardSnapshots) { return files.get(name);
FileInfo fileInfo = snapshot.findNameFile(name);
if (fileInfo != null) {
return fileInfo;
}
}
return null;
} }
@Override @Override
public Iterator<BlobStoreIndexShardSnapshot> iterator() { public Iterator<SnapshotFiles> iterator() {
return shardSnapshots.iterator(); return shardSnapshots.iterator();
} }
static final class Fields {
static final XContentBuilderString FILES = new XContentBuilderString("files");
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
}
static final class ParseFields {
static final ParseField FILES = new ParseField("files");
static final ParseField SNAPSHOTS = new ParseField("snapshots");
}
/**
* Writes index file for the shard in the following format.
* <pre>
* {@code
* {
* "files": [{
* "name": "__3",
* "physical_name": "_0.si",
* "length": 310,
* "checksum": "1tpsg3p",
* "written_by": "5.1.0",
* "meta_hash": "P9dsFxNMdWNlb......"
* }, {
* "name": "__2",
* "physical_name": "segments_2",
* "length": 150,
* "checksum": "11qjpz6",
* "written_by": "5.1.0",
* "meta_hash": "P9dsFwhzZWdtZ......."
* }, {
* "name": "__1",
* "physical_name": "_0.cfe",
* "length": 363,
* "checksum": "er9r9g",
* "written_by": "5.1.0"
* }, {
* "name": "__0",
* "physical_name": "_0.cfs",
* "length": 3354,
* "checksum": "491liz",
* "written_by": "5.1.0"
* }, {
* "name": "__4",
* "physical_name": "segments_3",
* "length": 150,
* "checksum": "134567",
* "written_by": "5.1.0",
* "meta_hash": "P9dsFwhzZWdtZ......."
* }],
* "snapshots": {
* "snapshot_1": {
* "files": ["__0", "__1", "__2", "__3"]
* },
* "snapshot_2": {
* "files": ["__0", "__1", "__2", "__4"]
* }
* }
* }
* }
* </pre>
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// First we list all blobs with their file infos:
builder.startArray(Fields.FILES);
for (Map.Entry<String, FileInfo> entry : files.entrySet()) {
FileInfo.toXContent(entry.getValue(), builder, params);
}
builder.endArray();
// Then we list all snapshots with list of all blobs that are used by the snapshot
builder.startObject(Fields.SNAPSHOTS);
for (SnapshotFiles snapshot : shardSnapshots) {
builder.startObject(snapshot.snapshot(), XContentBuilder.FieldCaseConversion.NONE);
builder.startArray(Fields.FILES);
for (FileInfo fileInfo : snapshot.indexFiles()) {
builder.value(fileInfo.name());
}
builder.endArray();
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
}
public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
Map<String, List<String>> snapshotsMap = newHashMap();
ImmutableMap.Builder<String, FileInfo> filesBuilder = ImmutableMap.builder();
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ElasticsearchParseException("unexpected token [" + token + "]");
}
String currentFieldName = parser.currentName();
token = parser.nextToken();
if (token == XContentParser.Token.START_ARRAY) {
if (ParseFields.FILES.match(currentFieldName) == false) {
throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]");
}
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
FileInfo fileInfo = FileInfo.fromXContent(parser);
filesBuilder.put(fileInfo.name(), fileInfo);
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (ParseFields.SNAPSHOTS.match(currentFieldName) == false) {
throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]");
}
String snapshot = parser.currentName();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("unknown object [" + currentFieldName + "]");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
if (ParseFields.FILES.match(currentFieldName) == false) {
throw new ElasticsearchParseException("unknown array [" + currentFieldName + "]");
}
List<String> fileNames = newArrayList();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
fileNames.add(parser.text());
}
snapshotsMap.put(snapshot, fileNames);
}
}
}
}
} else {
throw new ElasticsearchParseException("unexpected token [" + token + "]");
}
}
}
ImmutableMap<String, FileInfo> files = filesBuilder.build();
ImmutableList.Builder<SnapshotFiles> snapshots = ImmutableList.builder();
for (Map.Entry<String, List<String>> entry : snapshotsMap.entrySet()) {
ImmutableList.Builder<FileInfo> fileInfosBuilder = ImmutableList.builder();
for (String file : entry.getValue()) {
FileInfo fileInfo = files.get(file);
assert fileInfo != null;
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.getKey(), fileInfosBuilder.build()));
}
return new BlobStoreIndexShardSnapshots(files, snapshots.build());
}
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
/**
* Contains a list of files participating in a snapshot
*/
public class SnapshotFiles {
private final String snapshot;
private final ImmutableList<FileInfo> indexFiles;
private Map<String, FileInfo> physicalFiles = null;
public String snapshot() {
return snapshot;
}
public SnapshotFiles(String snapshot, ImmutableList<FileInfo> indexFiles ) {
this.snapshot = snapshot;
this.indexFiles = indexFiles;
}
/**
* Returns a list of file in the snapshot
*/
public ImmutableList<FileInfo> indexFiles() {
return indexFiles;
}
/**
* Returns true if this snapshot contains a file with a given original name
*
* @param physicalName original file name
* @return true if the file was found, false otherwise
*/
public boolean containPhysicalIndexFile(String physicalName) {
return findPhysicalIndexFile(physicalName) != null;
}
/**
* Returns information about a physical file with the given name
* @param physicalName the original file name
* @return information about this file
*/
public FileInfo findPhysicalIndexFile(String physicalName) {
if (physicalFiles == null) {
Map<String, FileInfo> files = newHashMap();
for(FileInfo fileInfo : indexFiles) {
files.put(fileInfo.physicalName(), fileInfo);
}
this.physicalFiles = files;
}
return physicalFiles.get(physicalName);
}
}

View File

@ -174,9 +174,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
*/ */
@Override @Override
protected void doStart() { protected void doStart() {
this.snapshotsBlobContainer = blobStore().blobContainer(basePath()); this.snapshotsBlobContainer = blobStore().blobContainer(basePath());
indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this); indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this, isCompress());
} }
/** /**
@ -329,11 +328,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
} }
private StreamOutput compressIfNeeded(OutputStream output) throws IOException { private StreamOutput compressIfNeeded(OutputStream output) throws IOException {
return toStreamOutput(output, isCompress());
}
public static StreamOutput toStreamOutput(OutputStream output, boolean compress) throws IOException {
StreamOutput out = null; StreamOutput out = null;
boolean success = false; boolean success = false;
try { try {
out = new OutputStreamStreamOutput(output); out = new OutputStreamStreamOutput(output);
if (isCompress()) { if (compress) {
out = CompressorFactory.defaultCompressor().streamOutput(out); out = CompressorFactory.defaultCompressor().streamOutput(out);
} }
success = true; success = true;
@ -596,10 +599,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
*/ */
protected void writeSnapshotList(ImmutableList<SnapshotId> snapshots) throws IOException { protected void writeSnapshotList(ImmutableList<SnapshotId> snapshots) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput(); BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = bStream; StreamOutput stream = compressIfNeeded(bStream);
if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject(); builder.startObject();
builder.startArray("snapshots"); builder.startArray("snapshots");