Add checksum to snapshot metadata files

This commit adds checksum to snapshot files that store global and index based metadata as well as shard information.

Closes #11589
This commit is contained in:
Igor Motov 2015-07-15 18:52:55 -04:00
parent c62d0b9ee3
commit bfbee383bd
24 changed files with 1310 additions and 388 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.HashFunction; import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressedXContent;
@ -40,10 +41,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.loader.SettingsLoader; import org.elasticsearch.common.settings.loader.SettingsLoader;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.warmer.IndexWarmersMetaData; import org.elasticsearch.search.warmer.IndexWarmersMetaData;
@ -64,7 +62,7 @@ import static org.elasticsearch.common.settings.Settings.*;
/** /**
* *
*/ */
public class IndexMetaData implements Diffable<IndexMetaData> { public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuilder<IndexMetaData>, ToXContent {
public static final IndexMetaData PROTO = IndexMetaData.builder("") public static final IndexMetaData PROTO = IndexMetaData.builder("")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
@ -515,6 +513,17 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
return new IndexMetaDataDiff(in); return new IndexMetaDataDiff(in);
} }
@Override
public IndexMetaData fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
return Builder.fromXContent(parser);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Builder.toXContent(this, builder, params);
return builder;
}
private static class IndexMetaDataDiff implements Diff<IndexMetaData> { private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
private final String index; private final String index;

View File

@ -36,6 +36,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -60,7 +61,7 @@ import java.util.*;
import static org.elasticsearch.common.settings.Settings.*; import static org.elasticsearch.common.settings.Settings.*;
public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> { public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, FromXContentBuilder<MetaData>, ToXContent {
public static final MetaData PROTO = builder().build(); public static final MetaData PROTO = builder().build();
@ -635,6 +636,17 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> {
return new MetaDataDiff(in); return new MetaDataDiff(in);
} }
@Override
public MetaData fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
return Builder.fromXContent(parser);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Builder.toXContent(this, builder, params);
return builder;
}
private static class MetaDataDiff implements Diff<MetaData> { private static class MetaDataDiff implements Diff<MetaData> {
private long version; private long version;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing; package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -37,11 +38,14 @@ public class RestoreSource implements Streamable, ToXContent {
private String index; private String index;
private Version version;
RestoreSource() { RestoreSource() {
} }
public RestoreSource(SnapshotId snapshotId, String index) { public RestoreSource(SnapshotId snapshotId, Version version, String index) {
this.snapshotId = snapshotId; this.snapshotId = snapshotId;
this.version = version;
this.index = index; this.index = index;
} }
@ -53,6 +57,10 @@ public class RestoreSource implements Streamable, ToXContent {
return index; return index;
} }
public Version version() {
return version;
}
public static RestoreSource readRestoreSource(StreamInput in) throws IOException { public static RestoreSource readRestoreSource(StreamInput in) throws IOException {
RestoreSource restoreSource = new RestoreSource(); RestoreSource restoreSource = new RestoreSource();
restoreSource.readFrom(in); restoreSource.readFrom(in);
@ -66,12 +74,14 @@ public class RestoreSource implements Streamable, ToXContent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
snapshotId = SnapshotId.readSnapshotId(in); snapshotId = SnapshotId.readSnapshotId(in);
version = Version.readVersion(in);
index = in.readString(); index = in.readString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
snapshotId.writeTo(out); snapshotId.writeTo(out);
Version.writeVersion(version, out);
out.writeString(index); out.writeString(index);
} }
@ -80,6 +90,7 @@ public class RestoreSource implements Streamable, ToXContent {
return builder.startObject() return builder.startObject()
.field("repository", snapshotId.getRepository()) .field("repository", snapshotId.getRepository())
.field("snapshot", snapshotId.getSnapshot()) .field("snapshot", snapshotId.getSnapshot())
.field("version", version.toString())
.field("index", index) .field("index", index)
.endObject(); .endObject();
} }

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.store;
import org.apache.lucene.store.IndexInput;
import java.io.EOFException;
import java.io.IOException;
/**
* Wraps array of bytes into IndexInput
*/
public class ByteArrayIndexInput extends IndexInput {
private final byte[] bytes;
private int pos;
private int offset;
private int length;
public ByteArrayIndexInput(String resourceDesc, byte[] bytes) {
this(resourceDesc, bytes, 0, bytes.length);
}
public ByteArrayIndexInput(String resourceDesc, byte[] bytes, int offset, int length) {
super(resourceDesc);
this.bytes = bytes;
this.offset = offset;
this.length = length;
}
@Override
public void close() throws IOException {
}
@Override
public long getFilePointer() {
return pos;
}
@Override
public void seek(long l) throws IOException {
if (l < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + pos);
} else if (l > length) {
throw new EOFException("seek past EOF");
}
pos = (int)l;
}
@Override
public long length() {
return length;
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if (offset >= 0L && length >= 0L && offset + length <= this.length) {
return new ByteArrayIndexInput(sliceDescription, bytes, this.offset + (int)offset, (int)length);
} else {
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length + ": " + this);
}
}
@Override
public byte readByte() throws IOException {
if (pos >= offset + length) {
throw new EOFException("seek past EOF");
}
return bytes[offset + pos++];
}
@Override
public void readBytes(final byte[] b, final int offset, int len) throws IOException {
if (pos + len > this.offset + length) {
throw new EOFException("seek past EOF");
}
System.arraycopy(bytes, this.offset + pos, b, offset, len);
pos += len;
}
}

View File

