Merge pull request #19706 from elastic/enhancement/snapshot-blob-handling

More resilient blob handling in snapshot repositories
This commit is contained in:
Ali Beyad 2016-08-01 12:03:53 -04:00 committed by GitHub
commit 9f88a8194a
33 changed files with 1539 additions and 335 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException; import java.io.IOException;
@ -70,12 +71,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private final boolean includeGlobalState; private final boolean includeGlobalState;
private final boolean partial; private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards; private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices; private final List<IndexId> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices; private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime; private final long startTime;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime, public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) { long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state; this.state = state;
this.snapshot = snapshot; this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState; this.includeGlobalState = includeGlobalState;
@ -111,7 +112,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return state; return state;
} }
public List<String> indices() { public List<IndexId> indices() {
return indices; return indices;
} }
@ -377,9 +378,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
boolean partial = in.readBoolean(); boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte()); State state = State.fromValue(in.readByte());
int indices = in.readVInt(); int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>(); List<IndexId> indexBuilder = new ArrayList<>();
for (int j = 0; j < indices; j++) { for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString()); indexBuilder.add(new IndexId(in.readString(), in.readString()));
} }
long startTime = in.readLong(); long startTime = in.readLong();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
@ -410,8 +411,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
out.writeBoolean(entry.partial()); out.writeBoolean(entry.partial());
out.writeByte(entry.state().value()); out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size()); out.writeVInt(entry.indices().size());
for (String index : entry.indices()) { for (IndexId index : entry.indices()) {
out.writeString(index); index.writeTo(out);
} }
out.writeLong(entry.startTime()); out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size()); out.writeVInt(entry.shards().size());
@ -458,8 +459,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
builder.field(STATE, entry.state()); builder.field(STATE, entry.state());
builder.startArray(INDICES); builder.startArray(INDICES);
{ {
for (String index : entry.indices()) { for (IndexId index : entry.indices()) {
builder.value(index); index.toXContent(builder, params);
} }
} }
builder.endArray(); builder.endArray();

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.nio.file.NoSuchFileException;
import java.util.Map; import java.util.Map;
/** /**
@ -53,7 +53,8 @@ public interface BlobContainer {
* @param blobName * @param blobName
* The name of the blob to get an {@link InputStream} for. * The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob. * @return The {@code InputStream} to read the blob.
* @throws IOException if the blob does not exist or can not be read. * @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/ */
InputStream readBlob(String blobName) throws IOException; InputStream readBlob(String blobName) throws IOException;
@ -95,7 +96,8 @@ public interface BlobContainer {
* *
* @param blobName * @param blobName
* The name of the blob to delete. * The name of the blob to delete.
* @throws IOException if the blob does not exist, or if the blob exists but could not be deleted. * @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob exists but could not be deleted.
*/ */
void deleteBlob(String blobName) throws IOException; void deleteBlob(String blobName) throws IOException;

View File

@ -27,13 +27,16 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -85,7 +88,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
Path blobPath = path.resolve(blobName); Path blobPath = path.resolve(blobName);
Files.deleteIfExists(blobPath); Files.delete(blobPath);
} }
@Override @Override
@ -95,14 +98,18 @@ public class FsBlobContainer extends AbstractBlobContainer {
@Override @Override
public InputStream readBlob(String name) throws IOException { public InputStream readBlob(String name) throws IOException {
return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes()); final Path resolvedPath = path.resolve(name);
try {
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
} }
@Override @Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final Path file = path.resolve(blobName); final Path file = path.resolve(blobName);
// TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files? try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
try (OutputStream outputStream = Files.newOutputStream(file)) {
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]); Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
} }
IOUtils.fsync(file, false); IOUtils.fsync(file, false);

View File

@ -20,14 +20,11 @@
package org.elasticsearch.common.blobstore.support; package org.elasticsearch.common.blobstore.support;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
/** /**
* A base abstract blob container that implements higher level container methods. * A base abstract blob container that implements higher level container methods.

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import java.io.IOException; import java.io.IOException;
@ -394,10 +395,12 @@ final class StoreRecovery {
translogState.totalOperationsOnStart(0); translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery(); indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId; ShardId snapshotShardId = shardId;
if (!shardId.getIndexName().equals(restoreSource.index())) { final String indexName = restoreSource.index();
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
} }
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState()); final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery(); indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();
indexShard.postRecovery("restore done"); indexShard.postRecovery("restore done");

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.Objects;
/**
* Represents a single snapshotted index in the repository.
*/
public final class IndexId implements Writeable, ToXContent {
protected static final String NAME = "name";
protected static final String ID = "id";
private final String name;
private final String id;
public IndexId(final String name, final String id) {
this.name = name;
this.id = id;
}
public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
}
/**
* The name of the index.
*/
public String getName() {
return name;
}
/**
* The unique ID for the index within the repository. This is *not* the same as the
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
* use to name blobs for the index.
*
* We could not use the index's actual UUID (See {@link Index#getUUID()}) because in the
* case of snapshot/restore, the index UUID in the snapshotted index will be different
* from the index UUID assigned to it when it is restored. Hence, the actual index UUID
* is not useful in the context of snapshot/restore for tying a snapshotted index to the
* index it was snapshot from, and so we are using a separate UUID here.
*/
public String getId() {
return id;
}
@Override
public String toString() {
return "[" + name + "/" + id + "]";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
@SuppressWarnings("unchecked") IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(name, id);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.endObject();
return builder;
}
}

View File

@ -47,7 +47,7 @@ import java.util.List;
* <ul> * <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} * <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li> * with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)} * <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li> * for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li> * <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul> * </ul>
@ -88,15 +88,14 @@ public interface Repository extends LifecycleComponent {
* @param indices list of indices * @param indices list of indices
* @return information about snapshot * @return information about snapshot
*/ */
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException; MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;
/** /**
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name. * Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
* To get all snapshots, the predicate filter should return true regardless of the input. * and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* * if there was an error in reading the data.
* @return snapshot list
*/ */
List<SnapshotId> getSnapshots(); RepositoryData getRepositoryData();
/** /**
* Starts snapshotting process * Starts snapshotting process
@ -105,7 +104,7 @@ public interface Repository extends LifecycleComponent {
* @param indices list of indices to be snapshotted * @param indices list of indices to be snapshotted
* @param metaData cluster metadata * @param metaData cluster metadata
*/ */
void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData); void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
/** /**
* Finalizes snapshotting process * Finalizes snapshotting process
@ -113,12 +112,14 @@ public interface Repository extends LifecycleComponent {
* This method is called on master after all shards are snapshotted. * This method is called on master after all shards are snapshotted.
* *
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param indices list of indices in the snapshot
* @param startTime start time of the snapshot
* @param failure global failure reason or null * @param failure global failure reason or null
* @param totalShards total number of shards * @param totalShards total number of shards
* @param shardFailures list of shard failures * @param shardFailures list of shard failures
* @return snapshot description * @return snapshot description
*/ */
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures); SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
/** /**
* Deletes snapshot * Deletes snapshot
@ -181,10 +182,11 @@ public interface Repository extends LifecycleComponent {
* *
* @param shard shard to be snapshotted * @param shard shard to be snapshotted
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point * @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status * @param snapshotStatus snapshot status
*/ */
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
/** /**
* Restores snapshot of the shard. * Restores snapshot of the shard.
@ -194,20 +196,22 @@ public interface Repository extends LifecycleComponent {
* @param shard the shard to restore the index into * @param shard the shard to restore the index into
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot * @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot) * @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state * @param recoveryState recovery state
*/ */
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState); void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);
/** /**
* Retrieve shard snapshot status for the stored snapshot * Retrieve shard snapshot status for the stored snapshot
* *
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot * @param version version of elasticsearch that created this snapshot
* @param indexId the snapshotted index id for the shard to get status for
* @param shardId shard id * @param shardId shard id
* @return snapshot status * @return snapshot status
*/ */
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId);
} }

View File

