From 0b024ad2f3a65902b4b0791c208ee85b7912ad21 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 4 Dec 2014 10:56:38 -0500 Subject: [PATCH] Snapshot/Restore: switch to write once mode for snapshot metadata files This commit removes creation of in-progress snapshot file and makes creation of the final snapshot file atomic. Fixes #8696 --- .../cluster/metadata/SnapshotMetaData.java | 66 ++- .../common/blobstore/BlobContainer.java | 22 + .../common/blobstore/fs/FsBlobContainer.java | 15 +- .../blobstore/url/URLBlobContainer.java | 5 + .../BlobStoreIndexShardRepository.java | 5 +- .../repositories/Repository.java | 10 +- .../blobstore/BlobStoreRepository.java | 107 ++-- .../blobstore/BlobStoreSnapshot.java | 514 ------------------ .../org/elasticsearch/snapshots/Snapshot.java | 257 ++++++++- .../snapshots/SnapshotsService.java | 47 +- .../common/blobstore/BlobStoreTest.java | 90 ++- .../DedicatedClusterSnapshotRestoreTests.java | 6 +- .../SharedClusterSnapshotRestoreTests.java | 2 +- .../mockstore/BlobContainerWrapper.java | 5 + .../snapshots/mockstore/MockRepository.java | 6 + 15 files changed, 509 insertions(+), 648 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreSnapshot.java diff --git a/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java index edf55d40884..b759fe5daeb 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; @@ -66,12 +67,14 @@ public class SnapshotMetaData implements MetaData.Custom { private final ImmutableMap shards; private final ImmutableList indices; private final ImmutableMap> waitingIndices; + private final long startTime; - public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList indices, ImmutableMap shards) { + public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList indices, long startTime, ImmutableMap shards) { this.state = state; this.snapshotId = snapshotId; this.includeGlobalState = includeGlobalState; this.indices = indices; + this.startTime = startTime; if (shards == null) { this.shards = ImmutableMap.of(); this.waitingIndices = ImmutableMap.of(); @@ -81,6 +84,14 @@ public class SnapshotMetaData implements MetaData.Custom { } } + public Entry(Entry entry, State state, ImmutableMap shards) { + this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards); + } + + public Entry(Entry entry, ImmutableMap shards) { + this(entry, entry.state, shards); + } + public SnapshotId snapshotId() { return this.snapshotId; } @@ -105,6 +116,10 @@ public class SnapshotMetaData implements MetaData.Custom { return includeGlobalState; } + public long startTime() { + return startTime; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -113,10 +128,12 @@ public class SnapshotMetaData implements MetaData.Custom { Entry entry = (Entry) o; if (includeGlobalState != entry.includeGlobalState) return false; + if (startTime != entry.startTime) return false; if (!indices.equals(entry.indices)) return false; if (!shards.equals(entry.shards)) return false; if (!snapshotId.equals(entry.snapshotId)) return false; if (state != entry.state) return false; + if (!waitingIndices.equals(entry.waitingIndices)) return false; return true; } @@ -128,6 +145,8 @@ public class SnapshotMetaData implements MetaData.Custom { result = 31 * result + (includeGlobalState ? 1 : 0); result = 31 * result + shards.hashCode(); result = 31 * result + indices.hashCode(); + result = 31 * result + waitingIndices.hashCode(); + result = 31 * result + (int) (startTime ^ (startTime >>> 32)); return result; } @@ -331,7 +350,8 @@ public class SnapshotMetaData implements MetaData.Custom { for (int j = 0; j < indices; j++) { indexBuilder.add(in.readString()); } - ImmutableMap.Builder builder = ImmutableMap.builder(); + long startTime = in.readLong(); + ImmutableMap.Builder builder = ImmutableMap.builder(); int shards = in.readVInt(); for (int j = 0; j < shards; j++) { ShardId shardId = ShardId.readShardId(in); @@ -339,7 +359,7 @@ public class SnapshotMetaData implements MetaData.Custom { State shardState = State.fromValue(in.readByte()); builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState)); } - entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), builder.build()); + entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), startTime, builder.build()); } return new SnapshotMetaData(entries); } @@ -355,6 +375,7 @@ public class SnapshotMetaData implements MetaData.Custom { for (String index : entry.indices()) { out.writeString(index); } + out.writeLong(entry.startTime()); out.writeVInt(entry.shards().size()); for (Map.Entry shardEntry : entry.shards().entrySet()) { shardEntry.getKey().writeTo(out); @@ -369,9 +390,24 @@ public class SnapshotMetaData implements MetaData.Custom { throw new UnsupportedOperationException(); } + static final class Fields { + static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository"); + static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots"); + static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); + static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state"); + static final XContentBuilderString STATE = new XContentBuilderString("state"); + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis"); + static final XContentBuilderString START_TIME = new XContentBuilderString("start_time"); + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + static final XContentBuilderString INDEX = new XContentBuilderString("index"); + static final XContentBuilderString SHARD = new XContentBuilderString("shard"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + } + @Override public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startArray("snapshots"); + builder.startArray(Fields.SNAPSHOTS); for (Entry entry : customIndexMetaData.entries()) { toXContent(entry, builder, params); } @@ -380,33 +416,33 @@ public class SnapshotMetaData implements MetaData.Custom { public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); - builder.field("repository", entry.snapshotId().getRepository()); - builder.field("snapshot", entry.snapshotId().getSnapshot()); - builder.field("include_global_state", entry.includeGlobalState()); - builder.field("state", entry.state()); - builder.startArray("indices"); + builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository()); + builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot()); + builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); + builder.field(Fields.STATE, entry.state()); + builder.startArray(Fields.INDICES); { for (String index : entry.indices()) { builder.value(index); } } builder.endArray(); - builder.startArray("shards"); + builder.timeValueField(Fields.START_TIME_MILLIS, Fields.START_TIME, entry.startTime()); + builder.startArray(Fields.SHARDS); { for (Map.Entry shardEntry : entry.shards.entrySet()) { ShardId shardId = shardEntry.getKey(); ShardSnapshotStatus status = shardEntry.getValue(); builder.startObject(); { - builder.field("index", shardId.getIndex()); - builder.field("shard", shardId.getId()); - builder.field("state", status.state()); - builder.field("node", status.nodeId()); + builder.field(Fields.INDEX, shardId.getIndex()); + builder.field(Fields.SHARD, shardId.getId()); + builder.field(Fields.STATE, status.state()); + builder.field(Fields.NODE, status.nodeId()); } builder.endObject(); } } - builder.endArray(); builder.endObject(); } diff --git a/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 3ff5158a39a..a7afda2055b 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -51,13 +51,35 @@ public interface BlobContainer { */ OutputStream createOutput(String blobName) throws IOException; + /** + * Deletes a blob with giving name. + * + * If blob exist but cannot be deleted an exception has to be thrown. + */ void deleteBlob(String blobName) throws IOException; + /** + * Deletes all blobs in the container that match the specified prefix. + */ void deleteBlobsByPrefix(String blobNamePrefix) throws IOException; + /** + * Deletes all blobs in the container that match the supplied filter. + */ void deleteBlobsByFilter(BlobNameFilter filter) throws IOException; + /** + * Lists all blobs in the container + */ ImmutableMap listBlobs() throws IOException; + /** + * Lists all blobs in the container that match specified prefix + */ ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException; + + /** + * Atomically renames source blob into target blob + */ + void move(String sourceBlobName, String targetBlobName) throws IOException; } diff --git a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 28e556d39ae..852c3399845 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -21,7 +21,6 @@ package org.elasticsearch.common.blobstore.fs; import com.google.common.collect.ImmutableMap; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; @@ -30,8 +29,7 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.FileSystemUtils; import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; +import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; /** @@ -99,4 +97,15 @@ public class FsBlobContainer extends AbstractBlobContainer { } }, blobStore.bufferSizeInBytes()); } + + @Override + public void move(String source, String target) throws IOException { + Path sourcePath = path.resolve(source); + Path targetPath = path.resolve(target); + // If the target file exists then Files.move() behaviour is implementation specific + // the existing file might be replaced or this method fails by throwing an IOException. + assert !Files.exists(targetPath); + Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE); + IOUtils.fsync(path, true); + } } diff --git a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index e7ee3803402..d08afe2d797 100644 --- a/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -69,6 +69,11 @@ public class URLBlobContainer extends AbstractBlobContainer { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } + @Override + public void move(String from, String to) throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + /** * This operation is not supported by URLBlobContainer */ diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index e6f5ed0276a..3d481be34ff 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -240,7 +240,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); builder.flush(); - builder.close(); } /** @@ -510,14 +509,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // now create and write the commit point snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); - String commitPointName = snapshotBlobName(snapshotId); + String snapshotBlobName = snapshotBlobName(snapshotId); BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try (OutputStream output = blobContainer.createOutput(commitPointName)) { + try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) { writeSnapshot(snapshot, output); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); diff --git a/src/main/java/org/elasticsearch/repositories/Repository.java b/src/main/java/org/elasticsearch/repositories/Repository.java index a5619c9c927..cced4040648 100644 --- a/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/src/main/java/org/elasticsearch/repositories/Repository.java @@ -22,8 +22,6 @@ import com.google.common.collect.ImmutableList; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotShardFailure; @@ -41,7 +39,7 @@ import java.io.IOException; *
  • Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, com.google.common.collect.ImmutableList, org.elasticsearch.cluster.metadata.MetaData)} * with list of indices that will be included into the snapshot
  • *
  • Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard
  • - *
  • When all shard calls return master calls {@link #finalizeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, String, int, com.google.common.collect.ImmutableList)} + *
  • When all shard calls return master calls {@link #finalizeSnapshot} * with possible list of failures
  • * */ @@ -93,7 +91,7 @@ public interface Repository extends LifecycleComponent { * @param shardFailures list of shard failures * @return snapshot description */ - Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList shardFailures); + Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList indices, long startTime, String failure, int totalShards, ImmutableList shardFailures); /** * Deletes snapshot @@ -115,7 +113,7 @@ public interface Repository extends LifecycleComponent { /** * Verifies repository on the master node and returns the verification token. - * + *

    * If the verification token is not null, it's passed to all data nodes for verification. If it's null - no * additional verification is required * @@ -125,7 +123,7 @@ public interface Repository extends LifecycleComponent { /** * Called at the end of repository verification process. - * + *

    * This method should perform all necessary cleanup of the temporary files created in the repository * * @param verificationToken verification request generated by {@link #startVerification} command diff --git a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index a525e527189..894550811fb 100644 --- a/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -62,9 +62,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.NoSuchFileException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.google.common.collect.Lists.newArrayList; @@ -80,9 +78,9 @@ import static com.google.common.collect.Lists.newArrayList; * {@code * STORE_ROOT * |- index - list of all snapshot name as JSON array - * |- snapshot-20131010 - JSON serialized BlobStoreSnapshot for snapshot "20131010" + * |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010" * |- metadata-20131010 - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) - * |- snapshot-20131011 - JSON serialized BlobStoreSnapshot for snapshot "20131011" + * |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011" * |- metadata-20131011 - JSON serialized MetaData for snapshot "20131011" * ..... * |- indices/ - data for all indices @@ -118,6 +116,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indices, MetaData metaData) { try { - BlobStoreSnapshot blobStoreSnapshot = BlobStoreSnapshot.builder() - .name(snapshotId.getSnapshot()) - .indices(indices) - .startTime(System.currentTimeMillis()) - .build(); String snapshotBlobName = snapshotBlobName(snapshotId); if (snapshotsBlobContainer.blobExists(snapshotBlobName)) { - // TODO: Can we make it atomic? throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists"); } - try (OutputStream output = snapshotsBlobContainer.createOutput(snapshotBlobName)) { - writeSnapshot(blobStoreSnapshot, output); - } // Write Global MetaData // TODO: Check if metadata needs to be written try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) { @@ -320,42 +311,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent shardFailures) { - BlobStoreSnapshot snapshot = (BlobStoreSnapshot) readSnapshot(snapshotId); - if (snapshot == null) { - throw new SnapshotMissingException(snapshotId); - } - if (snapshot.state().completed()) { - throw new SnapshotException(snapshotId, "snapshot is already closed"); - } + public Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList indices, long startTime, String failure, int totalShards, ImmutableList shardFailures) { try { + String tempBlobName = tempSnapshotBlobName(snapshotId); String blobName = snapshotBlobName(snapshotId); - BlobStoreSnapshot.Builder updatedSnapshot = BlobStoreSnapshot.builder().snapshot(snapshot); - if (failure == null) { - if (shardFailures.isEmpty()) { - updatedSnapshot.success(); - } else { - updatedSnapshot.partial(); - } - updatedSnapshot.failures(totalShards, shardFailures); - } else { - updatedSnapshot.failed(failure); - } - updatedSnapshot.endTime(System.currentTimeMillis()); - snapshot = updatedSnapshot.build(); - try (OutputStream output = snapshotsBlobContainer.createOutput(blobName)) { - writeSnapshot(snapshot, output); + Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures); + try (OutputStream output = snapshotsBlobContainer.createOutput(tempBlobName)) { + writeSnapshot(blobStoreSnapshot, output); } + snapshotsBlobContainer.move(tempBlobName, blobName); ImmutableList snapshotIds = snapshots(); if (!snapshotIds.contains(snapshotId)) { snapshotIds = ImmutableList.builder().addAll(snapshotIds).add(snapshotId).build(); } writeSnapshotList(snapshotIds); - return snapshot; + return blobStoreSnapshot; } catch (IOException ex) { throw new RepositoryException(this.repositoryName, "failed to update snapshot in repository", ex); } @@ -400,29 +375,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent - * Stored in the the root of the repository, see {@link BlobStoreRepository} for more information. - */ -public class BlobStoreSnapshot implements Snapshot { - private final String name; - - private final Version version; - - private final SnapshotState state; - - private final String reason; - - private final ImmutableList indices; - - private final long startTime; - - private final long endTime; - - private final int totalShard; - - private final int successfulShards; - - private final ImmutableList shardFailures; - - private BlobStoreSnapshot(String name, ImmutableList indices, SnapshotState state, String reason, Version version, long startTime, long endTime, - int totalShard, int successfulShards, ImmutableList shardFailures) { - assert name != null; - assert indices != null; - assert state != null; - assert shardFailures != null; - this.name = name; - this.indices = indices; - this.state = state; - this.reason = reason; - this.version = version; - this.startTime = startTime; - this.endTime = endTime; - this.totalShard = totalShard; - this.successfulShards = successfulShards; - this.shardFailures = shardFailures; - } - - /** - * {@inheritDoc} - */ - @Override - public String name() { - return name; - } - - /** - * {@inheritDoc} - */ - @Override - public SnapshotState state() { - return state; - } - - /** - * {@inheritDoc} - */ - @Override - public String reason() { - return reason; - } - - /** - * {@inheritDoc} - */ - @Override - public Version version() { - return version; - } - - /** - * {@inheritDoc} - */ - @Override - public ImmutableList indices() { - return indices; - } - - /** - * {@inheritDoc} - */ - @Override - public long startTime() { - return startTime; - } - - /** - * {@inheritDoc} - */ - @Override - public long endTime() { - return endTime; - } - - /** - * {@inheritDoc} - */ - @Override - public int totalShard() { - return totalShard; - } - - /** - * {@inheritDoc} - */ - @Override - public int successfulShards() { - return successfulShards; - } - - /** - * {@inheritDoc} - */ - @Override - public ImmutableList shardFailures() { - return shardFailures; - } - - /** - * Creates new BlobStoreSnapshot builder - * - * @return - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Compares two snapshots by their start time - * - * @param o other snapshot - * @return the value {@code 0} if snapshots were created at the same time; - * a value less than {@code 0} if this snapshot was created before snapshot {@code o}; and - * a value greater than {@code 0} if this snapshot was created after snapshot {@code o}; - */ - @Override - public int compareTo(Snapshot o) { - return Long.compare(startTime, ((BlobStoreSnapshot) o).startTime); - } - - /** - * BlobStoreSnapshot builder - */ - public static class Builder { - - private String name; - - private Version version = Version.CURRENT; - - private SnapshotState state = SnapshotState.IN_PROGRESS; - - private String reason; - - private ImmutableList indices; - - private long startTime; - - private long endTime; - - private int totalShard; - - private int successfulShards; - - private ImmutableList shardFailures = ImmutableList.of(); - - /** - * Copies data from another snapshot into the builder - * - * @param snapshot another snapshot - * @return this builder - */ - public Builder snapshot(BlobStoreSnapshot snapshot) { - name = snapshot.name(); - indices = snapshot.indices(); - version = snapshot.version(); - reason = snapshot.reason(); - state = snapshot.state(); - startTime = snapshot.startTime(); - endTime = snapshot.endTime(); - totalShard = snapshot.totalShard(); - successfulShards = snapshot.successfulShards(); - shardFailures = snapshot.shardFailures(); - return this; - } - - /** - * Sets snapshot name - * - * @param name snapshot name - * @return this builder - */ - public Builder name(String name) { - this.name = name; - return this; - } - - /** - * Sets list of indices in the snapshot - * - * @param indices list of indices - * @return this builder - */ - public Builder indices(Collection indices) { - this.indices = ImmutableList.copyOf(indices); - return this; - } - - /** - * Sets list of indices in the snapshot - * - * @param indices list of indices - * @return this builder - */ - public Builder indices(String[] indices) { - this.indices = ImmutableList.copyOf(indices); - return this; - } - - /** - * Sets snapshot state - * - * @param state snapshot state - * @return this builder - */ - public Builder state(SnapshotState state) { - this.state = state; - return this; - } - - /** - * Sets snapshot failure reason - * - * @param reason snapshot failure reason - * @return this builder - */ - public Builder reason(String reason) { - this.reason = reason; - return this; - } - - /** - * Marks snapshot as successful - * - * @return this builder - */ - public Builder success() { - this.state = SnapshotState.SUCCESS; - return this; - } - - /** - * Marks snapshot as partially successful - * - * @return this builder - */ - public Builder partial() { - this.state = SnapshotState.PARTIAL; - return this; - } - - /** - * Marks snapshot as failed and saves failure reason - * - * @param reason failure reason - * @return this builder - */ - public Builder failed(String reason) { - this.state = SnapshotState.FAILED; - this.reason = reason; - return this; - } - - /** - * Sets version of Elasticsearch that created this snapshot - * - * @param version version - * @return this builder - */ - public Builder version(Version version) { - this.version = version; - return this; - } - - /** - * Sets snapshot start time - * - * @param startTime snapshot start time - * @return this builder - */ - public Builder startTime(long startTime) { - this.startTime = startTime; - return this; - } - - /** - * Sets snapshot end time - * - * @param endTime snapshot end time - * @return this builder - */ - public Builder endTime(long endTime) { - this.endTime = endTime; - return this; - } - - /** - * Sets total number of shards across all snapshot indices - * - * @param totalShard number of shards - * @return this builder - */ - public Builder totalShard(int totalShard) { - this.totalShard = totalShard; - return this; - } - - /** - * Sets total number fo shards that were successfully snapshotted - * - * @param successfulShards number of successful shards - * @return this builder - */ - public Builder successfulShards(int successfulShards) { - this.successfulShards = successfulShards; - return this; - } - - /** - * Sets the list of individual shard failures - * - * @param shardFailures list of shard failures - * @return this builder - */ - public Builder shardFailures(ImmutableList shardFailures) { - this.shardFailures = shardFailures; - return this; - } - - /** - * Sets the total number of shards and the list of individual shard failures - * - * @param totalShard number of shards - * @param shardFailures list of shard failures - * @return this builder - */ - public Builder failures(int totalShard, ImmutableList shardFailures) { - this.totalShard = totalShard; - this.successfulShards = totalShard - shardFailures.size(); - this.shardFailures = shardFailures; - return this; - } - - /** - * Builds new BlobStoreSnapshot - * - * @return - */ - public BlobStoreSnapshot build() { - return new BlobStoreSnapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures); - } - - static final class Fields { - static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); - static final XContentBuilderString NAME = new XContentBuilderString("name"); - static final XContentBuilderString VERSION_ID = new XContentBuilderString("version_id"); - static final XContentBuilderString INDICES = new XContentBuilderString("indices"); - static final XContentBuilderString STATE = new XContentBuilderString("state"); - static final XContentBuilderString REASON = new XContentBuilderString("reason"); - static final XContentBuilderString START_TIME = new XContentBuilderString("start_time"); - static final XContentBuilderString END_TIME = new XContentBuilderString("end_time"); - static final XContentBuilderString TOTAL_SHARDS = new XContentBuilderString("total_shards"); - static final XContentBuilderString SUCCESSFUL_SHARDS = new XContentBuilderString("successful_shards"); - static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); - } - - /** - * Serializes the snapshot - * - * @param snapshot snapshot to be serialized - * @param builder XContent builder - * @param params serialization parameters - * @throws IOException - */ - public static void toXContent(BlobStoreSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(Fields.SNAPSHOT); - builder.field(Fields.NAME, snapshot.name); - builder.field(Fields.VERSION_ID, snapshot.version.id); - builder.startArray(Fields.INDICES); - for (String index : snapshot.indices) { - builder.value(index); - } - builder.endArray(); - builder.field(Fields.STATE, snapshot.state); - if (snapshot.reason != null) { - builder.field(Fields.REASON, snapshot.reason); - } - builder.field(Fields.START_TIME, snapshot.startTime); - builder.field(Fields.END_TIME, snapshot.endTime); - builder.field(Fields.TOTAL_SHARDS, snapshot.totalShard); - builder.field(Fields.SUCCESSFUL_SHARDS, snapshot.successfulShards); - builder.startArray(Fields.FAILURES); - for (SnapshotShardFailure shardFailure : snapshot.shardFailures) { - SnapshotShardFailure.toXContent(shardFailure, builder, params); - } - builder.endArray(); - builder.endObject(); - } - - /** - * Parses the snapshot - * - * @param parser XContent parser - * @return snapshot - * @throws IOException - */ - public static BlobStoreSnapshot fromXContent(XContentParser parser) throws IOException { - Builder builder = new Builder(); - - XContentParser.Token token = parser.currentToken(); - if (token == XContentParser.Token.START_OBJECT) { - String currentFieldName = parser.currentName(); - if ("snapshot".equals(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - token = parser.nextToken(); - if (token.isValue()) { - if ("name".equals(currentFieldName)) { - builder.name(parser.text()); - } else if ("state".equals(currentFieldName)) { - builder.state(SnapshotState.valueOf(parser.text())); - } else if ("reason".equals(currentFieldName)) { - builder.reason(parser.text()); - } else if ("start_time".equals(currentFieldName)) { - builder.startTime(parser.longValue()); - } else if ("end_time".equals(currentFieldName)) { - builder.endTime(parser.longValue()); - } else if ("total_shards".equals(currentFieldName)) { - builder.totalShard(parser.intValue()); - } else if ("successful_shards".equals(currentFieldName)) { - builder.successfulShards(parser.intValue()); - } else if ("version_id".equals(currentFieldName)) { - builder.version(Version.fromId(parser.intValue())); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if ("indices".equals(currentFieldName)) { - ArrayList indices = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - indices.add(parser.text()); - } - builder.indices(indices); - } else if ("failures".equals(currentFieldName)) { - ArrayList failures = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - failures.add(SnapshotShardFailure.fromXContent(parser)); - } - builder.shardFailures(ImmutableList.copyOf(failures)); - } else { - // It was probably created by newer version - ignoring - parser.skipChildren(); - } - } else if (token == XContentParser.Token.START_OBJECT) { - // It was probably created by newer version - ignoring - parser.skipChildren(); - } - } - } - } - } - return builder.build(); - } - } -} diff --git a/src/main/java/org/elasticsearch/snapshots/Snapshot.java b/src/main/java/org/elasticsearch/snapshots/Snapshot.java index 80d65710948..365f88a0048 100644 --- a/src/main/java/org/elasticsearch/snapshots/Snapshot.java +++ b/src/main/java/org/elasticsearch/snapshots/Snapshot.java @@ -21,52 +21,135 @@ package org.elasticsearch.snapshots; import com.google.common.collect.ImmutableList; import org.elasticsearch.Version; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; /** * Represent information about snapshot */ -public interface Snapshot extends Comparable { +public class Snapshot implements Comparable, ToXContent { + + private final String name; + + private final Version version; + + private final SnapshotState state; + + private final String reason; + + private final ImmutableList indices; + + private final long startTime; + + private final long endTime; + + private final int totalShard; + + private final int successfulShards; + + private final ImmutableList shardFailures; + + private final static ImmutableList NO_FAILURES = ImmutableList.of(); + + private Snapshot(String name, ImmutableList indices, SnapshotState state, String reason, Version version, long startTime, long endTime, + int totalShard, int successfulShards, ImmutableList shardFailures) { + assert name != null; + assert indices != null; + assert state != null; + assert shardFailures != null; + this.name = name; + this.indices = indices; + this.state = state; + this.reason = reason; + this.version = version; + this.startTime = startTime; + this.endTime = endTime; + this.totalShard = totalShard; + this.successfulShards = successfulShards; + this.shardFailures = shardFailures; + } + + + public Snapshot(String name, ImmutableList indices, long startTime) { + this(name, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0, NO_FAILURES); + } + + public Snapshot(String name, ImmutableList indices, long startTime, String reason, long endTime, + int totalShard, ImmutableList shardFailures) { + this(name, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT, + startTime, endTime, totalShard, totalShard - shardFailures.size(), shardFailures); + } + + private static SnapshotState snapshotState(String reason, ImmutableList shardFailures) { + if (reason == null) { + if (shardFailures.isEmpty()) { + return SnapshotState.SUCCESS; + } else { + return SnapshotState.PARTIAL; + } + } else { + return SnapshotState.FAILED; + } + } + /** * Returns snapshot name * * @return snapshot name */ - String name(); + public String name() { + return name; + } /** * Returns current snapshot state * * @return snapshot state */ - SnapshotState state(); + public SnapshotState state() { + return state; + } /** * Returns reason for complete snapshot failure * * @return snapshot failure reason */ - String reason(); + public String reason() { + return reason; + } /** * Returns version of Elasticsearch that was used to create this snapshot * * @return Elasticsearch version */ - Version version(); + public Version version() { + return version; + } /** * Returns indices that were included into this snapshot * * @return list of indices */ - ImmutableList indices(); + public ImmutableList indices() { + return indices; + } /** * Returns time when snapshot started * * @return snapshot start time */ - long startTime(); + public long startTime() { + return startTime; + } /** * Returns time when snapshot ended @@ -75,27 +158,175 @@ public interface Snapshot extends Comparable { * * @return snapshot end time */ - long endTime(); + public long endTime() { + return endTime; + } /** * Returns total number of shards that were snapshotted * * @return number of shards */ - int totalShard(); + public int totalShard() { + return totalShard; + } /** * Returns total number of shards that were successfully snapshotted * * @return number of successful shards */ - int successfulShards(); + public int successfulShards() { + return successfulShards; + } /** * Returns shard failures - * - * @return shard failures */ - ImmutableList shardFailures(); + public ImmutableList shardFailures() { + return shardFailures; + } + + /** + * Compares two snapshots by their start time + * + * @param o other snapshot + * @return the value {@code 0} if snapshots were created at the same time; + * a value less than {@code 0} if this snapshot was created before snapshot {@code o}; and + * a value greater than {@code 0} if this snapshot was created after snapshot {@code o}; + */ + @Override + public int compareTo(Snapshot o) { + return Long.compare(startTime, o.startTime); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Snapshot that = (Snapshot) o; + + if (startTime != that.startTime) return false; + if (!name.equals(that.name)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + (int) (startTime ^ (startTime >>> 32)); + return result; + } + + static final class Fields { + static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); + static final XContentBuilderString NAME = new XContentBuilderString("name"); + static final XContentBuilderString VERSION_ID = new XContentBuilderString("version_id"); + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString STATE = new XContentBuilderString("state"); + static final XContentBuilderString REASON = new XContentBuilderString("reason"); + static final XContentBuilderString START_TIME = new XContentBuilderString("start_time"); + static final XContentBuilderString END_TIME = new XContentBuilderString("end_time"); + static final XContentBuilderString TOTAL_SHARDS = new XContentBuilderString("total_shards"); + static final XContentBuilderString SUCCESSFUL_SHARDS = new XContentBuilderString("successful_shards"); + static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); + } + + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.SNAPSHOT); + builder.field(Fields.NAME, name); + builder.field(Fields.VERSION_ID, version.id); + builder.startArray(Fields.INDICES); + for (String index : indices) { + builder.value(index); + } + builder.endArray(); + builder.field(Fields.STATE, state); + if (reason != null) { + builder.field(Fields.REASON, reason); + } + builder.field(Fields.START_TIME, startTime); + builder.field(Fields.END_TIME, endTime); + builder.field(Fields.TOTAL_SHARDS, totalShard); + builder.field(Fields.SUCCESSFUL_SHARDS, successfulShards); + builder.startArray(Fields.FAILURES); + for (SnapshotShardFailure shardFailure : shardFailures) { + SnapshotShardFailure.toXContent(shardFailure, builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + + public static Snapshot fromXContent(XContentParser parser) throws IOException { + String name = null; + Version version = Version.CURRENT; + SnapshotState state = SnapshotState.IN_PROGRESS; + String reason = null; + ImmutableList indices = ImmutableList.of(); + long startTime = 0; + long endTime = 0; + int totalShard = 0; + int successfulShards = 0; + ImmutableList shardFailures = NO_FAILURES; + + XContentParser.Token token = parser.currentToken(); + if (token == XContentParser.Token.START_OBJECT) { + String currentFieldName = parser.currentName(); + if ("snapshot".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token.isValue()) { + if ("name".equals(currentFieldName)) { + name = parser.text(); + } else if ("state".equals(currentFieldName)) { + state = SnapshotState.valueOf(parser.text()); + } else if ("reason".equals(currentFieldName)) { + reason = parser.text(); + } else if ("start_time".equals(currentFieldName)) { + startTime = parser.longValue(); + } else if ("end_time".equals(currentFieldName)) { + endTime = parser.longValue(); + } else if ("total_shards".equals(currentFieldName)) { + totalShard = parser.intValue(); + } else if ("successful_shards".equals(currentFieldName)) { + successfulShards = parser.intValue(); + } else if ("version_id".equals(currentFieldName)) { + version = Version.fromId(parser.intValue()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("indices".equals(currentFieldName)) { + ArrayList indicesArray = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + indicesArray.add(parser.text()); + } + indices = ImmutableList.copyOf(indicesArray); + } else if ("failures".equals(currentFieldName)) { + ArrayList shardFailureArrayList = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + shardFailureArrayList.add(SnapshotShardFailure.fromXContent(parser)); + } + shardFailures = ImmutableList.copyOf(shardFailureArrayList); + } else { + // It was probably created by newer version - ignoring + parser.skipChildren(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + // It was probably created by newer version - ignoring + parser.skipChildren(); + } + } + } + } + } + return new Snapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures); + } } diff --git a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 44474749fa3..f7701c2ea0b 100644 --- a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -132,6 +132,10 @@ public class SnapshotsService extends AbstractLifecycleComponent entries = currentSnapshots(snapshotId.getRepository(), new String[] {snapshotId.getSnapshot()}); + if (!entries.isEmpty()) { + return inProgressSnapshot(entries.iterator().next()); + } return repositoriesService.repository(snapshotId.getRepository()).readSnapshot(snapshotId); } @@ -142,12 +146,17 @@ public class SnapshotsService extends AbstractLifecycleComponent snapshots(String repositoryName) { - ArrayList snapshotList = newArrayList(); + Set snapshotSet = newHashSet(); + ImmutableList entries = currentSnapshots(repositoryName, null); + for (SnapshotMetaData.Entry entry : entries) { + snapshotSet.add(inProgressSnapshot(entry)); + } Repository repository = repositoriesService.repository(repositoryName); ImmutableList snapshotIds = repository.snapshots(); for (SnapshotId snapshotId : snapshotIds) { - snapshotList.add(repository.readSnapshot(snapshotId)); + snapshotSet.add(repository.readSnapshot(snapshotId)); } + ArrayList snapshotList = newArrayList(snapshotSet); CollectionUtil.timSort(snapshotList); return ImmutableList.copyOf(snapshotList); } @@ -178,7 +187,7 @@ public class SnapshotsService extends AbstractLifecycleComponent indices = ImmutableList.copyOf(metaData.concreteIndices(request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices); - newSnapshot = new SnapshotMetaData.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, null); + newSnapshot = new SnapshotMetaData.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null); snapshots = new SnapshotMetaData(newSnapshot); } else { // TODO: What should we do if a snapshot is already running? @@ -297,17 +306,17 @@ public class SnapshotsService extends AbstractLifecycleComponent shards = shards(snapshot.snapshotId(), currentState, snapshot.indices()); + ImmutableMap shards = shards(entry.snapshotId(), currentState, entry.indices()); if (!partial) { Set indicesWithMissingShards = indicesWithMissingShards(shards); if (indicesWithMissingShards != null) { - updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.FAILED, snapshot.indices(), shards); + updatedSnapshot = new SnapshotMetaData.Entry(entry, State.FAILED, shards); entries.add(updatedSnapshot); failure = "Indices don't have primary shards +[" + indicesWithMissingShards + "]"; continue; } } - updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.STARTED, snapshot.indices(), shards); + updatedSnapshot = new SnapshotMetaData.Entry(entry, State.STARTED, shards); entries.add(updatedSnapshot); if (!completed(shards.values())) { accepted = true; @@ -325,7 +334,8 @@ public class SnapshotsService extends AbstractLifecycleComponentof()); + repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot( + snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(), ExceptionsHelper.detailedMessage(t), 0, ImmutableList.of()); } catch (Throwable t2) { logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId()); } @@ -354,7 +364,8 @@ public class SnapshotsService extends AbstractLifecycleComponentof()); + repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(), + ExceptionsHelper.detailedMessage(t), 0, ImmutableList.of()); } catch (Throwable t2) { logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId()); } @@ -363,6 +374,10 @@ public class SnapshotsService extends AbstractLifecycleComponent @@ -556,10 +571,10 @@ public class SnapshotsService extends AbstractLifecycleComponent shardsMap = shards.build(); if (!snapshot.state().completed() && completed(shardsMap.values())) { - updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.SUCCESS, snapshot.indices(), shardsMap); + updatedSnapshot = new SnapshotMetaData.Entry(snapshot, State.SUCCESS, shardsMap); endSnapshot(updatedSnapshot); } else { - updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), snapshot.state(), snapshot.indices(), shardsMap); + updatedSnapshot = new SnapshotMetaData.Entry(snapshot, snapshot.state(), shardsMap); } } entries.add(updatedSnapshot); @@ -616,10 +631,10 @@ public class SnapshotsService extends AbstractLifecycleComponent generatedBlobs = newHashMap(); + for (int i = 0; i < numberOfFooBlobs; i++) { + int length = randomIntBetween(10, 100); + String name = "foo-" + i + "-"; + generatedBlobs.put(name, (long) length); + createRandomBlob(container, name, length); + } + for (int i = 1; i < numberOfBarBlobs; i++) { + int length = randomIntBetween(10, 100); + String name = "bar-" + i + "-"; + generatedBlobs.put(name, (long) length); + createRandomBlob(container, name, length); + } + int length = randomIntBetween(10, 100); + String name = "bar-0-"; + generatedBlobs.put(name, (long) length); + byte[] data = createRandomBlob(container, name, length); + + ImmutableMap blobs = container.listBlobs(); + assertThat(blobs.size(), equalTo(numberOfFooBlobs + numberOfBarBlobs)); + for (Map.Entry generated : generatedBlobs.entrySet()) { + BlobMetaData blobMetaData = blobs.get(generated.getKey()); + assertThat(generated.getKey(), blobMetaData, notNullValue()); + assertThat(blobMetaData.name(), equalTo(generated.getKey())); + assertThat(blobMetaData.length(), equalTo(generated.getValue())); + } + + assertThat(container.listBlobsByPrefix("foo-").size(), equalTo(numberOfFooBlobs)); + assertThat(container.listBlobsByPrefix("bar-").size(), equalTo(numberOfBarBlobs)); + assertThat(container.listBlobsByPrefix("baz-").size(), equalTo(0)); + + String newName = "bar-new"; + // Move to a new location + container.move(name, newName); + assertThat(container.listBlobsByPrefix(name).size(), equalTo(0)); + blobs = container.listBlobsByPrefix(newName); + assertThat(blobs.size(), equalTo(1)); + assertThat(blobs.get(newName).length(), equalTo(generatedBlobs.get(name))); + assertThat(data, equalTo(readBlobFully(container, newName, length))); + store.close(); + } + + protected byte[] createRandomBlob(BlobContainer container, String name, int length) throws IOException { + byte[] data = randomBytes(length); + try (OutputStream stream = container.createOutput(name)) { + stream.write(data); + } + return data; + } + + protected byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException { + byte[] data = new byte[length]; + try (InputStream inputStream = container.openInput(name)) { + assertThat(inputStream.read(data), equalTo(length)); + assertThat(inputStream.read(), equalTo(-1)); + } + return data; + } + + protected byte[] randomBytes(int length) { + byte[] data = new byte[length]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) randomInt(); + } + return data; + } + protected BlobStore newBlobStore() throws IOException { Path tempDir = newTempDir(LifecycleScope.TEST).toPath(); Settings settings = randomBoolean() ? ImmutableSettings.EMPTY : ImmutableSettings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build(); diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index 2949f109d6f..87df7a825a1 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -308,10 +308,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); logger.info("--> creating repository"); + File repo = newTempDir(LifecycleScope.TEST); PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType(MockRepositoryModule.class.getCanonicalName()).setSettings( ImmutableSettings.settingsBuilder() - .put("location", newTempDir(LifecycleScope.TEST)) + .put("location", repo) .put("random", randomAsciiOfLength(10)) .put("wait_after_unblock", 200) ).get(); @@ -322,6 +323,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests // Remove it from the list of available nodes nodes.remove(blockedNode); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); logger.info("--> snapshot"); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); @@ -347,6 +349,8 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests logger.info("--> making sure that snapshot no longer exists"); assertThrows(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute(), SnapshotMissingException.class); + // Subtract index file from the count + assertThat("not all files were deleted during snapshot cancellation", numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 1)); logger.info("--> done"); } diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 3bed9527692..8949278ceb5 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -682,7 +682,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(3)); assertThat(createSnapshotResponse.getSnapshotInfo().reason(), startsWith("Indices don't have primary shards")); } diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 17c92a1c271..4fe92362132 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -81,4 +81,9 @@ public class BlobContainerWrapper implements BlobContainer { public ImmutableMap listBlobsByPrefix(String blobNamePrefix) throws IOException { return delegate.listBlobsByPrefix(blobNamePrefix); } + + @Override + public void move(String sourceBlobName, String targetBlobName) throws IOException { + delegate.move(sourceBlobName, targetBlobName); + } } diff --git a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 1e8f0b24da0..97b0b50259e 100644 --- a/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -291,6 +291,12 @@ public class MockRepository extends FsRepository { return super.listBlobsByPrefix(blobNamePrefix); } + @Override + public void move(String sourceBlob, String targetBlob) throws IOException { + maybeIOExceptionOrBlock(targetBlob); + super.move(sourceBlob, targetBlob); + } + @Override public OutputStream createOutput(String blobName) throws IOException { maybeIOExceptionOrBlock(blobName);