@ -25,12 +25,13 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
/** /**
* {@link OutputStream} that writes into underlying IndexOutput
*/ */
public class OutputStreamIndexOutput extends OutputStream { public class IndexOutputOutputStream extends OutputStream {
private final IndexOutput out; private final IndexOutput out;
public OutputStreamIndexOutput(IndexOutput out) { public IndexOutputOutputStream(IndexOutput out) {
this.out = out; this.out = out;
} }

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.xcontent;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamableReader;
import java.io.IOException;
/**
* Indicates that the class supports XContent deserialization.
*
* This interface is similar to what {@link StreamableReader} does, only it works with XContent serialization
* instead of binary serialization.
*/
public interface FromXContentBuilder<T> {
/**
* Parses an object with the type T from parser
*/
T fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException;
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -114,7 +115,7 @@ public abstract class MetaDataStateFormat<T> {
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);
out.writeInt(format.index()); out.writeInt(format.index());
out.writeLong(version); out.writeLong(version);
try (XContentBuilder builder = newXContentBuilder(format, new org.elasticsearch.common.lucene.store.OutputStreamIndexOutput(out) { try (XContentBuilder builder = newXContentBuilder(format, new IndexOutputOutputStream(out) {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close. // this is important since some of the XContentBuilders write bytes on close.

View File

@ -317,7 +317,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements
if (!shardId.getIndex().equals(restoreSource.index())) { if (!shardId.getIndex().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); snapshotShardId = new ShardId(restoreSource.index(), shardId.id());
} }
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState);
indexShard.skipTranslogRecovery(true); indexShard.skipTranslogRecovery(true);
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();
indexShard.postRecovery("restore done"); indexShard.postRecovery("restore done");

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.snapshots; package org.elasticsearch.index.snapshots;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -35,7 +36,7 @@ public interface IndexShardRepository {
/** /**
* Creates a snapshot of the shard based on the index commit point. * Creates a snapshot of the shard based on the index commit point.
* <p/> * <p/>
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex()} method. * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method.
* IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller. * IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p/> * <p/>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
@ -55,19 +56,21 @@ public interface IndexShardRepository {
* *
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param shardId shard id (in the current index) * @param shardId shard id (in the current index)
* @param version version of elasticsearch that created this snapshot
* @param snapshotShardId shard id (in the snapshot) * @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state * @param recoveryState recovery state
*/ */
void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState); void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState);
/** /**
* Retrieve shard snapshot status for the stored snapshot * Retrieve shard snapshot status for the stored snapshot
* *
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param shardId shard id * @param shardId shard id
* @return snapshot status * @return snapshot status
*/ */
IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId); IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
/** /**
* Verifies repository settings on data node * Verifies repository settings on data node

View File

@ -33,6 +33,7 @@ import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -46,7 +47,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -63,6 +63,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.*;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
@ -72,7 +73,6 @@ import java.util.*;
import static com.google.common.collect.Lists.newArrayList; import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.toStreamOutput;
/** /**
* Blob store based implementation of IndexShardRepository * Blob store based implementation of IndexShardRepository
@ -104,14 +104,30 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
private final ParseFieldMatcher parseFieldMatcher; private final ParseFieldMatcher parseFieldMatcher;
protected static final String SNAPSHOT_PREFIX = "snapshot-"; protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-";
protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s";
protected static final String SNAPSHOT_PREFIX = "snap-";
protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat";
protected static final String SNAPSHOT_CODEC = "snapshot";
protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; protected static final String SNAPSHOT_INDEX_PREFIX = "index-";
protected static final String SNAPSHOT_TEMP_PREFIX = "pending-"; protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s";
protected static final String SNAPSHOT_INDEX_CODEC = "snapshots";
protected static final String DATA_BLOB_PREFIX = "__"; protected static final String DATA_BLOB_PREFIX = "__";
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat;
private LegacyBlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotLegacyFormat;
private ChecksumBlobStoreFormat<BlobStoreIndexShardSnapshots> indexShardSnapshotsFormat;
@Inject @Inject
public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) {
super(settings); super(settings);
@ -144,6 +160,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} }
}; };
this.compress = compress; this.compress = compress;
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress());
indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress());
} }
/** /**
@ -174,8 +193,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState); final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState);
try { try {
snapshotContext.restore(); snapshotContext.restore();
} catch (Throwable e) { } catch (Throwable e) {
@ -187,8 +206,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId) { public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, shardId); Context context = new Context(snapshotId, version, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
status.updateStage(IndexShardSnapshotStatus.Stage.DONE); status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
@ -223,8 +242,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param shardId shard id * @param shardId shard id
*/ */
public void delete(SnapshotId snapshotId, ShardId shardId) { public void delete(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, shardId, shardId); Context context = new Context(snapshotId, version, shardId, shardId);
context.delete(); context.delete();
} }
@ -236,58 +255,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
']'; ']';
} }
/**
* Returns shard snapshot metadata file name
*
* @param snapshotId snapshot id
* @return shard snapshot metadata file name
*/
private String snapshotBlobName(SnapshotId snapshotId) {
return SNAPSHOT_PREFIX + snapshotId.getSnapshot();
}
/**
* Serializes snapshot to JSON
*
* @param snapshot snapshot
* @param output the stream to output the snapshot JSON representation to
* @throws IOException if an IOException occurs
*/
public void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, StreamOutput output) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output).prettyPrint();
BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS);
builder.flush();
}
/**
* Parses JSON representation of a snapshot
*
* @param stream JSON
* @return snapshot
* @throws IOException if an IOException occurs
*/
public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream, ParseFieldMatcher parseFieldMatcher) throws IOException {
byte[] data = ByteStreams.toByteArray(stream);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
parser.nextToken();
return BlobStoreIndexShardSnapshot.fromXContent(parser, parseFieldMatcher);
}
}
/**
* Parses JSON representation of a snapshot
*
* @param stream JSON
* @return snapshot
* @throws IOException if an IOException occurs
* */
public static BlobStoreIndexShardSnapshots readSnapshots(InputStream stream, ParseFieldMatcher parseFieldMatcher) throws IOException {
byte[] data = ByteStreams.toByteArray(stream);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
parser.nextToken();
return BlobStoreIndexShardSnapshots.fromXContent(parser, parseFieldMatcher);
}
}
/** /**
* Returns true if metadata files should be compressed * Returns true if metadata files should be compressed
* *
@ -297,6 +264,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
return compress; return compress;
} }
BlobStoreFormat<BlobStoreIndexShardSnapshot> indexShardSnapshotFormat(Version version) {
if (BlobStoreRepository.legacyMetaData(version)) {
return indexShardSnapshotLegacyFormat;
} else {
return indexShardSnapshotFormat;
}
}
/** /**
* Context for snapshot/restore operations * Context for snapshot/restore operations
*/ */
@ -308,12 +283,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
protected final BlobContainer blobContainer; protected final BlobContainer blobContainer;
public Context(SnapshotId snapshotId, ShardId shardId) { protected final Version version;
this(snapshotId, shardId, shardId);
public Context(SnapshotId snapshotId, Version version, ShardId shardId) {
this(snapshotId, version, shardId, shardId);
} }
public Context(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId) { public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId; this.snapshotId = snapshotId;
this.version = version;
this.shardId = shardId; this.shardId = shardId;
blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId()))); blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId())));
} }
@ -333,10 +311,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
BlobStoreIndexShardSnapshots snapshots = tuple.v1(); BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2(); int fileListGeneration = tuple.v2();
String commitPointName = snapshotBlobName(snapshotId);
try { try {
blobContainer.deleteBlob(commitPointName); indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getSnapshot());
} catch (IOException e) { } catch (IOException e) {
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
} }
@ -356,13 +332,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* Loads information about shard snapshot * Loads information about shard snapshot
*/ */
public BlobStoreIndexShardSnapshot loadSnapshot() { public BlobStoreIndexShardSnapshot loadSnapshot() {
BlobStoreIndexShardSnapshot snapshot; try {
try (InputStream stream = blobContainer.openInput(snapshotBlobName(snapshotId))) { return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getSnapshot());
snapshot = readSnapshot(stream, parseFieldMatcher);
} catch (IOException ex) { } catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
} }
return snapshot;
} }
/** /**
@ -381,7 +355,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// delete old index files first // delete old index files first
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
// delete old file lists // delete old file lists
if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try { try {
blobContainer.deleteBlob(blobName); blobContainer.deleteBlob(blobName);
} catch (IOException e) { } catch (IOException e) {
@ -408,20 +382,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// If we deleted all snapshots - we don't need to create the index file // If we deleted all snapshots - we don't need to create the index file
if (snapshots.size() > 0) { if (snapshots.size() > 0) {
String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration; try {
try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) { indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration));
StreamOutput stream = compressIfNeeded(output);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.flush();
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e);
} }
try {
blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e);
}
} }
} }
@ -481,8 +446,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} }
} }
if (latest >= 0) { if (latest >= 0) {
try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) { try {
return new Tuple<>(readSnapshots(stream, parseFieldMatcher), latest); return new Tuple<>(indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)), latest);
} catch (IOException e) { } catch (IOException e) {
logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest);
} }
@ -491,22 +456,22 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// We couldn't load the index file - falling back to loading individual snapshots // We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = Lists.newArrayList(); List<SnapshotFiles> snapshots = Lists.newArrayList();
for (String name : blobs.keySet()) { for (String name : blobs.keySet()) {
if (name.startsWith(SNAPSHOT_PREFIX)) { try {
try (InputStream stream = blobContainer.openInput(name)) { BlobStoreIndexShardSnapshot snapshot = null;
BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream, parseFieldMatcher); if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
} catch (IOException e) { } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
logger.warn("failed to read commit point [{}]", e, name); snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
} }
if (snapshot != null) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
}
} catch (IOException e) {
logger.warn("failed to read commit point [{}]", e, name);
} }
} }
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1);
} }
protected StreamOutput compressIfNeeded(OutputStream output) throws IOException {
return toStreamOutput(output, isCompress());
}
} }
/** /**
@ -526,7 +491,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @param snapshotStatus snapshot status to report progress * @param snapshotStatus snapshot status to report progress
*/ */
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, shardId); super(snapshotId, Version.CURRENT, shardId);
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class); store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class);
this.snapshotStatus = snapshotStatus; this.snapshotStatus = snapshotStatus;
@ -627,15 +592,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
// now create and write the commit point // now create and write the commit point
snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE);
String snapshotBlobName = snapshotBlobName(snapshotId);
BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(),
snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(),
// snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong
System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize);
//TODO: The time stored in snapshot doesn't include cleanup time. //TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try (StreamOutput output = compressIfNeeded(blobContainer.createOutput(snapshotBlobName))) { try {
writeSnapshot(snapshot, output); indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getSnapshot());
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
} }
@ -815,8 +779,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
* @param snapshotShardId shard in the snapshot that data should be restored from * @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryState recovery state to report progress * @param recoveryState recovery state to report progress
*/ */
public RestoreContext(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, shardId, snapshotShardId); super(snapshotId, version, shardId, snapshotShardId);
store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class); store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class);
this.recoveryState = recoveryState; this.recoveryState = recoveryState;
} }

View File