@ -0,0 +1,311 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* A class that represents the data in a repository, as captured in the
* repository's index blob.
*/
public final class RepositoryData implements ToXContent {
public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap());
/**
* The ids of the snapshots in the repository.
*/
private final List<SnapshotId> snapshotIds;
/**
* The indices found in the repository across all snapshots, as a name to {@link IndexId} mapping
*/
private final Map<String, IndexId> indices;
/**
* The snapshots that each index belongs to.
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
public RepositoryData(List<SnapshotId> snapshotIds, Map<IndexId, Set<SnapshotId>> indexSnapshots) {
this.snapshotIds = Collections.unmodifiableList(snapshotIds);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet()
.stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
}
protected RepositoryData copy() {
return new RepositoryData(snapshotIds, indexSnapshots);
}
/**
* Returns an unmodifiable list of the snapshot ids.
*/
public List<SnapshotId> getSnapshotIds() {
return snapshotIds;
}
/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
public Map<String, IndexId> getIndices() {
return indices;
}
/**
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
* already exists in the repository data, this method throws an IllegalArgumentException.
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId, final List<IndexId> snapshottedIndices) {
if (snapshotIds.contains(snapshotId)) {
throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data");
}
List<SnapshotId> snapshots = new ArrayList<>(snapshotIds);
snapshots.add(snapshotId);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : snapshottedIndices) {
if (allIndexSnapshots.containsKey(indexId)) {
Set<SnapshotId> ids = allIndexSnapshots.get(indexId);
if (ids == null) {
ids = new LinkedHashSet<>();
allIndexSnapshots.put(indexId, ids);
}
ids.add(snapshotId);
} else {
Set<SnapshotId> ids = new LinkedHashSet<>();
ids.add(snapshotId);
allIndexSnapshots.put(indexId, ids);
}
}
return new RepositoryData(snapshots, allIndexSnapshots);
}
/**
* Initializes the indices in the repository metadata; returns a new instance.
*/
public RepositoryData initIndices(final Map<IndexId, Set<SnapshotId>> indexSnapshots) {
return new RepositoryData(snapshotIds, indexSnapshots);
}
/**
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
*/
public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
List<SnapshotId> newSnapshotIds = snapshotIds
.stream()
.filter(id -> snapshotId.equals(id) == false)
.collect(Collectors.toList());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Set<SnapshotId> set;
Set<SnapshotId> snapshotIds = this.indexSnapshots.get(indexId);
assert snapshotIds != null;
if (snapshotIds.contains(snapshotId)) {
if (snapshotIds.size() == 1) {
// removing the snapshot will mean no more snapshots have this index, so just skip over it
continue;
}
set = new LinkedHashSet<>(snapshotIds);
set.remove(snapshotId);
} else {
set = snapshotIds;
}
indexSnapshots.put(indexId, set);
}
return new RepositoryData(newSnapshotIds, indexSnapshots);
}
/**
* Returns an immutable collection of the snapshot ids for the snapshots that contain the given index.
*/
public Set<SnapshotId> getSnapshots(final IndexId indexId) {
Set<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
if (snapshotIds == null) {
throw new IllegalArgumentException("unknown snapshot index " + indexId + "");
}
return snapshotIds;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked") RepositoryData that = (RepositoryData) obj;
return snapshotIds.equals(that.snapshotIds)
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots);
}
@Override
public int hashCode() {
return Objects.hash(snapshotIds, indices, indexSnapshots);
}
/**
* Resolve the index name to the index id specific to the repository,
* throwing an exception if the index could not be resolved.
*/
public IndexId resolveIndexId(final String indexName) {
if (indices.containsKey(indexName)) {
return indices.get(indexName);
} else {
// on repositories created before 5.0, there was no indices information in the index
// blob, so if the repository hasn't been updated with new snapshots, no new index blob
// would have been written, so we only have old snapshots without the index information.
// in this case, the index id is just the index name
return new IndexId(indexName, indexName);
}
}
/**
* Resolve the given index names to index ids.
*/
public List<IndexId> resolveIndices(final List<String> indices) {
List<IndexId> resolvedIndices = new ArrayList<>(indices.size());
for (final String indexName : indices) {
resolvedIndices.add(resolveIndexId(indexName));
}
return resolvedIndices;
}
/**
* Resolve the given index names to index ids, creating new index ids for
* new indices in the repository.
*/
public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
final IndexId indexId;
if (indices.containsKey(index)) {
indexId = indices.get(index);
} else {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
}
snapshotIndices.add(indexId);
}
return snapshotIndices;
}
private static final String SNAPSHOTS = "snapshots";
private static final String INDICES = "indices";
private static final String INDEX_ID = "id";
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
for (final SnapshotId snapshot : getSnapshotIds()) {
snapshot.toXContent(builder, params);
}
builder.endArray();
// write the indices map
builder.startObject(INDICES);
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
builder.startArray(SNAPSHOTS);
Set<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
assert snapshotIds != null;
for (final SnapshotId snapshotId : snapshotIds) {
snapshotId.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
}
public static RepositoryData fromXContent(final XContentParser parser) throws IOException {
List<SnapshotId> snapshots = new ArrayList<>();
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if (SNAPSHOTS.equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshots.add(SnapshotId.fromXContent(parser));
}
} else {
throw new ElasticsearchParseException("expected array for [" + currentFieldName + "]");
}
} else if (INDICES.equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected [indices]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String indexName = parser.currentName();
String indexId = null;
Set<SnapshotId> snapshotIds = new LinkedHashSet<>();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("start object expected index[" + indexName + "]");
}
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String indexMetaFieldName = parser.currentName();
parser.nextToken();
if (INDEX_ID.equals(indexMetaFieldName)) {
indexId = parser.text();
} else if (SNAPSHOTS.equals(indexMetaFieldName)) {
if (parser.currentToken() != XContentParser.Token.START_ARRAY) {
throw new ElasticsearchParseException("start array expected [snapshots]");
}
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
snapshotIds.add(SnapshotId.fromXContent(parser));
}
}
}
assert indexId != null;
indexSnapshots.put(new IndexId(indexName, indexId), snapshotIds);
}
} else {
throw new ElasticsearchParseException("unknown field name [" + currentFieldName + "]");
}
}
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(snapshots, indexSnapshots);
}
}

View File

