Snapshot/Restore: switch to write once mode for snapshot metadata files

This commit removes creation of in-progress snapshot file and makes creation of the final snapshot file atomic.

Fixes #8696
This commit is contained in:
Igor Motov 2014-12-04 10:56:38 -05:00
parent 0bab17ffde
commit 0b024ad2f3
15 changed files with 509 additions and 648 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -66,12 +67,14 @@ public class SnapshotMetaData implements MetaData.Custom {
private final ImmutableMap<ShardId, ShardSnapshotStatus> shards; private final ImmutableMap<ShardId, ShardSnapshotStatus> shards;
private final ImmutableList<String> indices; private final ImmutableList<String> indices;
private final ImmutableMap<String, ImmutableList<ShardId>> waitingIndices; private final ImmutableMap<String, ImmutableList<ShardId>> waitingIndices;
private final long startTime;
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, ImmutableMap<ShardId, ShardSnapshotStatus> shards) { public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, ImmutableList<String> indices, long startTime, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state; this.state = state;
this.snapshotId = snapshotId; this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState; this.includeGlobalState = includeGlobalState;
this.indices = indices; this.indices = indices;
this.startTime = startTime;
if (shards == null) { if (shards == null) {
this.shards = ImmutableMap.of(); this.shards = ImmutableMap.of();
this.waitingIndices = ImmutableMap.of(); this.waitingIndices = ImmutableMap.of();
@ -81,6 +84,14 @@ public class SnapshotMetaData implements MetaData.Custom {
} }
} }
public Entry(Entry entry, State state, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
}
public Entry(Entry entry, ImmutableMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
}
public SnapshotId snapshotId() { public SnapshotId snapshotId() {
return this.snapshotId; return this.snapshotId;
} }
@ -105,6 +116,10 @@ public class SnapshotMetaData implements MetaData.Custom {
return includeGlobalState; return includeGlobalState;
} }
public long startTime() {
return startTime;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
@ -113,10 +128,12 @@ public class SnapshotMetaData implements MetaData.Custom {
Entry entry = (Entry) o; Entry entry = (Entry) o;
if (includeGlobalState != entry.includeGlobalState) return false; if (includeGlobalState != entry.includeGlobalState) return false;
if (startTime != entry.startTime) return false;
if (!indices.equals(entry.indices)) return false; if (!indices.equals(entry.indices)) return false;
if (!shards.equals(entry.shards)) return false; if (!shards.equals(entry.shards)) return false;
if (!snapshotId.equals(entry.snapshotId)) return false; if (!snapshotId.equals(entry.snapshotId)) return false;
if (state != entry.state) return false; if (state != entry.state) return false;
if (!waitingIndices.equals(entry.waitingIndices)) return false;
return true; return true;
} }
@ -128,6 +145,8 @@ public class SnapshotMetaData implements MetaData.Custom {
result = 31 * result + (includeGlobalState ? 1 : 0); result = 31 * result + (includeGlobalState ? 1 : 0);
result = 31 * result + shards.hashCode(); result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode(); result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
result = 31 * result + (int) (startTime ^ (startTime >>> 32));
return result; return result;
} }
@ -331,7 +350,8 @@ public class SnapshotMetaData implements MetaData.Custom {
for (int j = 0; j < indices; j++) { for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString()); indexBuilder.add(in.readString());
} }
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.<ShardId, ShardSnapshotStatus>builder(); long startTime = in.readLong();
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.builder();
int shards = in.readVInt(); int shards = in.readVInt();
for (int j = 0; j < shards; j++) { for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in); ShardId shardId = ShardId.readShardId(in);
@ -339,7 +359,7 @@ public class SnapshotMetaData implements MetaData.Custom {
State shardState = State.fromValue(in.readByte()); State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState)); builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
} }
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), builder.build()); entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), startTime, builder.build());
} }
return new SnapshotMetaData(entries); return new SnapshotMetaData(entries);
} }
@ -355,6 +375,7 @@ public class SnapshotMetaData implements MetaData.Custom {
for (String index : entry.indices()) { for (String index : entry.indices()) {
out.writeString(index); out.writeString(index);
} }
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size()); out.writeVInt(entry.shards().size());
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) { for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
shardEntry.getKey().writeTo(out); shardEntry.getKey().writeTo(out);
@ -369,9 +390,24 @@ public class SnapshotMetaData implements MetaData.Custom {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
static final class Fields {
static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository");
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString SHARD = new XContentBuilderString("shard");
static final XContentBuilderString NODE = new XContentBuilderString("node");
}
@Override @Override
public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { public void toXContent(SnapshotMetaData customIndexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots"); builder.startArray(Fields.SNAPSHOTS);
for (Entry entry : customIndexMetaData.entries()) { for (Entry entry : customIndexMetaData.entries()) {
toXContent(entry, builder, params); toXContent(entry, builder, params);
} }
@ -380,33 +416,33 @@ public class SnapshotMetaData implements MetaData.Custom {
public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field("repository", entry.snapshotId().getRepository()); builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
builder.field("snapshot", entry.snapshotId().getSnapshot()); builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
builder.field("include_global_state", entry.includeGlobalState()); builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field("state", entry.state()); builder.field(Fields.STATE, entry.state());
builder.startArray("indices"); builder.startArray(Fields.INDICES);
{ {
for (String index : entry.indices()) { for (String index : entry.indices()) {
builder.value(index); builder.value(index);
} }
} }
builder.endArray(); builder.endArray();
builder.startArray("shards"); builder.timeValueField(Fields.START_TIME_MILLIS, Fields.START_TIME, entry.startTime());
builder.startArray(Fields.SHARDS);
{ {
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards.entrySet()) { for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey(); ShardId shardId = shardEntry.getKey();
ShardSnapshotStatus status = shardEntry.getValue(); ShardSnapshotStatus status = shardEntry.getValue();
builder.startObject(); builder.startObject();
{ {
builder.field("index", shardId.getIndex()); builder.field(Fields.INDEX, shardId.getIndex());
builder.field("shard", shardId.getId()); builder.field(Fields.SHARD, shardId.getId());
builder.field("state", status.state()); builder.field(Fields.STATE, status.state());
builder.field("node", status.nodeId()); builder.field(Fields.NODE, status.nodeId());
} }
builder.endObject(); builder.endObject();
} }
} }
builder.endArray(); builder.endArray();
builder.endObject(); builder.endObject();
} }

View File

@ -51,13 +51,35 @@ public interface BlobContainer {
*/ */
OutputStream createOutput(String blobName) throws IOException; OutputStream createOutput(String blobName) throws IOException;
/**
* Deletes a blob with giving name.
*
* If blob exist but cannot be deleted an exception has to be thrown.
*/
void deleteBlob(String blobName) throws IOException; void deleteBlob(String blobName) throws IOException;
/**
* Deletes all blobs in the container that match the specified prefix.
*/
void deleteBlobsByPrefix(String blobNamePrefix) throws IOException; void deleteBlobsByPrefix(String blobNamePrefix) throws IOException;
/**
* Deletes all blobs in the container that match the supplied filter.
*/
void deleteBlobsByFilter(BlobNameFilter filter) throws IOException; void deleteBlobsByFilter(BlobNameFilter filter) throws IOException;
/**
* Lists all blobs in the container
*/
ImmutableMap<String, BlobMetaData> listBlobs() throws IOException; ImmutableMap<String, BlobMetaData> listBlobs() throws IOException;
/**
* Lists all blobs in the container that match specified prefix
*/
ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException; ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;
/**
* Atomically renames source blob into target blob
*/
void move(String sourceBlobName, String targetBlobName) throws IOException;
} }

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.blobstore.fs;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
@ -30,8 +29,7 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import java.io.*; import java.io.*;
import java.nio.file.Files; import java.nio.file.*;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
/** /**
@ -99,4 +97,15 @@ public class FsBlobContainer extends AbstractBlobContainer {
} }
}, blobStore.bufferSizeInBytes()); }, blobStore.bufferSizeInBytes());
} }
@Override
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
Path targetPath = path.resolve(target);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
assert !Files.exists(targetPath);
Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
IOUtils.fsync(path, true);
}
} }

View File

@ -69,6 +69,11 @@ public class URLBlobContainer extends AbstractBlobContainer {
throw new UnsupportedOperationException("URL repository doesn't support this operation"); throw new UnsupportedOperationException("URL repository doesn't support this operation");
} }
@Override
public void move(String from, String to) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
/** /**
* This operation is not supported by URLBlobContainer * This operation is not supported by URLBlobContainer
*/ */