@ -29,10 +29,7 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.IOException; import java.io.IOException;
@ -43,7 +40,9 @@ import static com.google.common.collect.Lists.newArrayList;
/** /**
* Shard snapshot metadata * Shard snapshot metadata
*/ */
public class BlobStoreIndexShardSnapshot { public class BlobStoreIndexShardSnapshot implements ToXContent, FromXContentBuilder<BlobStoreIndexShardSnapshot> {
public static final BlobStoreIndexShardSnapshot PROTO = new BlobStoreIndexShardSnapshot();
/** /**
* Information about snapshotted file * Information about snapshotted file
@ -350,6 +349,19 @@ public class BlobStoreIndexShardSnapshot {
this.totalSize = totalSize; this.totalSize = totalSize;
} }
/**
* Special constructor for the prototype
*/
private BlobStoreIndexShardSnapshot() {
this.snapshot = "";
this.indexVersion = 0;
this.indexFiles = ImmutableList.of();
this.startTime = 0;
this.time = 0;
this.numberOfFiles = 0;
this.totalSize = 0;
}
/** /**
* Returns index version * Returns index version
* *
@ -429,25 +441,24 @@ public class BlobStoreIndexShardSnapshot {
/** /**
* Serializes shard snapshot metadata info into JSON * Serializes shard snapshot metadata info into JSON
* *
* @param snapshot shard snapshot metadata
* @param builder XContent builder * @param builder XContent builder
* @param params parameters * @param params parameters
* @throws IOException * @throws IOException
*/ */
public static void toXContent(BlobStoreIndexShardSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException { @Override
builder.startObject(); public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.NAME, snapshot.snapshot); builder.field(Fields.NAME, snapshot);
builder.field(Fields.INDEX_VERSION, snapshot.indexVersion); builder.field(Fields.INDEX_VERSION, indexVersion);
builder.field(Fields.START_TIME, snapshot.startTime); builder.field(Fields.START_TIME, startTime);
builder.field(Fields.TIME, snapshot.time); builder.field(Fields.TIME, time);
builder.field(Fields.NUMBER_OF_FILES, snapshot.numberOfFiles); builder.field(Fields.NUMBER_OF_FILES, numberOfFiles);
builder.field(Fields.TOTAL_SIZE, snapshot.totalSize); builder.field(Fields.TOTAL_SIZE, totalSize);
builder.startArray(Fields.FILES); builder.startArray(Fields.FILES);
for (FileInfo fileInfo : snapshot.indexFiles) { for (FileInfo fileInfo : indexFiles) {
FileInfo.toXContent(fileInfo, builder, params); FileInfo.toXContent(fileInfo, builder, params);
} }
builder.endArray(); builder.endArray();
builder.endObject(); return builder;
} }
/** /**
@ -457,7 +468,7 @@ public class BlobStoreIndexShardSnapshot {
* @return shard snapshot metadata * @return shard snapshot metadata
* @throws IOException * @throws IOException
*/ */
public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { public BlobStoreIndexShardSnapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
String snapshot = null; String snapshot = null;
long indexVersion = -1; long indexVersion = -1;
@ -467,7 +478,9 @@ public class BlobStoreIndexShardSnapshot {
long totalSize = 0; long totalSize = 0;
List<FileInfo> indexFiles = newArrayList(); List<FileInfo> indexFiles = newArrayList();
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
}
XContentParser.Token token = parser.currentToken(); XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.START_OBJECT) { if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -510,5 +523,4 @@ public class BlobStoreIndexShardSnapshot {
return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.copyOf(indexFiles), return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.copyOf(indexFiles),
startTime, time, numberOfFiles, totalSize); startTime, time, numberOfFiles, totalSize);
} }
} }

View File

