Merge pull request #19421 from abeyad/snapshot-uuids-in-blob-names

Snapshot UUIDs in blob names
This commit is contained in:
Ali Beyad 2016-07-31 00:20:37 -04:00 committed by GitHub
commit ce8881513d
31 changed files with 1122 additions and 366 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
@ -70,12 +71,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private final boolean includeGlobalState;
private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices;
private final List<IndexId> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@ -111,7 +112,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return state;
}
public List<String> indices() {
public List<IndexId> indices() {
return indices;
}
@ -377,9 +378,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>();
List<IndexId> indexBuilder = new ArrayList<>();
for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString());
indexBuilder.add(new IndexId(in.readString(), in.readString()));
}
long startTime = in.readLong();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
@ -410,8 +411,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
out.writeBoolean(entry.partial());
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (String index : entry.indices()) {
out.writeString(index);
for (IndexId index : entry.indices()) {
index.writeTo(out);
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
@ -458,8 +459,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
builder.field(STATE, entry.state());
builder.startArray(INDICES);
{
for (String index : entry.indices()) {
builder.value(index);
for (IndexId index : entry.indices()) {
index.toXContent(builder, params);
}
}
builder.endArray();

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.nio.file.NoSuchFileException;
import java.util.Map;
/**
@ -53,7 +53,8 @@ public interface BlobContainer {
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
* @throws IOException if the blob does not exist or can not be read.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
InputStream readBlob(String blobName) throws IOException;
@ -95,7 +96,8 @@ public interface BlobContainer {
*
* @param blobName
* The name of the blob to delete.
* @throws IOException if the blob does not exist, or if the blob exists but could not be deleted.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob exists but could not be deleted.
*/
void deleteBlob(String blobName) throws IOException;

View File

@ -27,13 +27,16 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.io.Streams;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
@ -95,14 +98,18 @@ public class FsBlobContainer extends AbstractBlobContainer {
@Override
public InputStream readBlob(String name) throws IOException {
return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes());
final Path resolvedPath = path.resolve(name);
try {
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final Path file = path.resolve(blobName);
// TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files?
try (OutputStream outputStream = Files.newOutputStream(file)) {
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
}
IOUtils.fsync(file, false);

View File

@ -20,14 +20,11 @@
package org.elasticsearch.common.blobstore.support;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
/**
* A base abstract blob container that implements higher level container methods.

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import java.io.IOException;
@ -394,10 +395,12 @@ final class StoreRecovery {
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
if (!shardId.getIndexName().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
final String indexName = restoreSource.index();
if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
/**
* Represents a single snapshotted index in the repository.
*/
public final class IndexId implements Writeable, ToXContent {
protected static final String NAME = "name";
protected static final String ID = "id";
private final String name;
private final String id;
public IndexId(final String name, final String id) {
this.name = name;
this.id = id;
}
public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
}
/**
* The name of the index.
*/
public String getName() {
return name;
}
/**
* The unique ID for the index within the repository. This is *not* the same as the
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
* use to name blobs for the index.
*
* We could not use the index's actual UUID (See {@link Index#getUUID()}) because in the
* case of snapshot/restore, the index UUID in the snapshotted index will be different
* from the index UUID assigned to it when it is restored. Hence, the actual index UUID
* is not useful in the context of snapshot/restore for tying a snapshotted index to the
* index it was snapshot from, and so we are using a separate UUID here.
*/
public String getId() {
return id;
}
@Override
public String toString() {
return "[" + name + "/" + id + "]";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
@SuppressWarnings("unchecked") IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(name, id);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.endObject();
return builder;
}
}

View File

@ -47,7 +47,7 @@ import java.util.List;
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
@ -88,15 +88,14 @@ public interface Repository extends LifecycleComponent {
* @param indices list of indices
* @return information about snapshot
*/
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;
/**
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name.
* To get all snapshots, the predicate filter should return true regardless of the input.
*
* @return snapshot list
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* if there was an error in reading the data.
*/
List<SnapshotId> getSnapshots();
RepositoryData getRepositoryData();
/**
* Starts snapshotting process
@ -105,7 +104,7 @@ public interface Repository extends LifecycleComponent {
* @param indices list of indices to be snapshotted
* @param metaData cluster metadata
*/
void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData);
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
/**
* Finalizes snapshotting process
@ -113,12 +112,14 @@ public interface Repository extends LifecycleComponent {
* This method is called on master after all shards are snapshotted.
*
* @param snapshotId snapshot id
* @param indices list of indices in the snapshot
* @param startTime start time of the snapshot
* @param failure global failure reason or null
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
/**
* Deletes snapshot
@ -181,10 +182,11 @@ public interface Repository extends LifecycleComponent {
*
* @param shard shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/**
* Restores snapshot of the shard.
@ -194,20 +196,22 @@ public interface Repository extends LifecycleComponent {
* @param shard the shard to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);
/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId the snapshotted index id for the shard to get status for
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId);
}

View File

@ -0,0 +1,311 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A class that represents the data in a repository, as captured in the
* repository's index blob.
*/
public final class RepositoryData implements ToXContent {
public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap());
/**
* The ids of the snapshots in the repository.
*/
private final List<SnapshotId> snapshotIds;
/**
* The indices found in the repository across all snapshots, as a name to {@link IndexId} mapping
*/
private final Map<String, IndexId> indices;
/**
* The snapshots that each index belongs to.
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
public RepositoryData(List<SnapshotId> snapshotIds, Map<IndexId, Set<SnapshotId>> indexSnapshots) {
this.snapshotIds = Collections.unmodifiableList(snapshotIds);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet()
.stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
}
protected RepositoryData copy() {
return new RepositoryData(snapshotIds, indexSnapshots);
}
/**
* Returns an unmodifiable list of the snapshot ids.
*/
public List<SnapshotId> getSnapshotIds() {
return snapshotIds;
}
/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
public Map<String, IndexId> getIndices() {
return indices;
}
/**
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
* already exists in the repository data, this method throws an IllegalArgumentException.
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId, final List<IndexId> snapshottedIndices) {
if (snapshotIds.contains(snapshotId)) {
throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data");
}
List<SnapshotId> snapshots = new ArrayList<>(snapshotIds);
snapshots.add(snapshotId);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : snapshottedIndices) {
if (allIndexSnapshots.containsKey(indexId)) {
Set<SnapshotId> ids = allIndexSnapshots.get(indexId);
if (ids == null) {
ids = new LinkedHashSet<>();
allIndexSnapshots.put(indexId, ids);
}
ids.add(snapshotId);
} else {
Set<SnapshotId> ids = new LinkedHashSet<>();
ids.add(snapshotId);
allIndexSnapshots.put(indexId, ids);
}
}
return new RepositoryData(snapshots, allIndexSnapshots);
}
/**
* Initializes the indices in the repository metadata; returns a new instance.
*/
public RepositoryData initIndices(final Map<IndexId, Set<SnapshotId>> indexSnapshots) {
return new RepositoryData(snapshotIds, indexSnapshots);
}
/**
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
*/
public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
List<SnapshotId> newSnapshotIds = snapshotIds
.stream()
.filter(id -> snapshotId.equals(id) == false)
.collect(Collectors.toList());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Set<SnapshotId> set;
Set<SnapshotId> snapshotIds = this.indexSnapshots.get(indexId);
assert snapshotIds != null;
if (snapshotIds.contains(snapshotId)) {
if (snapshotIds.size() == 1) {
// removing the snapshot will mean no more snapshots have this index, so just skip over it
continue;
}
set = new LinkedHashSet<>(snapshotIds);
set.remove(snapshotId);
} else {
set = snapshotIds;
}
indexSnapshots.put(indexId, set);
}
return new RepositoryData(newSnapshotIds, indexSnapshots);
}
/**
* Returns an immutable collection of the snapshot ids for the snapshots that contain the given index.
*/
public Set<SnapshotId> getSnapshots(final IndexId indexId) {
Set<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
if (snapshotIds == null) {
throw new IllegalArgumentException("unknown snapshot index " + indexId + "");
}
return snapshotIds;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked") RepositoryData that = (RepositoryData) obj;
return snapshotIds.equals(that.snapshotIds)
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots);
}
@Override
public int hashCode() {
return Objects.hash(snapshotIds, indices, indexSnapshots);
}
/**
* Resolve the index name to the index id specific to the repository,
* throwing an exception if the index could not be resolved.
*/
public IndexId resolveIndexId(final String indexName) {
if (indices.containsKey(indexName)) {
return indices.get(indexName);
} else {
// on repositories created before 5.0, there was no indices information in the index
// blob, so if the repository hasn't been updated with new snapshots, no new index blob
// would have been written, so we only have old snapshots without the index information.
// in this case, the index id is just the index name
return new IndexId(indexName, indexName);
}
}
/**
* Resolve the given index names to index ids.
*/
public List<IndexId> resolveIndices(final List<String> indices) {
List<IndexId> resolvedIndices = new ArrayList<>(indices.size());
for (final String indexName : indices) {
resolvedIndices.add(resolveIndexId(indexName));
}
return resolvedIndices;
}
/**
* Resolve the given index names to index ids, creating new index ids for
* new indices in the repository.
*/
public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
final IndexId indexId;
if (indices.containsKey(index)) {
indexId = indices.get(index);
} else {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
}
snapshotIndices.add(indexId);
}
return snapshotIndices;
}
private static final String SNAPSHOTS = "snapshots";
private static final String INDICES = "indices";
private static final String INDEX_ID = "id";
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
for (final SnapshotId snapshot : getSnapshotIds()) {
snapshot.toXContent(builder, params);
}
builder.endArray();
// write the indices map
builder.startObject(INDICES);
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
builder.startArray(SNAPSHOTS);
Set<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
assert snapshotIds != null;
for (final SnapshotId snapshotId : snapshotIds) {
snapshotId.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
}
public static RepositoryData fromXContent(final XContentParser parser) throws IOException {
List<SnapshotId> snapshots = new ArrayList<>();
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if (SNAPSHOTS.equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshots.add(SnapshotId.fromXContent(parser));
}
} else {
throw new ElasticsearchParseException("expected array for [" + currentFieldName + "]");
}
} else if (INDICES.equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected [indices]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String indexName = parser.currentName();
String indexId = null;
Set<SnapshotId> snapshotIds = new LinkedHashSet<>();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected index[" + indexName + "]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String indexMetaFieldName = parser.currentName();
parser.nextToken();
if (INDEX_ID.equals(indexMetaFieldName)) {
indexId = parser.text();
} else if (SNAPSHOTS.equals(indexMetaFieldName)) {
if (parser.currentToken() != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("start array expected [snapshots]");
}
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshotIds.add(SnapshotId.fromXContent(parser));
}
}
}
assert indexId != null;
indexSnapshots.put(new IndexId(indexName, indexId), snapshotIds);
}
} else {
throw new ElasticsearchParseException("unknown field name [" + currentFieldName + "]");
}
}
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(snapshots, indexSnapshots);
}
}