View File

@ -240,7 +240,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint(); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
builder.flush(); builder.flush();
builder.close();
} }
/** /**
@ -510,14 +509,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// now create and write the commit point // now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
String commitPointName = snapshotBlobName(snapshotId); String snapshotBlobName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time. //TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try (OutputStream output = blobContainer.createOutput(commitPointName)) { try (OutputStream output = blobContainer.createOutput(snapshotBlobName)) {
writeSnapshot(snapshot, output); writeSnapshot(snapshot, output);
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);

View File

@ -22,8 +22,6 @@ import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotShardFailure;
@ -41,7 +39,7 @@ import java.io.IOException;
* <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, com.google.common.collect.ImmutableList, org.elasticsearch.cluster.metadata.MetaData)} * <li>Master calls {@link #initializeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, com.google.common.collect.ImmutableList, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li> * with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li> * <li>Data nodes call {@link org.elasticsearch.index.snapshots.IndexShardRepository#snapshot(org.elasticsearch.cluster.metadata.SnapshotId, org.elasticsearch.index.shard.ShardId, org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit, org.elasticsearch.index.snapshots.IndexShardSnapshotStatus)} for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot(org.elasticsearch.cluster.metadata.SnapshotId, String, int, com.google.common.collect.ImmutableList)} * <li>When all shard calls return master calls {@link #finalizeSnapshot}
* with possible list of failures</li> * with possible list of failures</li>
* </ul> * </ul>
*/ */
@ -93,7 +91,7 @@ public interface Repository extends LifecycleComponent<Repository> {
* @param shardFailures list of shard failures * @param shardFailures list of shard failures
* @return snapshot description * @return snapshot description
*/ */
Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures); Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, long startTime, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures);
/** /**
* Deletes snapshot * Deletes snapshot
@ -115,7 +113,7 @@ public interface Repository extends LifecycleComponent<Repository> {
/** /**
* Verifies repository on the master node and returns the verification token. * Verifies repository on the master node and returns the verification token.
* * <p/>
* If the verification token is not null, it's passed to all data nodes for verification. If it's null - no * If the verification token is not null, it's passed to all data nodes for verification. If it's null - no
* additional verification is required * additional verification is required
* *
@ -125,7 +123,7 @@ public interface Repository extends LifecycleComponent<Repository> {
/** /**
* Called at the end of repository verification process. * Called at the end of repository verification process.
* * <p/>
* This method should perform all necessary cleanup of the temporary files created in the repository * This method should perform all necessary cleanup of the temporary files created in the repository
* *
* @param verificationToken verification request generated by {@link #startVerification} command * @param verificationToken verification request generated by {@link #startVerification} command

View File

@ -62,9 +62,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
@ -80,9 +78,9 @@ import static com.google.common.collect.Lists.newArrayList;
* {@code * {@code
* STORE_ROOT * STORE_ROOT
* |- index - list of all snapshot name as JSON array * |- index - list of all snapshot name as JSON array
* |- snapshot-20131010 - JSON serialized BlobStoreSnapshot for snapshot "20131010" * |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010"
* |- metadata-20131010 - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) * |- metadata-20131010 - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
* |- snapshot-20131011 - JSON serialized BlobStoreSnapshot for snapshot "20131011" * |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011"
* |- metadata-20131011 - JSON serialized MetaData for snapshot "20131011" * |- metadata-20131011 - JSON serialized MetaData for snapshot "20131011"
* ..... * .....
* |- indices/ - data for all indices * |- indices/ - data for all indices
@ -118,6 +116,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private static final String SNAPSHOT_PREFIX = "snapshot-"; private static final String SNAPSHOT_PREFIX = "snapshot-";
private static final String TEMP_SNAPSHOT_FILE_PREFIX = "pending-";
private static final String SNAPSHOTS_FILE = "index"; private static final String SNAPSHOTS_FILE = "index";
private static final String TESTS_FILE = "tests-"; private static final String TESTS_FILE = "tests-";
@ -224,19 +224,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, MetaData metaData) { public void initializeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, MetaData metaData) {
try { try {
BlobStoreSnapshot blobStoreSnapshot = BlobStoreSnapshot.builder()
.name(snapshotId.getSnapshot())
.indices(indices)
.startTime(System.currentTimeMillis())
.build();
String snapshotBlobName = snapshotBlobName(snapshotId); String snapshotBlobName = snapshotBlobName(snapshotId);
if (snapshotsBlobContainer.blobExists(snapshotBlobName)) { if (snapshotsBlobContainer.blobExists(snapshotBlobName)) {
// TODO: Can we make it atomic?
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists"); throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
} }
try (OutputStream output = snapshotsBlobContainer.createOutput(snapshotBlobName)) {
writeSnapshot(blobStoreSnapshot, output);
}
// Write Global MetaData // Write Global MetaData
// TODO: Check if metadata needs to be written // TODO: Check if metadata needs to be written
try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) { try (OutputStream output = snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId))) {
@ -320,42 +311,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
} }
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public Snapshot finalizeSnapshot(SnapshotId snapshotId, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) { public Snapshot finalizeSnapshot(SnapshotId snapshotId, ImmutableList<String> indices, long startTime, String failure, int totalShards, ImmutableList<SnapshotShardFailure> shardFailures) {
BlobStoreSnapshot snapshot = (BlobStoreSnapshot) readSnapshot(snapshotId);
if (snapshot == null) {
throw new SnapshotMissingException(snapshotId);
}
if (snapshot.state().completed()) {
throw new SnapshotException(snapshotId, "snapshot is already closed");
}
try { try {
String tempBlobName = tempSnapshotBlobName(snapshotId);
String blobName = snapshotBlobName(snapshotId); String blobName = snapshotBlobName(snapshotId);
BlobStoreSnapshot.Builder updatedSnapshot = BlobStoreSnapshot.builder().snapshot(snapshot); Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures);
if (failure == null) { try (OutputStream output = snapshotsBlobContainer.createOutput(tempBlobName)) {
if (shardFailures.isEmpty()) { writeSnapshot(blobStoreSnapshot, output);
updatedSnapshot.success();
} else {
updatedSnapshot.partial();
}
updatedSnapshot.failures(totalShards, shardFailures);
} else {
updatedSnapshot.failed(failure);
}
updatedSnapshot.endTime(System.currentTimeMillis());
snapshot = updatedSnapshot.build();
try (OutputStream output = snapshotsBlobContainer.createOutput(blobName)) {
writeSnapshot(snapshot, output);
} }
snapshotsBlobContainer.move(tempBlobName, blobName);
ImmutableList<SnapshotId> snapshotIds = snapshots(); ImmutableList<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) { if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build(); snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
} }
writeSnapshotList(snapshotIds); writeSnapshotList(snapshotIds);
return snapshot; return blobStoreSnapshot;
} catch (IOException ex) { } catch (IOException ex) {
throw new RepositoryException(this.repositoryName, "failed to update snapshot in repository", ex); throw new RepositoryException(this.repositoryName, "failed to update snapshot in repository", ex);
} }
@ -400,29 +375,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
@Override @Override
public Snapshot readSnapshot(SnapshotId snapshotId) { public Snapshot readSnapshot(SnapshotId snapshotId) {
try { try {
String blobName = snapshotBlobName(snapshotId); try (InputStream blob = snapshotsBlobContainer.openInput(snapshotBlobName(snapshotId))) {
int retryCount = 0; return readSnapshot(ByteStreams.toByteArray(blob));
while (true) {
try (InputStream blob = snapshotsBlobContainer.openInput(blobName)) {
byte[] data = ByteStreams.toByteArray(blob);
// Because we are overriding snapshot during finalization, it's possible that
// we can get an empty or incomplete snapshot for a brief moment
// retrying after some what can resolve the issue
// TODO: switch to atomic update after non-local gateways are removed and we switch to java 1.7
try {
return readSnapshot(data);
} catch (ElasticsearchParseException ex) {
if (retryCount++ < 3) {
try {
Thread.sleep(50);
} catch (InterruptedException ex1) {
Thread.currentThread().interrupt();
}
} else {
throw ex;
}
}
}
} }
} catch (FileNotFoundException | NoSuchFileException ex) { } catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex); throw new SnapshotMissingException(snapshotId, ex);
@ -498,13 +452,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @return parsed snapshot description * @return parsed snapshot description
* @throws IOException parse exceptions * @throws IOException parse exceptions
*/ */
private BlobStoreSnapshot readSnapshot(byte[] data) throws IOException { public Snapshot readSnapshot(byte[] data) throws IOException {
try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) { try (XContentParser parser = XContentHelper.createParser(data, 0, data.length)) {
XContentParser.Token token; XContentParser.Token token;
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) { if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
if ((token = parser.nextToken()) == XContentParser.Token.FIELD_NAME) { if ((token = parser.nextToken()) == XContentParser.Token.FIELD_NAME) {
parser.nextToken(); parser.nextToken();
BlobStoreSnapshot snapshot = BlobStoreSnapshot.Builder.fromXContent(parser); Snapshot snapshot = Snapshot.fromXContent(parser);
if ((token = parser.nextToken()) == XContentParser.Token.END_OBJECT) { if ((token = parser.nextToken()) == XContentParser.Token.END_OBJECT) {
return snapshot; return snapshot;
} }
@ -549,6 +503,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
return SNAPSHOT_PREFIX + snapshotId.getSnapshot(); return SNAPSHOT_PREFIX + snapshotId.getSnapshot();
} }
/**
* Returns temporary name of snapshot blob
*
* @param snapshotId snapshot id
* @return name of snapshot blob
*/
private String tempSnapshotBlobName(SnapshotId snapshotId) {
return TEMP_SNAPSHOT_FILE_PREFIX + snapshotId.getSnapshot();
}
/** /**
* Returns name of metadata blob * Returns name of metadata blob
* *
@ -560,22 +524,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
} }
/** /**
* Serializes BlobStoreSnapshot into JSON * Serializes Snapshot into JSON
* *
* @param snapshot - snapshot description * @param snapshot - snapshot description
* @return BytesStreamOutput representing JSON serialized BlobStoreSnapshot * @return BytesStreamOutput representing JSON serialized Snapshot
* @throws IOException * @throws IOException
*/ */
private void writeSnapshot(BlobStoreSnapshot snapshot, OutputStream outputStream) throws IOException { private void writeSnapshot(Snapshot snapshot, OutputStream outputStream) throws IOException {
StreamOutput stream = new OutputStreamStreamOutput(outputStream); StreamOutput stream = new OutputStreamStreamOutput(outputStream);
if (isCompress()) { if (isCompress()) {
stream = CompressorFactory.defaultCompressor().streamOutput(stream); stream = CompressorFactory.defaultCompressor().streamOutput(stream);
} }
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject(); builder.startObject();
BlobStoreSnapshot.Builder.toXContent(snapshot, builder, snapshotOnlyFormatParams); snapshot.toXContent(builder, snapshotOnlyFormatParams);
builder.endObject(); builder.endObject();
builder.close(); builder.flush();
} }
/** /**
@ -594,7 +558,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
builder.startObject(); builder.startObject();
MetaData.Builder.toXContent(metaData, builder, snapshotOnlyFormatParams); MetaData.Builder.toXContent(metaData, builder, snapshotOnlyFormatParams);
builder.endObject(); builder.endObject();
builder.close(); builder.flush();
} }
/** /**
@ -681,9 +645,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
try { try {
String seed = Strings.randomBase64UUID(); String seed = Strings.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed); byte[] testBytes = Strings.toUTF8Bytes(seed);
try (OutputStream outputStream = snapshotsBlobContainer.createOutput(testBlobPrefix(seed) + "-master")) { String blobName = testBlobPrefix(seed) + "-master";
try (OutputStream outputStream = snapshotsBlobContainer.createOutput(blobName + "-temp")) {
outputStream.write(testBytes); outputStream.write(testBytes);
} }
// Make sure that move is supported
snapshotsBlobContainer.move(blobName + "-temp", blobName);
return seed; return seed;
} catch (IOException exp) { } catch (IOException exp) {
throw new RepositoryVerificationException(repositoryName, "path " + basePath() + " is not accessible on master node", exp); throw new RepositoryVerificationException(repositoryName, "path " + basePath() + " is not accessible on master node", exp);

View File

@ -1,514 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
/**
* Immutable snapshot description for BlobStoreRepository
* <p/>
* Stored in the the root of the repository, see {@link BlobStoreRepository} for more information.
*/
public class BlobStoreSnapshot implements Snapshot {
private final String name;
private final Version version;
private final SnapshotState state;
private final String reason;
private final ImmutableList<String> indices;
private final long startTime;
private final long endTime;
private final int totalShard;
private final int successfulShards;
private final ImmutableList<SnapshotShardFailure> shardFailures;
private BlobStoreSnapshot(String name, ImmutableList<String> indices, SnapshotState state, String reason, Version version, long startTime, long endTime,
int totalShard, int successfulShards, ImmutableList<SnapshotShardFailure> shardFailures) {
assert name != null;
assert indices != null;
assert state != null;
assert shardFailures != null;
this.name = name;
this.indices = indices;
this.state = state;
this.reason = reason;
this.version = version;
this.startTime = startTime;
this.endTime = endTime;
this.totalShard = totalShard;
this.successfulShards = successfulShards;
this.shardFailures = shardFailures;
}
/**
* {@inheritDoc}
*/
@Override
public String name() {
return name;
}
/**
* {@inheritDoc}
*/
@Override
public SnapshotState state() {
return state;
}
/**
* {@inheritDoc}
*/
@Override
public String reason() {
return reason;
}
/**
* {@inheritDoc}
*/
@Override
public Version version() {
return version;
}
/**
* {@inheritDoc}
*/
@Override
public ImmutableList<String> indices() {
return indices;
}
/**
* {@inheritDoc}
*/
@Override
public long startTime() {
return startTime;
}
/**
* {@inheritDoc}
*/
@Override
public long endTime() {
return endTime;
}
/**
* {@inheritDoc}
*/
@Override
public int totalShard() {
return totalShard;
}
/**
* {@inheritDoc}
*/
@Override
public int successfulShards() {
return successfulShards;
}
/**
* {@inheritDoc}
*/
@Override
public ImmutableList<SnapshotShardFailure> shardFailures() {
return shardFailures;
}
/**
* Creates new BlobStoreSnapshot builder
*
* @return
*/
public static Builder builder() {
return new Builder();
}
/**
* Compares two snapshots by their start time
*
* @param o other snapshot
* @return the value {@code 0} if snapshots were created at the same time;
* a value less than {@code 0} if this snapshot was created before snapshot {@code o}; and
* a value greater than {@code 0} if this snapshot was created after snapshot {@code o};
*/
@Override
public int compareTo(Snapshot o) {
return Long.compare(startTime, ((BlobStoreSnapshot) o).startTime);
}
/**
* BlobStoreSnapshot builder
*/
public static class Builder {
private String name;
private Version version = Version.CURRENT;
private SnapshotState state = SnapshotState.IN_PROGRESS;
private String reason;
private ImmutableList<String> indices;
private long startTime;
private long endTime;
private int totalShard;
private int successfulShards;
private ImmutableList<SnapshotShardFailure> shardFailures = ImmutableList.of();
/**
* Copies data from another snapshot into the builder
*
* @param snapshot another snapshot
* @return this builder
*/
public Builder snapshot(BlobStoreSnapshot snapshot) {
name = snapshot.name();
indices = snapshot.indices();
version = snapshot.version();
reason = snapshot.reason();
state = snapshot.state();
startTime = snapshot.startTime();
endTime = snapshot.endTime();
totalShard = snapshot.totalShard();
successfulShards = snapshot.successfulShards();
shardFailures = snapshot.shardFailures();
return this;
}
/**
* Sets snapshot name
*
* @param name snapshot name
* @return this builder
*/
public Builder name(String name) {
this.name = name;
return this;
}
/**
* Sets list of indices in the snapshot
*
* @param indices list of indices
* @return this builder
*/
public Builder indices(Collection<String> indices) {
this.indices = ImmutableList.copyOf(indices);
return this;
}
/**
* Sets list of indices in the snapshot
*
* @param indices list of indices
* @return this builder
*/
public Builder indices(String[] indices) {
this.indices = ImmutableList.copyOf(indices);
return this;
}
/**
* Sets snapshot state
*
* @param state snapshot state
* @return this builder
*/
public Builder state(SnapshotState state) {
this.state = state;
return this;
}
/**
* Sets snapshot failure reason
*
* @param reason snapshot failure reason
* @return this builder
*/
public Builder reason(String reason) {
this.reason = reason;
return this;
}
/**
* Marks snapshot as successful
*
* @return this builder
*/
public Builder success() {
this.state = SnapshotState.SUCCESS;
return this;
}
/**
* Marks snapshot as partially successful
*
* @return this builder
*/
public Builder partial() {
this.state = SnapshotState.PARTIAL;
return this;
}
/**
* Marks snapshot as failed and saves failure reason
*
* @param reason failure reason
* @return this builder
*/
public Builder failed(String reason) {
this.state = SnapshotState.FAILED;
this.reason = reason;
return this;
}
/**
* Sets version of Elasticsearch that created this snapshot
*
* @param version version
* @return this builder
*/
public Builder version(Version version) {
this.version = version;
return this;
}
/**
* Sets snapshot start time
*
* @param startTime snapshot start time
* @return this builder
*/
public Builder startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Sets snapshot end time
*
* @param endTime snapshot end time
* @return this builder
*/
public Builder endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Sets total number of shards across all snapshot indices
*
* @param totalShard number of shards
* @return this builder
*/
public Builder totalShard(int totalShard) {
this.totalShard = totalShard;
return this;
}
/**
* Sets total number fo shards that were successfully snapshotted
*
* @param successfulShards number of successful shards
* @return this builder
*/
public Builder successfulShards(int successfulShards) {
this.successfulShards = successfulShards;
return this;
}
/**
* Sets the list of individual shard failures
*
* @param shardFailures list of shard failures
* @return this builder
*/
public Builder shardFailures(ImmutableList<SnapshotShardFailure> shardFailures) {
this.shardFailures = shardFailures;
return this;
}
/**
* Sets the total number of shards and the list of individual shard failures
*
* @param totalShard number of shards
* @param shardFailures list of shard failures
* @return this builder
*/
public Builder failures(int totalShard, ImmutableList<SnapshotShardFailure> shardFailures) {
this.totalShard = totalShard;
this.successfulShards = totalShard - shardFailures.size();
this.shardFailures = shardFailures;
return this;
}
/**
* Builds new BlobStoreSnapshot
*
* @return
*/
public BlobStoreSnapshot build() {
return new BlobStoreSnapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures);
}
static final class Fields {
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString VERSION_ID = new XContentBuilderString("version_id");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString END_TIME = new XContentBuilderString("end_time");
static final XContentBuilderString TOTAL_SHARDS = new XContentBuilderString("total_shards");
static final XContentBuilderString SUCCESSFUL_SHARDS = new XContentBuilderString("successful_shards");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
}
/**
* Serializes the snapshot
*
* @param snapshot snapshot to be serialized
* @param builder XContent builder
* @param params serialization parameters
* @throws IOException
*/
public static void toXContent(BlobStoreSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.SNAPSHOT);
builder.field(Fields.NAME, snapshot.name);
builder.field(Fields.VERSION_ID, snapshot.version.id);
builder.startArray(Fields.INDICES);
for (String index : snapshot.indices) {
builder.value(index);
}
builder.endArray();
builder.field(Fields.STATE, snapshot.state);
if (snapshot.reason != null) {
builder.field(Fields.REASON, snapshot.reason);
}
builder.field(Fields.START_TIME, snapshot.startTime);
builder.field(Fields.END_TIME, snapshot.endTime);
builder.field(Fields.TOTAL_SHARDS, snapshot.totalShard);
builder.field(Fields.SUCCESSFUL_SHARDS, snapshot.successfulShards);
builder.startArray(Fields.FAILURES);
for (SnapshotShardFailure shardFailure : snapshot.shardFailures) {
SnapshotShardFailure.toXContent(shardFailure, builder, params);
}
builder.endArray();
builder.endObject();
}
/**
* Parses the snapshot
*
* @param parser XContent parser
* @return snapshot
* @throws IOException
*/
public static BlobStoreSnapshot fromXContent(XContentParser parser) throws IOException {
Builder builder = new Builder();
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = parser.currentName();
if ("snapshot".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
token = parser.nextToken();
if (token.isValue()) {
if ("name".equals(currentFieldName)) {
builder.name(parser.text());
} else if ("state".equals(currentFieldName)) {
builder.state(SnapshotState.valueOf(parser.text()));
} else if ("reason".equals(currentFieldName)) {
builder.reason(parser.text());
} else if ("start_time".equals(currentFieldName)) {
builder.startTime(parser.longValue());
} else if ("end_time".equals(currentFieldName)) {
builder.endTime(parser.longValue());
} else if ("total_shards".equals(currentFieldName)) {
builder.totalShard(parser.intValue());
} else if ("successful_shards".equals(currentFieldName)) {
builder.successfulShards(parser.intValue());
} else if ("version_id".equals(currentFieldName)) {
builder.version(Version.fromId(parser.intValue()));
}
} else if (token == XContentParser.Token.START_ARRAY) {
if ("indices".equals(currentFieldName)) {
ArrayList<String> indices = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indices.add(parser.text());
}
builder.indices(indices);
} else if ("failures".equals(currentFieldName)) {
ArrayList<SnapshotShardFailure> failures = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
failures.add(SnapshotShardFailure.fromXContent(parser));
}
builder.shardFailures(ImmutableList.copyOf(failures));
} else {
// It was probably created by newer version - ignoring
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
// It was probably created by newer version - ignoring
parser.skipChildren();
}
}
}
}
}
return builder.build();
}
}
}