@ -24,10 +24,7 @@ import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import java.io.IOException; import java.io.IOException;
@ -44,7 +41,10 @@ import static com.google.common.collect.Maps.newHashMap;
* This class is used to find files that were already snapshoted and clear out files that no longer referenced by any * This class is used to find files that were already snapshoted and clear out files that no longer referenced by any
* snapshots * snapshots
*/ */
public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, ToXContent { public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, ToXContent, FromXContentBuilder<BlobStoreIndexShardSnapshots> {
public static final BlobStoreIndexShardSnapshots PROTO = new BlobStoreIndexShardSnapshots();
private final ImmutableList<SnapshotFiles> shardSnapshots; private final ImmutableList<SnapshotFiles> shardSnapshots;
private final ImmutableMap<String, FileInfo> files; private final ImmutableMap<String, FileInfo> files;
private final ImmutableMap<String, ImmutableList<FileInfo>> physicalFiles; private final ImmutableMap<String, ImmutableList<FileInfo>> physicalFiles;
@ -103,6 +103,12 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
this.physicalFiles = mapBuilder.build(); this.physicalFiles = mapBuilder.build();
} }
private BlobStoreIndexShardSnapshots() {
shardSnapshots = ImmutableList.of();
files = ImmutableMap.of();
physicalFiles = ImmutableMap.of();
}
/** /**
* Returns list of snapshots * Returns list of snapshots
@ -201,7 +207,6 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
*/ */
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
// First we list all blobs with their file infos: // First we list all blobs with their file infos:
builder.startArray(Fields.FILES); builder.startArray(Fields.FILES);
for (Map.Entry<String, FileInfo> entry : files.entrySet()) { for (Map.Entry<String, FileInfo> entry : files.entrySet()) {
@ -219,14 +224,15 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
builder.endArray(); builder.endArray();
builder.endObject(); builder.endObject();
} }
builder.endObject();
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { public BlobStoreIndexShardSnapshots fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
XContentParser.Token token = parser.currentToken(); XContentParser.Token token = parser.currentToken();
if (token == null) { // New parser
token = parser.nextToken();
}
Map<String, List<String>> snapshotsMap = newHashMap(); Map<String, List<String>> snapshotsMap = newHashMap();
ImmutableMap.Builder<String, FileInfo> filesBuilder = ImmutableMap.builder(); ImmutableMap.Builder<String, FileInfo> filesBuilder = ImmutableMap.builder();
if (token == XContentParser.Token.START_OBJECT) { if (token == XContentParser.Token.START_OBJECT) {

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import com.google.common.collect.Maps;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.*;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
/**
* Base class that handles serialization of various data structures during snapshot/restore operations.
*/
public abstract class BlobStoreFormat<T extends ToXContent> {
protected final String blobNameFormat;
protected final FromXContentBuilder<T> reader;
protected final ParseFieldMatcher parseFieldMatcher;
// Serialization parameters to specify correct context for metadata serialization
protected static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS;
static {
Map<String, String> snapshotOnlyParams = Maps.newHashMap();
// when metadata is serialized certain elements of the metadata shouldn't be included into snapshot
// exclusion of these elements is done by setting MetaData.CONTEXT_MODE_PARAM to MetaData.CONTEXT_MODE_SNAPSHOT
snapshotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
SNAPSHOT_ONLY_FORMAT_PARAMS = new ToXContent.MapParams(snapshotOnlyParams);
}
/**
* @param blobNameFormat format of the blobname in {@link String#format(Locale, String, Object...)} format
* @param reader the prototype object that can deserialize objects with type T
* @param parseFieldMatcher parse field matcher
*/
protected BlobStoreFormat(String blobNameFormat, FromXContentBuilder<T> reader, ParseFieldMatcher parseFieldMatcher) {
this.reader = reader;
this.blobNameFormat = blobNameFormat;
this.parseFieldMatcher = parseFieldMatcher;
}
/**
* Reads and parses the blob with given blob name.
*
* @param blobContainer blob container
* @param blobName blob name
* @return parsed blob object
* @throws IOException
*/
public abstract T readBlob(BlobContainer blobContainer, String blobName) throws IOException;
/**
* Reads and parses the blob with given name, applying name translation using the {link #blobName} method
*
* @param blobContainer blob container
* @param name name to be translated into
* @return parsed blob object
* @throws IOException
*/
public T read(BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
return readBlob(blobContainer, blobName);
}
/**
* Deletes obj in the blob container
*/
public void delete(BlobContainer blobContainer, String name) throws IOException {
blobContainer.deleteBlob(blobName(name));
}
/**
* Checks obj in the blob container
*/
public boolean exists(BlobContainer blobContainer, String name) throws IOException {
return blobContainer.blobExists(blobName(name));
}
protected String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}
protected T read(BytesReference bytes) throws IOException {
try (XContentParser parser = XContentHelper.createParser(bytes)) {
T obj = reader.fromXContent(parser, parseFieldMatcher);
return obj;
}
}
}

View File

@ -19,18 +19,15 @@
package org.elasticsearch.repositories.blobstore; package org.elasticsearch.repositories.blobstore;
import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
@ -39,7 +36,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
@ -47,7 +43,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
@ -61,12 +56,7 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.InvalidSnapshotNameException; import org.elasticsearch.snapshots.*;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -93,13 +83,13 @@ import static com.google.common.collect.Lists.newArrayList;
* STORE_ROOT * STORE_ROOT
* |- index - list of all snapshot name as JSON array * |- index - list of all snapshot name as JSON array
* |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010" * |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010"
* |- metadata-20131010 - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) * |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
* |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011" * |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011"
* |- metadata-20131011 - JSON serialized MetaData for snapshot "20131011" * |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011"
* ..... * .....
* |- indices/ - data for all indices * |- indices/ - data for all indices
* |- foo/ - data for index "foo" * |- foo/ - data for index "foo"
* | |- snapshot-20131010 - JSON Serialized IndexMetaData for index "foo" * | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo"
* | |- 0/ - data for shard "0" of index "foo" * | |- 0/ - data for shard "0" of index "foo"
* | | |- __1 \ * | | |- __1 \
* | | |- __2 | * | | |- __2 |
@ -107,8 +97,9 @@ import static com.google.common.collect.Lists.newArrayList;
* | | |- __4 | * | | |- __4 |
* | | |- __5 / * | | |- __5 /
* | | ..... * | | .....
* | | |- snapshot-20131010 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131010" * | | |- snap-20131010.dat - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131010"
* | | |- snapshot-20131011 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011" * | | |- snap-20131011.dat - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011"
* | | |- list-123 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011"
* | | * | |
* | |- 1/ - data for shard "1" of index "foo" * | |- 1/ - data for shard "1" of index "foo"
* | | |- __1 * | | |- __1
@ -128,24 +119,35 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
protected final String repositoryName; protected final String repositoryName;
private static final String SNAPSHOT_PREFIX = "snapshot-"; private static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-";
private static final String TEMP_SNAPSHOT_FILE_PREFIX = "pending-"; private static final String SNAPSHOT_PREFIX = "snap-";
private static final String SNAPSHOT_SUFFIX = ".dat";
private static final String COMMON_SNAPSHOT_PREFIX = "snap";
private static final String SNAPSHOT_CODEC = "snapshot";
private static final String SNAPSHOTS_FILE = "index"; private static final String SNAPSHOTS_FILE = "index";
private static final String TESTS_FILE = "tests-"; private static final String TESTS_FILE = "tests-";
private static final String METADATA_PREFIX = "meta-"; private static final String METADATA_NAME_FORMAT = "meta-%s.dat";
private static final String LEGACY_METADATA_PREFIX = "metadata-"; private static final String LEGACY_METADATA_NAME_FORMAT = "metadata-%s";
private static final String METADATA_CODEC = "metadata";
private static final String INDEX_METADATA_CODEC = "index-metadata";
private static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s" + SNAPSHOT_SUFFIX;
private static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s";
private static final String METADATA_SUFFIX = ".dat";
private final BlobStoreIndexShardRepository indexShardRepository; private final BlobStoreIndexShardRepository indexShardRepository;
private final ToXContent.Params snapshotOnlyFormatParams;
private final RateLimiter snapshotRateLimiter; private final RateLimiter snapshotRateLimiter;
private final RateLimiter restoreRateLimiter; private final RateLimiter restoreRateLimiter;
@ -154,6 +156,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric(); private final CounterMetric restoreRateLimitingTimeInNanos = new CounterMetric();
private ChecksumBlobStoreFormat<MetaData> globalMetaDataFormat;
private LegacyBlobStoreFormat<MetaData> globalMetaDataLegacyFormat;
private ChecksumBlobStoreFormat<IndexMetaData> indexMetaDataFormat;
private LegacyBlobStoreFormat<IndexMetaData> indexMetaDataLegacyFormat;
private ChecksumBlobStoreFormat<Snapshot> snapshotFormat;
private LegacyBlobStoreFormat<Snapshot> snapshotLegacyFormat;
/** /**
* Constructs new BlobStoreRepository * Constructs new BlobStoreRepository
@ -166,9 +179,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
super(repositorySettings.globalSettings()); super(repositorySettings.globalSettings());
this.repositoryName = repositoryName; this.repositoryName = repositoryName;
this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository; this.indexShardRepository = (BlobStoreIndexShardRepository) indexShardRepository;
Map<String, String> snpashotOnlyParams = Maps.newHashMap();
snpashotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT);
snapshotOnlyFormatParams = new ToXContent.MapParams(snpashotOnlyParams);
snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
} }
@ -180,6 +190,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
protected void doStart() { protected void doStart() {
this.snapshotsBlobContainer = blobStore().blobContainer(basePath()); this.snapshotsBlobContainer = blobStore().blobContainer(basePath());
indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this, isCompress()); indexShardRepository.initialize(blobStore(), basePath(), chunkSize(), snapshotRateLimiter, restoreRateLimiter, this, isCompress());
ParseFieldMatcher parseFieldMatcher = new ParseFieldMatcher(settings);
globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher, isCompress());
globalMetaDataLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher);
indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT, IndexMetaData.PROTO, parseFieldMatcher, isCompress());
indexMetaDataLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, IndexMetaData.PROTO, parseFieldMatcher);
snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, Snapshot.PROTO, parseFieldMatcher, isCompress());
snapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, Snapshot.PROTO, parseFieldMatcher);
} }
/** /**
@ -241,26 +261,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) { public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {
try { try {
String snapshotBlobName = snapshotBlobName(snapshotId); if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot()) ||
if (snapshotsBlobContainer.blobExists(snapshotBlobName)) { snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) {
throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists"); throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists");
} }
// Write Global MetaData // Write Global MetaData
// TODO: Check if metadata needs to be written globalMetaDataFormat.write(metaData, snapshotsBlobContainer, snapshotId.getSnapshot());
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId, false)))) {
writeGlobalMetaData(metaData, output);
}
for (String index : indices) { for (String index : indices) {
final IndexMetaData indexMetaData = metaData.index(index); final IndexMetaData indexMetaData = metaData.index(index);
final BlobPath indexPath = basePath().add("indices").add(index); final BlobPath indexPath = basePath().add("indices").add(index);
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try (StreamOutput output = compressIfNeeded(indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId)))) { indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getSnapshot());
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output);
builder.startObject();
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
}
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SnapshotCreationException(snapshotId, ex); throw new SnapshotCreationException(snapshotId, ex);
@ -279,7 +290,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
indices = snapshot.indices(); indices = snapshot.indices();
} catch (SnapshotMissingException ex) { } catch (SnapshotMissingException ex) {
throw ex; throw ex;
} catch (SnapshotException | ElasticsearchParseException ex) { } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
logger.warn("cannot read snapshot file [{}]", ex, snapshotId); logger.warn("cannot read snapshot file [{}]", ex, snapshotId);
} }
MetaData metaData = null; MetaData metaData = null;
@ -287,25 +298,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
if (snapshot != null) { if (snapshot != null) {
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true); metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true);
} else { } else {
try { metaData = readSnapshotMetaData(snapshotId, null, indices, true);
metaData = readSnapshotMetaData(snapshotId, false, indices, true);
} catch (IOException ex) {
metaData = readSnapshotMetaData(snapshotId, true, indices, true);
}
} }
} catch (IOException | SnapshotException ex) { } catch (IOException | SnapshotException ex) {
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId); logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
} }
try { try {
String blobName = snapshotBlobName(snapshotId);
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK // Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK
snapshotsBlobContainer.deleteBlob(blobName);
if (snapshot != null) { if (snapshot != null) {
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, legacyMetaData(snapshot.version()))); snapshotFormat(snapshot.version()).delete(snapshotsBlobContainer, snapshotId.getSnapshot());
globalMetaDataFormat(snapshot.version()).delete(snapshotsBlobContainer, snapshotId.getSnapshot());
} else { } else {
// We don't know which version was the snapshot created with - try deleting both current and legacy metadata // We don't know which version was the snapshot created with - try deleting both current and legacy formats
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, true)); snapshotFormat.delete(snapshotsBlobContainer, snapshotId.getSnapshot());
snapshotsBlobContainer.deleteBlob(metaDataBlobName(snapshotId, false)); snapshotLegacyFormat.delete(snapshotsBlobContainer, snapshotId.getSnapshot());
globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, snapshotId.getSnapshot());
globalMetaDataFormat.delete(snapshotsBlobContainer, snapshotId.getSnapshot());
} }
// Delete snapshot from the snapshot list // Delete snapshot from the snapshot list
List<SnapshotId> snapshotIds = snapshots(); List<SnapshotId> snapshotIds = snapshots();
@ -324,7 +332,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
BlobPath indexPath = basePath().add("indices").add(index); BlobPath indexPath = basePath().add("indices").add(index);
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try { try {
indexMetaDataBlobContainer.deleteBlob(blobName); indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getSnapshot());
} catch (IOException ex) { } catch (IOException ex) {
logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index); logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index);
} }
@ -334,7 +342,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(index, i); ShardId shardId = new ShardId(index, i);
try { try {
indexShardRepository.delete(snapshotId, shardId); indexShardRepository.delete(snapshotId, snapshot.version(), shardId);
} catch (SnapshotException ex) { } catch (SnapshotException ex) {
logger.warn("[{}] failed to delete shard data for shard [{}]", ex, snapshotId, shardId); logger.warn("[{}] failed to delete shard data for shard [{}]", ex, snapshotId, shardId);
} }
@ -347,40 +355,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
} }
} }
private StreamOutput compressIfNeeded(OutputStream output) throws IOException {
return toStreamOutput(output, isCompress());
}
public static StreamOutput toStreamOutput(OutputStream output, boolean compress) throws IOException {
StreamOutput out = null;
boolean success = false;
try {
out = new OutputStreamStreamOutput(output);
if (compress) {
out = CompressorFactory.defaultCompressor().streamOutput(out);
}
success = true;
return out;
} finally {
if (!success) {
IOUtils.closeWhileHandlingException(out, output);
}
}
}
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public Snapshot finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) { public Snapshot finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
try { try {
String tempBlobName = tempSnapshotBlobName(snapshotId);
String blobName = snapshotBlobName(snapshotId);
Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures); Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures);
try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(tempBlobName))) { snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getSnapshot());
writeSnapshot(blobStoreSnapshot, output);
}
snapshotsBlobContainer.move(tempBlobName, blobName);
List<SnapshotId> snapshotIds = snapshots(); List<SnapshotId> snapshotIds = snapshots();
if (!snapshotIds.contains(snapshotId)) { if (!snapshotIds.contains(snapshotId)) {
snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build(); snapshotIds = ImmutableList.<SnapshotId>builder().addAll(snapshotIds).add(snapshotId).build();
@ -401,14 +383,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
List<SnapshotId> snapshots = newArrayList(); List<SnapshotId> snapshots = newArrayList();
Map<String, BlobMetaData> blobs; Map<String, BlobMetaData> blobs;
try { try {
blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOT_PREFIX); blobs = snapshotsBlobContainer.listBlobsByPrefix(COMMON_SNAPSHOT_PREFIX);
} catch (UnsupportedOperationException ex) { } catch (UnsupportedOperationException ex) {
// Fall back in case listBlobsByPrefix isn't supported by the blob store // Fall back in case listBlobsByPrefix isn't supported by the blob store
return readSnapshotList(); return readSnapshotList();
} }
int prefixLength = SNAPSHOT_PREFIX.length(); int prefixLength = SNAPSHOT_PREFIX.length();
int suffixLength = SNAPSHOT_SUFFIX.length();
int legacyPrefixLength = LEGACY_SNAPSHOT_PREFIX.length();
for (BlobMetaData md : blobs.values()) { for (BlobMetaData md : blobs.values()) {
String name = md.name().substring(prefixLength); String blobName = md.name();
final String name;
if (blobName.startsWith(SNAPSHOT_PREFIX) && blobName.length() > legacyPrefixLength) {
name = blobName.substring(prefixLength, blobName.length() - suffixLength);
} else if (blobName.startsWith(LEGACY_SNAPSHOT_PREFIX) && blobName.length() > suffixLength + prefixLength) {
name = blobName.substring(legacyPrefixLength);
} else {
// not sure what it was - ignore
continue;
}
snapshots.add(new SnapshotId(repositoryName, name)); snapshots.add(new SnapshotId(repositoryName, name));
} }
return ImmutableList.copyOf(snapshots); return ImmutableList.copyOf(snapshots);
@ -431,24 +424,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
@Override @Override
public Snapshot readSnapshot(SnapshotId snapshotId) { public Snapshot readSnapshot(SnapshotId snapshotId) {
try { try {
try (InputStream blob = snapshotsBlobContainer.openInput(snapshotBlobName(snapshotId))) { return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getSnapshot());
return readSnapshot(ByteStreams.toByteArray(blob));
}
} catch (FileNotFoundException | NoSuchFileException ex) { } catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex); // File is missing - let's try legacy format instead
try {
return snapshotLegacyFormat.read(snapshotsBlobContainer, snapshotId.getSnapshot());
} catch (FileNotFoundException | NoSuchFileException ex1) {
throw new SnapshotMissingException(snapshotId, ex);
} catch (IOException | NotXContentException ex1) {
throw new SnapshotException(snapshotId, "failed to get snapshots", ex1);
}
} catch (IOException | NotXContentException ex) { } catch (IOException | NotXContentException ex) {
throw new SnapshotException(snapshotId, "failed to get snapshots", ex); throw new SnapshotException(snapshotId, "failed to get snapshots", ex);
} }
} }
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException { private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException {
return readSnapshotMetaData(snapshotId, legacyMetaData(snapshotVersion), indices, ignoreIndexErrors);
}
private MetaData readSnapshotMetaData(SnapshotId snapshotId, boolean legacy, List<String> indices, boolean ignoreIndexErrors) throws IOException {
MetaData metaData; MetaData metaData;
try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId, legacy))) { if (snapshotVersion == null) {
metaData = readMetaData(ByteStreams.toByteArray(blob)); // When we delete corrupted snapshots we might not know which version we are dealing with
// We can try detecting the version based on the metadata file format
assert ignoreIndexErrors;
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) {
snapshotVersion = Version.CURRENT;
} else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) {
snapshotVersion = Version.V_1_0_0;
} else {
throw new SnapshotMissingException(snapshotId);
}
}
try {
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getSnapshot());
} catch (FileNotFoundException | NoSuchFileException ex) { } catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(snapshotId, ex); throw new SnapshotMissingException(snapshotId, ex);
} catch (IOException ex) { } catch (IOException ex) {
@ -458,28 +464,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
for (String index : indices) { for (String index : indices) {
BlobPath indexPath = basePath().add("indices").add(index); BlobPath indexPath = basePath().add("indices").add(index);
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try (InputStream blob = indexMetaDataBlobContainer.openInput(snapshotBlobName(snapshotId))) { try {
byte[] data = ByteStreams.toByteArray(blob); metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getSnapshot()), false);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { } catch (ElasticsearchParseException | IOException ex) {
XContentParser.Token token; if (ignoreIndexErrors) {
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
IndexMetaData indexMetaData = IndexMetaData.Builder.fromXContent(parser);
if ((token = parser.nextToken()) == XContentParser.Token.END_OBJECT) {
metaDataBuilder.put(indexMetaData, false);
continue;
}
}
if (!ignoreIndexErrors) {
throw new ElasticsearchParseException("unexpected token [{}]", token);
} else {
logger.warn("[{}] [{}] unexpected token while reading snapshot metadata [{}]", snapshotId, index, token);
}
}
} catch (IOException ex) {
if (!ignoreIndexErrors) {
throw new SnapshotException(snapshotId, "failed to read metadata", ex);
} else {
logger.warn("[{}] [{}] failed to read metadata for index", snapshotId, index, ex); logger.warn("[{}] [{}] failed to read metadata for index", snapshotId, index, ex);
} else {
throw ex;
} }
} }
} }
@ -505,85 +496,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
} }
/** /**
* Parses JSON containing snapshot description * Returns appropriate global metadata format based on the provided version of the snapshot
*
* @param data snapshot description in JSON format
* @return parsed snapshot description
* @throws IOException parse exceptions
*/ */
public Snapshot readSnapshot(byte[] data) throws IOException { private BlobStoreFormat<MetaData> globalMetaDataFormat(Version version) {
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { if(legacyMetaData(version)) {
XContentParser.Token token; return globalMetaDataLegacyFormat;
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
if ((token = parser.nextToken()) == XContentParser.Token.FIELD_NAME) {
parser.nextToken();
Snapshot snapshot = Snapshot.fromXContent(parser);
if ((token = parser.nextToken()) == XContentParser.Token.END_OBJECT) {
return snapshot;
}
}
}
throw new ElasticsearchParseException("unexpected token [{}]", token);
} catch (JsonParseException ex) {
throw new ElasticsearchParseException("failed to read snapshot", ex);
}
}
/**
* Parses JSON containing cluster metadata
*
* @param data cluster metadata in JSON format
* @return parsed metadata
* @throws IOException parse exceptions
*/
private MetaData readMetaData(byte[] data) throws IOException {
try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) {
XContentParser.Token token;
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
if ((token = parser.nextToken()) == XContentParser.Token.FIELD_NAME) {
parser.nextToken();
MetaData metaData = MetaData.Builder.fromXContent(parser);
if ((token = parser.nextToken()) == XContentParser.Token.END_OBJECT) {
return metaData;
}
}
}
throw new ElasticsearchParseException("unexpected token [{}]", token);
}
}
/**
* Returns name of snapshot blob
*
* @param snapshotId snapshot id
* @return name of snapshot blob
*/
private String snapshotBlobName(SnapshotId snapshotId) {
return SNAPSHOT_PREFIX + snapshotId.getSnapshot();
}
/**
* Returns temporary name of snapshot blob
*
* @param snapshotId snapshot id
* @return name of snapshot blob
*/
private String tempSnapshotBlobName(SnapshotId snapshotId) {
return TEMP_SNAPSHOT_FILE_PREFIX + snapshotId.getSnapshot();
}
/**
* Returns name of metadata blob
*
* @param snapshotId snapshot id
* @param legacy true if legacy (pre-2.0.0) format should be used
* @return name of metadata blob
*/
private String metaDataBlobName(SnapshotId snapshotId, boolean legacy) {
if (legacy) {
return LEGACY_METADATA_PREFIX + snapshotId.getSnapshot();
} else { } else {
return METADATA_PREFIX + snapshotId.getSnapshot() + METADATA_SUFFIX; return globalMetaDataFormat;
}
}
/**
* Returns appropriate snapshot format based on the provided version of the snapshot
*/
private BlobStoreFormat<Snapshot> snapshotFormat(Version version) {
if(legacyMetaData(version)) {
return snapshotLegacyFormat;
} else {
return snapshotFormat;
} }
} }
@ -592,38 +522,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @param version * @param version
* @return true if legacy version should be used false otherwise * @return true if legacy version should be used false otherwise
*/ */
private boolean legacyMetaData(Version version) { public static boolean legacyMetaData(Version version) {
return version.before(Version.V_2_0_0_beta1); return version.before(Version.V_2_0_0_beta1);
} }
/** /**
* Serializes Snapshot into JSON * Returns appropriate index metadata format based on the provided version of the snapshot
*
* @param snapshot - snapshot description
* @return BytesStreamOutput representing JSON serialized Snapshot
* @throws IOException
*/ */
private void writeSnapshot(Snapshot snapshot, StreamOutput outputStream) throws IOException { private BlobStoreFormat<IndexMetaData> indexMetaDataFormat(Version version) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, outputStream); if(legacyMetaData(version)) {
builder.startObject(); return indexMetaDataLegacyFormat;
snapshot.toXContent(builder, snapshotOnlyFormatParams); } else {
builder.endObject(); return indexMetaDataFormat;
builder.flush(); }
}
/**
* Serializes global MetaData into JSON
*
* @param metaData - metaData
* @return BytesStreamOutput representing JSON serialized global MetaData
* @throws IOException
*/
private void writeGlobalMetaData(MetaData metaData, StreamOutput outputStream) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, outputStream);
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, snapshotOnlyFormatParams);
builder.endObject();
builder.flush();
} }
/** /**
@ -635,18 +546,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent<Rep
* @throws IOException I/O errors * @throws IOException I/O errors
*/ */
protected void writeSnapshotList(List<SnapshotId> snapshots) throws IOException { protected void writeSnapshotList(List<SnapshotId> snapshots) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput(); final BytesReference bRef;
StreamOutput stream = compressIfNeeded(bStream); try(BytesStreamOutput bStream = new BytesStreamOutput()) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); try(StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
builder.startObject(); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startArray("snapshots"); builder.startObject();
for (SnapshotId snapshot : snapshots) { builder.startArray("snapshots");
builder.value(snapshot.getSnapshot()); for (SnapshotId snapshot : snapshots) {
builder.value(snapshot.getSnapshot());
}
builder.endArray();
builder.endObject();
builder.close();
}
bRef = bStream.bytes();
} }
builder.endArray(); snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE);
builder.endObject();
builder.close();
BytesReference bRef = bStream.bytes();
try (OutputStream output = snapshotsBlobContainer.createOutput(SNAPSHOTS_FILE)) { try (OutputStream output = snapshotsBlobContainer.createOutput(SNAPSHOTS_FILE)) {
bRef.writeTo(output); bRef.writeTo(output);
} }