View File

@ -45,6 +45,8 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
@ -58,6 +60,8 @@ import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings;
@ -103,6 +107,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -119,14 +124,14 @@ import static java.util.Collections.unmodifiableMap;
* {@code
* STORE_ROOT
* |- index-N - list of all snapshot name as JSON array, N is the generation of the file
* |- index-latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
* |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010"
* |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
* |- snap-20131010 - JSON serialized Snapshot for snapshot "20131010"
* |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
* |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011"
* |- snap-20131011 - JSON serialized Snapshot for snapshot "20131011"
* |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011"
* .....
* |- indices/ - data for all indices
* |- foo/ - data for index "foo"
* |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id of Ac1342-B_x in the repository
* | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo"
* | |- 0/ - data for shard "0" of index "foo"
* | | |- __1 \
@ -146,7 +151,7 @@ import static java.util.Collections.unmodifiableMap;
* | |-2/
* | ......
* |
* |- bar/ - data for index bar
* |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository
* ......
* }
* </pre>
@ -163,13 +168,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String SNAPSHOT_PREFIX = "snap-";
protected static final String SNAPSHOT_CODEC = "snapshot";
private static final String SNAPSHOT_CODEC = "snapshot";
static final String SNAPSHOTS_FILE = "index"; // package private for unit testing
private static final String SNAPSHOTS_FILE_PREFIX = "index-";
private static final String INDEX_FILE_PREFIX = "index-";
private static final String SNAPSHOTS_INDEX_LATEST_BLOB = "index.latest";
private static final String INDEX_LATEST_BLOB = "index.latest";
private static final String TESTS_FILE = "tests-";
@ -305,7 +310,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
@ -315,28 +320,69 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (getSnapshots().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with the same name already exists");
}
if (snapshotFormat.exists(snapshotsBlobContainer, blobId(snapshotId)) ||
if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) ||
snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) {
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists");
}
// Write Global MetaData
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, snapshotName);
for (String index : indices) {
final IndexMetaData indexMetaData = clusterMetadata.index(index);
final BlobPath indexPath = basePath().add("indices").add(index);
globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, snapshotId.getUUID());
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotName);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
}
}
// Older repository index files (index-N) only contain snapshot info, not indices info,
// so if the repository data is of the older format, populate it with the indices entries
// so we know which indices of snapshots have blob ids in the older format.
private RepositoryData upgradeRepositoryData(final RepositoryData repositoryData) throws IOException {
final Map<IndexId, Set<SnapshotId>> indexToSnapshots = new HashMap<>();
for (final SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
final SnapshotInfo snapshotInfo;
try {
snapshotInfo = getSnapshotInfo(snapshotId);
} catch (SnapshotException e) {
logger.warn("[{}] repository is on a pre-5.0 format with an index file that contains snapshot [{}] but " +
"the corresponding snap-{}.dat file cannot be read. The snapshot will no longer be included in " +
"the repository but its data directories will remain.", e, getMetadata().name(),
snapshotId, snapshotId.getUUID());
continue;
}
for (final String indexName : snapshotInfo.indices()) {
final IndexId indexId = new IndexId(indexName, indexName);
if (indexToSnapshots.containsKey(indexId)) {
indexToSnapshots.get(indexId).add(snapshotId);
} else {
indexToSnapshots.put(indexId, Sets.newHashSet(snapshotId));
}
}
}
try {
final RepositoryData updatedRepoData = repositoryData.initIndices(indexToSnapshots);
if (isReadOnly() == false) {
// write the new index gen file with the indices included
writeIndexGen(updatedRepoData);
}
return updatedRepoData;
} catch (IOException e) {
throw new RepositoryException(metadata.name(), "failed to update the repository index blob with indices data on startup", e);
}
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
List<String> indices = Collections.emptyList();
SnapshotInfo snapshot = null;
try {
@ -350,64 +396,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
MetaData metaData = null;
try {
if (snapshot != null) {
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true);
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), repositoryData.resolveIndices(indices), true);
} else {
metaData = readSnapshotMetaData(snapshotId, null, indices, true);
metaData = readSnapshotMetaData(snapshotId, null, repositoryData.resolveIndices(indices), true);
}
} catch (IOException | SnapshotException ex) {
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
}
try {
final String snapshotName = snapshotId.getName();
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK
if (snapshot != null) {
try {
snapshotFormat(snapshot.version()).delete(snapshotsBlobContainer, blobId(snapshotId));
} catch (IOException e) {
logger.debug("snapshotFormat failed to delete snapshot [{}]", snapshotId);
}
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
writeIndexGen(repositoryData.removeSnapshot(snapshotId));
try {
globalMetaDataFormat(snapshot.version()).delete(snapshotsBlobContainer, snapshotName);
} catch (IOException e) {
logger.debug("gloalMetaDataFormat failed to delete snapshot [{}]");
}
} else {
// We don't know which version was the snapshot created with - try deleting both current and legacy formats
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId(snapshotId));
} catch (IOException e) {
logger.debug("snapshotFormat failed to delete snapshot [{}]");
}
try {
snapshotLegacyFormat.delete(snapshotsBlobContainer, snapshotName);
} catch (IOException e) {
logger.debug("snapshotLegacyFormat failed to delete snapshot [{}]");
}
try {
globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, snapshotName);
} catch (IOException e) {
logger.debug("globalMetaDataLegacyFormat failed to delete snapshot [{}]");
}
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, snapshotName);
} catch (IOException e) {
logger.debug("globalMetaDataFormat failed to delete snapshot [{}]");
}
}
// Delete snapshot from the snapshot list
List<SnapshotId> snapshotIds = getSnapshots().stream().filter(id -> snapshotId.equals(id) == false).collect(Collectors.toList());
writeSnapshotsToIndexGen(snapshotIds);
// delete the snapshot file
safeSnapshotBlobDelete(snapshot, snapshotId.getUUID());
// delete the global metadata file
safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getUUID());
// Now delete all indices
for (String index : indices) {
BlobPath indexPath = basePath().add("indices").add(index);
final IndexId indexId = repositoryData.resolveIndexId(index);
BlobPath indexPath = basePath().add("indices").add(indexId.getId());
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try {
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getName());
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getUUID());
} catch (IOException ex) {
logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index);
}
@ -416,7 +427,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (indexMetaData != null) {
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
try {
delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId));
delete(snapshotId, snapshot.version(), indexId, new ShardId(indexMetaData.getIndex(), shardId));
} catch (SnapshotException ex) {
logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId);
}
@ -429,28 +440,77 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
snapshotFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn("[{}] Unable to delete snapshot file [{}]", e, snapshotInfo.snapshotId(), blobId);
}
} else {
// we don't know the version, first try the current format, then the legacy format
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// now try legacy format
try {
snapshotLegacyFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e2) {
// neither snapshot file could be deleted, log the error
logger.warn("Unable to delete snapshot file [{}]", e, blobId);
}
}
}
}
private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
globalMetaDataFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn("[{}] Unable to delete global metadata file [{}]", e, snapshotInfo.snapshotId(), blobId);
}
} else {
// we don't know the version, first try the current format, then the legacy format
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// now try legacy format
try {
globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e2) {
// neither global metadata file could be deleted, log the error
logger.warn("Unable to delete global metadata file [{}]", e, blobId);
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<String> indices,
final List<IndexId> indices,
final long startTime,
final String failure,
final int totalShards,
final List<SnapshotShardFailure> shardFailures) {
try {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime,
failure,
System.currentTimeMillis(),
totalShards,
shardFailures);
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, blobId(snapshotId));
List<SnapshotId> snapshotIds = getSnapshots();
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();
List<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
if (!snapshotIds.contains(snapshotId)) {
snapshotIds = new ArrayList<>(snapshotIds);
snapshotIds.add(snapshotId);
snapshotIds = Collections.unmodifiableList(snapshotIds);
writeSnapshotsToIndexGen(snapshotIds);
writeIndexGen(repositoryData.addSnapshot(snapshotId, indices));
}
return blobStoreSnapshot;
} catch (IOException ex) {
@ -458,27 +518,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
@Override
public List<SnapshotId> getSnapshots() {
try {
return Collections.unmodifiableList(readSnapshotsFromIndex());
} catch (NoSuchFileException | FileNotFoundException e) {
// its a fresh repository, no index file exists, so return an empty list
return Collections.emptyList();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "failed to list snapshots in repository", ioe);
}
return getRepositoryData().getSnapshotIds();
}
@Override
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false);
}
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return snapshotFormat.read(snapshotsBlobContainer, blobId(snapshotId));
return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
} catch (FileNotFoundException | NoSuchFileException ex) {
// File is missing - let's try legacy format instead
try {
@ -493,13 +545,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException {
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<IndexId> indices, boolean ignoreIndexErrors) throws IOException {
MetaData metaData;
if (snapshotVersion == null) {
// When we delete corrupted snapshots we might not know which version we are dealing with
// We can try detecting the version based on the metadata file format
assert ignoreIndexErrors;
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) {
snapshotVersion = Version.CURRENT;
} else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old");
@ -508,21 +560,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
try {
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getName());
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getUUID());
} catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
}
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
for (String index : indices) {
BlobPath indexPath = basePath().add("indices").add(index);
for (IndexId index : indices) {
BlobPath indexPath = basePath().add("indices").add(index.getId());
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try {
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getName()), false);
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getUUID()), false);
} catch (ElasticsearchParseException | IOException ex) {
if (ignoreIndexErrors) {
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index);
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index.getName());
} else {
throw ex;
}
@ -590,10 +642,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
private static final String SNAPSHOTS = "snapshots";
private static final String NAME = "name";
private static final String UUID = "uuid";
@Override
public long getSnapshotThrottleTimeInNanos() {
return snapshotRateLimitingTimeInNanos.count();
@ -637,6 +685,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
@Override
public RepositoryData getRepositoryData() {
try {
final long indexGen = latestIndexBlobId();
final String snapshotsIndexBlobName;
final boolean legacyFormat;
if (indexGen == -1) {
// index-N file doesn't exist, either its a fresh repository, or its in the
// old format, so look for the older index file before returning an empty list
snapshotsIndexBlobName = SNAPSHOTS_FILE;
legacyFormat = true;
} else {
snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
legacyFormat = false;
}
RepositoryData repositoryData;
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
try (XContentParser parser = XContentHelper.createParser(out.bytes())) {
repositoryData = RepositoryData.fromXContent(parser);
}
}
if (legacyFormat) {
// pre 5.0 repository data needs to be updated to include the indices
repositoryData = upgradeRepositoryData(repositoryData);
}
return repositoryData;
} catch (NoSuchFileException nsfe) {
// repository doesn't have an index blob, its a new blank repo
return RepositoryData.EMPTY;
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
}
public static String testBlobPrefix(String seed) {
return TESTS_FILE + seed;
}
@ -651,35 +736,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return snapshotsBlobContainer;
}
protected void writeSnapshotsToIndexGen(final List<SnapshotId> snapshots) throws IOException {
protected void writeIndexGen(final RepositoryData repositoryData) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject();
builder.startArray(SNAPSHOTS);
for (SnapshotId snapshot : snapshots) {
builder.startObject();
builder.field(NAME, snapshot.getName());
builder.field(UUID, snapshot.getUUID());
builder.endObject();
}
builder.endArray();
builder.endObject();
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
}
snapshotsBytes = bStream.bytes();
}
final long gen = latestIndexBlobId() + 1;
// write the index file
writeAtomic(SNAPSHOTS_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (isReadOnly() == false && gen - 2 >= 0) {
final String oldSnapshotIndexFile = SNAPSHOTS_FILE_PREFIX + Long.toString(gen - 2);
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2);
if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) {
snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile);
}
// delete the old index file (non-generational) if it exists
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE)) {
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE);
}
}
// write the current generation to the index-latest file
@ -688,72 +768,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
bStream.writeLong(gen);
genBytes = bStream.bytes();
}
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_INDEX_LATEST_BLOB)) {
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_INDEX_LATEST_BLOB);
if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) {
snapshotsBlobContainer.deleteBlob(INDEX_LATEST_BLOB);
}
writeAtomic(SNAPSHOTS_INDEX_LATEST_BLOB, genBytes);
}
protected List<SnapshotId> readSnapshotsFromIndex() throws IOException {
final long indexGen = latestIndexBlobId();
final String snapshotsIndexBlobName;
if (indexGen == -1) {
// index-N file doesn't exist, either its a fresh repository, or its in the
// old format, so look for the older index file before returning an empty list
snapshotsIndexBlobName = SNAPSHOTS_FILE;
} else {
snapshotsIndexBlobName = SNAPSHOTS_FILE_PREFIX + Long.toString(indexGen);
}
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(out.bytes())) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if (SNAPSHOTS.equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
// the new format from 5.0 which contains the snapshot name and uuid
String name = null;
String uuid = null;
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(currentFieldName)) {
name = parser.text();
} else if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
snapshots.add(new SnapshotId(name, uuid));
}
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
else {
name = parser.text();
snapshots.add(new SnapshotId(name, SnapshotId.UNASSIGNED_UUID));
}
}
}
}
}
}
}
return Collections.unmodifiableList(snapshots);
}
}
// Package private for testing
static String blobId(final SnapshotId snapshotId) {
final String uuid = snapshotId.getUUID();
if (uuid.equals(SnapshotId.UNASSIGNED_UUID)) {
// the old snapshot blob naming
return snapshotId.getName();
}
return snapshotId.getName() + "-" + uuid;
writeAtomic(INDEX_LATEST_BLOB, genBytes);
}
/**
@ -790,7 +808,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// package private for testing
long readSnapshotIndexLatestBlob() throws IOException {
try (InputStream blob = snapshotsBlobContainer.readBlob(SNAPSHOTS_INDEX_LATEST_BLOB)) {
try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
return Numbers.bytesToLong(out.bytes().toBytesRef());
@ -798,7 +816,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
private long listBlobsToGetLatestIndexId() throws IOException {
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOTS_FILE_PREFIX);
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
long latest = -1;
if (blobs.isEmpty()) {
// no snapshot index blobs have been written yet
@ -807,7 +825,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
for (final BlobMetaData blobMetaData : blobs.values()) {
final String blobName = blobMetaData.name();
try {
final long curr = Long.parseLong(blobName.substring(SNAPSHOTS_FILE_PREFIX.length()));
final long curr = Long.parseLong(blobName.substring(INDEX_FILE_PREFIX.length()));
latest = Math.max(latest, curr);
} catch (NumberFormatException nfe) {
// the index- blob wasn't of the format index-N where N is a number,
@ -830,9 +848,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus);
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus);
snapshotStatus.startTime(System.currentTimeMillis());
try {
@ -852,8 +872,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState);
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
try {
snapshotContext.restore();
} catch (Exception e) {
@ -862,8 +882,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId);
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
Context context = new Context(snapshotId, version, indexId, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
@ -897,8 +917,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param snapshotId snapshot id
* @param shardId shard id
*/
public void delete(SnapshotId snapshotId, Version version, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId, shardId);
private void delete(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
Context context = new Context(snapshotId, version, indexId, shardId, shardId);
context.delete();
}
@ -931,15 +951,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
protected final Version version;
public Context(SnapshotId snapshotId, Version version, ShardId shardId) {
this(snapshotId, version, shardId, shardId);
public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
this(snapshotId, version, indexId, shardId, shardId);
}
public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) {
public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.version = version;
this.shardId = shardId;
blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId())));
blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())));
}
/**
@ -958,7 +978,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
int fileListGeneration = tuple.v2();
try {
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName());
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getUUID());
} catch (IOException e) {
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
}
@ -979,7 +999,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public BlobStoreIndexShardSnapshot loadSnapshot() {
try {
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName());
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getUUID());
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
}
@ -1108,7 +1128,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, snapshotId.getUUID());
} else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
}
@ -1137,10 +1157,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*
* @param shard shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId the id of the index being snapshotted
* @param snapshotStatus snapshot status to report progress
*/
public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, shard.shardId());
public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, indexId, shard.shardId());
this.snapshotStatus = snapshotStatus;
this.store = shard.store();
}
@ -1248,7 +1269,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName());
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID());
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
@ -1424,11 +1445,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*
* @param shard shard to restore into
* @param snapshotId snapshot id
* @param indexId id of the index being restored
* @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryState recovery state to report progress
*/
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, shard.shardId(), snapshotShardId);
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, indexId, shard.shardId(), snapshotShardId);
this.recoveryState = recoveryState;
store = shard.store();
}
@ -1602,6 +1624,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
}
}
}