@ -45,6 +45,8 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotException; import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
@ -58,6 +60,8 @@ import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -103,6 +107,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
@ -119,14 +124,14 @@ import static java.util.Collections.unmodifiableMap;
* {@code * {@code
* STORE_ROOT * STORE_ROOT
* |- index-N - list of all snapshot name as JSON array, N is the generation of the file * |- index-N - list of all snapshot name as JSON array, N is the generation of the file
* |- index-latest - contains the numeric value of the latest generation of the index file (i.e. N from above) * |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
* |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010" * |- snap-20131010 - JSON serialized Snapshot for snapshot "20131010"
* |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) * |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
* |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011" * |- snap-20131011 - JSON serialized Snapshot for snapshot "20131011"
* |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011" * |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011"
* ..... * .....
* |- indices/ - data for all indices * |- indices/ - data for all indices
* |- foo/ - data for index "foo" * |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id of Ac1342-B_x in the repository
* | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo" * | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo"
* | |- 0/ - data for shard "0" of index "foo" * | |- 0/ - data for shard "0" of index "foo"
* | | |- __1 \ * | | |- __1 \
@ -146,7 +151,7 @@ import static java.util.Collections.unmodifiableMap;
* | |-2/ * | |-2/
* | ...... * | ......
* | * |
* |- bar/ - data for index bar * |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository
* ...... * ......
* } * }
* </pre> * </pre>
@ -163,13 +168,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String SNAPSHOT_PREFIX = "snap-"; private static final String SNAPSHOT_PREFIX = "snap-";
protected static final String SNAPSHOT_CODEC = "snapshot"; private static final String SNAPSHOT_CODEC = "snapshot";
static final String SNAPSHOTS_FILE = "index"; // package private for unit testing static final String SNAPSHOTS_FILE = "index"; // package private for unit testing
private static final String SNAPSHOTS_FILE_PREFIX = "index-"; private static final String INDEX_FILE_PREFIX = "index-";
private static final String SNAPSHOTS_INDEX_LATEST_BLOB = "index.latest"; private static final String INDEX_LATEST_BLOB = "index.latest";
private static final String TESTS_FILE = "tests-"; private static final String TESTS_FILE = "tests-";
@ -305,7 +310,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) { public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) { if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
} }
@ -315,28 +320,69 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (getSnapshots().stream().anyMatch(s -> s.getName().equals(snapshotName))) { if (getSnapshots().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with the same name already exists"); throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with the same name already exists");
} }
if (snapshotFormat.exists(snapshotsBlobContainer, blobId(snapshotId)) || if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) ||
snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) { snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) {
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists"); throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists");
} }
// Write Global MetaData // Write Global MetaData
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, snapshotName); globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, snapshotId.getUUID());
for (String index : indices) {
final IndexMetaData indexMetaData = clusterMetadata.index(index); // write the index metadata for each index in the snapshot
final BlobPath indexPath = basePath().add("indices").add(index); for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotName); indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex); throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
} }
} }
// Older repository index files (index-N) only contain snapshot info, not indices info,
// so if the repository data is of the older format, populate it with the indices entries
// so we know which indices of snapshots have blob ids in the older format.
private RepositoryData upgradeRepositoryData(final RepositoryData repositoryData) throws IOException {
final Map<IndexId, Set<SnapshotId>> indexToSnapshots = new HashMap<>();
for (final SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
final SnapshotInfo snapshotInfo;
try {
snapshotInfo = getSnapshotInfo(snapshotId);
} catch (SnapshotException e) {
logger.warn("[{}] repository is on a pre-5.0 format with an index file that contains snapshot [{}] but " +
"the corresponding snap-{}.dat file cannot be read. The snapshot will no longer be included in " +
"the repository but its data directories will remain.", e, getMetadata().name(),
snapshotId, snapshotId.getUUID());
continue;
}
for (final String indexName : snapshotInfo.indices()) {
final IndexId indexId = new IndexId(indexName, indexName);
if (indexToSnapshots.containsKey(indexId)) {
indexToSnapshots.get(indexId).add(snapshotId);
} else {
indexToSnapshots.put(indexId, Sets.newHashSet(snapshotId));
}
}
}
try {
final RepositoryData updatedRepoData = repositoryData.initIndices(indexToSnapshots);
if (isReadOnly() == false) {
// write the new index gen file with the indices included
writeIndexGen(updatedRepoData);
}
return updatedRepoData;
} catch (IOException e) {
throw new RepositoryException(metadata.name(), "failed to update the repository index blob with indices data on startup", e);
}
}
@Override @Override
public void deleteSnapshot(SnapshotId snapshotId) { public void deleteSnapshot(SnapshotId snapshotId) {
if (isReadOnly()) { if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
} }
final RepositoryData repositoryData = getRepositoryData();
List<String> indices = Collections.emptyList(); List<String> indices = Collections.emptyList();
SnapshotInfo snapshot = null; SnapshotInfo snapshot = null;
try { try {
@ -350,36 +396,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
MetaData metaData = null; MetaData metaData = null;
try { try {
if (snapshot != null) { if (snapshot != null) {
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true); metaData = readSnapshotMetaData(snapshotId, snapshot.version(), repositoryData.resolveIndices(indices), true);
} else { } else {
metaData = readSnapshotMetaData(snapshotId, null, indices, true); metaData = readSnapshotMetaData(snapshotId, null, repositoryData.resolveIndices(indices), true);
} }
} catch (IOException | SnapshotException ex) { } catch (IOException | SnapshotException ex) {
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId); logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
} }
try { try {
final String snapshotName = snapshotId.getName(); // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
// Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK writeIndexGen(repositoryData.removeSnapshot(snapshotId));
if (snapshot != null) {
snapshotFormat(snapshot.version()).delete(snapshotsBlobContainer, blobId(snapshotId)); // delete the snapshot file
globalMetaDataFormat(snapshot.version()).delete(snapshotsBlobContainer, snapshotName); safeSnapshotBlobDelete(snapshot, snapshotId.getUUID());
} else { // delete the global metadata file
// We don't know which version was the snapshot created with - try deleting both current and legacy formats safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getUUID());
snapshotFormat.delete(snapshotsBlobContainer, blobId(snapshotId));
snapshotLegacyFormat.delete(snapshotsBlobContainer, snapshotName);
globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, snapshotName);
globalMetaDataFormat.delete(snapshotsBlobContainer, snapshotName);
}
// Delete snapshot from the snapshot list
List<SnapshotId> snapshotIds = getSnapshots().stream().filter(id -> snapshotId.equals(id) == false).collect(Collectors.toList());
writeSnapshotsToIndexGen(snapshotIds);
// Now delete all indices // Now delete all indices
for (String index : indices) { for (String index : indices) {
BlobPath indexPath = basePath().add("indices").add(index); final IndexId indexId = repositoryData.resolveIndexId(index);
BlobPath indexPath = basePath().add("indices").add(indexId.getId());
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try { try {
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getName()); indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getUUID());
} catch (IOException ex) { } catch (IOException ex) {
logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index); logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index);
} }
@ -388,7 +427,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (indexMetaData != null) { if (indexMetaData != null) {
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
try { try {
delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); delete(snapshotId, snapshot.version(), indexId, new ShardId(indexMetaData.getIndex(), shardId));
} catch (SnapshotException ex) { } catch (SnapshotException ex) {
logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId); logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId);
} }
@ -401,28 +440,77 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
snapshotFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn("[{}] Unable to delete snapshot file [{}]", e, snapshotInfo.snapshotId(), blobId);
}
} else {
// we don't know the version, first try the current format, then the legacy format
try {
snapshotFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// now try legacy format
try {
snapshotLegacyFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e2) {
// neither snapshot file could be deleted, log the error
logger.warn("Unable to delete snapshot file [{}]", e, blobId);
}
}
}
}
private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) {
if (snapshotInfo != null) {
// we know the version the snapshot was created with
try {
globalMetaDataFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
logger.warn("[{}] Unable to delete global metadata file [{}]", e, snapshotInfo.snapshotId(), blobId);
}
} else {
// we don't know the version, first try the current format, then the legacy format
try {
globalMetaDataFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e) {
// now try legacy format
try {
globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, blobId);
} catch (IOException e2) {
// neither global metadata file could be deleted, log the error
logger.warn("Unable to delete global metadata file [{}]", e, blobId);
}
}
}
}
/**
* {@inheritDoc}
*/
@Override @Override
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<String> indices, final List<IndexId> indices,
final long startTime, final long startTime,
final String failure, final String failure,
final int totalShards, final int totalShards,
final List<SnapshotShardFailure> shardFailures) { final List<SnapshotShardFailure> shardFailures) {
try { try {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices, indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, startTime,
failure, failure,
System.currentTimeMillis(), System.currentTimeMillis(),
totalShards, totalShards,
shardFailures); shardFailures);
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, blobId(snapshotId)); snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
List<SnapshotId> snapshotIds = getSnapshots(); final RepositoryData repositoryData = getRepositoryData();
List<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
if (!snapshotIds.contains(snapshotId)) { if (!snapshotIds.contains(snapshotId)) {
snapshotIds = new ArrayList<>(snapshotIds); writeIndexGen(repositoryData.addSnapshot(snapshotId, indices));
snapshotIds.add(snapshotId);
snapshotIds = Collections.unmodifiableList(snapshotIds);
writeSnapshotsToIndexGen(snapshotIds);
} }
return blobStoreSnapshot; return blobStoreSnapshot;
} catch (IOException ex) { } catch (IOException ex) {
@ -430,27 +518,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
@Override
public List<SnapshotId> getSnapshots() { public List<SnapshotId> getSnapshots() {
try { return getRepositoryData().getSnapshotIds();
return Collections.unmodifiableList(readSnapshotsFromIndex());
} catch (NoSuchFileException | FileNotFoundException e) {
// its a fresh repository, no index file exists, so return an empty list
return Collections.emptyList();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "failed to list snapshots in repository", ioe);
}
} }
@Override @Override
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException { public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false); return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false);
} }
@Override @Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try { try {
return snapshotFormat.read(snapshotsBlobContainer, blobId(snapshotId)); return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getUUID());
} catch (FileNotFoundException | NoSuchFileException ex) { } catch (FileNotFoundException | NoSuchFileException ex) {
// File is missing - let's try legacy format instead // File is missing - let's try legacy format instead
try { try {
@ -465,13 +545,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException { private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<IndexId> indices, boolean ignoreIndexErrors) throws IOException {
MetaData metaData; MetaData metaData;
if (snapshotVersion == null) { if (snapshotVersion == null) {
// When we delete corrupted snapshots we might not know which version we are dealing with // When we delete corrupted snapshots we might not know which version we are dealing with
// We can try detecting the version based on the metadata file format // We can try detecting the version based on the metadata file format
assert ignoreIndexErrors; assert ignoreIndexErrors;
if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getName())) { if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) {
snapshotVersion = Version.CURRENT; snapshotVersion = Version.CURRENT;
} else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) { } else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) {
throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old"); throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old");
@ -480,21 +560,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
try { try {
metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getName()); metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getUUID());
} catch (FileNotFoundException | NoSuchFileException ex) { } catch (FileNotFoundException | NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex); throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) { } catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex); throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
} }
MetaData.Builder metaDataBuilder = MetaData.builder(metaData); MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
for (String index : indices) { for (IndexId index : indices) {
BlobPath indexPath = basePath().add("indices").add(index); BlobPath indexPath = basePath().add("indices").add(index.getId());
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
try { try {
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getName()), false); metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getUUID()), false);
} catch (ElasticsearchParseException | IOException ex) { } catch (ElasticsearchParseException | IOException ex) {
if (ignoreIndexErrors) { if (ignoreIndexErrors) {
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index); logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index.getName());
} else { } else {
throw ex; throw ex;
} }
@ -562,10 +642,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
private static final String SNAPSHOTS = "snapshots";
private static final String NAME = "name";
private static final String UUID = "uuid";
@Override @Override
public long getSnapshotThrottleTimeInNanos() { public long getSnapshotThrottleTimeInNanos() {
return snapshotRateLimitingTimeInNanos.count(); return snapshotRateLimitingTimeInNanos.count();
@ -609,6 +685,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
@Override
public RepositoryData getRepositoryData() {
try {
final long indexGen = latestIndexBlobId();
final String snapshotsIndexBlobName;
final boolean legacyFormat;
if (indexGen == -1) {
// index-N file doesn't exist, either its a fresh repository, or its in the
// old format, so look for the older index file before returning an empty list
snapshotsIndexBlobName = SNAPSHOTS_FILE;
legacyFormat = true;
} else {
snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
legacyFormat = false;
}
RepositoryData repositoryData;
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
try (XContentParser parser = XContentHelper.createParser(out.bytes())) {
repositoryData = RepositoryData.fromXContent(parser);
}
}
if (legacyFormat) {
// pre 5.0 repository data needs to be updated to include the indices
repositoryData = upgradeRepositoryData(repositoryData);
}
return repositoryData;
} catch (NoSuchFileException nsfe) {
// repository doesn't have an index blob, its a new blank repo
return RepositoryData.EMPTY;
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
}
public static String testBlobPrefix(String seed) { public static String testBlobPrefix(String seed) {
return TESTS_FILE + seed; return TESTS_FILE + seed;
} }
@ -623,35 +736,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return snapshotsBlobContainer; return snapshotsBlobContainer;
} }
protected void writeSnapshotsToIndexGen(final List<SnapshotId> snapshots) throws IOException { protected void writeIndexGen(final RepositoryData repositoryData) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository assert isReadOnly() == false; // can not write to a read only repository
final BytesReference snapshotsBytes; final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) { try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) { try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
builder.startObject(); repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.startArray(SNAPSHOTS);
for (SnapshotId snapshot : snapshots) {
builder.startObject();
builder.field(NAME, snapshot.getName());
builder.field(UUID, snapshot.getUUID());
builder.endObject();
}
builder.endArray();
builder.endObject();
builder.close(); builder.close();
} }
snapshotsBytes = bStream.bytes(); snapshotsBytes = bStream.bytes();
} }
final long gen = latestIndexBlobId() + 1; final long gen = latestIndexBlobId() + 1;
// write the index file // write the index file
writeAtomic(SNAPSHOTS_FILE_PREFIX + Long.toString(gen), snapshotsBytes); writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
// delete the N-2 index file if it exists, keep the previous one around as a backup // delete the N-2 index file if it exists, keep the previous one around as a backup
if (isReadOnly() == false && gen - 2 >= 0) { if (isReadOnly() == false && gen - 2 >= 0) {
final String oldSnapshotIndexFile = SNAPSHOTS_FILE_PREFIX + Long.toString(gen - 2); final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2);
if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) { if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) {
snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile); snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile);
} }
// delete the old index file (non-generational) if it exists
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE)) {
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE);
}
} }
// write the current generation to the index-latest file // write the current generation to the index-latest file
@ -660,72 +768,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
bStream.writeLong(gen); bStream.writeLong(gen);
genBytes = bStream.bytes(); genBytes = bStream.bytes();
} }
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_INDEX_LATEST_BLOB)) { if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) {
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_INDEX_LATEST_BLOB); snapshotsBlobContainer.deleteBlob(INDEX_LATEST_BLOB);
} }
writeAtomic(SNAPSHOTS_INDEX_LATEST_BLOB, genBytes); writeAtomic(INDEX_LATEST_BLOB, genBytes);
}
protected List<SnapshotId> readSnapshotsFromIndex() throws IOException {
final long indexGen = latestIndexBlobId();
final String snapshotsIndexBlobName;
if (indexGen == -1) {
// index-N file doesn't exist, either its a fresh repository, or its in the
// old format, so look for the older index file before returning an empty list
snapshotsIndexBlobName = SNAPSHOTS_FILE;
} else {
snapshotsIndexBlobName = SNAPSHOTS_FILE_PREFIX + Long.toString(indexGen);
}
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out);
ArrayList<SnapshotId> snapshots = new ArrayList<>();
try (XContentParser parser = XContentHelper.createParser(out.bytes())) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if (SNAPSHOTS.equals(currentFieldName)) {
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
// the new format from 5.0 which contains the snapshot name and uuid
String name = null;
String uuid = null;
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
currentFieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(currentFieldName)) {
name = parser.text();
} else if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
snapshots.add(new SnapshotId(name, uuid));
}
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
else {
name = parser.text();
snapshots.add(new SnapshotId(name, SnapshotId.UNASSIGNED_UUID));
}
}
}
}
}
}
}
return Collections.unmodifiableList(snapshots);
}
}
// Package private for testing
static String blobId(final SnapshotId snapshotId) {
final String uuid = snapshotId.getUUID();
if (uuid.equals(SnapshotId.UNASSIGNED_UUID)) {
// the old snapshot blob naming
return snapshotId.getName();
}
return snapshotId.getName() + "-" + uuid;
} }
/** /**
@ -762,7 +808,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// package private for testing // package private for testing
long readSnapshotIndexLatestBlob() throws IOException { long readSnapshotIndexLatestBlob() throws IOException {
try (InputStream blob = snapshotsBlobContainer.readBlob(SNAPSHOTS_INDEX_LATEST_BLOB)) { try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) {
BytesStreamOutput out = new BytesStreamOutput(); BytesStreamOutput out = new BytesStreamOutput();
Streams.copy(blob, out); Streams.copy(blob, out);
return Numbers.bytesToLong(out.bytes().toBytesRef()); return Numbers.bytesToLong(out.bytes().toBytesRef());
@ -770,7 +816,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
private long listBlobsToGetLatestIndexId() throws IOException { private long listBlobsToGetLatestIndexId() throws IOException {
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOTS_FILE_PREFIX); Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
long latest = -1; long latest = -1;
if (blobs.isEmpty()) { if (blobs.isEmpty()) {
// no snapshot index blobs have been written yet // no snapshot index blobs have been written yet
@ -779,7 +825,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
for (final BlobMetaData blobMetaData : blobs.values()) { for (final BlobMetaData blobMetaData : blobs.values()) {
final String blobName = blobMetaData.name(); final String blobName = blobMetaData.name();
try { try {
final long curr = Long.parseLong(blobName.substring(SNAPSHOTS_FILE_PREFIX.length())); final long curr = Long.parseLong(blobName.substring(INDEX_FILE_PREFIX.length()));
latest = Math.max(latest, curr); latest = Math.max(latest, curr);
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
// the index- blob wasn't of the format index-N where N is a number, // the index- blob wasn't of the format index-N where N is a number,
@ -802,9 +848,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
@Override @Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus); SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus);
snapshotStatus.startTime(System.currentTimeMillis()); snapshotStatus.startTime(System.currentTimeMillis());
try { try {
@ -824,8 +872,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
@Override @Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState); final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
try { try {
snapshotContext.restore(); snapshotContext.restore();
} catch (Exception e) { } catch (Exception e) {
@ -834,8 +882,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
@Override @Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId); Context context = new Context(snapshotId, version, indexId, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
status.updateStage(IndexShardSnapshotStatus.Stage.DONE); status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
@ -869,8 +917,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param shardId shard id * @param shardId shard id
*/ */
public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { private void delete(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
Context context = new Context(snapshotId, version, shardId, shardId); Context context = new Context(snapshotId, version, indexId, shardId, shardId);
context.delete(); context.delete();
} }
@ -903,15 +951,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
protected final Version version; protected final Version version;
public Context(SnapshotId snapshotId, Version version, ShardId shardId) { public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
this(snapshotId, version, shardId, shardId); this(snapshotId, version, indexId, shardId, shardId);
} }
public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId; this.snapshotId = snapshotId;
this.version = version; this.version = version;
this.shardId = shardId; this.shardId = shardId;
blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())));
} }
/** /**
@ -930,7 +978,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
int fileListGeneration = tuple.v2(); int fileListGeneration = tuple.v2();
try { try {
indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getUUID());
} catch (IOException e) { } catch (IOException e) {
logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId);
} }
@ -951,7 +999,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/ */
public BlobStoreIndexShardSnapshot loadSnapshot() { public BlobStoreIndexShardSnapshot loadSnapshot() {
try { try {
return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getUUID());
} catch (IOException ex) { } catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex);
} }
@ -1080,7 +1128,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try { try {
BlobStoreIndexShardSnapshot snapshot = null; BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) { if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); snapshot = indexShardSnapshotFormat.readBlob(blobContainer, snapshotId.getUUID());
} else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name);
} }
@ -1109,10 +1157,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* *
* @param shard shard to be snapshotted * @param shard shard to be snapshotted
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param indexId the id of the index being snapshotted
* @param snapshotStatus snapshot status to report progress * @param snapshotStatus snapshot status to report progress
*/ */
public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) { public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, Version.CURRENT, shard.shardId()); super(snapshotId, Version.CURRENT, indexId, shard.shardId());
this.snapshotStatus = snapshotStatus; this.snapshotStatus = snapshotStatus;
this.store = shard.store(); this.store = shard.store();
} }
@ -1220,7 +1269,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
//TODO: The time stored in snapshot doesn't include cleanup time. //TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try { try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID());
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
} }
@ -1396,11 +1445,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* *
* @param shard shard to restore into * @param shard shard to restore into
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param indexId id of the index being restored
* @param snapshotShardId shard in the snapshot that data should be restored from * @param snapshotShardId shard in the snapshot that data should be restored from
* @param recoveryState recovery state to report progress * @param recoveryState recovery state to report progress
*/ */
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
super(snapshotId, version, shard.shardId(), snapshotShardId); super(snapshotId, version, indexId, shard.shardId(), snapshotShardId);
this.recoveryState = recoveryState; this.recoveryState = recoveryState;
store = shard.store(); store = shard.store();
} }
@ -1574,6 +1624,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} }
} }
} }
} }
} }