View File

@ -0,0 +1,218 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import com.google.common.io.ByteStreams;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.gateway.CorruptStateException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Locale;
/**
* Snapshot metadata file format used in v2.0 and above
*/
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
private static final String TEMP_FILE_PREFIX = "pending-";
private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;
// The format version
public static final int VERSION = 1;
private static final int BUFFER_SIZE = 4096;
protected final XContentType xContentType;
protected final boolean compress;
private final String codec;
/**
* @param codec codec name
* @param blobNameFormat format of the blobname in {@link String#format} format
* @param reader prototype object that can deserialize T from XContent
* @param compress true if the content should be compressed
* @param xContentType content type that should be used for write operations
*/
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, FromXContentBuilder<T> reader, ParseFieldMatcher parseFieldMatcher, boolean compress, XContentType xContentType) {
super(blobNameFormat, reader, parseFieldMatcher);
this.xContentType = xContentType;
this.compress = compress;
this.codec = codec;
}
/**
* @param codec codec name
* @param blobNameFormat format of the blobname in {@link String#format} format
* @param reader prototype object that can deserialize T from XContent
* @param compress true if the content should be compressed
*/
public ChecksumBlobStoreFormat(String codec, String blobNameFormat, FromXContentBuilder<T> reader, ParseFieldMatcher parseFieldMatcher, boolean compress) {
this(codec, blobNameFormat, reader, parseFieldMatcher, compress, DEFAULT_X_CONTENT_TYPE);
}
/**
* Reads blob with specified name without resolving the blobName using using {@link #blobName} method.
*
* @param blobContainer blob container
* @param blobName blob name
* @return
* @throws IOException
*/
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
try (InputStream inputStream = blobContainer.openInput(blobName)) {
byte[] bytes = ByteStreams.toByteArray(inputStream);
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) {
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
BytesReference bytesReference = new BytesArray(bytes, (int) filePointer, (int) contentSize);
return read(bytesReference);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
}
}
}
/**
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
* <p/>
* The blob will be compressed and checksum will be written if required.
*
* Atomic move might be very inefficient on some repositories. It also cannot override existing files.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @throws IOException
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
String tempBlobName = tempBlobName(name);
writeBlob(obj, blobContainer, tempBlobName);
try {
blobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// Move failed - try cleaning up
blobContainer.deleteBlob(tempBlobName);
throw ex;
}
}
/**
* Writes blob with resolving the blob name using {@link #blobName} method.
* <p/>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @throws IOException
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
writeBlob(obj, blobContainer, blobName);
}
/**
* Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
* <p/>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param blobName blob name
* @throws IOException
*/
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
} }) {
bytes.writeTo(indexOutputOutputStream);
}
CodecUtil.writeFooter(indexOutput);
}
}
}
/**
* Returns true if the blob is a leftover temporary blob.
*
* The temporary blobs might be left after failed atomic write operation.
*/
public boolean isTempBlobName(String blobName) {
return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
}
protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
return bytesStreamOutput.bytes();
}
}
protected void write(T obj, StreamOutput streamOutput) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput)) {
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
builder.endObject();
}
}
protected String tempBlobName(String name) {
return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import com.google.common.io.ByteStreams;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.FromXContentBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import java.io.IOException;
import java.io.InputStream;
/**
* Snapshot metadata file format used before v2.0
*/
public class LegacyBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
/**
* @param blobNameFormat format of the blobname in {@link String#format} format
* @param reader the prototype object that can deserialize objects with type T
*/
public LegacyBlobStoreFormat(String blobNameFormat, FromXContentBuilder<T> reader, ParseFieldMatcher parseFieldMatcher) {
super(blobNameFormat, reader, parseFieldMatcher);
}
/**
* Reads and parses the blob with given name.
*
* If required the checksum of the blob will be verified.
*
* @param blobContainer blob container
* @param blobName blob name
* @return parsed blob object
* @throws IOException
*/
public T readBlob(BlobContainer blobContainer, String blobName) throws IOException {
try (InputStream inputStream = blobContainer.openInput(blobName)) {
return read(new BytesArray(ByteStreams.toByteArray(inputStream)));
}
}
}

