diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 9a684a283bc..deea1cec308 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeFilters; import org.elasticsearch.cluster.routing.HashFunction; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.compress.CompressedXContent; @@ -40,10 +41,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.loader.SettingsLoader; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.warmer.IndexWarmersMetaData; @@ -64,7 +62,7 @@ import static org.elasticsearch.common.settings.Settings.*; /** * */ -public class IndexMetaData implements Diffable { +public class IndexMetaData implements Diffable, FromXContentBuilder, ToXContent { public static final IndexMetaData PROTO = IndexMetaData.builder("") .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) @@ -515,6 +513,17 @@ public class IndexMetaData implements Diffable { return new IndexMetaDataDiff(in); } + @Override + public IndexMetaData fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + return Builder.fromXContent(parser); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + private static class IndexMetaDataDiff implements Diff { private final String index; diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index a3d839f0d33..3715e64f7b1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -60,7 +61,7 @@ import java.util.*; import static org.elasticsearch.common.settings.Settings.*; -public class MetaData implements Iterable, Diffable { +public class MetaData implements Iterable, Diffable, FromXContentBuilder, ToXContent { public static final MetaData PROTO = builder().build(); @@ -635,6 +636,17 @@ public class MetaData implements Iterable, Diffable { return new MetaDataDiff(in); } + @Override + public MetaData fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + return Builder.fromXContent(parser); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + private static class MetaDataDiff implements Diff { private long version; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RestoreSource.java b/core/src/main/java/org/elasticsearch/cluster/routing/RestoreSource.java index d36be1be519..01bbfc33558 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RestoreSource.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RestoreSource.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -37,11 +38,14 @@ public class RestoreSource implements Streamable, ToXContent { private String index; + private Version version; + RestoreSource() { } - public RestoreSource(SnapshotId snapshotId, String index) { + public RestoreSource(SnapshotId snapshotId, Version version, String index) { this.snapshotId = snapshotId; + this.version = version; this.index = index; } @@ -53,6 +57,10 @@ public class RestoreSource implements Streamable, ToXContent { return index; } + public Version version() { + return version; + } + public static RestoreSource readRestoreSource(StreamInput in) throws IOException { RestoreSource restoreSource = new RestoreSource(); restoreSource.readFrom(in); @@ -66,12 +74,14 @@ public class RestoreSource implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { snapshotId = SnapshotId.readSnapshotId(in); + version = Version.readVersion(in); index = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { snapshotId.writeTo(out); + Version.writeVersion(version, out); out.writeString(index); } @@ -80,6 +90,7 @@ public class RestoreSource implements Streamable, ToXContent { return builder.startObject() .field("repository", snapshotId.getRepository()) .field("snapshot", snapshotId.getSnapshot()) + .field("version", version.toString()) .field("index", index) .endObject(); } diff --git a/core/src/main/java/org/elasticsearch/common/lucene/store/ByteArrayIndexInput.java b/core/src/main/java/org/elasticsearch/common/lucene/store/ByteArrayIndexInput.java new file mode 100644 index 00000000000..6470f97f3f6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/lucene/store/ByteArrayIndexInput.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.lucene.store; + +import org.apache.lucene.store.IndexInput; + +import java.io.EOFException; +import java.io.IOException; + +/** + * Wraps array of bytes into IndexInput + */ +public class ByteArrayIndexInput extends IndexInput { + private final byte[] bytes; + + private int pos; + + private int offset; + + private int length; + + public ByteArrayIndexInput(String resourceDesc, byte[] bytes) { + this(resourceDesc, bytes, 0, bytes.length); + } + + public ByteArrayIndexInput(String resourceDesc, byte[] bytes, int offset, int length) { + super(resourceDesc); + this.bytes = bytes; + this.offset = offset; + this.length = length; + } + + @Override + public void close() throws IOException { + } + + @Override + public long getFilePointer() { + return pos; + } + + @Override + public void seek(long l) throws IOException { + if (l < 0) { + throw new IllegalArgumentException("Seeking to negative position: " + pos); + } else if (l > length) { + throw new EOFException("seek past EOF"); + } + pos = (int)l; + } + + @Override + public long length() { + return length; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + if (offset >= 0L && length >= 0L && offset + length <= this.length) { + return new ByteArrayIndexInput(sliceDescription, bytes, this.offset + (int)offset, (int)length); + } else { + throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength=" + this.length + ": " + this); + } + } + + @Override + public byte readByte() throws IOException { + if (pos >= offset + length) { + throw new EOFException("seek past EOF"); + } + return bytes[offset + pos++]; + } + + @Override + public void readBytes(final byte[] b, final int offset, int len) throws IOException { + if (pos + len > this.offset + length) { + throw new EOFException("seek past EOF"); + } + System.arraycopy(bytes, this.offset + pos, b, offset, len); + pos += len; + } +} diff --git a/core/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java b/core/src/main/java/org/elasticsearch/common/lucene/store/IndexOutputOutputStream.java similarity index 89% rename from core/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java rename to core/src/main/java/org/elasticsearch/common/lucene/store/IndexOutputOutputStream.java index 156ddb5f3fd..a6617b78438 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/store/IndexOutputOutputStream.java @@ -25,12 +25,13 @@ import java.io.IOException; import java.io.OutputStream; /** + * {@link OutputStream} that writes into underlying IndexOutput */ -public class OutputStreamIndexOutput extends OutputStream { +public class IndexOutputOutputStream extends OutputStream { private final IndexOutput out; - public OutputStreamIndexOutput(IndexOutput out) { + public IndexOutputOutputStream(IndexOutput out) { this.out = out; } diff --git a/core/src/main/java/org/elasticsearch/common/xcontent/FromXContentBuilder.java b/core/src/main/java/org/elasticsearch/common/xcontent/FromXContentBuilder.java new file mode 100644 index 00000000000..51511e445c2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/xcontent/FromXContentBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.xcontent; + +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.io.stream.StreamableReader; + +import java.io.IOException; + +/** + * Indicates that the class supports XContent deserialization. + * + * This interface is similar to what {@link StreamableReader} does, only it works with XContent serialization + * instead of binary serialization. + */ +public interface FromXContentBuilder { + /** + * Parses an object with the type T from parser + */ + T fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException; +} diff --git a/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 9ea7cf5e60b..523e9bc5414 100644 --- a/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/core/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -35,6 +35,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -114,7 +115,7 @@ public abstract class MetaDataStateFormat { CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); out.writeInt(format.index()); out.writeLong(version); - try (XContentBuilder builder = newXContentBuilder(format, new org.elasticsearch.common.lucene.store.OutputStreamIndexOutput(out) { + try (XContentBuilder builder = newXContentBuilder(format, new IndexOutputOutputStream(out) { @Override public void close() throws IOException { // this is important since some of the XContentBuilders write bytes on close. diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java index 32275c4c1fd..14b27efc8e9 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecoveryService.java @@ -317,7 +317,7 @@ public class StoreRecoveryService extends AbstractIndexShardComponent implements if (!shardId.getIndex().equals(restoreSource.index())) { snapshotShardId = new ShardId(restoreSource.index(), shardId.id()); } - indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); + indexShardRepository.restore(restoreSource.snapshotId(), restoreSource.version(), shardId, snapshotShardId, recoveryState); indexShard.skipTranslogRecovery(true); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java index 224019a26bb..7c778846926 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardRepository.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.snapshots; +import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.shard.ShardId; @@ -35,7 +36,7 @@ public interface IndexShardRepository { /** * Creates a snapshot of the shard based on the index commit point. *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex()} method. + * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#snapshotIndex} method. * IndexShardRepository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check @@ -55,19 +56,21 @@ public interface IndexShardRepository { * * @param snapshotId snapshot id * @param shardId shard id (in the current index) + * @param version version of elasticsearch that created this snapshot * @param snapshotShardId shard id (in the snapshot) * @param recoveryState recovery state */ - void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState); + void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState); /** * Retrieve shard snapshot status for the stored snapshot * * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot * @param shardId shard id * @return snapshot status */ - IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId); + IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); /** * Verifies repository settings on data node diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index b9245afa22f..c870b214500 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -46,7 +47,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; @@ -63,6 +63,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositoryVerificationException; +import org.elasticsearch.repositories.blobstore.*; import java.io.FilterInputStream; import java.io.IOException; @@ -72,7 +73,6 @@ import java.util.*; import static com.google.common.collect.Lists.newArrayList; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.testBlobPrefix; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.toStreamOutput; /** * Blob store based implementation of IndexShardRepository @@ -104,14 +104,30 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements private final ParseFieldMatcher parseFieldMatcher; - protected static final String SNAPSHOT_PREFIX = "snapshot-"; + protected static final String LEGACY_SNAPSHOT_PREFIX = "snapshot-"; + + protected static final String LEGACY_SNAPSHOT_NAME_FORMAT = LEGACY_SNAPSHOT_PREFIX + "%s"; + + protected static final String SNAPSHOT_PREFIX = "snap-"; + + protected static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; + + protected static final String SNAPSHOT_CODEC = "snapshot"; protected static final String SNAPSHOT_INDEX_PREFIX = "index-"; - protected static final String SNAPSHOT_TEMP_PREFIX = "pending-"; + protected static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; + + protected static final String SNAPSHOT_INDEX_CODEC = "snapshots"; protected static final String DATA_BLOB_PREFIX = "__"; + private ChecksumBlobStoreFormat indexShardSnapshotFormat; + + private LegacyBlobStoreFormat indexShardSnapshotLegacyFormat; + + private ChecksumBlobStoreFormat indexShardSnapshotsFormat; + @Inject public BlobStoreIndexShardRepository(Settings settings, RepositoryName repositoryName, IndicesService indicesService, ClusterService clusterService) { super(settings); @@ -144,6 +160,9 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } }; this.compress = compress; + indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher, isCompress()); + indexShardSnapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, BlobStoreIndexShardSnapshot.PROTO, parseFieldMatcher); + indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT, BlobStoreIndexShardSnapshots.PROTO, parseFieldMatcher, isCompress()); } /** @@ -174,8 +193,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * {@inheritDoc} */ @Override - public void restore(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(snapshotId, shardId, snapshotShardId, recoveryState); + public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { + final RestoreContext snapshotContext = new RestoreContext(snapshotId, version, shardId, snapshotShardId, recoveryState); try { snapshotContext.restore(); } catch (Throwable e) { @@ -187,8 +206,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * {@inheritDoc} */ @Override - public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, ShardId shardId) { - Context context = new Context(snapshotId, shardId); + public IndexShardSnapshotStatus snapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); status.updateStage(IndexShardSnapshotStatus.Stage.DONE); @@ -223,8 +242,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @param snapshotId snapshot id * @param shardId shard id */ - public void delete(SnapshotId snapshotId, ShardId shardId) { - Context context = new Context(snapshotId, shardId, shardId); + public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { + Context context = new Context(snapshotId, version, shardId, shardId); context.delete(); } @@ -236,58 +255,6 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements ']'; } - /** - * Returns shard snapshot metadata file name - * - * @param snapshotId snapshot id - * @return shard snapshot metadata file name - */ - private String snapshotBlobName(SnapshotId snapshotId) { - return SNAPSHOT_PREFIX + snapshotId.getSnapshot(); - } - - /** - * Serializes snapshot to JSON - * - * @param snapshot snapshot - * @param output the stream to output the snapshot JSON representation to - * @throws IOException if an IOException occurs - */ - public void writeSnapshot(BlobStoreIndexShardSnapshot snapshot, StreamOutput output) throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output).prettyPrint(); - BlobStoreIndexShardSnapshot.toXContent(snapshot, builder, ToXContent.EMPTY_PARAMS); - builder.flush(); - } - - /** - * Parses JSON representation of a snapshot - * - * @param stream JSON - * @return snapshot - * @throws IOException if an IOException occurs - */ - public static BlobStoreIndexShardSnapshot readSnapshot(InputStream stream, ParseFieldMatcher parseFieldMatcher) throws IOException { - byte[] data = ByteStreams.toByteArray(stream); - try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { - parser.nextToken(); - return BlobStoreIndexShardSnapshot.fromXContent(parser, parseFieldMatcher); - } - } - - /** - * Parses JSON representation of a snapshot - * - * @param stream JSON - * @return snapshot - * @throws IOException if an IOException occurs - * */ - public static BlobStoreIndexShardSnapshots readSnapshots(InputStream stream, ParseFieldMatcher parseFieldMatcher) throws IOException { - byte[] data = ByteStreams.toByteArray(stream); - try (XContentParser parser = XContentHelper.createParser(new BytesArray(data))) { - parser.nextToken(); - return BlobStoreIndexShardSnapshots.fromXContent(parser, parseFieldMatcher); - } - } /** * Returns true if metadata files should be compressed * @@ -297,6 +264,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements return compress; } + BlobStoreFormat indexShardSnapshotFormat(Version version) { + if (BlobStoreRepository.legacyMetaData(version)) { + return indexShardSnapshotLegacyFormat; + } else { + return indexShardSnapshotFormat; + } + } + /** * Context for snapshot/restore operations */ @@ -308,12 +283,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements protected final BlobContainer blobContainer; - public Context(SnapshotId snapshotId, ShardId shardId) { - this(snapshotId, shardId, shardId); + protected final Version version; + + public Context(SnapshotId snapshotId, Version version, ShardId shardId) { + this(snapshotId, version, shardId, shardId); } - public Context(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId) { + public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { this.snapshotId = snapshotId; + this.version = version; this.shardId = shardId; blobContainer = blobStore.blobContainer(basePath.add("indices").add(snapshotShardId.getIndex()).add(Integer.toString(snapshotShardId.getId()))); } @@ -333,10 +311,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements BlobStoreIndexShardSnapshots snapshots = tuple.v1(); int fileListGeneration = tuple.v2(); - String commitPointName = snapshotBlobName(snapshotId); - try { - blobContainer.deleteBlob(commitPointName); + indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getSnapshot()); } catch (IOException e) { logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); } @@ -356,13 +332,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * Loads information about shard snapshot */ public BlobStoreIndexShardSnapshot loadSnapshot() { - BlobStoreIndexShardSnapshot snapshot; - try (InputStream stream = blobContainer.openInput(snapshotBlobName(snapshotId))) { - snapshot = readSnapshot(stream, parseFieldMatcher); + try { + return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getSnapshot()); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); } - return snapshot; } /** @@ -381,7 +355,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // delete old index files first for (String blobName : blobs.keySet()) { // delete old file lists - if (blobName.startsWith(SNAPSHOT_TEMP_PREFIX) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { + if (indexShardSnapshotsFormat.isTempBlobName(blobName) || blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { try { blobContainer.deleteBlob(blobName); } catch (IOException e) { @@ -408,20 +382,11 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // If we deleted all snapshots - we don't need to create the index file if (snapshots.size() > 0) { - String newSnapshotIndexName = SNAPSHOT_INDEX_PREFIX + fileListGeneration; - try (OutputStream output = blobContainer.createOutput(SNAPSHOT_TEMP_PREFIX + fileListGeneration)) { - StreamOutput stream = compressIfNeeded(output); - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); - newSnapshots.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.flush(); + try { + indexShardSnapshotsFormat.writeAtomic(newSnapshots, blobContainer, Integer.toString(fileListGeneration)); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write file list", e); } - try { - blobContainer.move(SNAPSHOT_TEMP_PREFIX + fileListGeneration, newSnapshotIndexName); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to rename file list", e); - } } } @@ -481,8 +446,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } } if (latest >= 0) { - try (InputStream stream = blobContainer.openInput(SNAPSHOT_INDEX_PREFIX + latest)) { - return new Tuple<>(readSnapshots(stream, parseFieldMatcher), latest); + try { + return new Tuple<>(indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)), latest); } catch (IOException e) { logger.warn("failed to read index file [{}]", e, SNAPSHOT_INDEX_PREFIX + latest); } @@ -491,22 +456,22 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // We couldn't load the index file - falling back to loading individual snapshots List snapshots = Lists.newArrayList(); for (String name : blobs.keySet()) { - if (name.startsWith(SNAPSHOT_PREFIX)) { - try (InputStream stream = blobContainer.openInput(name)) { - BlobStoreIndexShardSnapshot snapshot = readSnapshot(stream, parseFieldMatcher); - snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - } catch (IOException e) { - logger.warn("failed to read commit point [{}]", e, name); + try { + BlobStoreIndexShardSnapshot snapshot = null; + if (name.startsWith(SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); + } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); } + if (snapshot != null) { + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + } + } catch (IOException e) { + logger.warn("failed to read commit point [{}]", e, name); } } return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), -1); } - - protected StreamOutput compressIfNeeded(OutputStream output) throws IOException { - return toStreamOutput(output, isCompress()); - } - } /** @@ -526,7 +491,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @param snapshotStatus snapshot status to report progress */ public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { - super(snapshotId, shardId); + super(snapshotId, Version.CURRENT, shardId); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class); this.snapshotStatus = snapshotStatus; @@ -627,15 +592,14 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements // now create and write the commit point snapshotStatus.updateStage(IndexShardSnapshotStatus.Stage.FINALIZE); - String snapshotBlobName = snapshotBlobName(snapshotId); BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getSnapshot(), snapshotIndexCommit.getGeneration(), indexCommitPointFiles, snapshotStatus.startTime(), // snapshotStatus.startTime() is assigned on the same machine, so it's safe to use with VLong System.currentTimeMillis() - snapshotStatus.startTime(), indexNumberOfFiles, indexTotalFilesSize); //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try (StreamOutput output = compressIfNeeded(blobContainer.createOutput(snapshotBlobName))) { - writeSnapshot(snapshot, output); + try { + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getSnapshot()); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } @@ -815,8 +779,8 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements * @param snapshotShardId shard in the snapshot that data should be restored from * @param recoveryState recovery state to report progress */ - public RestoreContext(SnapshotId snapshotId, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, shardId, snapshotShardId); + public RestoreContext(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { + super(snapshotId, version, shardId, snapshotShardId); store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class); this.recoveryState = recoveryState; } diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java index 6bab901ed5c..0e997c14ec0 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java @@ -29,10 +29,7 @@ import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.store.StoreFileMetaData; import java.io.IOException; @@ -43,7 +40,9 @@ import static com.google.common.collect.Lists.newArrayList; /** * Shard snapshot metadata */ -public class BlobStoreIndexShardSnapshot { +public class BlobStoreIndexShardSnapshot implements ToXContent, FromXContentBuilder { + + public static final BlobStoreIndexShardSnapshot PROTO = new BlobStoreIndexShardSnapshot(); /** * Information about snapshotted file @@ -350,6 +349,19 @@ public class BlobStoreIndexShardSnapshot { this.totalSize = totalSize; } + /** + * Special constructor for the prototype + */ + private BlobStoreIndexShardSnapshot() { + this.snapshot = ""; + this.indexVersion = 0; + this.indexFiles = ImmutableList.of(); + this.startTime = 0; + this.time = 0; + this.numberOfFiles = 0; + this.totalSize = 0; + } + /** * Returns index version * @@ -429,25 +441,24 @@ public class BlobStoreIndexShardSnapshot { /** * Serializes shard snapshot metadata info into JSON * - * @param snapshot shard snapshot metadata * @param builder XContent builder * @param params parameters * @throws IOException */ - public static void toXContent(BlobStoreIndexShardSnapshot snapshot, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field(Fields.NAME, snapshot.snapshot); - builder.field(Fields.INDEX_VERSION, snapshot.indexVersion); - builder.field(Fields.START_TIME, snapshot.startTime); - builder.field(Fields.TIME, snapshot.time); - builder.field(Fields.NUMBER_OF_FILES, snapshot.numberOfFiles); - builder.field(Fields.TOTAL_SIZE, snapshot.totalSize); + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.NAME, snapshot); + builder.field(Fields.INDEX_VERSION, indexVersion); + builder.field(Fields.START_TIME, startTime); + builder.field(Fields.TIME, time); + builder.field(Fields.NUMBER_OF_FILES, numberOfFiles); + builder.field(Fields.TOTAL_SIZE, totalSize); builder.startArray(Fields.FILES); - for (FileInfo fileInfo : snapshot.indexFiles) { + for (FileInfo fileInfo : indexFiles) { FileInfo.toXContent(fileInfo, builder, params); } builder.endArray(); - builder.endObject(); + return builder; } /** @@ -457,7 +468,7 @@ public class BlobStoreIndexShardSnapshot { * @return shard snapshot metadata * @throws IOException */ - public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + public BlobStoreIndexShardSnapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { String snapshot = null; long indexVersion = -1; @@ -467,7 +478,9 @@ public class BlobStoreIndexShardSnapshot { long totalSize = 0; List indexFiles = newArrayList(); - + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } XContentParser.Token token = parser.currentToken(); if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -510,5 +523,4 @@ public class BlobStoreIndexShardSnapshot { return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, ImmutableList.copyOf(indexFiles), startTime, time, numberOfFiles, totalSize); } - } diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index 03e50e05d78..19bf4ee3932 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -24,10 +24,7 @@ import com.google.common.collect.ImmutableMap; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import java.io.IOException; @@ -44,7 +41,10 @@ import static com.google.common.collect.Maps.newHashMap; * This class is used to find files that were already snapshoted and clear out files that no longer referenced by any * snapshots */ -public class BlobStoreIndexShardSnapshots implements Iterable, ToXContent { +public class BlobStoreIndexShardSnapshots implements Iterable, ToXContent, FromXContentBuilder { + + public static final BlobStoreIndexShardSnapshots PROTO = new BlobStoreIndexShardSnapshots(); + private final ImmutableList shardSnapshots; private final ImmutableMap files; private final ImmutableMap> physicalFiles; @@ -103,6 +103,12 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To this.physicalFiles = mapBuilder.build(); } + private BlobStoreIndexShardSnapshots() { + shardSnapshots = ImmutableList.of(); + files = ImmutableMap.of(); + physicalFiles = ImmutableMap.of(); + } + /** * Returns list of snapshots @@ -201,7 +207,6 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To */ @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); // First we list all blobs with their file infos: builder.startArray(Fields.FILES); for (Map.Entry entry : files.entrySet()) { @@ -219,14 +224,15 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To builder.endArray(); builder.endObject(); } - builder.endObject(); - builder.endObject(); return builder; } - public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + public BlobStoreIndexShardSnapshots fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { XContentParser.Token token = parser.currentToken(); + if (token == null) { // New parser + token = parser.nextToken(); + } Map> snapshotsMap = newHashMap(); ImmutableMap.Builder filesBuilder = ImmutableMap.builder(); if (token == XContentParser.Token.START_OBJECT) { diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java new file mode 100644 index 00000000000..0a3e97170aa --- /dev/null +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreFormat.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import com.google.common.collect.Maps; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.*; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; + +/** + * Base class that handles serialization of various data structures during snapshot/restore operations. + */ +public abstract class BlobStoreFormat { + + protected final String blobNameFormat; + + protected final FromXContentBuilder reader; + + protected final ParseFieldMatcher parseFieldMatcher; + + // Serialization parameters to specify correct context for metadata serialization + protected static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; + + static { + Map snapshotOnlyParams = Maps.newHashMap(); + // when metadata is serialized certain elements of the metadata shouldn't be included into snapshot + // exclusion of these elements is done by setting MetaData.CONTEXT_MODE_PARAM to MetaData.CONTEXT_MODE_SNAPSHOT + snapshotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT); + SNAPSHOT_ONLY_FORMAT_PARAMS = new ToXContent.MapParams(snapshotOnlyParams); + } + + /** + * @param blobNameFormat format of the blobname in {@link String#format(Locale, String, Object...)} format + * @param reader the prototype object that can deserialize objects with type T + * @param parseFieldMatcher parse field matcher + */ + protected BlobStoreFormat(String blobNameFormat, FromXContentBuilder reader, ParseFieldMatcher parseFieldMatcher) { + this.reader = reader; + this.blobNameFormat = blobNameFormat; + this.parseFieldMatcher = parseFieldMatcher; + } + + /** + * Reads and parses the blob with given blob name. + * + * @param blobContainer blob container + * @param blobName blob name + * @return parsed blob object + * @throws IOException + */ + public abstract T readBlob(BlobContainer blobContainer, String blobName) throws IOException; + + /** + * Reads and parses the blob with given name, applying name translation using the {link #blobName} method + * + * @param blobContainer blob container + * @param name name to be translated into + * @return parsed blob object + * @throws IOException + */ + public T read(BlobContainer blobContainer, String name) throws IOException { + String blobName = blobName(name); + return readBlob(blobContainer, blobName); + } + + + /** + * Deletes obj in the blob container + */ + public void delete(BlobContainer blobContainer, String name) throws IOException { + blobContainer.deleteBlob(blobName(name)); + } + + /** + * Checks obj in the blob container + */ + public boolean exists(BlobContainer blobContainer, String name) throws IOException { + return blobContainer.blobExists(blobName(name)); + } + + protected String blobName(String name) { + return String.format(Locale.ROOT, blobNameFormat, name); + } + + protected T read(BytesReference bytes) throws IOException { + try (XContentParser parser = XContentHelper.createParser(bytes)) { + T obj = reader.fromXContent(parser, parseFieldMatcher); + return obj; + + } + } +} diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 77853371221..4af5b7c00f4 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -19,18 +19,15 @@ package org.elasticsearch.repositories.blobstore; -import com.fasterxml.jackson.core.JsonParseException; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; - import org.apache.lucene.store.RateLimiter; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -39,7 +36,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; @@ -47,7 +43,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; @@ -61,12 +56,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.snapshots.InvalidSnapshotNameException; -import org.elasticsearch.snapshots.Snapshot; -import org.elasticsearch.snapshots.SnapshotCreationException; -import org.elasticsearch.snapshots.SnapshotException; -import org.elasticsearch.snapshots.SnapshotMissingException; -import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.*; import java.io.FileNotFoundException; import java.io.IOException; @@ -93,13 +83,13 @@ import static com.google.common.collect.Lists.newArrayList; * STORE_ROOT * |- index - list of all snapshot name as JSON array * |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010" - * |- metadata-20131010 - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) + * |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) * |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011" - * |- metadata-20131011 - JSON serialized MetaData for snapshot "20131011" + * |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011" * ..... * |- indices/ - data for all indices * |- foo/ - data for index "foo" - * | |- snapshot-20131010 - JSON Serialized IndexMetaData for index "foo" + * | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo" * | |- 0/ - data for shard "0" of index "foo" * | | |- __1 \ * | | |- __2 | @@ -107,8 +97,9 @@ import static com.google.common.collect.Lists.newArrayList; * | | |- __4 | * | | |- __5 / * | | ..... - * | | |- snapshot-20131010 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131010" - * | | |- snapshot-20131011 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011" + * | | |- snap-20131010.dat - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131010" + * | | |- snap-20131011.dat - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011" + * | | |- list-123 - JSON serialized BlobStoreIndexShardSnapshot for snapshot "20131011" * | | * | |- 1/ - data for shard "1" of index "foo" * | | |- __1 @@ -128,24 +119,35 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent globalMetaDataFormat; + + private LegacyBlobStoreFormat globalMetaDataLegacyFormat; + + private ChecksumBlobStoreFormat indexMetaDataFormat; + + private LegacyBlobStoreFormat indexMetaDataLegacyFormat; + + private ChecksumBlobStoreFormat snapshotFormat; + + private LegacyBlobStoreFormat snapshotLegacyFormat; /** * Constructs new BlobStoreRepository @@ -166,9 +179,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snpashotOnlyParams = Maps.newHashMap(); - snpashotOnlyParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_SNAPSHOT); - snapshotOnlyFormatParams = new ToXContent.MapParams(snpashotOnlyParams); snapshotRateLimiter = getRateLimiter(repositorySettings, "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(repositorySettings, "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); } @@ -180,6 +190,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent(METADATA_CODEC, METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher, isCompress()); + globalMetaDataLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_METADATA_NAME_FORMAT, MetaData.PROTO, parseFieldMatcher); + + indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT, IndexMetaData.PROTO, parseFieldMatcher, isCompress()); + indexMetaDataLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, IndexMetaData.PROTO, parseFieldMatcher); + + snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT, Snapshot.PROTO, parseFieldMatcher, isCompress()); + snapshotLegacyFormat = new LegacyBlobStoreFormat<>(LEGACY_SNAPSHOT_NAME_FORMAT, Snapshot.PROTO, parseFieldMatcher); } /** @@ -241,26 +261,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indices, MetaData metaData) { try { - String snapshotBlobName = snapshotBlobName(snapshotId); - if (snapshotsBlobContainer.blobExists(snapshotBlobName)) { + if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot()) || + snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) { throw new InvalidSnapshotNameException(snapshotId, "snapshot with such name already exists"); } // Write Global MetaData - // TODO: Check if metadata needs to be written - try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(metaDataBlobName(snapshotId, false)))) { - writeGlobalMetaData(metaData, output); - } + globalMetaDataFormat.write(metaData, snapshotsBlobContainer, snapshotId.getSnapshot()); for (String index : indices) { final IndexMetaData indexMetaData = metaData.index(index); final BlobPath indexPath = basePath().add("indices").add(index); final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); - try (StreamOutput output = compressIfNeeded(indexMetaDataBlobContainer.createOutput(snapshotBlobName(snapshotId)))) { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, output); - builder.startObject(); - IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - builder.close(); - } + indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getSnapshot()); } } catch (IOException ex) { throw new SnapshotCreationException(snapshotId, ex); @@ -279,7 +290,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshotIds = snapshots(); @@ -324,7 +332,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indices, long startTime, String failure, int totalShards, List shardFailures) { try { - String tempBlobName = tempSnapshotBlobName(snapshotId); - String blobName = snapshotBlobName(snapshotId); Snapshot blobStoreSnapshot = new Snapshot(snapshotId.getSnapshot(), indices, startTime, failure, System.currentTimeMillis(), totalShards, shardFailures); - try (StreamOutput output = compressIfNeeded(snapshotsBlobContainer.createOutput(tempBlobName))) { - writeSnapshot(blobStoreSnapshot, output); - } - snapshotsBlobContainer.move(tempBlobName, blobName); + snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getSnapshot()); List snapshotIds = snapshots(); if (!snapshotIds.contains(snapshotId)) { snapshotIds = ImmutableList.builder().addAll(snapshotIds).add(snapshotId).build(); @@ -401,14 +383,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshots = newArrayList(); Map blobs; try { - blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOT_PREFIX); + blobs = snapshotsBlobContainer.listBlobsByPrefix(COMMON_SNAPSHOT_PREFIX); } catch (UnsupportedOperationException ex) { // Fall back in case listBlobsByPrefix isn't supported by the blob store return readSnapshotList(); } int prefixLength = SNAPSHOT_PREFIX.length(); + int suffixLength = SNAPSHOT_SUFFIX.length(); + int legacyPrefixLength = LEGACY_SNAPSHOT_PREFIX.length(); for (BlobMetaData md : blobs.values()) { - String name = md.name().substring(prefixLength); + String blobName = md.name(); + final String name; + if (blobName.startsWith(SNAPSHOT_PREFIX) && blobName.length() > legacyPrefixLength) { + name = blobName.substring(prefixLength, blobName.length() - suffixLength); + } else if (blobName.startsWith(LEGACY_SNAPSHOT_PREFIX) && blobName.length() > suffixLength + prefixLength) { + name = blobName.substring(legacyPrefixLength); + } else { + // not sure what it was - ignore + continue; + } snapshots.add(new SnapshotId(repositoryName, name)); } return ImmutableList.copyOf(snapshots); @@ -431,24 +424,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indices, boolean ignoreIndexErrors) throws IOException { - return readSnapshotMetaData(snapshotId, legacyMetaData(snapshotVersion), indices, ignoreIndexErrors); - } - - private MetaData readSnapshotMetaData(SnapshotId snapshotId, boolean legacy, List indices, boolean ignoreIndexErrors) throws IOException { MetaData metaData; - try (InputStream blob = snapshotsBlobContainer.openInput(metaDataBlobName(snapshotId, legacy))) { - metaData = readMetaData(ByteStreams.toByteArray(blob)); + if (snapshotVersion == null) { + // When we delete corrupted snapshots we might not know which version we are dealing with + // We can try detecting the version based on the metadata file format + assert ignoreIndexErrors; + if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) { + snapshotVersion = Version.CURRENT; + } else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getSnapshot())) { + snapshotVersion = Version.V_1_0_0; + } else { + throw new SnapshotMissingException(snapshotId); + } + } + try { + metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getSnapshot()); } catch (FileNotFoundException | NoSuchFileException ex) { throw new SnapshotMissingException(snapshotId, ex); } catch (IOException ex) { @@ -458,28 +464,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent globalMetaDataFormat(Version version) { + if(legacyMetaData(version)) { + return globalMetaDataLegacyFormat; } else { - return METADATA_PREFIX + snapshotId.getSnapshot() + METADATA_SUFFIX; + return globalMetaDataFormat; + } + } + + /** + * Returns appropriate snapshot format based on the provided version of the snapshot + */ + private BlobStoreFormat snapshotFormat(Version version) { + if(legacyMetaData(version)) { + return snapshotLegacyFormat; + } else { + return snapshotFormat; } } @@ -592,38 +522,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent indexMetaDataFormat(Version version) { + if(legacyMetaData(version)) { + return indexMetaDataLegacyFormat; + } else { + return indexMetaDataFormat; + } } /** @@ -635,18 +546,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent snapshots) throws IOException { - BytesStreamOutput bStream = new BytesStreamOutput(); - StreamOutput stream = compressIfNeeded(bStream); - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); - builder.startObject(); - builder.startArray("snapshots"); - for (SnapshotId snapshot : snapshots) { - builder.value(snapshot.getSnapshot()); + final BytesReference bRef; + try(BytesStreamOutput bStream = new BytesStreamOutput()) { + try(StreamOutput stream = new OutputStreamStreamOutput(bStream)) { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); + builder.startObject(); + builder.startArray("snapshots"); + for (SnapshotId snapshot : snapshots) { + builder.value(snapshot.getSnapshot()); + } + builder.endArray(); + builder.endObject(); + builder.close(); + } + bRef = bStream.bytes(); } - builder.endArray(); - builder.endObject(); - builder.close(); - BytesReference bRef = bStream.bytes(); + snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE); try (OutputStream output = snapshotsBlobContainer.createOutput(SNAPSHOTS_FILE)) { bRef.writeTo(output); } diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java new file mode 100644 index 00000000000..cc1323e9207 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import com.google.common.io.ByteStreams; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; +import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.gateway.CorruptStateException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Locale; + +/** + * Snapshot metadata file format used in v2.0 and above + */ +public class ChecksumBlobStoreFormat extends BlobStoreFormat { + + private static final String TEMP_FILE_PREFIX = "pending-"; + + private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE; + + // The format version + public static final int VERSION = 1; + + private static final int BUFFER_SIZE = 4096; + + protected final XContentType xContentType; + + protected final boolean compress; + + private final String codec; + + /** + * @param codec codec name + * @param blobNameFormat format of the blobname in {@link String#format} format + * @param reader prototype object that can deserialize T from XContent + * @param compress true if the content should be compressed + * @param xContentType content type that should be used for write operations + */ + public ChecksumBlobStoreFormat(String codec, String blobNameFormat, FromXContentBuilder reader, ParseFieldMatcher parseFieldMatcher, boolean compress, XContentType xContentType) { + super(blobNameFormat, reader, parseFieldMatcher); + this.xContentType = xContentType; + this.compress = compress; + this.codec = codec; + } + + /** + * @param codec codec name + * @param blobNameFormat format of the blobname in {@link String#format} format + * @param reader prototype object that can deserialize T from XContent + * @param compress true if the content should be compressed + */ + public ChecksumBlobStoreFormat(String codec, String blobNameFormat, FromXContentBuilder reader, ParseFieldMatcher parseFieldMatcher, boolean compress) { + this(codec, blobNameFormat, reader, parseFieldMatcher, compress, DEFAULT_X_CONTENT_TYPE); + } + + /** + * Reads blob with specified name without resolving the blobName using using {@link #blobName} method. + * + * @param blobContainer blob container + * @param blobName blob name + * @return + * @throws IOException + */ + public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { + try (InputStream inputStream = blobContainer.openInput(blobName)) { + byte[] bytes = ByteStreams.toByteArray(inputStream); + final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + try (ByteArrayIndexInput indexInput = new ByteArrayIndexInput(resourceDesc, bytes)) { + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); + long filePointer = indexInput.getFilePointer(); + long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; + BytesReference bytesReference = new BytesArray(bytes, (int) filePointer, (int) contentSize); + return read(bytesReference); + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + // we trick this into a dedicated exception with the original stacktrace + throw new CorruptStateException(ex); + } + } + } + + /** + * Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods. + *

+ * The blob will be compressed and checksum will be written if required. + * + * Atomic move might be very inefficient on some repositories. It also cannot override existing files. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @throws IOException + */ + public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException { + String blobName = blobName(name); + String tempBlobName = tempBlobName(name); + writeBlob(obj, blobContainer, tempBlobName); + try { + blobContainer.move(tempBlobName, blobName); + } catch (IOException ex) { + // Move failed - try cleaning up + blobContainer.deleteBlob(tempBlobName); + throw ex; + } + } + + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + *

+ * The blob will be compressed and checksum will be written if required. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @throws IOException + */ + public void write(T obj, BlobContainer blobContainer, String name) throws IOException { + String blobName = blobName(name); + writeBlob(obj, blobContainer, blobName); + } + + /** + * Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method. + *

+ * The blob will be compressed and checksum will be written if required. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param blobName blob name + * @throws IOException + */ + protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException { + BytesReference bytes = write(obj); + try (OutputStream outputStream = blobContainer.createOutput(blobName)) { + final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")"; + try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, outputStream, BUFFER_SIZE)) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() throws IOException { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } }) { + bytes.writeTo(indexOutputOutputStream); + } + CodecUtil.writeFooter(indexOutput); + } + } + } + + /** + * Returns true if the blob is a leftover temporary blob. + * + * The temporary blobs might be left after failed atomic write operation. + */ + public boolean isTempBlobName(String blobName) { + return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX); + } + + protected BytesReference write(T obj) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + if (compress) { + try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) { + write(obj, compressedStreamOutput); + } + } else { + write(obj, bytesStreamOutput); + } + return bytesStreamOutput.bytes(); + } + } + + protected void write(T obj, StreamOutput streamOutput) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput)) { + builder.startObject(); + obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + builder.endObject(); + } + } + + + protected String tempBlobName(String name) { + return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name); + } + +} diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/LegacyBlobStoreFormat.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/LegacyBlobStoreFormat.java new file mode 100644 index 00000000000..5fcc8ec47a6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/LegacyBlobStoreFormat.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import com.google.common.io.ByteStreams; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.FromXContentBuilder; +import org.elasticsearch.common.xcontent.ToXContent; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Snapshot metadata file format used before v2.0 + */ +public class LegacyBlobStoreFormat extends BlobStoreFormat { + + /** + * @param blobNameFormat format of the blobname in {@link String#format} format + * @param reader the prototype object that can deserialize objects with type T + */ + public LegacyBlobStoreFormat(String blobNameFormat, FromXContentBuilder reader, ParseFieldMatcher parseFieldMatcher) { + super(blobNameFormat, reader, parseFieldMatcher); + } + + /** + * Reads and parses the blob with given name. + * + * If required the checksum of the blob will be verified. + * + * @param blobContainer blob container + * @param blobName blob name + * @return parsed blob object + * @throws IOException + */ + public T readBlob(BlobContainer blobContainer, String blobName) throws IOException { + try (InputStream inputStream = blobContainer.openInput(blobName)) { + return read(new BytesArray(ByteStreams.toByteArray(inputStream))); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 6b5ca0c7296..d88d0381b3e 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -84,7 +84,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX * {@link StoreRecoveryService#recover(IndexShard, boolean, StoreRecoveryService.RecoveryListener)} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null - * {@code recover} method uses {@link StoreRecoveryService#restore(org.elasticsearch.indices.recovery.RecoveryState)} + * {@code recover} method uses {@link StoreRecoveryService#restore} * method to start shard restore process. *

* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)}, @@ -203,7 +203,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis for (Map.Entry indexEntry : renamedIndices.entrySet()) { String index = indexEntry.getValue(); boolean partial = checkPartial(index); - RestoreSource restoreSource = new RestoreSource(snapshotId, index); + RestoreSource restoreSource = new RestoreSource(snapshotId, snapshot.version(), index); String renamedIndex = indexEntry.getKey(); IndexMetaData snapshotIndexMetaData = metaData.index(index); snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings); diff --git a/core/src/main/java/org/elasticsearch/snapshots/Snapshot.java b/core/src/main/java/org/elasticsearch/snapshots/Snapshot.java index 01ef6a5ba0b..05429eab850 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/Snapshot.java +++ b/core/src/main/java/org/elasticsearch/snapshots/Snapshot.java @@ -20,20 +20,22 @@ package org.elasticsearch.snapshots; import com.google.common.collect.ImmutableList; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.ParseFieldMatcher; +import org.elasticsearch.common.xcontent.*; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import static java.util.Collections.*; + /** * Represent information about snapshot */ -public class Snapshot implements Comparable, ToXContent { +public class Snapshot implements Comparable, ToXContent, FromXContentBuilder { private final String name; @@ -57,6 +59,8 @@ public class Snapshot implements Comparable, ToXContent { private final static List NO_FAILURES = ImmutableList.of(); + public final static Snapshot PROTO = new Snapshot(); + private Snapshot(String name, List indices, SnapshotState state, String reason, Version version, long startTime, long endTime, int totalShard, int successfulShards, List shardFailures) { assert name != null; @@ -86,6 +90,13 @@ public class Snapshot implements Comparable, ToXContent { startTime, endTime, totalShard, totalShard - shardFailures.size(), shardFailures); } + /** + * Special constructor for the prototype object + */ + private Snapshot() { + this("", (List) EMPTY_LIST, 0); + } + private static SnapshotState snapshotState(String reason, List shardFailures) { if (reason == null) { if (shardFailures.isEmpty()) { @@ -221,6 +232,11 @@ public class Snapshot implements Comparable, ToXContent { return result; } + @Override + public Snapshot fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + return fromXContent(parser); + } + static final class Fields { static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); static final XContentBuilderString NAME = new XContentBuilderString("name"); @@ -277,9 +293,14 @@ public class Snapshot implements Comparable, ToXContent { int totalShard = 0; int successfulShards = 0; List shardFailures = NO_FAILURES; - - XContentParser.Token token = parser.currentToken(); - if (token == XContentParser.Token.START_OBJECT) { + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { // on a start object move to next token + parser.nextToken(); + } + XContentParser.Token token; + if ((token = parser.nextToken()) == XContentParser.Token.START_OBJECT) { String currentFieldName = parser.currentName(); if ("snapshot".equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -328,6 +349,8 @@ public class Snapshot implements Comparable, ToXContent { } } } + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); } return new Snapshot(name, indices, state, reason, version, startTime, endTime, totalShard, successfulShards, shardFailures); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java index 150729f2cbf..27fe3de51e0 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotMissingException.java @@ -34,6 +34,10 @@ public class SnapshotMissingException extends SnapshotException { super(snapshot, "is missing", cause); } + public SnapshotMissingException(SnapshotId snapshot) { + super(snapshot, "is missing"); + } + public SnapshotMissingException(StreamInput in) throws IOException { super(in); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index afc45c4d5bb..da796d1b502 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -471,7 +471,7 @@ public class SnapshotsService extends AbstractLifecycleComponent { + public static final BlobObj PROTO = new BlobObj(""); + + private final String text; + + public BlobObj(String text) { + this.text = text; + } + + public String getText() { + return text; + } + + @Override + public BlobObj fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException { + String text = null; + XContentParser.Token token = parser.currentToken(); + if (token == null) { + token = parser.nextToken(); + } + if (token == XContentParser.Token.START_OBJECT) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token != XContentParser.Token.FIELD_NAME) { + throw new ElasticsearchParseException("unexpected token [{}]", token); + } + String currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token.isValue()) { + if ("text" .equals(currentFieldName)) { + text = parser.text(); + } else { + throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName); + } + } else { + throw new ElasticsearchParseException("unexpected token [{}]", token); + } + } + } + if (text == null) { + throw new ElasticsearchParseException("missing mandatory parameter text"); + } + return new BlobObj(text); + } + + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.field("text", getText()); + return builder; + } + } + + /** + * Extends legacy format with writing functionality. It's used to simulate legacy file formats in tests. + */ + private static final class LegacyEmulationBlobStoreFormat extends LegacyBlobStoreFormat { + + protected final XContentType xContentType; + + protected final boolean compress; + + public LegacyEmulationBlobStoreFormat(String blobNameFormat, FromXContentBuilder reader, ParseFieldMatcher parseFieldMatcher, boolean compress, XContentType xContentType) { + super(blobNameFormat, reader, parseFieldMatcher); + this.xContentType = xContentType; + this.compress = compress; + } + + public void write(T obj, BlobContainer blobContainer, String blobName) throws IOException { + BytesReference bytes = write(obj); + try (OutputStream outputStream = blobContainer.createOutput(blobName)) { + bytes.writeTo(outputStream); + } + } + + private BytesReference write(T obj) throws IOException { + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + if (compress) { + try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) { + write(obj, compressedStreamOutput); + } + } else { + write(obj, bytesStreamOutput); + } + return bytesStreamOutput.bytes(); + } + } + + private void write(T obj, StreamOutput streamOutput) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(xContentType, streamOutput); + builder.startObject(); + obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + builder.endObject(); + builder.close(); + } + } + + @Test + public void testBlobStoreOperations() throws IOException { + BlobStore blobStore = createTestBlobStore(); + BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); + ChecksumBlobStoreFormat checksumJSON = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.JSON); + ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.SMILE); + ChecksumBlobStoreFormat checksumSMILECompressed = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, true, XContentType.SMILE); + LegacyEmulationBlobStoreFormat legacyJSON = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.JSON); + LegacyEmulationBlobStoreFormat legacySMILE = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, false, XContentType.SMILE); + LegacyEmulationBlobStoreFormat legacySMILECompressed = new LegacyEmulationBlobStoreFormat<>("%s", BlobObj.PROTO, parseFieldMatcher, true, XContentType.SMILE); + + // Write blobs in different formats + checksumJSON.write(new BlobObj("checksum json"), blobContainer, "check-json"); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile"); + checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp"); + legacyJSON.write(new BlobObj("legacy json"), blobContainer, "legacy-json"); + legacySMILE.write(new BlobObj("legacy smile"), blobContainer, "legacy-smile"); + legacySMILECompressed.write(new BlobObj("legacy smile compressed"), blobContainer, "legacy-smile-comp"); + + // Assert that all checksum blobs can be read by all formats + assertEquals(checksumJSON.read(blobContainer, "check-json").getText(), "checksum json"); + assertEquals(checksumSMILE.read(blobContainer, "check-json").getText(), "checksum json"); + assertEquals(checksumJSON.read(blobContainer, "check-smile").getText(), "checksum smile"); + assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile"); + assertEquals(checksumJSON.read(blobContainer, "check-smile-comp").getText(), "checksum smile compressed"); + assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp").getText(), "checksum smile compressed"); + + // Assert that all legacy blobs can be read be all formats + assertEquals(legacyJSON.read(blobContainer, "legacy-json").getText(), "legacy json"); + assertEquals(legacySMILE.read(blobContainer, "legacy-json").getText(), "legacy json"); + assertEquals(legacyJSON.read(blobContainer, "legacy-smile").getText(), "legacy smile"); + assertEquals(legacySMILE.read(blobContainer, "legacy-smile").getText(), "legacy smile"); + assertEquals(legacyJSON.read(blobContainer, "legacy-smile-comp").getText(), "legacy smile compressed"); + assertEquals(legacySMILE.read(blobContainer, "legacy-smile-comp").getText(), "legacy smile compressed"); + } + + + @Test + public void testCompressionIsApplied() throws IOException { + BlobStore blobStore = createTestBlobStore(); + BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); + StringBuilder veryRedundantText = new StringBuilder(); + for (int i = 0; i < randomIntBetween(100, 300); i++) { + veryRedundantText.append("Blah "); + } + ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, false, randomBoolean() ? XContentType.SMILE : XContentType.JSON); + ChecksumBlobStoreFormat checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, true, randomBoolean() ? XContentType.SMILE : XContentType.JSON); + BlobObj blobObj = new BlobObj(veryRedundantText.toString()); + checksumFormatComp.write(blobObj, blobContainer, "blob-comp"); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp"); + Map blobs = blobContainer.listBlobsByPrefix("blob-"); + assertEquals(blobs.size(), 2); + assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); + } + + @Test + public void testBlobCorruption() throws IOException { + BlobStore blobStore = createTestBlobStore(); + BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); + String testString = randomAsciiOfLength(randomInt(10000)); + BlobObj blobObj = new BlobObj(testString); + ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, randomBoolean(), randomBoolean() ? XContentType.SMILE : XContentType.JSON); + checksumFormat.write(blobObj, blobContainer, "test-path"); + assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString); + randomCorruption(blobContainer, "test-path"); + try { + checksumFormat.read(blobContainer, "test-path"); + fail("Should have failed due to corruption"); + } catch (ElasticsearchCorruptionException ex) { + assertThat(ex.getMessage(), containsString("test-path")); + } catch (EOFException ex) { + // This can happen if corrupt the byte length + } + } + + public void testAtomicWrite() throws Exception { + final BlobStore blobStore = createTestBlobStore(); + final BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); + String testString = randomAsciiOfLength(randomInt(10000)); + final CountDownLatch block = new CountDownLatch(1); + final CountDownLatch unblock = new CountDownLatch(1); + final BlobObj blobObj = new BlobObj(testString) { + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + super.toXContent(builder, params); + // Block before finishing writing + try { + block.countDown(); + unblock.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + return builder; + } + }; + final ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj.PROTO, parseFieldMatcher, randomBoolean(), randomBoolean() ? XContentType.SMILE : XContentType.JSON); + ExecutorService threadPool = Executors.newFixedThreadPool(1); + try { + Future future = threadPool.submit(new Callable() { + @Override + public Void call() throws Exception { + checksumFormat.writeAtomic(blobObj, blobContainer, "test-blob"); + return null; + } + }); + block.await(5, TimeUnit.SECONDS); + assertFalse(blobContainer.blobExists("test-blob")); + unblock.countDown(); + future.get(); + assertTrue(blobContainer.blobExists("test-blob")); + } finally { + threadPool.shutdown(); + } + } + + protected BlobStore createTestBlobStore() throws IOException { + Settings settings = Settings.builder().build(); + return new FsBlobStore(settings, randomRepoPath()); + } + + protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException { + byte[] buffer = new byte[(int) blobContainer.listBlobsByPrefix(blobName).get(blobName).length()]; + long originalChecksum = checksum(buffer); + try (InputStream inputStream = blobContainer.openInput(blobName)) { + Streams.readFully(inputStream, buffer); + } + do { + int location = randomIntBetween(0, buffer.length - 1); + buffer[location] = (byte) (buffer[location] ^ 42); + } while (originalChecksum == checksum(buffer)); + try (OutputStream outputStream = blobContainer.createOutput(blobName)) { + Streams.copy(buffer, outputStream); + } + } + + private long checksum(byte[] buffer) throws IOException { + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { + try (BufferedChecksumStreamOutput checksumOutput = new BufferedChecksumStreamOutput(streamOutput)) { + checksumOutput.write(buffer); + return checksumOutput.getChecksum(); + } + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 8d81a8c67e5..bf2e3aa98f7 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -906,7 +906,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> truncate snapshot file to make it unreadable"); - Path snapshotPath = repo.resolve("snapshot-test-snap-1"); + Path snapshotPath = repo.resolve("snap-test-snap-1.dat"); try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { outChan.truncate(randomInt(10)); }