View File

@ -63,8 +63,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
@ -185,7 +187,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
try { try {
// Read snapshot info and metadata from the repository // Read snapshot info and metadata from the repository
Repository repository = repositoriesService.repository(request.repositoryName); Repository repository = repositoriesService.repository(request.repositoryName);
final Optional<SnapshotId> matchingSnapshotId = repository.getSnapshots().stream() final RepositoryData repositoryData = repository.getRepositoryData();
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
.filter(s -> request.snapshotName.equals(s.getName())).findFirst(); .filter(s -> request.snapshotName.equals(s.getName())).findFirst();
if (matchingSnapshotId.isPresent() == false) { if (matchingSnapshotId.isPresent() == false) {
throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist"); throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist");
@ -194,7 +197,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId); final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, filteredIndices); MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
final MetaData metaData; final MetaData metaData;
if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) { if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) {

View File

@ -22,6 +22,9 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
@ -29,12 +32,10 @@ import java.util.Objects;
/** /**
* SnapshotId - snapshot name + snapshot UUID * SnapshotId - snapshot name + snapshot UUID
*/ */
public final class SnapshotId implements Writeable { public final class SnapshotId implements Writeable, ToXContent {
/** private static final String NAME = "name";
* This value is for older snapshots that don't have a UUID. private static final String UUID = "uuid";
*/
public static final String UNASSIGNED_UUID = "_na_";
private final String name; private final String name;
private final String uuid; private final String uuid;
@ -115,4 +116,35 @@ public final class SnapshotId implements Writeable {
out.writeString(uuid); out.writeString(uuid);
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(UUID, uuid);
builder.endObject();
return builder;
}
public static SnapshotId fromXContent(XContentParser parser) throws IOException {
// the new format from 5.0 which contains the snapshot name and uuid
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
String name = null;
String uuid = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(currentFieldName)) {
name = parser.text();
} else if (UUID.equals(currentFieldName)) {
uuid = parser.text();
}
}
return new SnapshotId(name, uuid);
} else {
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
final String name = parser.text();
return new SnapshotId(name, name);
}
}
} }