View File

@ -63,8 +63,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -185,7 +187,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
try {
// Read snapshot info and metadata from the repository
Repository repository = repositoriesService.repository(request.repositoryName);
final Optional<SnapshotId> matchingSnapshotId = repository.getSnapshots().stream()
final RepositoryData repositoryData = repository.getRepositoryData();
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
.filter(s -> request.snapshotName.equals(s.getName())).findFirst();
if (matchingSnapshotId.isPresent() == false) {
throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist");
@ -194,7 +197,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, filteredIndices);
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
final MetaData metaData;
if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) {

View File

@ -22,6 +22,9 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Objects;
@ -29,12 +32,10 @@ import java.util.Objects;
/**
* SnapshotId - snapshot name + snapshot UUID
*/
public final class SnapshotId implements Writeable {
public final class SnapshotId implements Writeable, ToXContent {
/**
* This value is for older snapshots that don't have a UUID.
*/
public static final String UNASSIGNED_UUID = "_na_";
private static final String NAME = "name";
private static final String UUID = "uuid";
private final String name;
private final String uuid;
@ -115,4 +116,35 @@ public final class SnapshotId implements Writeable {
out.writeString(uuid);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(UUID, uuid);
builder.endObject();
return builder;
}
public static SnapshotId fromXContent(XContentParser parser) throws IOException {
// the new format from 5.0 which contains the snapshot name and uuid
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
String name = null;
String uuid = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(currentFieldName)) {
name = parser.text();
} else if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
return new SnapshotId(name, uuid);
} else {
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
final String name = parser.text();
return new SnapshotId(name, name);
}
}
}