View File

@ -21,52 +21,135 @@ package org.elasticsearch.snapshots;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.ArrayList;
/** /**
* Represent information about snapshot * Represent information about snapshot
*/ */
public interface Snapshot extends Comparable<Snapshot> { public class Snapshot implements Comparable<Snapshot>, ToXContent {
private final String name;
private final Version version;
private final SnapshotState state;
private final String reason;
private final ImmutableList<String> indices;
private final long startTime;
private final long endTime;
private final int totalShard;
private final int successfulShards;
private final ImmutableList<SnapshotShardFailure> shardFailures;
private final static ImmutableList<SnapshotShardFailure> NO_FAILURES = ImmutableList.of();
private Snapshot(String name, ImmutableList<String> indices, SnapshotState state, String reason, Version version, long startTime, long endTime,
int totalShard, int successfulShards, ImmutableList<SnapshotShardFailure> shardFailures) {
assert name != null;
assert indices != null;
assert state != null;
assert shardFailures != null;
this.name = name;
this.indices = indices;
this.state = state;
this.reason = reason;
this.version = version;
this.startTime = startTime;
this.endTime = endTime;
this.totalShard = totalShard;
this.successfulShards = successfulShards;
this.shardFailures = shardFailures;
}
public Snapshot(String name, ImmutableList<String> indices, long startTime) {
this(name, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0, NO_FAILURES);
}
public Snapshot(String name, ImmutableList<String> indices, long startTime, String reason, long endTime,
int totalShard, ImmutableList<SnapshotShardFailure> shardFailures) {
this(name, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT,
startTime, endTime, totalShard, totalShard - shardFailures.size(), shardFailures);
}
private static SnapshotState snapshotState(String reason, ImmutableList<SnapshotShardFailure> shardFailures) {
if (reason == null) {
if (shardFailures.isEmpty()) {
return SnapshotState.SUCCESS;
} else {
return SnapshotState.PARTIAL;
}
} else {
return SnapshotState.FAILED;
}
}
/** /**
* Returns snapshot name * Returns snapshot name
* *
* @return snapshot name * @return snapshot name
*/ */
String name(); public String name() {
return name;
}
/** /**
* Returns current snapshot state * Returns current snapshot state
* *
* @return snapshot state * @return snapshot state
*/ */
SnapshotState state(); public SnapshotState state() {
return state;
}
/** /**
* Returns reason for complete snapshot failure * Returns reason for complete snapshot failure
* *
* @return snapshot failure reason * @return snapshot failure reason
*/ */
String reason(); public String reason() {
return reason;
}
/** /**
* Returns version of Elasticsearch that was used to create this snapshot * Returns version of Elasticsearch that was used to create this snapshot
* *
* @return Elasticsearch version * @return Elasticsearch version
*/ */
Version version(); public Version version() {
return version;
}
/** /**
* Returns indices that were included into this snapshot * Returns indices that were included into this snapshot
* *
* @return list of indices * @return list of indices
*/ */
ImmutableList<String> indices(); public ImmutableList<String> indices() {
return indices;
}
/** /**
* Returns time when snapshot started * Returns time when snapshot started
* *
* @return snapshot start time * @return snapshot start time
*/ */
long startTime(); public long startTime() {
return startTime;
}
/** /**
* Returns time when snapshot ended * Returns time when snapshot ended
@ -75,27 +158,175 @@ public interface Snapshot extends Comparable<Snapshot> {
* *
* @return snapshot end time * @return snapshot end time
*/ */
long endTime(); public long endTime() {
return endTime;
}
/** /**
* Returns total number of shards that were snapshotted * Returns total number of shards that were snapshotted
* *
* @return number of shards * @return number of shards
*/ */
int totalShard(); public int totalShard() {
return totalShard;
}
/** /**
* Returns total number of shards that were successfully snapshotted * Returns total number of shards that were successfully snapshotted
* *
* @return number of successful shards * @return number of successful shards
*/ */
int successfulShards(); public int successfulShards() {
return successfulShards;
}
/** /**
* Returns shard failures * Returns shard failures
*
* @return shard failures
*/ */
ImmutableList<SnapshotShardFailure> shardFailures(); public ImmutableList<SnapshotShardFailure> shardFailures() {
return shardFailures;
}
/**
* Compares two snapshots by their start time
*
* @param o other snapshot
* @return the value {@code 0} if snapshots were created at the same time;
* a value less than {@code 0} if this snapshot was created before snapshot {@code o}; and
* a value greater than {@code 0} if this snapshot was created after snapshot {@code o};
*/
@Override
public int compareTo(Snapshot o) {
return Long.compare(startTime, o.startTime);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Snapshot that = (Snapshot) o;
if (startTime != that.startTime) return false;
if (!name.equals(that.name)) return false;
return true;
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (int) (startTime ^ (startTime >>> 32));
return result;
}
static final class Fields {
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString VERSION_ID = new XContentBuilderString("version_id");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString END_TIME = new XContentBuilderString("end_time");
static final XContentBuilderString TOTAL_SHARDS = new XContentBuilderString("total_shards");
static final XContentBuilderString SUCCESSFUL_SHARDS = new XContentBuilderString("successful_shards");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.SNAPSHOT);
builder.field(Fields.NAME, name);
builder.field(Fields.VERSION_ID, version.id);
builder.startArray(Fields.INDICES);
for (String index : indices) {
builder.value(index);
}
builder.endArray();
builder.field(Fields.STATE, state);
if (reason != null) {
builder.field(Fields.REASON, reason);
}
builder.field(Fields.START_TIME, startTime);
builder.field(Fields.END_TIME, endTime);
builder.field(Fields.TOTAL_SHARDS, totalShard);
builder.field(Fields.SUCCESSFUL_SHARDS, successfulShards);
builder.startArray(Fields.FAILURES);
for (SnapshotShardFailure shardFailure : shardFailures) {
SnapshotShardFailure.toXContent(shardFailure, builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
public static Snapshot fromXContent(XContentParser parser) throws IOException {
String name = null;
Version version = Version.CURRENT;
SnapshotState state = SnapshotState.IN_PROGRESS;
String reason = null;
ImmutableList<String> indices = ImmutableList.of();
long startTime = 0;
long endTime = 0;
int totalShard = 0;
int successfulShards = 0;
ImmutableList<SnapshotShardFailure> shardFailures = NO_FAILURES;
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.START_OBJECT) {
String currentFieldName = parser.currentName();
if ("snapshot".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
token = parser.nextToken();
if (token.isValue()) {
if ("name".equals(currentFieldName)) {
name = parser.text();
} else if ("state".equals(currentFieldName)) {
state = SnapshotState.valueOf(parser.text());
} else if ("reason".equals(currentFieldName)) {
reason = parser.text();
} else if ("start_time".equals(currentFieldName)) {
startTime = parser.longValue();
} else if ("end_time".equals(currentFieldName)) {
endTime = parser.longValue();
} else if ("total_shards".equals(currentFieldName)) {
totalShard = parser.intValue();
} else if ("successful_shards".equals(currentFieldName)) {
successfulShards = parser.intValue();
} else if ("version_id".equals(currentFieldName)) {
version = Version.fromId(parser.intValue());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if ("indices".equals(currentFieldName)) {
ArrayList<String> indicesArray = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
indicesArray.add(parser.text());
}
indices = ImmutableList.copyOf(indicesArray);
} else if ("failures".equals(currentFieldName)) {
ArrayList<SnapshotShardFailure> shardFailureArrayList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
shardFailureArrayList.add(SnapshotShardFailure.fromXContent(parser));
}
shardFailures = ImmutableList.copyOf(shardFailureArrayList);
} else {
// It was probably created by newer version - ignoring
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
// It was probably created by newer version - ignoring
parser.skipChildren();
}
}
}
}
}
return new Snapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures);
}
} }