View File

@ -458,7 +458,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
} }
if (uuid == null) { if (uuid == null) {
// the old format where there wasn't a UUID // the old format where there wasn't a UUID
uuid = SnapshotId.UNASSIGNED_UUID; uuid = name;
} }
return new SnapshotInfo(new SnapshotId(name, uuid), return new SnapshotInfo(new SnapshotId(name, uuid),
indices, indices,

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -66,6 +67,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
@ -208,8 +211,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = new HashMap<>(); Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = new HashMap<>();
// Now go through all snapshots and update existing or create missing // Now go through all snapshots and update existing or create missing
final String localNodeId = clusterService.localNode().getId(); final String localNodeId = clusterService.localNode().getId();
final Map<Snapshot, Map<String, IndexId>> snapshotIndices = new HashMap<>();
if (snapshotsInProgress != null) { if (snapshotsInProgress != null) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
snapshotIndices.put(entry.snapshot(),
entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())));
if (entry.state() == SnapshotsInProgress.State.STARTED) { if (entry.state() == SnapshotsInProgress.State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>(); Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
@ -289,14 +295,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
if (newSnapshots.isEmpty() == false) { if (newSnapshots.isEmpty() == false) {
Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) { for (final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) {
Map<String, IndexId> indicesMap = snapshotIndices.get(entry.getKey());
assert indicesMap != null;
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) { for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
final ShardId shardId = shardEntry.getKey(); final ShardId shardId = shardEntry.getKey();
try { try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() { executor.execute(new AbstractRunnable() {
@Override @Override
public void doRun() { public void doRun() {
snapshot(indexShard, entry.getKey(), shardEntry.getValue()); snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
} }
@ -321,7 +331,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshot snapshot * @param snapshot snapshot
* @param snapshotStatus snapshot status * @param snapshotStatus snapshot status
*/ */
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) { private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) {
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
ShardId shardId = indexShard.shardId(); ShardId shardId = indexShard.shardId();
if (!indexShard.routingEntry().primary()) { if (!indexShard.routingEntry().primary()) {
@ -340,7 +350,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted // we flush first to make sure we get the latest writes snapshotted
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
try { try {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus); repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");

View File

@ -56,8 +56,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -132,7 +134,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public List<SnapshotId> snapshotIds(final String repositoryName) { public List<SnapshotId> snapshotIds(final String repositoryName) {
Repository repository = repositoriesService.repository(repositoryName); Repository repository = repositoriesService.repository(repositoryName);
assert repository != null; // should only be called once we've validated the repository exists assert repository != null; // should only be called once we've validated the repository exists
return repository.getSnapshots(); return repository.getRepositoryData().getSnapshotIds();
} }
/** /**
@ -218,6 +220,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final String snapshotName = request.snapshotName; final String snapshotName = request.snapshotName;
validate(repositoryName, snapshotName); validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
@ -232,11 +235,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Store newSnapshot here to be processed in clusterStateProcessed // Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.includeGlobalState(),
request.partial(), request.partial(),
State.INIT, State.INIT,
indices, snapshotIndices,
System.currentTimeMillis(), System.currentTimeMillis(),
null); null);
snapshots = new SnapshotsInProgress(newSnapshot); snapshots = new SnapshotsInProgress(newSnapshot);
@ -334,8 +338,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (!snapshot.includeGlobalState()) { if (!snapshot.includeGlobalState()) {
// Remove global state from the cluster state // Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder(); MetaData.Builder builder = MetaData.builder();
for (String index : snapshot.indices()) { for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index), false); builder.put(metaData.index(index.getName()), false);
} }
metaData = builder.build(); metaData = builder.build();
} }
@ -473,7 +477,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices(), entry.startTime()); return new SnapshotInfo(entry.snapshot().getSnapshotId(),
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
entry.startTime());
} }
/** /**
@ -546,8 +552,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotInfo snapshotInfo) throws IOException { final SnapshotInfo snapshotInfo) throws IOException {
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>(); Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
Repository repository = repositoriesService.repository(repositoryName); Repository repository = repositoriesService.repository(repositoryName);
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, snapshotInfo.indices()); RepositoryData repositoryData = repository.getRepositoryData();
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(snapshotInfo.indices()));
for (String index : snapshotInfo.indices()) { for (String index : snapshotInfo.indices()) {
IndexId indexId = repositoryData.resolveIndexId(index);
IndexMetaData indexMetaData = metaData.indices().get(index); IndexMetaData indexMetaData = metaData.indices().get(index);
if (indexMetaData != null) { if (indexMetaData != null) {
int numberOfShards = indexMetaData.getNumberOfShards(); int numberOfShards = indexMetaData.getNumberOfShards();
@ -561,7 +569,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
shardStatus.put(shardId, shardSnapshotStatus); shardStatus.put(shardId, shardSnapshotStatus);
} else { } else {
IndexShardSnapshotStatus shardSnapshotStatus = IndexShardSnapshotStatus shardSnapshotStatus =
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), indexId, shardId);
shardStatus.put(shardId, shardSnapshotStatus); shardStatus.put(shardId, shardSnapshotStatus);
} }
} }
@ -953,7 +961,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) { public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) {
// First, look for the snapshot in the repository // First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName); final Repository repository = repositoriesService.repository(repositoryName);
Optional<SnapshotId> matchedEntry = repository.getSnapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst(); Optional<SnapshotId> matchedEntry = repository.getRepositoryData().getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// if nothing found by the same name, then look in the cluster state for current in progress snapshots // if nothing found by the same name, then look in the cluster state for current in progress snapshots
if (matchedEntry.isPresent() == false) { if (matchedEntry.isPresent() == false) {
matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream() matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream()
@ -1121,21 +1132,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param indices list of indices to be snapshotted * @param indices list of indices to be snapshotted
* @return list of shard to be included into current snapshot * @return list of shard to be included into current snapshot
*/ */
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<String> indices) { private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<IndexId> indices) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData(); MetaData metaData = clusterState.metaData();
for (String index : indices) { for (IndexId index : indices) {
IndexMetaData indexMetaData = metaData.index(index); final String indexName = index.getName();
IndexMetaData indexMetaData = metaData.index(indexName);
if (indexMetaData == null) { if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing. // The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(index, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i); ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed")); builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
} }
} else { } else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index); IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i); ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
if (indexRoutingTable != null) { if (indexRoutingTable != null) {
@ -1191,8 +1203,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.partial() == false) { if (entry.partial() == false) {
if (entry.state() == State.INIT) { if (entry.state() == State.INIT) {
for (String index : entry.indices()) { for (IndexId index : entry.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(index); IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
if (indexMetaData != null && indices.contains(indexMetaData)) { if (indexMetaData != null && indices.contains(indexMetaData)) {
if (indicesToFail == null) { if (indicesToFail == null) {
indicesToFail = new HashSet<>(); indicesToFail = new HashSet<>();

View File

@ -46,7 +46,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
* as blob names and repository blob formats have changed between the snapshot versions. * as blob names and repository blob formats have changed between the snapshot versions.
*/ */
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
// this test sometimes fails in recovery when the recovery is reset, increasing the logging level to help debug // this test sometimes fails in recovery when the recovery is reset, increasing the logging level to help debug
@TestLogging("indices.recovery:DEBUG") @TestLogging("indices.recovery:DEBUG")
public class RepositoryUpgradabilityIT extends AbstractSnapshotIntegTestCase { public class RepositoryUpgradabilityIT extends AbstractSnapshotIntegTestCase {
@ -70,7 +70,7 @@ public class RepositoryUpgradabilityIT extends AbstractSnapshotIntegTestCase {
final Set<SnapshotInfo> snapshotInfos = Sets.newHashSet(getSnapshots(repoName)); final Set<SnapshotInfo> snapshotInfos = Sets.newHashSet(getSnapshots(repoName));
assertThat(snapshotInfos.size(), equalTo(1)); assertThat(snapshotInfos.size(), equalTo(1));
SnapshotInfo originalSnapshot = snapshotInfos.iterator().next(); SnapshotInfo originalSnapshot = snapshotInfos.iterator().next();
assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", SnapshotId.UNASSIGNED_UUID))); assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", "test_1")));
assertThat(Sets.newHashSet(originalSnapshot.indices()), equalTo(indices)); assertThat(Sets.newHashSet(originalSnapshot.indices()), equalTo(indices));
logger.info("--> restore the original snapshot"); logger.info("--> restore the original snapshot");

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collections; import java.util.Collections;
@ -659,7 +658,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
randomBoolean(), randomBoolean(),
randomBoolean(), randomBoolean(),
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)), SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.<String>emptyList(), Collections.emptyList(),
Math.abs(randomLong()), Math.abs(randomLong()),
ImmutableOpenMap.of())); ImmutableOpenMap.of()));
case 1: case 1:

View File

@ -100,7 +100,9 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotInfo;
@ -121,8 +123,10 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -1184,9 +1188,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
test_target_shard.updateRoutingEntry(routing); test_target_shard.updateRoutingEntry(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode)); test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() { assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override @Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
try { try {
cleanLuceneIndex(targetStore.directory()); cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) { for (String file : sourceStore.directory().listAll()) {
@ -1645,8 +1649,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
/** A dummy repository for testing which just needs restore overridden */ /** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
public RestoreOnlyRepository() { private final String indexName;
public RestoreOnlyRepository(String indexName) {
super(Settings.EMPTY); super(Settings.EMPTY);
this.indexName = indexName;
} }
@Override @Override
protected void doStart() {} protected void doStart() {}
@ -1663,17 +1669,19 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return null; return null;
} }
@Override @Override
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException { public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
return null; return null;
} }
@Override @Override
public List<SnapshotId> getSnapshots() { public RepositoryData getRepositoryData() {
return null; Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), Collections.emptySet());
return new RepositoryData(Collections.emptyList(), map);
} }
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {} public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {}
@Override @Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) { public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
return null; return null;
} }
@Override @Override
@ -1697,9 +1705,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return false; return false;
} }
@Override @Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
@Override @Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return null; return null;
} }
@Override @Override

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
/**
* Tests for the {@link IndexId} class.
*/
public class IndexIdTests extends ESTestCase {
public void testEqualsAndHashCode() {
// assert equals and hashcode
String name = randomAsciiOfLength(8);
String id = UUIDs.randomBase64UUID();
IndexId indexId1 = new IndexId(name, id);
IndexId indexId2 = new IndexId(name, id);
assertEquals(indexId1, indexId2);
assertEquals(indexId1.hashCode(), indexId2.hashCode());
// assert equals when using index name for id
id = name;
indexId1 = new IndexId(name, id);
indexId2 = new IndexId(name, id);
assertEquals(indexId1, indexId2);
assertEquals(indexId1.hashCode(), indexId2.hashCode());
//assert not equals when name or id differ
indexId2 = new IndexId(randomAsciiOfLength(8), id);
assertNotEquals(indexId1, indexId2);
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
indexId2 = new IndexId(name, UUIDs.randomBase64UUID());
assertNotEquals(indexId1, indexId2);
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
}
public void testSerialization() throws IOException {
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
BytesStreamOutput out = new BytesStreamOutput();
indexId.writeTo(out);
assertEquals(indexId, new IndexId(out.bytes().streamInput()));
}
public void testXContent() throws IOException {
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
XContentBuilder builder = JsonXContent.contentBuilder();
indexId.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
String name = null;
String id = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String currentFieldName = parser.currentName();
parser.nextToken();
if (currentFieldName.equals(IndexId.NAME)) {
name = parser.text();
} else if (currentFieldName.equals(IndexId.ID)) {
id = parser.text();
}
}
assertNotNull(name);
assertNotNull(id);
assertEquals(indexId, new IndexId(name, id));
}
}

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.greaterThan;
/**
* Tests for the {@link RepositoryData} class.
*/
public class RepositoryDataTests extends ESTestCase {
public void testEqualsAndHashCode() {
RepositoryData repositoryData1 = generateRandomRepoData();
RepositoryData repositoryData2 = repositoryData1.copy();
assertEquals(repositoryData1, repositoryData2);
assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode());
}
public void testXContent() throws IOException {
RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = JsonXContent.contentBuilder();
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
assertEquals(repositoryData, RepositoryData.fromXContent(parser));
}
public void testAddSnapshots() {
RepositoryData repositoryData = generateRandomRepoData();
// test that adding the same snapshot id to the repository data throws an exception
final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0);
Map<String, IndexId> indexIdMap = repositoryData.getIndices();
expectThrows(IllegalArgumentException.class,
() -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList()));
// test that adding a snapshot and its indices works
SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
List<IndexId> indices = new ArrayList<>();
Set<IndexId> newIndices = new HashSet<>();
int numNew = randomIntBetween(1, 10);
for (int i = 0; i < numNew; i++) {
IndexId indexId = new IndexId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
newIndices.add(indexId);
indices.add(indexId);
}
int numOld = randomIntBetween(1, indexIdMap.size());
List<String> indexNames = new ArrayList<>(indexIdMap.keySet());
for (int i = 0; i < numOld; i++) {
indices.add(indexIdMap.get(indexNames.get(i)));
}
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, indices);
// verify that the new repository data has the new snapshot and its indices
assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
for (IndexId indexId : indices) {
Set<SnapshotId> snapshotIds = newRepoData.getSnapshots(indexId);
assertTrue(snapshotIds.contains(newSnapshot));
if (newIndices.contains(indexId)) {
assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set
}
}
}
public void testInitIndices() {
final int numSnapshots = randomIntBetween(1, 30);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
// test that initializing indices works
Map<IndexId, Set<SnapshotId>> indices = randomIndices(snapshotIds);
RepositoryData newRepoData = repositoryData.initIndices(indices);
assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds());
for (IndexId indexId : indices.keySet()) {
assertEquals(indices.get(indexId), newRepoData.getSnapshots(indexId));
}
}
public void testRemoveSnapshot() {
RepositoryData repositoryData = generateRandomRepoData();
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
assertThat(snapshotIds.size(), greaterThan(0));
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId);
// make sure the repository data's indices no longer contain the removed snapshot
for (final IndexId indexId : newRepositoryData.getIndices().values()) {
assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
}
}
public void testResolveIndexId() {
RepositoryData repositoryData = generateRandomRepoData();
Map<String, IndexId> indices = repositoryData.getIndices();
Set<String> indexNames = indices.keySet();
assertThat(indexNames.size(), greaterThan(0));
String indexName = indexNames.iterator().next();
IndexId indexId = indices.get(indexName);
assertEquals(indexId, repositoryData.resolveIndexId(indexName));
String notInRepoData = randomAsciiOfLength(5);
assertFalse(indexName.contains(notInRepoData));
assertEquals(new IndexId(notInRepoData, notInRepoData), repositoryData.resolveIndexId(notInRepoData));
}
public static RepositoryData generateRandomRepoData() {
return generateRandomRepoData(new ArrayList<>());
}
public static RepositoryData generateRandomRepoData(final List<SnapshotId> origSnapshotIds) {
List<SnapshotId> snapshotIds = randomSnapshots(origSnapshotIds);
return new RepositoryData(snapshotIds, randomIndices(snapshotIds));
}
private static List<SnapshotId> randomSnapshots(final List<SnapshotId> origSnapshotIds) {
final int numSnapshots = randomIntBetween(1, 30);
final List<SnapshotId> snapshotIds = new ArrayList<>(origSnapshotIds);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
return snapshotIds;
}
private static Map<IndexId, Set<SnapshotId>> randomIndices(final List<SnapshotId> snapshotIds) {
final int totalSnapshots = snapshotIds.size();
final int numIndices = randomIntBetween(1, 30);
final Map<IndexId, Set<SnapshotId>> indices = new HashMap<>(numIndices);
for (int i = 0; i < numIndices; i++) {
final IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
final Set<SnapshotId> indexSnapshots = new LinkedHashSet<>();
final int numIndicesForSnapshot = randomIntBetween(1, numIndices);
for (int j = 0; j < numIndicesForSnapshot; j++) {
indexSnapshots.add(snapshotIds.get(randomIntBetween(0, totalSnapshots - 1)));
}
indices.put(indexId, indexSnapshots);
}
return indices;
}
}

View File

@ -28,11 +28,11 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.ESSingleNodeTestCase;
@ -44,7 +44,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.blobId; import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
/** /**
@ -109,86 +109,56 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo(); final BlobStoreRepository repository = setupRepo();
// write to and read from a snapshot file with no entries // write to and read from a index file with no entries
assertThat(repository.getSnapshots().size(), equalTo(0));
repository.writeSnapshotsToIndexGen(Collections.emptyList());
assertThat(repository.getSnapshots().size(), equalTo(0)); assertThat(repository.getSnapshots().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
repository.writeIndexGen(emptyData);
final RepositoryData readData = repository.getRepositoryData();
assertEquals(readData, emptyData);
assertEquals(readData.getIndices().size(), 0);
assertEquals(readData.getSnapshotIds().size(), 0);
// write to and read from a snapshot file with a random number of entries // write to and read from an index file with snapshots but no indices
final int numSnapshots = randomIntBetween(1, 1000); final int numSnapshots = randomIntBetween(1, 20);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots); final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) { for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
} }
repository.writeSnapshotsToIndexGen(snapshotIds); RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
assertThat(repository.getSnapshots(), equalTo(snapshotIds)); repository.writeIndexGen(repositoryData);
assertEquals(repository.getRepositoryData(), repositoryData);
// write to and read from a index file with random repository data
repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
} }
public void testIndexGenerationalFiles() throws Exception { public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo(); final BlobStoreRepository repository = setupRepo();
// write to index generational file // write to index generational file
final int numSnapshots = randomIntBetween(1, 1000); RepositoryData repositoryData = generateRandomRepoData();
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots); repository.writeIndexGen(repositoryData);
for (int i = 0; i < numSnapshots; i++) { assertThat(repository.getRepositoryData(), equalTo(repositoryData));
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.latestIndexBlobId(), equalTo(0L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
// adding more and writing to a new index generational file // adding more and writing to a new index generational file
for (int i = 0; i < 10; i++) { repositoryData = generateRandomRepoData();
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); repository.writeIndexGen(repositoryData);
} assertEquals(repository.getRepositoryData(), repositoryData);
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.latestIndexBlobId(), equalTo(1L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
// removing a snapshot adn writing to a new index generational file // removing a snapshot and writing to a new index generational file
snapshotIds.remove(0); repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0));
repository.writeSnapshotsToIndexGen(snapshotIds); repository.writeIndexGen(repositoryData);
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds))); assertEquals(repository.getRepositoryData(), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.latestIndexBlobId(), equalTo(2L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
} }
public void testOldIndexFileFormat() throws Exception {
final BlobStoreRepository repository = setupRepo();
// write old index file format
final int numOldSnapshots = randomIntBetween(1, 50);
final List<SnapshotId> snapshotIds = new ArrayList<>();
for (int i = 0; i < numOldSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), SnapshotId.UNASSIGNED_UUID));
}
writeOldFormat(repository, snapshotIds.stream().map(SnapshotId::getName).collect(Collectors.toList()));
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
// write to and read from a snapshot file with a random number of new entries added
final int numSnapshots = randomIntBetween(1, 1000);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repository.writeSnapshotsToIndexGen(snapshotIds);
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
}
public void testBlobId() {
SnapshotId snapshotId = new SnapshotId("abc123", SnapshotId.UNASSIGNED_UUID);
assertThat(blobId(snapshotId), equalTo("abc123")); // just the snapshot name
snapshotId = new SnapshotId("abc-123", SnapshotId.UNASSIGNED_UUID);
assertThat(blobId(snapshotId), equalTo("abc-123")); // just the snapshot name
String uuid = UUIDs.randomBase64UUID();
snapshotId = new SnapshotId("abc123", uuid);
assertThat(blobId(snapshotId), equalTo("abc123-" + uuid)); // snapshot name + '-' + uuid
uuid = UUIDs.randomBase64UUID();
snapshotId = new SnapshotId("abc-123", uuid);
assertThat(blobId(snapshotId), equalTo("abc-123-" + uuid)); // snapshot name + '-' + uuid
}
private BlobStoreRepository setupRepo() { private BlobStoreRepository setupRepo() {
final Client client = client(); final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings()); final Path location = ESIntegTestCase.randomRepoPath(node().settings());

View File

@ -61,7 +61,9 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -884,7 +886,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> delete index metadata and shard metadata"); logger.info("--> delete index metadata and shard metadata");
Path metadata = repo.resolve("meta-test-snap-1.dat"); Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
Files.delete(metadata); Files.delete(metadata);
logger.info("--> delete snapshot"); logger.info("--> delete snapshot");
@ -917,7 +919,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable"); logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snap-test-snap-1-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10)); outChan.truncate(randomInt(10));
} }
@ -2017,6 +2019,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> emulate an orphan snapshot"); logger.info("--> emulate an orphan snapshot");
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
final IndexId indexId = repositoryData.resolveIndexId(idxName);
clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
@ -2033,7 +2038,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
true, true,
false, false,
State.ABORTED, State.ABORTED,
Collections.singletonList(idxName), Collections.singletonList(indexId),
System.currentTimeMillis(), System.currentTimeMillis(),
shards.build())); shards.build()));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build(); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build();
@ -2189,7 +2194,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> truncate snapshot file to make it unreadable"); logger.info("--> truncate snapshot file to make it unreadable");
Path snapshotPath = repo.resolve("snap-test-snap-2-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10)); outChan.truncate(randomInt(10));
} }

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
@ -112,8 +113,8 @@ public class MockRepository extends FsRepository {
} }
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) { public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
if (blockOnInitialization ) { if (blockOnInitialization) {
blockExecution(); blockExecution();
} }
super.initializeSnapshot(snapshotId, indices, clusterMetadata); super.initializeSnapshot(snapshotId, indices, clusterMetadata);

View File

@ -30,12 +30,12 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Map; import java.util.Map;
/** /**
@ -70,11 +70,12 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public InputStream readBlob(String blobName) throws IOException { public InputStream readBlob(String blobName) throws IOException {
logger.trace("readBlob({})", blobName); logger.trace("readBlob({})", blobName);
try { try {
return blobStore.getInputStream(blobStore.container(), buildKey(blobName)); return blobStore.getInputStream(blobStore.container(), buildKey(blobName));
} catch (StorageException e) { } catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage()); throw new NoSuchFileException(e.getMessage());
} }
throw new IOException(e); throw new IOException(e);
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
@ -103,7 +104,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName))); return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName)));
} catch (StorageException e) { } catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage()); throw new NoSuchFileException(e.getMessage());
} }
throw new IOException(e); throw new IOException(e);
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
@ -116,6 +117,11 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
logger.trace("deleteBlob({})", blobName); logger.trace("deleteBlob({})", blobName);
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
try { try {
blobStore.deleteBlob(blobStore.container(), buildKey(blobName)); blobStore.deleteBlob(blobStore.container(), buildKey(blobName));
} catch (URISyntaxException | StorageException e) { } catch (URISyntaxException | StorageException e) {

View File

@ -32,6 +32,8 @@ import org.elasticsearch.cloud.azure.storage.AzureStorageService;
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage; import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
@ -44,7 +46,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotId;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting; import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting;
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue; import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue;
@ -153,7 +154,7 @@ public class AzureRepository extends BlobStoreRepository {
} }
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) { public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
try { try {
if (!blobStore.doesContainerExist(blobStore.container())) { if (!blobStore.doesContainerExist(blobStore.container())) {
logger.debug("container [{}] does not exist. Creating...", blobStore.container()); logger.debug("container [{}] does not exist. Creating...", blobStore.container());

View File

@ -21,22 +21,19 @@ package org.elasticsearch.cloud.azure.storage;
import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -82,7 +79,7 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
@Override @Override
public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws IOException { public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws IOException {
if (!blobExists(account, mode, container, blob)) { if (!blobExists(account, mode, container, blob)) {
throw new FileNotFoundException("missing blob [" + blob + "]"); throw new NoSuchFileException("missing blob [" + blob + "]");
} }
return new ByteArrayInputStream(blobs.get(blob).toByteArray()); return new ByteArrayInputStream(blobs.get(blob).toByteArray());
} }
@ -99,13 +96,13 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder(); MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
for (String blobName : blobs.keySet()) { for (String blobName : blobs.keySet()) {
final String checkBlob; final String checkBlob;
if (keyPath != null) { if (keyPath != null && !keyPath.isEmpty()) {
// strip off key path from the beginning of the blob name // strip off key path from the beginning of the blob name
checkBlob = blobName.replace(keyPath, ""); checkBlob = blobName.replace(keyPath, "");
} else { } else {
checkBlob = blobName; checkBlob = blobName;
} }
if (startsWithIgnoreCase(checkBlob, prefix)) { if (prefix == null || startsWithIgnoreCase(checkBlob, prefix)) {
blobsBuilder.put(blobName, new PlainBlobMetaData(checkBlob, blobs.get(blobName).size())); blobsBuilder.put(blobName, new PlainBlobMetaData(checkBlob, blobs.get(blobName).size()));
} }
} }

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore;
import org.elasticsearch.cloud.azure.storage.AzureStorageServiceMock;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import java.io.IOException;
import java.net.URISyntaxException;
public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
try {
RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY);
AzureStorageServiceMock client = new AzureStorageServiceMock();
return new AzureBlobStore(repositoryMetaData, Settings.EMPTY, client);
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}
}

View File

@ -41,9 +41,9 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -196,7 +196,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
} catch (GoogleJsonResponseException e) { } catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails(); GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) { if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
throw new FileNotFoundException(e.getMessage()); throw new NoSuchFileException(e.getMessage());
} }
throw e; throw e;
} }
@ -227,6 +227,9 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @param blobName name of the blob * @param blobName name of the blob
*/ */
void deleteBlob(String blobName) throws IOException { void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
doPrivileged(() -> client.objects().delete(bucket, blobName).execute()); doPrivileged(() -> client.objects().delete(bucket, blobName).execute());
} }

View File

@ -32,9 +32,9 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -68,16 +68,16 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
try { if (!blobExists(blobName)) {
store.execute(new Operation<Boolean>() { throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
}
});
} catch (FileNotFoundException ok) {
// behaves like Files.deleteIfExists
} }
store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
}
});
} }
@Override @Override
@ -93,6 +93,9 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override @Override
public InputStream readBlob(String blobName) throws IOException { public InputStream readBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
// FSDataInputStream does buffering internally // FSDataInputStream does buffering internally
return store.execute(new Operation<InputStream>() { return store.execute(new Operation<InputStream>() {
@Override @Override

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import javax.security.auth.Subject;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collections;
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
return AccessController.doPrivileged(
new PrivilegedAction<HdfsBlobStore>() {
@Override
public HdfsBlobStore run() {
try {
FileContext fileContext = createContext(new URI("hdfs:///"));
return new HdfsBlobStore(fileContext, "temp", 1024);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
});
}
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
private FileContext createContext(URI uri) {
// mirrors HdfsRepository.java behaviour
Configuration cfg = new Configuration(true);
cfg.setClassLoader(HdfsRepository.class.getClassLoader());
cfg.reloadConfiguration();
Constructor<?> ctor;
Subject subject;
try {
Class<?> clazz = Class.forName("org.apache.hadoop.security.User");
ctor = clazz.getConstructor(String.class);
ctor.setAccessible(true);
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
try {
Principal principal = (Principal) ctor.newInstance(System.getProperty("user.name"));
subject = new Subject(false, Collections.singleton(principal),
Collections.emptySet(), Collections.emptySet());
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
// disable file system cache
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);
// set file system to TestingFs to avoid a bunch of security
// checks, similar to what is done in HdfsTests.java
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());
// create the FileContext with our user
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
try {
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
return FileContext.getFileContext(fs, cfg);
} catch (UnsupportedFileSystemException e) {
throw new RuntimeException(e);
}
}
});
}
}

View File

@ -37,10 +37,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedActionException; import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -89,7 +89,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
} else { } else {
if (e instanceof AmazonS3Exception) { if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) { if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage()); throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
} }
} }
throw e; throw e;
@ -115,6 +115,10 @@ public class S3BlobContainer extends AbstractBlobContainer {
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
try { try {
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName)); blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
} catch (AmazonClientException e) { } catch (AmazonClientException e) {

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AbstractAmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.util.Base64;
import java.io.IOException;
import java.io.InputStream;
import java.security.DigestInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
class MockAmazonS3 extends AbstractAmazonS3 {
private Map<String, InputStream> blobs = new ConcurrentHashMap<>();
// in ESBlobStoreContainerTestCase.java, the maximum
// length of the input data is 100 bytes
private byte[] byteCounter = new byte[100];
@Override
public boolean doesBucketExist(String bucket) {
return true;
}
@Override
public ObjectMetadata getObjectMetadata(
GetObjectMetadataRequest getObjectMetadataRequest)
throws AmazonClientException, AmazonServiceException {
String blobName = getObjectMetadataRequest.getKey();
if (!blobs.containsKey(blobName)) {
throw new AmazonS3Exception("[" + blobName + "] does not exist.");
}
return new ObjectMetadata(); // nothing is done with it
}
@Override
public PutObjectResult putObject(PutObjectRequest putObjectRequest)
throws AmazonClientException, AmazonServiceException {
String blobName = putObjectRequest.getKey();
DigestInputStream stream = (DigestInputStream) putObjectRequest.getInputStream();
if (blobs.containsKey(blobName)) {
throw new AmazonS3Exception("[" + blobName + "] already exists.");
}
blobs.put(blobName, stream);
// input and output md5 hashes need to match to avoid an exception
String md5 = Base64.encodeAsString(stream.getMessageDigest().digest());
PutObjectResult result = new PutObjectResult();
result.setContentMd5(md5);
return result;
}
@Override
public S3Object getObject(GetObjectRequest getObjectRequest)
throws AmazonClientException, AmazonServiceException {
// in ESBlobStoreContainerTestCase.java, the prefix is empty,
// so the key and blobName are equivalent to each other
String blobName = getObjectRequest.getKey();
if (!blobs.containsKey(blobName)) {
throw new AmazonS3Exception("[" + blobName + "] does not exist.");
}
// the HTTP request attribute is irrelevant for reading
S3ObjectInputStream stream = new S3ObjectInputStream(
blobs.get(blobName), null, false);
S3Object s3Object = new S3Object();
s3Object.setObjectContent(stream);
return s3Object;
}
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
throws AmazonClientException, AmazonServiceException {
MockObjectListing list = new MockObjectListing();
list.setTruncated(false);
String blobName;
String prefix = listObjectsRequest.getPrefix();
ArrayList<S3ObjectSummary> mockObjectSummaries = new ArrayList<>();
for (Map.Entry<String, InputStream> blob : blobs.entrySet()) {
blobName = blob.getKey();
S3ObjectSummary objectSummary = new S3ObjectSummary();
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
objectSummary.setKey(blobName);
try {
objectSummary.setSize(getSize(blob.getValue()));
} catch (IOException e) {
throw new AmazonS3Exception("Object listing " +
"failed for blob [" + blob.getKey() + "]");
}
mockObjectSummaries.add(objectSummary);
}
}
list.setObjectSummaries(mockObjectSummaries);
return list;
}
@Override
public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
throws AmazonClientException, AmazonServiceException {
String sourceBlobName = copyObjectRequest.getSourceKey();
String targetBlobName = copyObjectRequest.getDestinationKey();
if (!blobs.containsKey(sourceBlobName)) {
throw new AmazonS3Exception("Source blob [" +
sourceBlobName + "] does not exist.");
}
if (blobs.containsKey(targetBlobName)) {
throw new AmazonS3Exception("Target blob [" +
targetBlobName + "] already exists.");
}
blobs.put(targetBlobName, blobs.get(sourceBlobName));
return new CopyObjectResult(); // nothing is done with it
}
@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest)
throws AmazonClientException, AmazonServiceException {
String blobName = deleteObjectRequest.getKey();
if (!blobs.containsKey(blobName)) {
throw new AmazonS3Exception("[" + blobName + "] does not exist.");
}
blobs.remove(blobName);
}
private int getSize(InputStream stream) throws IOException {
int size = stream.read(byteCounter);
stream.reset(); // in case we ever need the size again
return size;
}
private class MockObjectListing extends ObjectListing {
// the objectSummaries attribute in ObjectListing.java
// is read-only, but we need to be able to write to it,
// so we create a mock of it to work around this
private List<S3ObjectSummary> mockObjectSummaries;
@Override
public List<S3ObjectSummary> getObjectSummaries() {
return mockObjectSummaries;
}
private void setObjectSummaries(List<S3ObjectSummary> objectSummaries) {
mockObjectSummaries = objectSummaries;
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import java.io.IOException;
import java.util.Locale;
public class S3BlobStoreContainerTests extends ESBlobStoreContainerTestCase {
protected BlobStore newBlobStore() throws IOException {
MockAmazonS3 client = new MockAmazonS3();
String bucket = randomAsciiOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
return new S3BlobStore(Settings.EMPTY, client, bucket, null, false,
new ByteSizeValue(10, ByteSizeUnit.MB), 5, "public-read-write", "standard");
}
}

View File

@ -112,17 +112,34 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
} }
} }
public void testDeleteBlob() throws IOException {
try (final BlobStore store = newBlobStore()) {
final String blobName = "foobar";
final BlobContainer container = store.blobContainer(new BlobPath());
expectThrows(IOException.class, () -> container.deleteBlob(blobName));
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data);
container.writeBlob(blobName, bytesArray);
container.deleteBlob(blobName); // should not raise
// blob deleted, so should raise again
expectThrows(IOException.class, () -> container.deleteBlob(blobName));
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/15579") @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/15579")
public void testOverwriteFails() throws IOException { public void testVerifyOverwriteFails() throws IOException {
try (final BlobStore store = newBlobStore()) { try (final BlobStore store = newBlobStore()) {
final String blobName = "foobar"; final String blobName = "foobar";
final BlobContainer container = store.blobContainer(new BlobPath()); final BlobContainer container = store.blobContainer(new BlobPath());
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
final BytesArray bytesArray = new BytesArray(data); final BytesArray bytesArray = new BytesArray(data);
container.writeBlob(blobName, bytesArray); container.writeBlob(blobName, bytesArray);
// should not be able to overwrite existing blob
expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray)); expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray));
container.deleteBlob(blobName); container.deleteBlob(blobName);
container.writeBlob(blobName, bytesArray); // deleted it, so should be able to write it again container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again
} }
} }