View File

@ -458,7 +458,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
}
if (uuid == null) {
// the old format where there wasn't a UUID
uuid = SnapshotId.UNASSIGNED_UUID;
uuid = name;
}
return new SnapshotInfo(new SnapshotId(name, uuid),
indices,

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -66,6 +67,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@ -208,8 +211,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = new HashMap<>();
// Now go through all snapshots and update existing or create missing
final String localNodeId = clusterService.localNode().getId();
final Map<Snapshot, Map<String, IndexId>> snapshotIndices = new HashMap<>();
if (snapshotsInProgress != null) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
snapshotIndices.put(entry.snapshot(),
entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())));
if (entry.state() == SnapshotsInProgress.State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
@ -289,14 +295,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
if (newSnapshots.isEmpty() == false) {
Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) {
Map<String, IndexId> indicesMap = snapshotIndices.get(entry.getKey());
assert indicesMap != null;
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
final ShardId shardId = shardEntry.getKey();
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {
@Override
public void doRun() {
snapshot(indexShard, entry.getKey(), shardEntry.getValue());
snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
}
@ -321,7 +331,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshot snapshot
* @param snapshotStatus snapshot status
*/
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) {
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) {
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
ShardId shardId = indexShard.shardId();
if (!indexShard.routingEntry().primary()) {
@ -340,7 +350,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
try {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus);
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");

View File

@ -56,8 +56,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
@ -132,7 +134,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public List<SnapshotId> snapshotIds(final String repositoryName) {
Repository repository = repositoriesService.repository(repositoryName);
assert repository != null; // should only be called once we've validated the repository exists
return repository.getSnapshots();
return repository.getRepositoryData().getSnapshotIds();
}
/**
@ -218,6 +220,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final String snapshotName = request.snapshotName;
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
@ -232,11 +235,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(),
request.partial(),
State.INIT,
indices,
snapshotIndices,
System.currentTimeMillis(),
null);
snapshots = new SnapshotsInProgress(newSnapshot);
@ -334,8 +338,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (String index : snapshot.indices()) {
builder.put(metaData.index(index), false);
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}
@ -473,7 +477,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices(), entry.startTime());
return new SnapshotInfo(entry.snapshot().getSnapshotId(),
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
entry.startTime());
}
/**
@ -546,8 +552,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotInfo snapshotInfo) throws IOException {
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Repository repository = repositoriesService.repository(repositoryName);
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, snapshotInfo.indices());
RepositoryData repositoryData = repository.getRepositoryData();
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(snapshotInfo.indices()));
for (String index : snapshotInfo.indices()) {
IndexId indexId = repositoryData.resolveIndexId(index);
IndexMetaData indexMetaData = metaData.indices().get(index);
if (indexMetaData != null) {
int numberOfShards = indexMetaData.getNumberOfShards();
@ -561,7 +569,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
shardStatus.put(shardId, shardSnapshotStatus);
} else {
IndexShardSnapshotStatus shardSnapshotStatus =
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), indexId, shardId);
shardStatus.put(shardId, shardSnapshotStatus);
}
}
@ -953,7 +961,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) {
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
Optional<SnapshotId> matchedEntry = repository.getSnapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst();
Optional<SnapshotId> matchedEntry = repository.getRepositoryData().getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
if (matchedEntry.isPresent() == false) {
matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream()
@ -1121,21 +1132,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param indices list of indices to be snapshotted
* @return list of shard to be included into current snapshot
*/
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<String> indices) {
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<IndexId> indices) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData();
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
for (IndexId index : indices) {
final String indexName = index.getName();
IndexMetaData indexMetaData = metaData.index(indexName);
if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(index, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
}
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index);
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
if (indexRoutingTable != null) {
@ -1191,8 +1203,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.partial() == false) {
if (entry.state() == State.INIT) {
for (String index : entry.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(index);
for (IndexId index : entry.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
if (indexMetaData != null && indices.contains(indexMetaData)) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();

View File

@ -70,7 +70,7 @@ public class RepositoryUpgradabilityIT extends AbstractSnapshotIntegTestCase {
final Set<SnapshotInfo> snapshotInfos = Sets.newHashSet(getSnapshots(repoName));
assertThat(snapshotInfos.size(), equalTo(1));
SnapshotInfo originalSnapshot = snapshotInfos.iterator().next();
assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", SnapshotId.UNASSIGNED_UUID)));
assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", "test_1")));
assertThat(Sets.newHashSet(originalSnapshot.indices()), equalTo(indices));
logger.info("--> restore the original snapshot");

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collections;
@ -659,7 +658,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
randomBoolean(),
randomBoolean(),
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.<String>emptyList(),
Collections.emptyList(),
Math.abs(randomLong()),
ImmutableOpenMap.of()));
case 1:

View File

@ -100,7 +100,9 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
@ -121,8 +123,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
@ -1184,9 +1188,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test_target_shard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() {
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
try {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
@ -1645,8 +1649,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
public RestoreOnlyRepository() {
private final String indexName;
public RestoreOnlyRepository(String indexName) {
super(Settings.EMPTY);
this.indexName = indexName;
}
@Override
protected void doStart() {}
@ -1663,17 +1669,19 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return null;
}
@Override
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
return null;
}
@Override
public List<SnapshotId> getSnapshots() {
return null;
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), Collections.emptySet());
return new RepositoryData(Collections.emptyList(), map);
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {}
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
return null;
}
@Override
@ -1697,9 +1705,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return false;
}
@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null;
}
@Override

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
/**
* Tests for the {@link IndexId} class.
*/
public class IndexIdTests extends ESTestCase {
public void testEqualsAndHashCode() {
// assert equals and hashcode
String name = randomAsciiOfLength(8);
String id = UUIDs.randomBase64UUID();
IndexId indexId1 = new IndexId(name, id);
IndexId indexId2 = new IndexId(name, id);
assertEquals(indexId1, indexId2);
assertEquals(indexId1.hashCode(), indexId2.hashCode());
// assert equals when using index name for id
id = name;
indexId1 = new IndexId(name, id);
indexId2 = new IndexId(name, id);
assertEquals(indexId1, indexId2);
assertEquals(indexId1.hashCode(), indexId2.hashCode());
//assert not equals when name or id differ
indexId2 = new IndexId(randomAsciiOfLength(8), id);
assertNotEquals(indexId1, indexId2);
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
indexId2 = new IndexId(name, UUIDs.randomBase64UUID());
assertNotEquals(indexId1, indexId2);
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
}
public void testSerialization() throws IOException {
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
BytesStreamOutput out = new BytesStreamOutput();
indexId.writeTo(out);
assertEquals(indexId, new IndexId(out.bytes().streamInput()));
}
public void testXContent() throws IOException {
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
XContentBuilder builder = JsonXContent.contentBuilder();
indexId.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
String name = null;
String id = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String currentFieldName = parser.currentName();
parser.nextToken();
if (currentFieldName.equals(IndexId.NAME)) {
name = parser.text();
} else if (currentFieldName.equals(IndexId.ID)) {
id = parser.text();
}
}
assertNotNull(name);
assertNotNull(id);
assertEquals(indexId, new IndexId(name, id));
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.greaterThan;
/**
* Tests for the {@link RepositoryData} class.
*/
public class RepositoryDataTests extends ESTestCase {
public void testEqualsAndHashCode() {
RepositoryData repositoryData1 = generateRandomRepoData();
RepositoryData repositoryData2 = repositoryData1.copy();
assertEquals(repositoryData1, repositoryData2);
assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode());
}
public void testXContent() throws IOException {
RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = JsonXContent.contentBuilder();
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
assertEquals(repositoryData, RepositoryData.fromXContent(parser));
}
public void testAddSnapshots() {
RepositoryData repositoryData = generateRandomRepoData();
// test that adding the same snapshot id to the repository data throws an exception
final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0);
Map<String, IndexId> indexIdMap = repositoryData.getIndices();
expectThrows(IllegalArgumentException.class,
() -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList()));
// test that adding a snapshot and its indices works
SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
List<IndexId> indices = new ArrayList<>();
Set<IndexId> newIndices = new HashSet<>();
int numNew = randomIntBetween(1, 10);
for (int i = 0; i < numNew; i++) {
IndexId indexId = new IndexId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
newIndices.add(indexId);
indices.add(indexId);
}
int numOld = randomIntBetween(1, indexIdMap.size());
List<String> indexNames = new ArrayList<>(indexIdMap.keySet());
for (int i = 0; i < numOld; i++) {
indices.add(indexIdMap.get(indexNames.get(i)));
}
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, indices);
// verify that the new repository data has the new snapshot and its indices
assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
for (IndexId indexId : indices) {
Set<SnapshotId> snapshotIds = newRepoData.getSnapshots(indexId);
assertTrue(snapshotIds.contains(newSnapshot));
if (newIndices.contains(indexId)) {
assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set
}
}
}
public void testInitIndices() {
final int numSnapshots = randomIntBetween(1, 30);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
// test that initializing indices works
Map<IndexId, Set<SnapshotId>> indices = randomIndices(snapshotIds);
RepositoryData newRepoData = repositoryData.initIndices(indices);
assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds());
for (IndexId indexId : indices.keySet()) {
assertEquals(indices.get(indexId), newRepoData.getSnapshots(indexId));
}
}
public void testRemoveSnapshot() {
RepositoryData repositoryData = generateRandomRepoData();
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
assertThat(snapshotIds.size(), greaterThan(0));
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId);
// make sure the repository data's indices no longer contain the removed snapshot
for (final IndexId indexId : newRepositoryData.getIndices().values()) {
assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
}
}
public void testResolveIndexId() {
RepositoryData repositoryData = generateRandomRepoData();
Map<String, IndexId> indices = repositoryData.getIndices();
Set<String> indexNames = indices.keySet();
assertThat(indexNames.size(), greaterThan(0));
String indexName = indexNames.iterator().next();
IndexId indexId = indices.get(indexName);
assertEquals(indexId, repositoryData.resolveIndexId(indexName));
String notInRepoData = randomAsciiOfLength(5);
assertFalse(indexName.contains(notInRepoData));
assertEquals(new IndexId(notInRepoData, notInRepoData), repositoryData.resolveIndexId(notInRepoData));
}
public static RepositoryData generateRandomRepoData() {
return generateRandomRepoData(new ArrayList<>());
}
public static RepositoryData generateRandomRepoData(final List<SnapshotId> origSnapshotIds) {
List<SnapshotId> snapshotIds = randomSnapshots(origSnapshotIds);
return new RepositoryData(snapshotIds, randomIndices(snapshotIds));
}
private static List<SnapshotId> randomSnapshots(final List<SnapshotId> origSnapshotIds) {
final int numSnapshots = randomIntBetween(1, 30);
final List<SnapshotId> snapshotIds = new ArrayList<>(origSnapshotIds);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
return snapshotIds;
}
private static Map<IndexId, Set<SnapshotId>> randomIndices(final List<SnapshotId> snapshotIds) {
final int totalSnapshots = snapshotIds.size();
final int numIndices = randomIntBetween(1, 30);
final Map<IndexId, Set<SnapshotId>> indices = new HashMap<>(numIndices);
for (int i = 0; i < numIndices; i++) {
final IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
final Set<SnapshotId> indexSnapshots = new LinkedHashSet<>();
final int numIndicesForSnapshot = randomIntBetween(1, numIndices);
for (int j = 0; j < numIndicesForSnapshot; j++) {
indexSnapshots.add(snapshotIds.get(randomIntBetween(0, totalSnapshots - 1)));
}
indices.put(indexId, indexSnapshots);
}
return indices;
}
}