View File

@ -84,7 +84,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX
* {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)} * {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null
* {@code recover} method uses {@link StoreRecoveryService#restore(org.elasticsearch.indices.recovery.RecoveryState)} * {@code recover} method uses {@link StoreRecoveryService#restore}
* method to start shard restore process. * method to start shard restore process.
* <p/> * <p/>
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)}, * At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)},
@ -203,7 +203,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) { for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
String index = indexEntry.getValue(); String index = indexEntry.getValue();
boolean partial = checkPartial(index); boolean partial = checkPartial(index);
RestoreSource restoreSource = new RestoreSource(snapshotId, index); RestoreSource restoreSource = new RestoreSource(snapshotId, snapshot.version(), index);
String renamedIndex = indexEntry.getKey(); String renamedIndex = indexEntry.getKey();
IndexMetaData snapshotIndexMetaData = metaData.index(index); IndexMetaData snapshotIndexMetaData = metaData.index(index);
snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings); snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings);

View File

@ -20,20 +20,22 @@
package org.elasticsearch.snapshots; package org.elasticsearch.snapshots;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import static java.util.Collections.*;
/** /**
* Represent information about snapshot * Represent information about snapshot
*/ */
public class Snapshot implements Comparable<Snapshot>, ToXContent { public class Snapshot implements Comparable<Snapshot>, ToXContent, FromXContentBuilder<Snapshot> {
private final String name; private final String name;
@ -57,6 +59,8 @@ public class Snapshot implements Comparable<Snapshot>, ToXContent {
private final static List<SnapshotShardFailure> NO_FAILURES = ImmutableList.of(); private final static List<SnapshotShardFailure> NO_FAILURES = ImmutableList.of();
public final static Snapshot PROTO = new Snapshot();
private Snapshot(String name, List<String> indices, SnapshotState state, String reason, Version version, long startTime, long endTime, private Snapshot(String name, List<String> indices, SnapshotState state, String reason, Version version, long startTime, long endTime,
int totalShard, int successfulShards, List<SnapshotShardFailure> shardFailures) { int totalShard, int successfulShards, List<SnapshotShardFailure> shardFailures) {
assert name != null; assert name != null;
@ -86,6 +90,13 @@ public class Snapshot implements Comparable<Snapshot>, ToXContent {
startTime, endTime, totalShard, totalShard - shardFailures.size(), shardFailures); startTime, endTime, totalShard, totalShard - shardFailures.size(), shardFailures);
} }
/**
* Special constructor for the prototype object
*/
private Snapshot() {
this("", (List<String>) EMPTY_LIST, 0);
}
private static SnapshotState snapshotState(String reason, List<SnapshotShardFailure> shardFailures) { private static SnapshotState snapshotState(String reason, List<SnapshotShardFailure> shardFailures) {
if (reason == null) { if (reason == null) {
if (shardFailures.isEmpty()) { if (shardFailures.isEmpty()) {
@ -221,6 +232,11 @@ public class Snapshot implements Comparable<Snapshot>, ToXContent {
return result; return result;
} }
@Override
public Snapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
return fromXContent(parser);
}
static final class Fields { static final class Fields {
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString NAME = new XContentBuilderString("name");
@ -277,9 +293,14 @@ public class Snapshot implements Comparable<Snapshot>, ToXContent {
int totalShard = 0; int totalShard = 0;
int successfulShards = 0; int successfulShards = 0;
List<SnapshotShardFailure> shardFailures = NO_FAILURES; List<SnapshotShardFailure> shardFailures = NO_FAILURES;
if (parser.currentToken() == null) { // fresh parser? move to the first token
XContentParser.Token token = parser.currentToken(); parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) { }
if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token
parser.nextToken();
}
XContentParser.Token token;
if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) {
String currentFieldName = parser.currentName(); String currentFieldName = parser.currentName();
if ("snapshot".equals(currentFieldName)) { if ("snapshot".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -328,6 +349,8 @@ public class Snapshot implements Comparable<Snapshot>, ToXContent {
} }
} }
} }
} else {
throw new ElasticsearchParseException("unexpected token [" + token + "]");
} }
return new Snapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures); return new Snapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures);
} }