View File

@ -132,6 +132,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @throws SnapshotMissingException if snapshot is not found * @throws SnapshotMissingException if snapshot is not found
*/ */
public Snapshot snapshot(SnapshotId snapshotId) { public Snapshot snapshot(SnapshotId snapshotId) {
ImmutableList<SnapshotMetaData.Entry> entries = currentSnapshots(snapshotId.getRepository(), new String[] {snapshotId.getSnapshot()});
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
return repositoriesService.repository(snapshotId.getRepository()).readSnapshot(snapshotId); return repositoriesService.repository(snapshotId.getRepository()).readSnapshot(snapshotId);
} }
@ -142,12 +146,17 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @return list of snapshots * @return list of snapshots
*/ */
public ImmutableList<Snapshot> snapshots(String repositoryName) { public ImmutableList<Snapshot> snapshots(String repositoryName) {
ArrayList<Snapshot> snapshotList = newArrayList(); Set<Snapshot> snapshotSet = newHashSet();
ImmutableList<SnapshotMetaData.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotMetaData.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
}
Repository repository = repositoriesService.repository(repositoryName); Repository repository = repositoriesService.repository(repositoryName);
ImmutableList<SnapshotId> snapshotIds = repository.snapshots(); ImmutableList<SnapshotId> snapshotIds = repository.snapshots();
for (SnapshotId snapshotId : snapshotIds) { for (SnapshotId snapshotId : snapshotIds) {
snapshotList.add(repository.readSnapshot(snapshotId)); snapshotSet.add(repository.readSnapshot(snapshotId));
} }
ArrayList<Snapshot> snapshotList = newArrayList(snapshotSet);
CollectionUtil.timSort(snapshotList); CollectionUtil.timSort(snapshotList);
return ImmutableList.copyOf(snapshotList); return ImmutableList.copyOf(snapshotList);
} }
@ -178,7 +187,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
// Store newSnapshot here to be processed in clusterStateProcessed // Store newSnapshot here to be processed in clusterStateProcessed
ImmutableList<String> indices = ImmutableList.copyOf(metaData.concreteIndices(request.indicesOptions(), request.indices())); ImmutableList<String> indices = ImmutableList.copyOf(metaData.concreteIndices(request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices); logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices);
newSnapshot = new SnapshotMetaData.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, null); newSnapshot = new SnapshotMetaData.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
snapshots = new SnapshotMetaData(newSnapshot); snapshots = new SnapshotMetaData(newSnapshot);
} else { } else {
// TODO: What should we do if a snapshot is already running? // TODO: What should we do if a snapshot is already running?
@ -297,17 +306,17 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
for (SnapshotMetaData.Entry entry : snapshots.entries()) { for (SnapshotMetaData.Entry entry : snapshots.entries()) {
if (entry.snapshotId().equals(snapshot.snapshotId())) { if (entry.snapshotId().equals(snapshot.snapshotId())) {
// Replace the snapshot that was just created // Replace the snapshot that was just created
ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards = shards(snapshot.snapshotId(), currentState, snapshot.indices()); ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards = shards(entry.snapshotId(), currentState, entry.indices());
if (!partial) { if (!partial) {
Set<String> indicesWithMissingShards = indicesWithMissingShards(shards); Set<String> indicesWithMissingShards = indicesWithMissingShards(shards);
if (indicesWithMissingShards != null) { if (indicesWithMissingShards != null) {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.FAILED, snapshot.indices(), shards); updatedSnapshot = new SnapshotMetaData.Entry(entry, State.FAILED, shards);
entries.add(updatedSnapshot); entries.add(updatedSnapshot);
failure = "Indices don't have primary shards +[" + indicesWithMissingShards + "]"; failure = "Indices don't have primary shards +[" + indicesWithMissingShards + "]";
continue; continue;
} }
} }
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.STARTED, snapshot.indices(), shards); updatedSnapshot = new SnapshotMetaData.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot); entries.add(updatedSnapshot);
if (!completed(shards.values())) { if (!completed(shards.values())) {
accepted = true; accepted = true;
@ -325,7 +334,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
logger.warn("[{}] failed to create snapshot", t, snapshot.snapshotId()); logger.warn("[{}] failed to create snapshot", t, snapshot.snapshotId());
removeSnapshotFromClusterState(snapshot.snapshotId(), null, t); removeSnapshotFromClusterState(snapshot.snapshotId(), null, t);
try { try {
repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(snapshot.snapshotId(), ExceptionsHelper.detailedMessage(t), 0, ImmutableList.<SnapshotShardFailure>of()); repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(
snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(), ExceptionsHelper.detailedMessage(t), 0, ImmutableList.<SnapshotShardFailure>of());
} catch (Throwable t2) { } catch (Throwable t2) {
logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId()); logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId());
} }
@ -354,7 +364,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
removeSnapshotFromClusterState(snapshot.snapshotId(), null, t); removeSnapshotFromClusterState(snapshot.snapshotId(), null, t);
if (snapshotCreated) { if (snapshotCreated) {
try { try {
repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(snapshot.snapshotId(), ExceptionsHelper.detailedMessage(t), 0, ImmutableList.<SnapshotShardFailure>of()); repositoriesService.repository(snapshot.snapshotId().getRepository()).finalizeSnapshot(snapshot.snapshotId(), snapshot.indices(), snapshot.startTime(),
ExceptionsHelper.detailedMessage(t), 0, ImmutableList.<SnapshotShardFailure>of());
} catch (Throwable t2) { } catch (Throwable t2) {
logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId()); logger.warn("[{}] failed to close snapshot in repository", snapshot.snapshotId());
} }
@ -363,6 +374,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
} }
} }
private Snapshot inProgressSnapshot(SnapshotMetaData.Entry entry) {
return new Snapshot(entry.snapshotId().getSnapshot(), entry.indices(), entry.startTime());
}
/** /**
* Returns status of the currently running snapshots * Returns status of the currently running snapshots
* <p> * <p>
@ -556,10 +571,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
changed = true; changed = true;
ImmutableMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build(); ImmutableMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
if (!snapshot.state().completed() && completed(shardsMap.values())) { if (!snapshot.state().completed() && completed(shardsMap.values())) {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.SUCCESS, snapshot.indices(), shardsMap); updatedSnapshot = new SnapshotMetaData.Entry(snapshot, State.SUCCESS, shardsMap);
endSnapshot(updatedSnapshot); endSnapshot(updatedSnapshot);
} else { } else {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), snapshot.state(), snapshot.indices(), shardsMap); updatedSnapshot = new SnapshotMetaData.Entry(snapshot, snapshot.state(), shardsMap);
} }
} }
entries.add(updatedSnapshot); entries.add(updatedSnapshot);
@ -616,10 +631,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (shards != null) { if (shards != null) {
changed = true; changed = true;
if (!snapshot.state().completed() && completed(shards.values())) { if (!snapshot.state().completed() && completed(shards.values())) {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), State.SUCCESS, snapshot.indices(), shards); updatedSnapshot = new SnapshotMetaData.Entry(snapshot, State.SUCCESS, shards);
endSnapshot(updatedSnapshot); endSnapshot(updatedSnapshot);
} else { } else {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot.snapshotId(), snapshot.includeGlobalState(), snapshot.state(), snapshot.indices(), shards); updatedSnapshot = new SnapshotMetaData.Entry(snapshot, shards);
} }
} }
entries.add(updatedSnapshot); entries.add(updatedSnapshot);
@ -904,11 +919,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state()); logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state());
shards.put(request.shardId(), request.status()); shards.put(request.shardId(), request.status());
if (!completed(shards.values())) { if (!completed(shards.values())) {
entries.add(new SnapshotMetaData.Entry(entry.snapshotId(), entry.includeGlobalState(), entry.state(), entry.indices(), ImmutableMap.copyOf(shards))); entries.add(new SnapshotMetaData.Entry(entry, ImmutableMap.copyOf(shards)));
} else { } else {
// Snapshot is finished - mark it as done // Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status? // TODO: Add PARTIAL_SUCCESS status?
SnapshotMetaData.Entry updatedEntry = new SnapshotMetaData.Entry(entry.snapshotId(), entry.includeGlobalState(), State.SUCCESS, entry.indices(), ImmutableMap.copyOf(shards)); SnapshotMetaData.Entry updatedEntry = new SnapshotMetaData.Entry(entry, State.SUCCESS, ImmutableMap.copyOf(shards));
entries.add(updatedEntry); entries.add(updatedEntry);
// Finalize snapshot in the repository // Finalize snapshot in the repository
endSnapshot(updatedEntry); endSnapshot(updatedEntry);
@ -973,7 +988,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId.getIndex(), shardId.id(), status.reason())); shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId.getIndex(), shardId.id(), status.reason()));
} }
} }
Snapshot snapshot = repository.finalizeSnapshot(snapshotId, failure, entry.shards().size(), ImmutableList.copyOf(shardFailures)); Snapshot snapshot = repository.finalizeSnapshot(snapshotId, entry.indices(), entry.startTime(), failure, entry.shards().size(), ImmutableList.copyOf(shardFailures));
removeSnapshotFromClusterState(snapshotId, new SnapshotInfo(snapshot), null); removeSnapshotFromClusterState(snapshotId, new SnapshotInfo(snapshot), null);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("[{}] failed to finalize snapshot", t, snapshotId); logger.warn("[{}] failed to finalize snapshot", t, snapshotId);
@ -1094,7 +1109,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
logger.trace("trying to delete completed snapshot - save to delete"); logger.trace("trying to delete completed snapshot - save to delete");
return currentState; return currentState;
} }
SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshotId, snapshot.includeGlobalState(), State.ABORTED, snapshot.indices(), shards); SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshot, State.ABORTED, shards);
snapshots = new SnapshotMetaData(newSnapshot); snapshots = new SnapshotMetaData(newSnapshot);
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots); mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build(); return ClusterState.builder(currentState).metaData(mdBuilder).build();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.blobstore; package org.elasticsearch.common.blobstore;
import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.blobstore.fs.FsBlobStore; import org.elasticsearch.common.blobstore.fs.FsBlobStore;
@ -29,9 +30,16 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test; import org.junit.Test;
import java.io.*; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
public class BlobStoreTest extends ElasticsearchTestCase { public class BlobStoreTest extends ElasticsearchTestCase {
@ -39,11 +47,7 @@ public class BlobStoreTest extends ElasticsearchTestCase {
public void testWriteRead() throws IOException { public void testWriteRead() throws IOException {
final BlobStore store = newBlobStore(); final BlobStore store = newBlobStore();
final BlobContainer container = store.blobContainer(new BlobPath()); final BlobContainer container = store.blobContainer(new BlobPath());
int length = randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
byte[] data = new byte[length];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) randomInt();
}
try (OutputStream stream = container.createOutput("foobar")) { try (OutputStream stream = container.createOutput("foobar")) {
stream.write(data); stream.write(data);
} }
@ -61,6 +65,80 @@ public class BlobStoreTest extends ElasticsearchTestCase {
store.close(); store.close();
} }
@Test
public void testMoveAndList() throws IOException {
final BlobStore store = newBlobStore();
final BlobContainer container = store.blobContainer(new BlobPath());
assertThat(container.listBlobs().size(), equalTo(0));
int numberOfFooBlobs = randomIntBetween(0, 10);
int numberOfBarBlobs = randomIntBetween(3, 20);
Map<String, Long> generatedBlobs = newHashMap();
for (int i = 0; i < numberOfFooBlobs; i++) {
int length = randomIntBetween(10, 100);
String name = "foo-" + i + "-";
generatedBlobs.put(name, (long) length);
createRandomBlob(container, name, length);
}
for (int i = 1; i < numberOfBarBlobs; i++) {
int length = randomIntBetween(10, 100);
String name = "bar-" + i + "-";
generatedBlobs.put(name, (long) length);
createRandomBlob(container, name, length);
}
int length = randomIntBetween(10, 100);
String name = "bar-0-";
generatedBlobs.put(name, (long) length);
byte[] data = createRandomBlob(container, name, length);
ImmutableMap<String, BlobMetaData> blobs = container.listBlobs();
assertThat(blobs.size(), equalTo(numberOfFooBlobs + numberOfBarBlobs));
for (Map.Entry<String, Long> generated : generatedBlobs.entrySet()) {
BlobMetaData blobMetaData = blobs.get(generated.getKey());
assertThat(generated.getKey(), blobMetaData, notNullValue());
assertThat(blobMetaData.name(), equalTo(generated.getKey()));
assertThat(blobMetaData.length(), equalTo(generated.getValue()));
}
assertThat(container.listBlobsByPrefix("foo-").size(), equalTo(numberOfFooBlobs));
assertThat(container.listBlobsByPrefix("bar-").size(), equalTo(numberOfBarBlobs));
assertThat(container.listBlobsByPrefix("baz-").size(), equalTo(0));
String newName = "bar-new";
// Move to a new location
container.move(name, newName);
assertThat(container.listBlobsByPrefix(name).size(), equalTo(0));
blobs = container.listBlobsByPrefix(newName);
assertThat(blobs.size(), equalTo(1));
assertThat(blobs.get(newName).length(), equalTo(generatedBlobs.get(name)));
assertThat(data, equalTo(readBlobFully(container, newName, length)));
store.close();
}
protected byte[] createRandomBlob(BlobContainer container, String name, int length) throws IOException {
byte[] data = randomBytes(length);
try (OutputStream stream = container.createOutput(name)) {
stream.write(data);
}
return data;
}
protected byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
byte[] data = new byte[length];
try (InputStream inputStream = container.openInput(name)) {
assertThat(inputStream.read(data), equalTo(length));
assertThat(inputStream.read(), equalTo(-1));
}
return data;
}
protected byte[] randomBytes(int length) {
byte[] data = new byte[length];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) randomInt();
}
return data;
}
protected BlobStore newBlobStore() throws IOException { protected BlobStore newBlobStore() throws IOException {
Path tempDir = newTempDir(LifecycleScope.TEST).toPath(); Path tempDir = newTempDir(LifecycleScope.TEST).toPath();
Settings settings = randomBoolean() ? ImmutableSettings.EMPTY : ImmutableSettings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build(); Settings settings = randomBoolean() ? ImmutableSettings.EMPTY : ImmutableSettings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();

View File

@ -308,10 +308,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> creating repository"); logger.info("--> creating repository");
File repo = newTempDir(LifecycleScope.TEST);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings( .setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder() ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST)) .put("location", repo)
.put("random", randomAsciiOfLength(10)) .put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200) .put("wait_after_unblock", 200)
).get(); ).get();
@ -322,6 +323,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
// Remove it from the list of available nodes // Remove it from the list of available nodes
nodes.remove(blockedNode); nodes.remove(blockedNode);
int numberOfFilesBeforeSnapshot = numberOfFiles(repo);
logger.info("--> snapshot"); logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
@ -347,6 +349,8 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
logger.info("--> making sure that snapshot no longer exists"); logger.info("--> making sure that snapshot no longer exists");
assertThrows(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute(), SnapshotMissingException.class); assertThrows(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute(), SnapshotMissingException.class);
// Subtract index file from the count
assertThat("not all files were deleted during snapshot cancellation", numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 1));
logger.info("--> done"); logger.info("--> done");
} }

View File

@ -682,7 +682,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED)); assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0)); assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(3));
assertThat(createSnapshotResponse.getSnapshotInfo().reason(), startsWith("Indices don't have primary shards")); assertThat(createSnapshotResponse.getSnapshotInfo().reason(), startsWith("Indices don't have primary shards"));
} }

View File

@ -81,4 +81,9 @@ public class BlobContainerWrapper implements BlobContainer {
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException { public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
return delegate.listBlobsByPrefix(blobNamePrefix); return delegate.listBlobsByPrefix(blobNamePrefix);
} }
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
delegate.move(sourceBlobName, targetBlobName);
}
} }

View File

@ -291,6 +291,12 @@ public class MockRepository extends FsRepository {
return super.listBlobsByPrefix(blobNamePrefix); return super.listBlobsByPrefix(blobNamePrefix);
} }
@Override
public void move(String sourceBlob, String targetBlob) throws IOException {
maybeIOExceptionOrBlock(targetBlob);
super.move(sourceBlob, targetBlob);
}
@Override @Override
public OutputStream createOutput(String blobName) throws IOException { public OutputStream createOutput(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName); maybeIOExceptionOrBlock(blobName);