View File

@ -28,11 +28,11 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -44,7 +44,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.blobId;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.hamcrest.Matchers.equalTo;
/**
@ -109,86 +109,56 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
// write to and read from a snapshot file with no entries
assertThat(repository.getSnapshots().size(), equalTo(0));
repository.writeSnapshotsToIndexGen(Collections.emptyList());
// write to and read from a index file with no entries
assertThat(repository.getSnapshots().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
repository.writeIndexGen(emptyData);
final RepositoryData readData = repository.getRepositoryData();
assertEquals(readData, emptyData);
assertEquals(readData.getIndices().size(), 0);
assertEquals(readData.getSnapshotIds().size(), 0);
// write to and read from a snapshot file with a random number of entries
final int numSnapshots = randomIntBetween(1, 1000);
// write to and read from an index file with snapshots but no indices
final int numSnapshots = randomIntBetween(1, 20);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(repository.getSnapshots(), equalTo(snapshotIds));
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
repository.writeIndexGen(repositoryData);
assertEquals(repository.getRepositoryData(), repositoryData);
// write to and read from a index file with random repository data
repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
}
public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo();
// write to index generational file
final int numSnapshots = randomIntBetween(1, 1000);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
assertThat(repository.latestIndexBlobId(), equalTo(0L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
// adding more and writing to a new index generational file
for (int i = 0; i < 10; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
assertEquals(repository.getRepositoryData(), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(1L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
// removing a snapshot adn writing to a new index generational file
snapshotIds.remove(0);
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
// removing a snapshot and writing to a new index generational file
repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0));
repository.writeIndexGen(repositoryData);
assertEquals(repository.getRepositoryData(), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(2L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
}
public void testOldIndexFileFormat() throws Exception {
final BlobStoreRepository repository = setupRepo();
// write old index file format
final int numOldSnapshots = randomIntBetween(1, 50);
final List<SnapshotId> snapshotIds = new ArrayList<>();
for (int i = 0; i < numOldSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), SnapshotId.UNASSIGNED_UUID));
}
writeOldFormat(repository, snapshotIds.stream().map(SnapshotId::getName).collect(Collectors.toList()));
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
// write to and read from a snapshot file with a random number of new entries added
final int numSnapshots = randomIntBetween(1, 1000);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
}
public void testBlobId() {
SnapshotId snapshotId = new SnapshotId("abc123", SnapshotId.UNASSIGNED_UUID);
assertThat(blobId(snapshotId), equalTo("abc123")); // just the snapshot name
snapshotId = new SnapshotId("abc-123", SnapshotId.UNASSIGNED_UUID);
assertThat(blobId(snapshotId), equalTo("abc-123")); // just the snapshot name
String uuid = UUIDs.randomBase64UUID();
snapshotId = new SnapshotId("abc123", uuid);
assertThat(blobId(snapshotId), equalTo("abc123-" + uuid)); // snapshot name + '-' + uuid
uuid = UUIDs.randomBase64UUID();
snapshotId = new SnapshotId("abc-123", uuid);
assertThat(blobId(snapshotId), equalTo("abc-123-" + uuid)); // snapshot name + '-' + uuid
}
private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());

View File

@ -61,7 +61,9 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -884,7 +886,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> delete index metadata and shard metadata");
Path metadata = repo.resolve("meta-test-snap-1.dat");
Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
Files.delete(metadata);
logger.info("--> delete snapshot");
@ -917,7 +919,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snap-test-snap-1-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10));
}
@ -2017,6 +2019,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> emulate an orphan snapshot");
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
final IndexId indexId = repositoryData.resolveIndexId(idxName);
clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
@ -2033,7 +2038,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
true,
false,
State.ABORTED,
Collections.singletonList(idxName),
Collections.singletonList(indexId),
System.currentTimeMillis(),
shards.build()));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build();
@ -2189,7 +2194,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snap-test-snap-2-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10));
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
@ -112,8 +113,8 @@ public class MockRepository extends FsRepository {
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
if (blockOnInitialization ) {
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
if (blockOnInitialization) {
blockExecution();
}
super.initializeSnapshot(snapshotId, indices, clusterMetadata);

View File

@ -30,12 +30,12 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.repositories.RepositoryException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
/**
@ -71,15 +71,11 @@ public class AzureBlobContainer extends AbstractBlobContainer {
public InputStream readBlob(String blobName) throws IOException {
logger.trace("readBlob({})", blobName);
if (!blobExists(blobName)) {
throw new IOException("Blob [" + blobName + "] does not exist");
}
try {
return blobStore.getInputStream(blobStore.container(), buildKey(blobName));
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
throw new NoSuchFileException(e.getMessage());
}
throw new IOException(e);
} catch (URISyntaxException e) {
@ -108,7 +104,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName)));
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
throw new NoSuchFileException(e.getMessage());
}
throw new IOException(e);
} catch (URISyntaxException e) {

View File

@ -32,6 +32,8 @@ import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -44,7 +46,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue;
@ -153,7 +154,7 @@ public class AzureRepository extends BlobStoreRepository {
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
try {
if (!blobStore.doesContainerExist(blobStore.container())) {
logger.debug("container [{}] does not exist. Creating...", blobStore.container());

View File

@ -21,22 +21,19 @@ package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -82,7 +79,7 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
@Override
public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws IOException {
if (!blobExists(account, mode, container, blob)) {
throw new FileNotFoundException("missing blob [" + blob + "]");
throw new NoSuchFileException("missing blob [" + blob + "]");
}
return new ByteArrayInputStream(blobs.get(blob).toByteArray());
}

View File

@ -22,11 +22,10 @@ package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceMock;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import java.io.IOException;
import java.net.URISyntaxException;
@ -35,11 +34,9 @@ public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
try {
RepositoryName repositoryName = new RepositoryName("azure", "ittest");
RepositorySettings repositorySettings = new RepositorySettings(
Settings.EMPTY, Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock(Settings.EMPTY);
return new AzureBlobStore(repositoryName, Settings.EMPTY, repositorySettings, client);
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, Settings.EMPTY, client);
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}

View File

@ -41,9 +41,9 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
@ -196,7 +196,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
} catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
throw new FileNotFoundException(e.getMessage());
throw new NoSuchFileException(e.getMessage());
}
throw e;
}
@ -227,6 +227,9 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @param blobName name of the blob
*/
void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
doPrivileged(() -> client.objects().delete(bucket, blobName).execute());
}