View File

@ -34,6 +34,10 @@ public class SnapshotMissingException extends SnapshotException {
super(snapshot, "is missing", cause); super(snapshot, "is missing", cause);
} }
public SnapshotMissingException(SnapshotId snapshot) {
super(snapshot, "is missing");
}
public SnapshotMissingException(StreamInput in) throws IOException { public SnapshotMissingException(StreamInput in) throws IOException {
super(in); super(in);
} }

View File

@ -471,7 +471,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
shardSnapshotStatus.failure(shardFailure.reason()); shardSnapshotStatus.failure(shardFailure.reason());
shardStatusBuilder.put(shardId, shardSnapshotStatus); shardStatusBuilder.put(shardId, shardSnapshotStatus);
} else { } else {
IndexShardSnapshotStatus shardSnapshotStatus = indexShardRepository.snapshotStatus(snapshotId, shardId); IndexShardSnapshotStatus shardSnapshotStatus = indexShardRepository.snapshotStatus(snapshotId, snapshot.version(), shardId);
shardStatusBuilder.put(shardId, shardSnapshotStatus); shardStatusBuilder.put(shardId, shardSnapshotStatus);
} }
} }

View File

@ -126,7 +126,7 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
.build(); .build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData) .metaData(metaData)
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), "test"), new IntHashSet())).build(); .routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test"), new IntHashSet())).build();
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) { for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED)); assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
} }
@ -139,7 +139,7 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
.build(); .build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData) .metaData(metaData)
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), "test"))).build(); .routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test"))).build();
for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) { for (ShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED)); assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
} }

View File

@ -0,0 +1,138 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.store;
import com.google.common.base.Charsets;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.io.IOException;
import static org.hamcrest.Matchers.containsString;
public class ByteArrayIndexInputTests extends ElasticsearchTestCase {
@Test
public void testRandomReads() throws IOException {
for (int i = 0; i < 100; i++) {
byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(Charsets.UTF_8);
ByteArrayIndexInput indexInput = new ByteArrayIndexInput("test", input);
assertEquals(input.length, indexInput.length());
assertEquals(0, indexInput.getFilePointer());
byte[] output = randomReadAndSlice(indexInput, input.length);
assertArrayEquals(input, output);
}
}
@Test
public void testRandomOverflow() throws IOException {
for (int i = 0; i < 100; i++) {
byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(Charsets.UTF_8);
ByteArrayIndexInput indexInput = new ByteArrayIndexInput("test", input);
int firstReadLen = randomIntBetween(0, input.length - 1);
randomReadAndSlice(indexInput, firstReadLen);
int bytesLeft = input.length - firstReadLen;
try {
// read using int size
int secondReadLen = bytesLeft + randomIntBetween(1, 100);
indexInput.readBytes(new byte[secondReadLen], 0, secondReadLen);
fail();
} catch (IOException ex) {
assertThat(ex.getMessage(), containsString("EOF"));
}
}
}
@Test
public void testSeekOverflow() throws IOException {
for (int i = 0; i < 100; i++) {
byte[] input = randomUnicodeOfLength(randomIntBetween(1, 1000)).getBytes(Charsets.UTF_8);
ByteArrayIndexInput indexInput = new ByteArrayIndexInput("test", input);
int firstReadLen = randomIntBetween(0, input.length - 1);
randomReadAndSlice(indexInput, firstReadLen);
try {
switch (randomIntBetween(0, 2)) {
case 0:
indexInput.seek(Integer.MAX_VALUE + 4L);
break;
case 1:
indexInput.seek(-randomIntBetween(1, 10));
break;
case 2:
int seek = input.length + randomIntBetween(1, 100);
indexInput.seek(seek);
break;
default:
fail();
}
fail();
} catch (IOException ex) {
assertThat(ex.getMessage(), containsString("EOF"));
} catch (IllegalArgumentException ex) {
assertThat(ex.getMessage(), containsString("negative position"));
}
}
}
private byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IOException {
int readPos = (int) indexInput.getFilePointer();
byte[] output = new byte[length];
while (readPos < length) {
switch (randomIntBetween(0, 3)) {
case 0:
// Read by one byte at a time
output[readPos++] = indexInput.readByte();
break;
case 1:
// Read several bytes into target
int len = randomIntBetween(1, length - readPos);
indexInput.readBytes(output, readPos, len);
readPos += len;
break;
case 2:
// Read several bytes into 0-offset target
len = randomIntBetween(1, length - readPos);
byte[] temp = new byte[len];
indexInput.readBytes(temp, 0, len);
System.arraycopy(temp, 0, output, readPos, len);
readPos += len;
break;
case 3:
// Read using slice
len = randomIntBetween(1, length - readPos);
IndexInput slice = indexInput.slice("slice (" + readPos + ", " + len + ") of " + indexInput.toString(), readPos, len);
temp = randomReadAndSlice(slice, len);
// assert that position in the original input didn't change
assertEquals(readPos, indexInput.getFilePointer());
System.arraycopy(temp, 0, output, readPos, len);
readPos += len;
indexInput.seek(readPos);
assertEquals(readPos, indexInput.getFilePointer());
break;
default:
fail();
}
assertEquals((long) readPos, indexInput.getFilePointer());
}
return output;
}
}

View File

@ -0,0 +1,296 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.ElasticsearchCorruptionException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.translog.BufferedChecksumStreamOutput;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.repositories.blobstore.LegacyBlobStoreFormat;
import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
public class BlobStoreFormatTests extends AbstractSnapshotTests {
private static final ParseFieldMatcher parseFieldMatcher = new ParseFieldMatcher(Settings.EMPTY);
public static final String BLOB_CODEC = "blob";
private static class BlobObj implements ToXContent, FromXContentBuilder<BlobObj> {
public static final BlobObj PROTO = new BlobObj("");
private final String text;
public BlobObj(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public BlobObj fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {
String text = null;
XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ElasticsearchParseException("unexpected token [{}]", token);
}
String currentFieldName = parser.currentName();
token = parser.nextToken();
if (token.isValue()) {
if ("text" .equals(currentFieldName)) {
text = parser.text();
} else {
throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName);
}
} else {
throw new ElasticsearchParseException("unexpected token [{}]", token);
}
}
}
if (text == null) {
throw new ElasticsearchParseException("missing mandatory parameter text");
}
return new BlobObj(text);
}
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("text", getText());
return builder;
}
}
/**
* Extends legacy format with writing functionality. It's used to simulate legacy file formats in tests.
*/
private static final class LegacyEmulationBlobStoreFormat<T extends ToXContent> extends LegacyBlobStoreFormat<T> {
protected final XContentType xContentType;
protected final boolean compress;
public LegacyEmulationBlobStoreFormat(String blobNameFormat, FromXContentBuilder<T> reader, ParseFieldMatcher parseFieldMatcher, boolean compress, XContentType xContentType) {
super(blobNameFormat, reader, parseFieldMatcher);
this.xContentType = xContentType;
this.compress = compress;
}
public void write(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
bytes.writeTo(outputStream);
}
}
private BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) {
write(obj, compressedStreamOutput);
}
} else {
write(obj, bytesStreamOutput);
}
return bytesStreamOutput.bytes();
}
}
private void write(T obj, StreamOutput streamOutput) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput);
builder.startObject();
obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS);
builder.endObject();
builder.close();
}
}
@Test
public void testBlobStoreOperations() throws IOException {
BlobStore blobStore = createTestBlobStore();
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
ChecksumBlobStoreFormat<BlobObj> checksumJSON = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.JSON);
ChecksumBlobStoreFormat<BlobObj> checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.SMILE);
ChecksumBlobStoreFormat<BlobObj> checksumSMILECompressed = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, true, XContentType.SMILE);
LegacyEmulationBlobStoreFormat<BlobObj> legacyJSON = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.JSON);
LegacyEmulationBlobStoreFormat<BlobObj> legacySMILE = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.SMILE);
LegacyEmulationBlobStoreFormat<BlobObj> legacySMILECompressed = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, true, XContentType.SMILE);
// Write blobs in different formats
checksumJSON.write(new BlobObj("checksum json"), blobContainer, "check-json");
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile");
checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp");
legacyJSON.write(new BlobObj("legacy json"), blobContainer, "legacy-json");
legacySMILE.write(new BlobObj("legacy smile"), blobContainer, "legacy-smile");
legacySMILECompressed.write(new BlobObj("legacy smile compressed"), blobContainer, "legacy-smile-comp");
// Assert that all checksum blobs can be read by all formats
assertEquals(checksumJSON.read(blobContainer, "check-json").getText(), "checksum json");
assertEquals(checksumSMILE.read(blobContainer, "check-json").getText(), "checksum json");
assertEquals(checksumJSON.read(blobContainer, "check-smile").getText(), "checksum smile");
assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile");
assertEquals(checksumJSON.read(blobContainer, "check-smile-comp").getText(), "checksum smile compressed");
assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp").getText(), "checksum smile compressed");
// Assert that all legacy blobs can be read be all formats
assertEquals(legacyJSON.read(blobContainer, "legacy-json").getText(), "legacy json");
assertEquals(legacySMILE.read(blobContainer, "legacy-json").getText(), "legacy json");
assertEquals(legacyJSON.read(blobContainer, "legacy-smile").getText(), "legacy smile");
assertEquals(legacySMILE.read(blobContainer, "legacy-smile").getText(), "legacy smile");
assertEquals(legacyJSON.read(blobContainer, "legacy-smile-comp").getText(), "legacy smile compressed");
assertEquals(legacySMILE.read(blobContainer, "legacy-smile-comp").getText(), "legacy smile compressed");
}
@Test
public void testCompressionIsApplied() throws IOException {
BlobStore blobStore = createTestBlobStore();
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
StringBuilder veryRedundantText = new StringBuilder();
for (int i = 0; i < randomIntBetween(100, 300); i++) {
veryRedundantText.append("Blah ");
}
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, randomBoolean() ? XContentType.SMILE : XContentType.JSON);
ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, true, randomBoolean() ? XContentType.SMILE : XContentType.JSON);
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
checksumFormatComp.write(blobObj, blobContainer, "blob-comp");
checksumFormat.write(blobObj, blobContainer, "blob-not-comp");
Map<String, BlobMetaData> blobs = blobContainer.listBlobsByPrefix("blob-");
assertEquals(blobs.size(), 2);
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
}
@Test
public void testBlobCorruption() throws IOException {
BlobStore blobStore = createTestBlobStore();
BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
String testString = randomAsciiOfLength(randomInt(10000));
BlobObj blobObj = new BlobObj(testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, randomBoolean(), randomBoolean() ? XContentType.SMILE : XContentType.JSON);
checksumFormat.write(blobObj, blobContainer, "test-path");
assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
checksumFormat.read(blobContainer, "test-path");
fail("Should have failed due to corruption");
} catch (ElasticsearchCorruptionException ex) {
assertThat(ex.getMessage(), containsString("test-path"));
} catch (EOFException ex) {
// This can happen if corrupt the byte length
}
}
public void testAtomicWrite() throws Exception {
final BlobStore blobStore = createTestBlobStore();
final BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath());
String testString = randomAsciiOfLength(randomInt(10000));
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
final BlobObj blobObj = new BlobObj(testString) {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
super.toXContent(builder, params);
// Block before finishing writing
try {
block.countDown();
unblock.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return builder;
}
};
final ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, randomBoolean(), randomBoolean() ? XContentType.SMILE : XContentType.JSON);
ExecutorService threadPool = Executors.newFixedThreadPool(1);
try {
Future<Void> future = threadPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
checksumFormat.writeAtomic(blobObj, blobContainer, "test-blob");
return null;
}
});
block.await(5, TimeUnit.SECONDS);
assertFalse(blobContainer.blobExists("test-blob"));
unblock.countDown();
future.get();
assertTrue(blobContainer.blobExists("test-blob"));
} finally {
threadPool.shutdown();
}
}
protected BlobStore createTestBlobStore() throws IOException {
Settings settings = Settings.builder().build();
return new FsBlobStore(settings, randomRepoPath());
}
protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException {
byte[] buffer = new byte[(int) blobContainer.listBlobsByPrefix(blobName).get(blobName).length()];
long originalChecksum = checksum(buffer);
try (InputStream inputStream = blobContainer.openInput(blobName)) {
Streams.readFully(inputStream, buffer);
}
do {
int location = randomIntBetween(0, buffer.length - 1);
buffer[location] = (byte) (buffer[location] ^ 42);
} while (originalChecksum == checksum(buffer));
try (OutputStream outputStream = blobContainer.createOutput(blobName)) {
Streams.copy(buffer, outputStream);
}
}
private long checksum(byte[] buffer) throws IOException {
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
try (BufferedChecksumStreamOutput checksumOutput = new BufferedChecksumStreamOutput(streamOutput)) {
checksumOutput.write(buffer);
return checksumOutput.getChecksum();
}
}
}
}

View File

@ -906,7 +906,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable"); logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snapshot-test-snap-1"); Path snapshotPath = repo.resolve("snap-test-snap-1.dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10)); outChan.truncate(randomInt(10));
} }