View File

@ -32,9 +32,9 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
@ -69,7 +69,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new IOException("Blob [" + blobName + "] does not exist");
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
store.execute(new Operation<Boolean>() {
@ -93,6 +93,9 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public InputStream readBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
// FSDataInputStream does buffering internally
return store.execute(new Operation<InputStream>() {
@Override

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
@ -55,7 +56,8 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
});
}
public FileContext createContext(URI uri) {
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
private FileContext createContext(URI uri) {
// mirrors HdfsRepository.java behaviour
Configuration cfg = new Configuration(true);
cfg.setClassLoader(HdfsRepository.class.getClassLoader());
@ -85,8 +87,7 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
// set file system to TestingFs to avoid a bunch of security
// checks, similar to what is done in HdfsTests.java
cfg.set(String.format("fs.AbstractFileSystem.%s.impl", uri.getScheme()),
TestingFs.class.getName());
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());
// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {

View File

@ -37,10 +37,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
@ -89,7 +89,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
} else {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage());
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
@ -116,7 +116,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
@Override
public void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new IOException("Blob [" + blobName + "] does not exist");
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
try {

View File

@ -129,16 +129,17 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/15579")
public void testOverwriteFails() throws IOException {
public void testVerifyOverwriteFails() throws IOException {
try (final BlobStore store = newBlobStore()) {
final String blobName = "foobar";
final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
container.writeBlob(blobName, bytesArray);
// should not be able to overwrite existing blob
expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray));
container.deleteBlob(blobName);
container.writeBlob(blobName, bytesArray); // deleted it, so should be able to write it again
container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again
}
}