Index folder names now use a UUID (not the index UUID but one specific
to snapshot/restore) and the index to UUID mapping is stored in the repository index file.
This commit is contained in:
parent
a0a4d67eae
commit
d9ec959dfc
|
@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -70,12 +71,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
|
|||
private final boolean includeGlobalState;
|
||||
private final boolean partial;
|
||||
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
private final List<String> indices;
|
||||
private final List<IndexId> indices;
|
||||
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
|
||||
private final long startTime;
|
||||
|
||||
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime,
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
|
||||
long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
this.state = state;
|
||||
this.snapshot = snapshot;
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
|
@ -111,7 +112,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
|
|||
return state;
|
||||
}
|
||||
|
||||
public List<String> indices() {
|
||||
public List<IndexId> indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
|
@ -377,9 +378,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
|
|||
boolean partial = in.readBoolean();
|
||||
State state = State.fromValue(in.readByte());
|
||||
int indices = in.readVInt();
|
||||
List<String> indexBuilder = new ArrayList<>();
|
||||
List<IndexId> indexBuilder = new ArrayList<>();
|
||||
for (int j = 0; j < indices; j++) {
|
||||
indexBuilder.add(in.readString());
|
||||
indexBuilder.add(new IndexId(in.readString(), in.readString()));
|
||||
}
|
||||
long startTime = in.readLong();
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
|
@ -410,8 +411,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
|
|||
out.writeBoolean(entry.partial());
|
||||
out.writeByte(entry.state().value());
|
||||
out.writeVInt(entry.indices().size());
|
||||
for (String index : entry.indices()) {
|
||||
out.writeString(index);
|
||||
for (IndexId index : entry.indices()) {
|
||||
index.writeTo(out);
|
||||
}
|
||||
out.writeLong(entry.startTime());
|
||||
out.writeVInt(entry.shards().size());
|
||||
|
@ -458,8 +459,8 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
|
|||
builder.field(STATE, entry.state());
|
||||
builder.startArray(INDICES);
|
||||
{
|
||||
for (String index : entry.indices()) {
|
||||
builder.value(index);
|
||||
for (IndexId index : entry.indices()) {
|
||||
index.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.index.mapper.MapperService;
|
|||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -394,10 +395,12 @@ final class StoreRecovery {
|
|||
translogState.totalOperationsOnStart(0);
|
||||
indexShard.prepareForIndexRecovery();
|
||||
ShardId snapshotShardId = shardId;
|
||||
if (!shardId.getIndexName().equals(restoreSource.index())) {
|
||||
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
|
||||
final String indexName = restoreSource.index();
|
||||
if (!shardId.getIndexName().equals(indexName)) {
|
||||
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
|
||||
}
|
||||
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
|
||||
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
|
||||
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
|
||||
indexShard.skipTranslogRecovery();
|
||||
indexShard.finalizeRecovery();
|
||||
indexShard.postRecovery("restore done");
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents a single snapshotted index in the repository.
|
||||
*/
|
||||
public final class IndexId implements Writeable, ToXContent {
|
||||
protected static final String NAME = "name";
|
||||
protected static final String ID = "id";
|
||||
|
||||
private final String name;
|
||||
private final String id;
|
||||
|
||||
public IndexId(final String name, final String id) {
|
||||
this.name = name;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public IndexId(final StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
this.id = in.readString();
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the index.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* The unique ID for the index within the repository. This is *not* the same as the
|
||||
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
|
||||
* use to name blobs for the index.
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + name + "/" + id + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
@SuppressWarnings("unchecked") IndexId that = (IndexId) o;
|
||||
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeString(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(NAME, name);
|
||||
builder.field(ID, id);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -47,7 +47,7 @@ import java.util.List;
|
|||
* <ul>
|
||||
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
|
||||
* with list of indices that will be included into the snapshot</li>
|
||||
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
|
||||
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
|
||||
* for each shard</li>
|
||||
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
|
||||
* </ul>
|
||||
|
@ -88,15 +88,14 @@ public interface Repository extends LifecycleComponent {
|
|||
* @param indices list of indices
|
||||
* @return information about snapshot
|
||||
*/
|
||||
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
|
||||
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name.
|
||||
* To get all snapshots, the predicate filter should return true regardless of the input.
|
||||
*
|
||||
* @return snapshot list
|
||||
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
|
||||
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
|
||||
* if there was an error in reading the data.
|
||||
*/
|
||||
List<SnapshotId> getSnapshots();
|
||||
RepositoryData getRepositoryData();
|
||||
|
||||
/**
|
||||
* Starts snapshotting process
|
||||
|
@ -105,7 +104,7 @@ public interface Repository extends LifecycleComponent {
|
|||
* @param indices list of indices to be snapshotted
|
||||
* @param metaData cluster metadata
|
||||
*/
|
||||
void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData);
|
||||
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
|
||||
|
||||
/**
|
||||
* Finalizes snapshotting process
|
||||
|
@ -113,12 +112,14 @@ public interface Repository extends LifecycleComponent {
|
|||
* This method is called on master after all shards are snapshotted.
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param indices list of indices in the snapshot
|
||||
* @param startTime start time of the snapshot
|
||||
* @param failure global failure reason or null
|
||||
* @param totalShards total number of shards
|
||||
* @param shardFailures list of shard failures
|
||||
* @return snapshot description
|
||||
*/
|
||||
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
|
||||
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
|
||||
|
||||
/**
|
||||
* Deletes snapshot
|
||||
|
@ -181,10 +182,11 @@ public interface Repository extends LifecycleComponent {
|
|||
*
|
||||
* @param shard shard to be snapshotted
|
||||
* @param snapshotId snapshot id
|
||||
* @param indexId id for the index being snapshotted
|
||||
* @param snapshotIndexCommit commit point
|
||||
* @param snapshotStatus snapshot status
|
||||
*/
|
||||
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
|
||||
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
|
||||
|
||||
/**
|
||||
* Restores snapshot of the shard.
|
||||
|
@ -194,20 +196,22 @@ public interface Repository extends LifecycleComponent {
|
|||
* @param shard the shard to restore the index into
|
||||
* @param snapshotId snapshot id
|
||||
* @param version version of elasticsearch that created this snapshot
|
||||
* @param indexId id of the index in the repository from which the restore is occurring
|
||||
* @param snapshotShardId shard id (in the snapshot)
|
||||
* @param recoveryState recovery state
|
||||
*/
|
||||
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
|
||||
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);
|
||||
|
||||
/**
|
||||
* Retrieve shard snapshot status for the stored snapshot
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param version version of elasticsearch that created this snapshot
|
||||
* @param indexId the snapshotted index id for the shard to get status for
|
||||
* @param shardId shard id
|
||||
* @return snapshot status
|
||||
*/
|
||||
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
|
||||
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,361 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A class that represents the data in a repository, as captured in the
|
||||
* repository's index blob.
|
||||
*/
|
||||
public final class RepositoryData implements ToXContent {
|
||||
|
||||
public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap());
|
||||
|
||||
/**
|
||||
* The ids of the snapshots in the repository.
|
||||
*/
|
||||
private final List<SnapshotId> snapshotIds;
|
||||
/**
|
||||
* The indices found in the repository across all snapshots.
|
||||
*/
|
||||
private final Map<String, IndexMeta> indices;
|
||||
|
||||
public RepositoryData(List<SnapshotId> snapshotIds, Map<String, IndexMeta> indices) {
|
||||
this.snapshotIds = Collections.unmodifiableList(snapshotIds);
|
||||
this.indices = Collections.unmodifiableMap(indices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an unmodifiable list of the snapshot ids.
|
||||
*/
|
||||
public List<SnapshotId> getSnapshotIds() {
|
||||
return snapshotIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an unmodifiable map of the index names to index metadata in the repository.
|
||||
*/
|
||||
public Map<String, IndexMeta> getIndices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
|
||||
* already exists in the repository data, this method throws an IllegalArgumentException.
|
||||
*/
|
||||
public RepositoryData addSnapshot(final SnapshotId snapshotId, final List<IndexId> snapshottedIndices) {
|
||||
if (snapshotIds.contains(snapshotId)) {
|
||||
throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data");
|
||||
}
|
||||
List<SnapshotId> snapshots = new ArrayList<>(snapshotIds);
|
||||
snapshots.add(snapshotId);
|
||||
Map<String, IndexMeta> indexMetaMap = getIndices();
|
||||
Map<String, IndexMeta> addedIndices = new HashMap<>();
|
||||
for (IndexId indexId : snapshottedIndices) {
|
||||
final String indexName = indexId.getName();
|
||||
IndexMeta newIndexMeta;
|
||||
if (indexMetaMap.containsKey(indexName)) {
|
||||
newIndexMeta = indexMetaMap.get(indexName).addSnapshot(snapshotId);
|
||||
} else {
|
||||
Set<SnapshotId> ids = new LinkedHashSet<>();
|
||||
ids.add(snapshotId);
|
||||
newIndexMeta = new IndexMeta(indexId, ids);
|
||||
}
|
||||
addedIndices.put(indexName, newIndexMeta);
|
||||
}
|
||||
Map<String, IndexMeta> allIndices = new HashMap<>(indices);
|
||||
allIndices.putAll(addedIndices);
|
||||
return new RepositoryData(snapshots, allIndices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add indices to the repository metadata; returns a new instance.
|
||||
*/
|
||||
public RepositoryData addIndices(final Map<String, IndexMeta> newIndices) {
|
||||
Map<String, IndexMeta> map = new HashMap<>(indices);
|
||||
map.putAll(newIndices);
|
||||
return new RepositoryData(snapshotIds, map);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
|
||||
*/
|
||||
public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
|
||||
List<SnapshotId> newSnapshotIds = snapshotIds
|
||||
.stream()
|
||||
.filter(id -> snapshotId.equals(id) == false)
|
||||
.collect(Collectors.toList());
|
||||
Map<String, IndexMeta> newIndices = new HashMap<>();
|
||||
for (IndexMeta indexMeta : indices.values()) {
|
||||
Set<SnapshotId> set;
|
||||
if (indexMeta.getSnapshotIds().contains(snapshotId)) {
|
||||
if (indexMeta.getSnapshotIds().size() == 1) {
|
||||
// removing the snapshot will mean no more snapshots have this index, so just skip over it
|
||||
continue;
|
||||
}
|
||||
set = new LinkedHashSet<>(indexMeta.getSnapshotIds());
|
||||
set.remove(snapshotId);
|
||||
} else {
|
||||
set = indexMeta.getSnapshotIds();
|
||||
}
|
||||
newIndices.put(indexMeta.getName(), new IndexMeta(indexMeta.getIndexId(), set));
|
||||
}
|
||||
|
||||
return new RepositoryData(newSnapshotIds, newIndices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
@SuppressWarnings("unchecked") RepositoryData that = (RepositoryData) obj;
|
||||
return snapshotIds.equals(that.snapshotIds) && indices.equals(that.indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(snapshotIds, indices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the index name to the index id specific to the repository,
|
||||
* throwing an exception if the index could not be resolved.
|
||||
*/
|
||||
public IndexId resolveIndexId(final String indexName) {
|
||||
if (indices.containsKey(indexName)) {
|
||||
return indices.get(indexName).getIndexId();
|
||||
} else {
|
||||
// on repositories created before 5.0, there was no indices information in the index
|
||||
// blob, so if the repository hasn't been updated with new snapshots, no new index blob
|
||||
// would have been written, so we only have old snapshots without the index information.
|
||||
// in this case, the index id is just the index name
|
||||
return new IndexId(indexName, indexName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the given index names to index ids, throwing an exception
|
||||
* if any of the indices could not be resolved.
|
||||
*/
|
||||
public List<IndexId> resolveIndices(final List<String> indices) {
|
||||
List<IndexId> resolvedIndices = new ArrayList<>(indices.size());
|
||||
for (final String indexName : indices) {
|
||||
resolvedIndices.add(resolveIndexId(indexName));
|
||||
}
|
||||
return resolvedIndices;
|
||||
}
|
||||
|
||||
private static final String SNAPSHOTS = "snapshots";
|
||||
private static final String INDICES = "indices";
|
||||
private static final String INDEX_ID = "id";
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.startObject();
|
||||
// write the snapshots list
|
||||
builder.startArray(SNAPSHOTS);
|
||||
for (final SnapshotId snapshot : getSnapshotIds()) {
|
||||
snapshot.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
// write the indices map
|
||||
builder.startObject(INDICES);
|
||||
for (final IndexMeta indexMeta : getIndices().values()) {
|
||||
builder.startObject(indexMeta.getName());
|
||||
builder.field(INDEX_ID, indexMeta.getId());
|
||||
builder.startArray(SNAPSHOTS);
|
||||
for (final SnapshotId snapshotId : indexMeta.getSnapshotIds()) {
|
||||
snapshotId.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static RepositoryData fromXContent(final XContentParser parser) throws IOException {
|
||||
List<SnapshotId> snapshots = new ArrayList<>();
|
||||
Map<String, IndexMeta> indices = new HashMap<>();
|
||||
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
|
||||
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
|
||||
String currentFieldName = parser.currentName();
|
||||
if (SNAPSHOTS.equals(currentFieldName)) {
|
||||
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
|
||||
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
|
||||
snapshots.add(SnapshotId.fromXContent(parser));
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchParseException("expected array for [" + currentFieldName + "]");
|
||||
}
|
||||
} else if (INDICES.equals(currentFieldName)) {
|
||||
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
|
||||
throw new ElasticsearchParseException("start object expected [indices]");
|
||||
}
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
String indexName = parser.currentName();
|
||||
String indexId = null;
|
||||
Set<SnapshotId> snapshotIds = new LinkedHashSet<>();
|
||||
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
|
||||
throw new ElasticsearchParseException("start object expected index[" + indexName + "]");
|
||||
}
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
String indexMetaFieldName = parser.currentName();
|
||||
parser.nextToken();
|
||||
if (INDEX_ID.equals(indexMetaFieldName)) {
|
||||
indexId = parser.text();
|
||||
} else if (SNAPSHOTS.equals(indexMetaFieldName)) {
|
||||
if (parser.currentToken() != XContentParser.Token.START_ARRAY) {
|
||||
throw new ElasticsearchParseException("start array expected [snapshots]");
|
||||
}
|
||||
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
|
||||
snapshotIds.add(SnapshotId.fromXContent(parser));
|
||||
}
|
||||
}
|
||||
}
|
||||
assert indexId != null;
|
||||
indices.put(indexName, new IndexMeta(indexName, indexId, snapshotIds));
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchParseException("unknown field name [" + currentFieldName + "]");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchParseException("start object expected");
|
||||
}
|
||||
return new RepositoryData(snapshots, indices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents information about a single index snapshotted in a repository.
|
||||
*/
|
||||
public static final class IndexMeta implements Writeable {
|
||||
private final IndexId indexId;
|
||||
private final Set<SnapshotId> snapshotIds;
|
||||
|
||||
public IndexMeta(final String name, final String id, final Set<SnapshotId> snapshotIds) {
|
||||
this(new IndexId(name, id), snapshotIds);
|
||||
}
|
||||
|
||||
public IndexMeta(final IndexId indexId, final Set<SnapshotId> snapshotIds) {
|
||||
this.indexId = indexId;
|
||||
this.snapshotIds = Collections.unmodifiableSet(snapshotIds);
|
||||
}
|
||||
|
||||
public IndexMeta(final StreamInput in) throws IOException {
|
||||
indexId = new IndexId(in);
|
||||
final int size = in.readVInt();
|
||||
Set<SnapshotId> ids = new LinkedHashSet<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ids.add(new SnapshotId(in));
|
||||
}
|
||||
snapshotIds = Collections.unmodifiableSet(ids);
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of the index.
|
||||
*/
|
||||
public String getName() {
|
||||
return indexId.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* The unique ID for the index within the repository. This is *not* the same as the
|
||||
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
|
||||
* use to name blobs for the index.
|
||||
*/
|
||||
public String getId() {
|
||||
return indexId.getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* An unmodifiable set of snapshot ids that contain this index as part of its snapshot.
|
||||
*/
|
||||
public Set<SnapshotId> getSnapshotIds() {
|
||||
return snapshotIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* The snapshotted index id.
|
||||
*/
|
||||
public IndexId getIndexId() {
|
||||
return indexId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a snapshot id to the list of snapshots that contain this index.
|
||||
*/
|
||||
public IndexMeta addSnapshot(final SnapshotId snapshotId) {
|
||||
Set<SnapshotId> withAdded = new LinkedHashSet<>(snapshotIds);
|
||||
withAdded.add(snapshotId);
|
||||
return new IndexMeta(indexId, withAdded);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
indexId.writeTo(out);
|
||||
out.writeVInt(snapshotIds.size());
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
snapshotId.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
@SuppressWarnings("unchecked") IndexMeta that = (IndexMeta) obj;
|
||||
return indexId.equals(that.indexId) && snapshotIds.equals(that.snapshotIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(indexId, snapshotIds);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -45,6 +45,8 @@ import org.elasticsearch.common.lucene.Lucene;
|
|||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
|
||||
|
@ -58,6 +60,9 @@ import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
|
|||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryData.IndexMeta;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -103,6 +108,8 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -120,9 +127,9 @@ import static java.util.Collections.unmodifiableMap;
|
|||
* STORE_ROOT
|
||||
* |- index-N - list of all snapshot name as JSON array, N is the generation of the file
|
||||
* |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
|
||||
* |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010"
|
||||
* |- snap-20131010 - JSON serialized Snapshot for snapshot "20131010"
|
||||
* |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata)
|
||||
* |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011"
|
||||
* |- snap-20131011 - JSON serialized Snapshot for snapshot "20131011"
|
||||
* |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011"
|
||||
* .....
|
||||
* |- indices/ - data for all indices
|
||||
|
@ -163,13 +170,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private static final String SNAPSHOT_PREFIX = "snap-";
|
||||
|
||||
protected static final String SNAPSHOT_CODEC = "snapshot";
|
||||
private static final String SNAPSHOT_CODEC = "snapshot";
|
||||
|
||||
static final String SNAPSHOTS_FILE = "index"; // package private for unit testing
|
||||
|
||||
private static final String SNAPSHOTS_FILE_PREFIX = "index-";
|
||||
private static final String INDEX_FILE_PREFIX = "index-";
|
||||
|
||||
private static final String SNAPSHOTS_INDEX_LATEST_BLOB = "index.latest";
|
||||
private static final String INDEX_LATEST_BLOB = "index.latest";
|
||||
|
||||
private static final String TESTS_FILE = "tests-";
|
||||
|
||||
|
@ -299,13 +306,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return null;
|
||||
}
|
||||
|
||||
public Map<String, IndexId> getIndices() {
|
||||
try {
|
||||
return readIndexGen()
|
||||
.getIndices()
|
||||
.values()
|
||||
.stream()
|
||||
.map(IndexMeta::getIndexId)
|
||||
.collect(Collectors.toMap(IndexId::getName, Function.identity()));
|
||||
} catch (IOException e) {
|
||||
throw new RepositoryException(metadata.name(), "Could not get the indices in the repository.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryMetaData getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
|
||||
if (isReadOnly()) {
|
||||
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
|
||||
}
|
||||
|
@ -319,11 +339,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) {
|
||||
throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists");
|
||||
}
|
||||
|
||||
// update the index file if created pre 5.0 to include the indices in the repository
|
||||
updateIndexGenIfNecessary();
|
||||
|
||||
// Write Global MetaData
|
||||
globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, blobId(snapshotId));
|
||||
for (String index : indices) {
|
||||
final IndexMetaData indexMetaData = clusterMetadata.index(index);
|
||||
final BlobPath indexPath = basePath().add("indices").add(index);
|
||||
globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, blobId(snapshotId));
|
||||
|
||||
// write the index metadata for each index in the snapshot
|
||||
for (IndexId index : indices) {
|
||||
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
|
||||
final BlobPath indexPath = basePath().add("indices").add(index.getId());
|
||||
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, blobId(snapshotId));
|
||||
}
|
||||
|
@ -332,11 +358,49 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
// Older repository index files (index-N) only contain snapshot info, not indices info,
|
||||
// so if the index file is of the older format, populate it with the indices entries
|
||||
// so we know which indices of snapshots have blob ids in the older format.
|
||||
private void updateIndexGenIfNecessary() {
|
||||
if (isReadOnly()) {
|
||||
// nothing to update on read only repositories
|
||||
return;
|
||||
}
|
||||
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE) == false) {
|
||||
// pre 5.0 repositories have a single index file instead of generational index-N files,
|
||||
// so if the single index file is missing, we already have an up to date repository.
|
||||
return;
|
||||
}
|
||||
final RepositoryData repositoryData = getRepositoryData();
|
||||
final Map<String, Set<SnapshotId>> indexToSnapshots = new HashMap<>();
|
||||
for (final SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
|
||||
for (final String indexName : getSnapshotInfo(snapshotId).indices()) {
|
||||
if (indexToSnapshots.containsKey(indexName)) {
|
||||
indexToSnapshots.get(indexName).add(snapshotId);
|
||||
} else {
|
||||
indexToSnapshots.put(indexName, Sets.newHashSet(snapshotId));
|
||||
}
|
||||
}
|
||||
}
|
||||
final Map<String, IndexMeta> indices = new HashMap<>();
|
||||
for (Map.Entry<String, Set<SnapshotId>> entry : indexToSnapshots.entrySet()) {
|
||||
final String indexName = entry.getKey();
|
||||
indices.put(indexName, new IndexMeta(indexName, indexName, entry.getValue()));
|
||||
}
|
||||
try {
|
||||
// write the new index gen file with the indices included
|
||||
writeIndexGen(repositoryData.addIndices(indices));
|
||||
} catch (IOException e) {
|
||||
throw new RepositoryException(metadata.name(), "failed to update the repository index blob with indices data on startup", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId) {
|
||||
if (isReadOnly()) {
|
||||
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
|
||||
}
|
||||
final RepositoryData repositoryData = getRepositoryData();
|
||||
List<String> indices = Collections.emptyList();
|
||||
SnapshotInfo snapshot = null;
|
||||
try {
|
||||
|
@ -350,19 +414,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
MetaData metaData = null;
|
||||
try {
|
||||
if (snapshot != null) {
|
||||
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true);
|
||||
metaData = readSnapshotMetaData(snapshotId, snapshot.version(), repositoryData.resolveIndices(indices), true);
|
||||
} else {
|
||||
metaData = readSnapshotMetaData(snapshotId, null, indices, true);
|
||||
metaData = readSnapshotMetaData(snapshotId, null, repositoryData.resolveIndices(indices), true);
|
||||
}
|
||||
} catch (IOException | SnapshotException ex) {
|
||||
logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId);
|
||||
}
|
||||
try {
|
||||
// Delete snapshot from the snapshot list, since it is the maintainer of truth of active snapshots
|
||||
List<SnapshotId> snapshotIds = getSnapshots().stream()
|
||||
.filter(id -> snapshotId.equals(id) == false)
|
||||
.collect(Collectors.toList());
|
||||
writeSnapshotsToIndexGen(snapshotIds);
|
||||
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
||||
writeIndexGen(repositoryData.removeSnapshot(snapshotId));
|
||||
|
||||
// delete the snapshot file
|
||||
safeSnapshotBlobDelete(snapshot, blobId(snapshotId));
|
||||
|
@ -371,7 +432,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
// Now delete all indices
|
||||
for (String index : indices) {
|
||||
BlobPath indexPath = basePath().add("indices").add(index);
|
||||
final IndexId indexId = repositoryData.resolveIndexId(index);
|
||||
BlobPath indexPath = basePath().add("indices").add(indexId.getId());
|
||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
try {
|
||||
indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, blobId(snapshotId));
|
||||
|
@ -383,7 +445,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (indexMetaData != null) {
|
||||
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
||||
try {
|
||||
delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId));
|
||||
delete(snapshotId, snapshot.version(), indexId, new ShardId(indexMetaData.getIndex(), shardId));
|
||||
} catch (SnapshotException ex) {
|
||||
logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId);
|
||||
}
|
||||
|
@ -449,26 +511,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*/
|
||||
@Override
|
||||
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
|
||||
final List<String> indices,
|
||||
final List<IndexId> indices,
|
||||
final long startTime,
|
||||
final String failure,
|
||||
final int totalShards,
|
||||
final List<SnapshotShardFailure> shardFailures) {
|
||||
try {
|
||||
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
|
||||
indices,
|
||||
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
startTime,
|
||||
failure,
|
||||
System.currentTimeMillis(),
|
||||
totalShards,
|
||||
shardFailures);
|
||||
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, blobId(snapshotId));
|
||||
List<SnapshotId> snapshotIds = getSnapshots();
|
||||
final RepositoryData repositoryData = getRepositoryData();
|
||||
List<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
||||
if (!snapshotIds.contains(snapshotId)) {
|
||||
snapshotIds = new ArrayList<>(snapshotIds);
|
||||
snapshotIds.add(snapshotId);
|
||||
snapshotIds = Collections.unmodifiableList(snapshotIds);
|
||||
writeSnapshotsToIndexGen(snapshotIds);
|
||||
writeIndexGen(repositoryData.addSnapshot(snapshotId, indices));
|
||||
}
|
||||
return blobStoreSnapshot;
|
||||
} catch (IOException ex) {
|
||||
|
@ -476,20 +536,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SnapshotId> getSnapshots() {
|
||||
try {
|
||||
return Collections.unmodifiableList(readSnapshotsFromIndex());
|
||||
} catch (NoSuchFileException | FileNotFoundException e) {
|
||||
// its a fresh repository, no index file exists, so return an empty list
|
||||
return Collections.emptyList();
|
||||
} catch (IOException ioe) {
|
||||
throw new RepositoryException(metadata.name(), "failed to list snapshots in repository", ioe);
|
||||
}
|
||||
return getRepositoryData().getSnapshotIds();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
|
||||
return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false);
|
||||
}
|
||||
|
||||
|
@ -511,7 +563,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<String> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List<IndexId> indices, boolean ignoreIndexErrors) throws IOException {
|
||||
MetaData metaData;
|
||||
if (snapshotVersion == null) {
|
||||
// When we delete corrupted snapshots we might not know which version we are dealing with
|
||||
|
@ -533,14 +585,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex);
|
||||
}
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder(metaData);
|
||||
for (String index : indices) {
|
||||
BlobPath indexPath = basePath().add("indices").add(index);
|
||||
for (IndexId index : indices) {
|
||||
BlobPath indexPath = basePath().add("indices").add(index.getId());
|
||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
|
||||
try {
|
||||
metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, blobId(snapshotId)), false);
|
||||
} catch (ElasticsearchParseException | IOException ex) {
|
||||
if (ignoreIndexErrors) {
|
||||
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index);
|
||||
logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index.getName());
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
|
@ -608,10 +660,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private static final String SNAPSHOTS = "snapshots";
|
||||
private static final String NAME = "name";
|
||||
private static final String UUID = "uuid";
|
||||
|
||||
@Override
|
||||
public long getSnapshotThrottleTimeInNanos() {
|
||||
return snapshotRateLimitingTimeInNanos.count();
|
||||
|
@ -655,6 +703,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryData getRepositoryData() {
|
||||
try {
|
||||
return readIndexGen();
|
||||
} catch (NoSuchFileException nsfe) {
|
||||
// repository doesn't have an index blob, its a new blank repo
|
||||
return RepositoryData.EMPTY;
|
||||
} catch (IOException ioe) {
|
||||
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
public static String testBlobPrefix(String seed) {
|
||||
return TESTS_FILE + seed;
|
||||
}
|
||||
|
@ -669,35 +729,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return snapshotsBlobContainer;
|
||||
}
|
||||
|
||||
protected void writeSnapshotsToIndexGen(final List<SnapshotId> snapshots) throws IOException {
|
||||
protected void writeIndexGen(final RepositoryData repositoryData) throws IOException {
|
||||
assert isReadOnly() == false; // can not write to a read only repository
|
||||
final BytesReference snapshotsBytes;
|
||||
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
||||
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
|
||||
builder.startObject();
|
||||
builder.startArray(SNAPSHOTS);
|
||||
for (SnapshotId snapshot : snapshots) {
|
||||
builder.startObject();
|
||||
builder.field(NAME, snapshot.getName());
|
||||
builder.field(UUID, snapshot.getUUID());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.close();
|
||||
}
|
||||
snapshotsBytes = bStream.bytes();
|
||||
}
|
||||
final long gen = latestIndexBlobId() + 1;
|
||||
// write the index file
|
||||
writeAtomic(SNAPSHOTS_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
|
||||
writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
|
||||
// delete the N-2 index file if it exists, keep the previous one around as a backup
|
||||
if (isReadOnly() == false && gen - 2 >= 0) {
|
||||
final String oldSnapshotIndexFile = SNAPSHOTS_FILE_PREFIX + Long.toString(gen - 2);
|
||||
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2);
|
||||
if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) {
|
||||
snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile);
|
||||
}
|
||||
// delete the old index file (non-generational) if it exists
|
||||
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE)) {
|
||||
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE);
|
||||
}
|
||||
}
|
||||
|
||||
// write the current generation to the index-latest file
|
||||
|
@ -706,13 +761,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
bStream.writeLong(gen);
|
||||
genBytes = bStream.bytes();
|
||||
}
|
||||
if (snapshotsBlobContainer.blobExists(SNAPSHOTS_INDEX_LATEST_BLOB)) {
|
||||
snapshotsBlobContainer.deleteBlob(SNAPSHOTS_INDEX_LATEST_BLOB);
|
||||
if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) {
|
||||
snapshotsBlobContainer.deleteBlob(INDEX_LATEST_BLOB);
|
||||
}
|
||||
writeAtomic(SNAPSHOTS_INDEX_LATEST_BLOB, genBytes);
|
||||
writeAtomic(INDEX_LATEST_BLOB, genBytes);
|
||||
}
|
||||
|
||||
protected List<SnapshotId> readSnapshotsFromIndex() throws IOException {
|
||||
RepositoryData readIndexGen() throws IOException {
|
||||
final long indexGen = latestIndexBlobId();
|
||||
final String snapshotsIndexBlobName;
|
||||
if (indexGen == -1) {
|
||||
|
@ -720,47 +775,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// old format, so look for the older index file before returning an empty list
|
||||
snapshotsIndexBlobName = SNAPSHOTS_FILE;
|
||||
} else {
|
||||
snapshotsIndexBlobName = SNAPSHOTS_FILE_PREFIX + Long.toString(indexGen);
|
||||
snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen);
|
||||
}
|
||||
|
||||
try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
Streams.copy(blob, out);
|
||||
ArrayList<SnapshotId> snapshots = new ArrayList<>();
|
||||
try (XContentParser parser = XContentHelper.createParser(out.bytes())) {
|
||||
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
|
||||
if (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
|
||||
String currentFieldName = parser.currentName();
|
||||
if (SNAPSHOTS.equals(currentFieldName)) {
|
||||
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
|
||||
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
|
||||
// the new format from 5.0 which contains the snapshot name and uuid
|
||||
String name = null;
|
||||
String uuid = null;
|
||||
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
currentFieldName = parser.currentName();
|
||||
parser.nextToken();
|
||||
if (NAME.equals(currentFieldName)) {
|
||||
name = parser.text();
|
||||
} else if (UUID.equals(currentFieldName)) {
|
||||
uuid = parser.text();
|
||||
}
|
||||
}
|
||||
snapshots.add(new SnapshotId(name, uuid));
|
||||
}
|
||||
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
|
||||
else {
|
||||
name = parser.text();
|
||||
snapshots.add(new SnapshotId(name, SnapshotId.UNASSIGNED_UUID));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return RepositoryData.fromXContent(parser);
|
||||
}
|
||||
return Collections.unmodifiableList(snapshots);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -807,7 +830,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
// package private for testing
|
||||
long readSnapshotIndexLatestBlob() throws IOException {
|
||||
try (InputStream blob = snapshotsBlobContainer.readBlob(SNAPSHOTS_INDEX_LATEST_BLOB)) {
|
||||
try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
Streams.copy(blob, out);
|
||||
return Numbers.bytesToLong(out.bytes().toBytesRef());
|
||||
|
@ -815,7 +838,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
private long listBlobsToGetLatestIndexId() throws IOException {
|
||||
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOTS_FILE_PREFIX);
|
||||
Map<String, BlobMetaData> blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
|
||||
long latest = -1;
|
||||
if (blobs.isEmpty()) {
|
||||
// no snapshot index blobs have been written yet
|
||||
|
@ -824,7 +847,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
for (final BlobMetaData blobMetaData : blobs.values()) {
|
||||
final String blobName = blobMetaData.name();
|
||||
try {
|
||||
final long curr = Long.parseLong(blobName.substring(SNAPSHOTS_FILE_PREFIX.length()));
|
||||
final long curr = Long.parseLong(blobName.substring(INDEX_FILE_PREFIX.length()));
|
||||
latest = Math.max(latest, curr);
|
||||
} catch (NumberFormatException nfe) {
|
||||
// the index- blob wasn't of the format index-N where N is a number,
|
||||
|
@ -847,9 +870,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
|
||||
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus);
|
||||
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
|
||||
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus);
|
||||
snapshotStatus.startTime(System.currentTimeMillis());
|
||||
|
||||
try {
|
||||
|
@ -869,8 +894,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
@Override
|
||||
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState);
|
||||
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
|
||||
try {
|
||||
snapshotContext.restore();
|
||||
} catch (Exception e) {
|
||||
|
@ -879,8 +904,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
@Override
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
|
||||
Context context = new Context(snapshotId, version, shardId);
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
|
||||
Context context = new Context(snapshotId, version, indexId, shardId);
|
||||
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
|
||||
IndexShardSnapshotStatus status = new IndexShardSnapshotStatus();
|
||||
status.updateStage(IndexShardSnapshotStatus.Stage.DONE);
|
||||
|
@ -914,8 +939,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* @param snapshotId snapshot id
|
||||
* @param shardId shard id
|
||||
*/
|
||||
public void delete(SnapshotId snapshotId, Version version, ShardId shardId) {
|
||||
Context context = new Context(snapshotId, version, shardId, shardId);
|
||||
private void delete(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
|
||||
Context context = new Context(snapshotId, version, indexId, shardId, shardId);
|
||||
context.delete();
|
||||
}
|
||||
|
||||
|
@ -948,15 +973,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
protected final Version version;
|
||||
|
||||
public Context(SnapshotId snapshotId, Version version, ShardId shardId) {
|
||||
this(snapshotId, version, shardId, shardId);
|
||||
public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
|
||||
this(snapshotId, version, indexId, shardId, shardId);
|
||||
}
|
||||
|
||||
public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) {
|
||||
public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
|
||||
this.snapshotId = snapshotId;
|
||||
this.version = version;
|
||||
this.shardId = shardId;
|
||||
blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId())));
|
||||
blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1154,10 +1179,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*
|
||||
* @param shard shard to be snapshotted
|
||||
* @param snapshotId snapshot id
|
||||
* @param indexId the id of the index being snapshotted
|
||||
* @param snapshotStatus snapshot status to report progress
|
||||
*/
|
||||
public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) {
|
||||
super(snapshotId, Version.CURRENT, shard.shardId());
|
||||
public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) {
|
||||
super(snapshotId, Version.CURRENT, indexId, shard.shardId());
|
||||
this.snapshotStatus = snapshotStatus;
|
||||
this.store = shard.store();
|
||||
}
|
||||
|
@ -1441,11 +1467,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*
|
||||
* @param shard shard to restore into
|
||||
* @param snapshotId snapshot id
|
||||
* @param indexId id of the index being restored
|
||||
* @param snapshotShardId shard in the snapshot that data should be restored from
|
||||
* @param recoveryState recovery state to report progress
|
||||
*/
|
||||
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
super(snapshotId, version, shard.shardId(), snapshotShardId);
|
||||
public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
super(snapshotId, version, indexId, shard.shardId(), snapshotShardId);
|
||||
this.recoveryState = recoveryState;
|
||||
store = shard.store();
|
||||
}
|
||||
|
@ -1619,6 +1646,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -63,8 +63,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -185,7 +187,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
try {
|
||||
// Read snapshot info and metadata from the repository
|
||||
Repository repository = repositoriesService.repository(request.repositoryName);
|
||||
final Optional<SnapshotId> matchingSnapshotId = repository.getSnapshots().stream()
|
||||
final RepositoryData repositoryData = repository.getRepositoryData();
|
||||
final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
|
||||
.filter(s -> request.snapshotName.equals(s.getName())).findFirst();
|
||||
if (matchingSnapshotId.isPresent() == false) {
|
||||
throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist");
|
||||
|
@ -194,7 +197,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
|
||||
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
|
||||
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, filteredIndices);
|
||||
MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
|
||||
|
||||
final MetaData metaData;
|
||||
if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) {
|
||||
|
|
|
@ -22,6 +22,9 @@ package org.elasticsearch.snapshots;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -29,12 +32,14 @@ import java.util.Objects;
|
|||
/**
|
||||
* SnapshotId - snapshot name + snapshot UUID
|
||||
*/
|
||||
public final class SnapshotId implements Writeable {
|
||||
public final class SnapshotId implements Writeable, ToXContent {
|
||||
|
||||
/**
|
||||
* This value is for older snapshots that don't have a UUID.
|
||||
*/
|
||||
public static final String UNASSIGNED_UUID = "_na_";
|
||||
private static final String NAME = "name";
|
||||
private static final String UUID = "uuid";
|
||||
|
||||
private final String name;
|
||||
private final String uuid;
|
||||
|
@ -115,4 +120,35 @@ public final class SnapshotId implements Writeable {
|
|||
out.writeString(uuid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(NAME, name);
|
||||
builder.field(UUID, uuid);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public static SnapshotId fromXContent(XContentParser parser) throws IOException {
|
||||
// the new format from 5.0 which contains the snapshot name and uuid
|
||||
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
|
||||
String name = null;
|
||||
String uuid = null;
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
String currentFieldName = parser.currentName();
|
||||
parser.nextToken();
|
||||
if (NAME.equals(currentFieldName)) {
|
||||
name = parser.text();
|
||||
} else if (UUID.equals(currentFieldName)) {
|
||||
uuid = parser.text();
|
||||
}
|
||||
}
|
||||
return new SnapshotId(name, uuid);
|
||||
}
|
||||
// the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too
|
||||
else {
|
||||
return new SnapshotId(parser.text(), SnapshotId.UNASSIGNED_UUID);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
|
@ -66,6 +67,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
@ -208,8 +211,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
Map<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = new HashMap<>();
|
||||
// Now go through all snapshots and update existing or create missing
|
||||
final String localNodeId = clusterService.localNode().getId();
|
||||
final Map<Snapshot, Map<String, IndexId>> snapshotIndices = new HashMap<>();
|
||||
if (snapshotsInProgress != null) {
|
||||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
|
||||
snapshotIndices.put(entry.snapshot(),
|
||||
entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())));
|
||||
if (entry.state() == SnapshotsInProgress.State.STARTED) {
|
||||
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
|
||||
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
|
||||
|
@ -289,14 +295,20 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
if (newSnapshots.isEmpty() == false) {
|
||||
Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
for (final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry : newSnapshots.entrySet()) {
|
||||
Map<String, IndexId> indicesMap = snapshotIndices.get(entry.getKey());
|
||||
assert indicesMap != null;
|
||||
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
|
||||
final ShardId shardId = shardEntry.getKey();
|
||||
try {
|
||||
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
|
||||
final IndexId indexId = indicesMap.get(shardId.getIndexName());
|
||||
if (indexId == null) {
|
||||
throw new IllegalStateException("[" + shardId.getIndexName() + "] being snapshotted, but not found in the snapshots in progress");
|
||||
}
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void doRun() {
|
||||
snapshot(indexShard, entry.getKey(), shardEntry.getValue());
|
||||
snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
|
||||
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
|
||||
}
|
||||
|
||||
|
@ -321,7 +333,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
* @param snapshot snapshot
|
||||
* @param snapshotStatus snapshot status
|
||||
*/
|
||||
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) {
|
||||
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) {
|
||||
Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
|
||||
ShardId shardId = indexShard.shardId();
|
||||
if (!indexShard.routingEntry().primary()) {
|
||||
|
@ -340,7 +352,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
// we flush first to make sure we get the latest writes snapshotted
|
||||
IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true);
|
||||
try {
|
||||
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus);
|
||||
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus);
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n");
|
||||
|
|
|
@ -56,8 +56,11 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryData.IndexMeta;
|
||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -132,7 +135,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
public List<SnapshotId> snapshotIds(final String repositoryName) {
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
assert repository != null; // should only be called once we've validated the repository exists
|
||||
return repository.getSnapshots();
|
||||
return repository.getRepositoryData().getSnapshotIds();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -218,6 +221,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
final String snapshotName = request.snapshotName;
|
||||
validate(repositoryName, snapshotName);
|
||||
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
final Map<String, IndexMeta> indexIds = repository.getRepositoryData().getIndices();
|
||||
|
||||
clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() {
|
||||
|
||||
|
@ -232,11 +237,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// Store newSnapshot here to be processed in clusterStateProcessed
|
||||
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices()));
|
||||
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
||||
List<IndexId> snapshotIndices = new ArrayList<>();
|
||||
for (String index : indices) {
|
||||
final IndexId indexId;
|
||||
if (indexIds.containsKey(index)) {
|
||||
indexId = indexIds.get(index).getIndexId();
|
||||
} else {
|
||||
indexId = new IndexId(index, UUIDs.randomBase64UUID());
|
||||
}
|
||||
snapshotIndices.add(indexId);
|
||||
}
|
||||
newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
|
||||
request.includeGlobalState(),
|
||||
request.partial(),
|
||||
State.INIT,
|
||||
indices,
|
||||
snapshotIndices,
|
||||
System.currentTimeMillis(),
|
||||
null);
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
|
@ -334,8 +349,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
if (!snapshot.includeGlobalState()) {
|
||||
// Remove global state from the cluster state
|
||||
MetaData.Builder builder = MetaData.builder();
|
||||
for (String index : snapshot.indices()) {
|
||||
builder.put(metaData.index(index), false);
|
||||
for (IndexId index : snapshot.indices()) {
|
||||
builder.put(metaData.index(index.getName()), false);
|
||||
}
|
||||
metaData = builder.build();
|
||||
}
|
||||
|
@ -473,7 +488,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
|
||||
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
|
||||
return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices(), entry.startTime());
|
||||
return new SnapshotInfo(entry.snapshot().getSnapshotId(),
|
||||
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
entry.startTime());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -546,8 +563,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
final SnapshotInfo snapshotInfo) throws IOException {
|
||||
Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||
Repository repository = repositoriesService.repository(repositoryName);
|
||||
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, snapshotInfo.indices());
|
||||
RepositoryData repositoryData = repository.getRepositoryData();
|
||||
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(snapshotInfo.indices()));
|
||||
for (String index : snapshotInfo.indices()) {
|
||||
IndexId indexId = repositoryData.resolveIndexId(index);
|
||||
IndexMetaData indexMetaData = metaData.indices().get(index);
|
||||
if (indexMetaData != null) {
|
||||
int numberOfShards = indexMetaData.getNumberOfShards();
|
||||
|
@ -561,7 +580,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
shardStatus.put(shardId, shardSnapshotStatus);
|
||||
} else {
|
||||
IndexShardSnapshotStatus shardSnapshotStatus =
|
||||
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId);
|
||||
repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), indexId, shardId);
|
||||
shardStatus.put(shardId, shardSnapshotStatus);
|
||||
}
|
||||
}
|
||||
|
@ -953,7 +972,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) {
|
||||
// First, look for the snapshot in the repository
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
Optional<SnapshotId> matchedEntry = repository.getSnapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst();
|
||||
Optional<SnapshotId> matchedEntry = repository.getRepositoryData().getSnapshotIds()
|
||||
.stream()
|
||||
.filter(s -> s.getName().equals(snapshotName))
|
||||
.findFirst();
|
||||
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
|
||||
if (matchedEntry.isPresent() == false) {
|
||||
matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream()
|
||||
|
@ -1121,21 +1143,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @param indices list of indices to be snapshotted
|
||||
* @return list of shard to be included into current snapshot
|
||||
*/
|
||||
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<String> indices) {
|
||||
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<IndexId> indices) {
|
||||
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = metaData.index(index);
|
||||
for (IndexId index : indices) {
|
||||
final String indexName = index.getName();
|
||||
IndexMetaData indexMetaData = metaData.index(indexName);
|
||||
if (indexMetaData == null) {
|
||||
// The index was deleted before we managed to start the snapshot - mark it as missing.
|
||||
builder.put(new ShardId(index, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
|
||||
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
|
||||
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
||||
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
|
||||
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
|
||||
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
|
||||
}
|
||||
} else {
|
||||
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index);
|
||||
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
|
||||
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
|
||||
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
|
||||
if (indexRoutingTable != null) {
|
||||
|
@ -1191,8 +1214,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
||||
if (entry.partial() == false) {
|
||||
if (entry.state() == State.INIT) {
|
||||
for (String index : entry.indices()) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
for (IndexId index : entry.indices()) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
|
||||
if (indexMetaData != null && indices.contains(indexMetaData)) {
|
||||
if (indicesToFail == null) {
|
||||
indicesToFail = new HashSet<>();
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -659,7 +658,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
|
|||
randomBoolean(),
|
||||
randomBoolean(),
|
||||
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
|
||||
Collections.<String>emptyList(),
|
||||
Collections.emptyList(),
|
||||
Math.abs(randomLong()),
|
||||
ImmutableOpenMap.of()));
|
||||
case 1:
|
||||
|
|
|
@ -100,7 +100,10 @@ import org.elasticsearch.index.translog.Translog;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryData.IndexMeta;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
|
@ -121,8 +124,10 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -1184,9 +1189,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
test_target_shard.updateRoutingEntry(routing);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode));
|
||||
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() {
|
||||
assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository("test") {
|
||||
@Override
|
||||
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
|
||||
try {
|
||||
cleanLuceneIndex(targetStore.directory());
|
||||
for (String file : sourceStore.directory().listAll()) {
|
||||
|
@ -1645,8 +1650,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
|
||||
/** A dummy repository for testing which just needs restore overridden */
|
||||
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
|
||||
public RestoreOnlyRepository() {
|
||||
private final String indexName;
|
||||
public RestoreOnlyRepository(String indexName) {
|
||||
super(Settings.EMPTY);
|
||||
this.indexName = indexName;
|
||||
}
|
||||
@Override
|
||||
protected void doStart() {}
|
||||
|
@ -1663,17 +1670,19 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
return null;
|
||||
}
|
||||
@Override
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException {
|
||||
public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public List<SnapshotId> getSnapshots() {
|
||||
return null;
|
||||
public RepositoryData getRepositoryData() {
|
||||
Map<String, IndexMeta> map = new HashMap<>();
|
||||
map.put(indexName, new IndexMeta(indexName, "blah", Collections.emptySet()));
|
||||
return new RepositoryData(Collections.emptyList(), map);
|
||||
}
|
||||
@Override
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData) {}
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {}
|
||||
@Override
|
||||
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
|
||||
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
|
@ -1697,9 +1706,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
return false;
|
||||
}
|
||||
@Override
|
||||
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
|
||||
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {}
|
||||
@Override
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) {
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Tests for the {@link IndexId} class.
|
||||
*/
|
||||
public class IndexIdTests extends ESTestCase {
|
||||
|
||||
public void testEqualsAndHashCode() {
|
||||
// assert equals and hashcode
|
||||
String name = randomAsciiOfLength(8);
|
||||
String id = UUIDs.randomBase64UUID();
|
||||
IndexId indexId1 = new IndexId(name, id);
|
||||
IndexId indexId2 = new IndexId(name, id);
|
||||
assertEquals(indexId1, indexId2);
|
||||
assertEquals(indexId1.hashCode(), indexId2.hashCode());
|
||||
// assert equals when using index name for id
|
||||
id = name;
|
||||
indexId1 = new IndexId(name, id);
|
||||
indexId2 = new IndexId(name, id);
|
||||
assertEquals(indexId1, indexId2);
|
||||
assertEquals(indexId1.hashCode(), indexId2.hashCode());
|
||||
//assert not equals when name or id differ
|
||||
indexId2 = new IndexId(randomAsciiOfLength(8), id);
|
||||
assertNotEquals(indexId1, indexId2);
|
||||
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
|
||||
indexId2 = new IndexId(name, UUIDs.randomBase64UUID());
|
||||
assertNotEquals(indexId1, indexId2);
|
||||
assertNotEquals(indexId1.hashCode(), indexId2.hashCode());
|
||||
}
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
indexId.writeTo(out);
|
||||
assertEquals(indexId, new IndexId(out.bytes().streamInput()));
|
||||
}
|
||||
|
||||
public void testXContent() throws IOException {
|
||||
IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
indexId.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
|
||||
assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
String name = null;
|
||||
String id = null;
|
||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||
final String currentFieldName = parser.currentName();
|
||||
parser.nextToken();
|
||||
if (currentFieldName.equals(IndexId.NAME)) {
|
||||
name = parser.text();
|
||||
} else if (currentFieldName.equals(IndexId.ID)) {
|
||||
id = parser.text();
|
||||
}
|
||||
}
|
||||
assertNotNull(name);
|
||||
assertNotNull(id);
|
||||
assertEquals(indexId, new IndexId(name, id));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories;
|
||||
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.repositories.RepositoryData.IndexMeta;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
/**
|
||||
* Tests for the {@link RepositoryData} class.
|
||||
*/
|
||||
public class RepositoryDataTests extends ESTestCase {
|
||||
|
||||
public void testEqualsAndHashCode() {
|
||||
RepositoryData repositoryData1 = generateRandomRepoData();
|
||||
RepositoryData repositoryData2 = new RepositoryData(repositoryData1.getSnapshotIds(), repositoryData1.getIndices());
|
||||
assertEquals(repositoryData1, repositoryData2);
|
||||
assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode());
|
||||
}
|
||||
|
||||
public void testXContent() throws IOException {
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes());
|
||||
assertEquals(repositoryData, RepositoryData.fromXContent(parser));
|
||||
}
|
||||
|
||||
public void testAddSnapshots() {
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
// test that adding the same snapshot id to the repository data throws an exception
|
||||
final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0);
|
||||
Map<String, IndexMeta> indexMetaMap = repositoryData.getIndices();
|
||||
expectThrows(IllegalArgumentException.class,
|
||||
() -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList()));
|
||||
// test that adding a snapshot and its indices works
|
||||
SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
|
||||
List<IndexId> indices = new ArrayList<>();
|
||||
Set<IndexId> newIndices = new HashSet<>();
|
||||
int numNew = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numNew; i++) {
|
||||
IndexId indexId = new IndexId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
|
||||
newIndices.add(indexId);
|
||||
indices.add(indexId);
|
||||
}
|
||||
int numOld = randomIntBetween(1, indexMetaMap.size());
|
||||
List<String> indexNames = new ArrayList<>(indexMetaMap.keySet());
|
||||
for (int i = 0; i < numOld; i++) {
|
||||
IndexId indexId = indexMetaMap.get(indexNames.get(i)).getIndexId();
|
||||
indices.add(indexId);
|
||||
}
|
||||
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, indices);
|
||||
// verify that the new repository data has the new snapshot and its indices
|
||||
assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
|
||||
indexMetaMap = newRepoData.getIndices();
|
||||
for (IndexId indexId : indices) {
|
||||
Set<SnapshotId> snapshotIds = indexMetaMap.get(indexId.getName()).getSnapshotIds();
|
||||
assertTrue(snapshotIds.contains(newSnapshot));
|
||||
if (newIndices.contains(indexId)) {
|
||||
assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddIndices() {
|
||||
final int numSnapshots = randomIntBetween(1, 30);
|
||||
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
|
||||
for (int i = 0; i < numSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
|
||||
// test that adding indices works
|
||||
Map<String, IndexMeta> indices = randomIndices(snapshotIds);
|
||||
RepositoryData newRepoData = repositoryData.addIndices(indices);
|
||||
assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds());
|
||||
assertEquals(indices, newRepoData.getIndices());
|
||||
}
|
||||
|
||||
public void testRemoveSnapshot() {
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
|
||||
assertThat(snapshotIds.size(), greaterThan(0));
|
||||
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
|
||||
RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId);
|
||||
// make sure the repository data's indices no longer contain the removed snapshot
|
||||
for (IndexMeta indexMeta : newRepositoryData.getIndices().values()) {
|
||||
assertFalse(indexMeta.getSnapshotIds().contains(removedSnapshotId));
|
||||
}
|
||||
}
|
||||
|
||||
public void testResolveIndexId() {
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
Map<String, IndexMeta> indices = repositoryData.getIndices();
|
||||
Set<String> indexNames = indices.keySet();
|
||||
assertThat(indexNames.size(), greaterThan(0));
|
||||
String indexName = indexNames.iterator().next();
|
||||
IndexId indexId = indices.get(indexName).getIndexId();
|
||||
assertEquals(indexId, repositoryData.resolveIndexId(indexName));
|
||||
String notInRepoData = randomAsciiOfLength(5);
|
||||
assertFalse(indexName.contains(notInRepoData));
|
||||
assertEquals(new IndexId(notInRepoData, notInRepoData), repositoryData.resolveIndexId(notInRepoData));
|
||||
}
|
||||
|
||||
public static RepositoryData generateRandomRepoData() {
|
||||
return generateRandomRepoData(new ArrayList<>());
|
||||
}
|
||||
|
||||
public static RepositoryData generateRandomRepoData(final List<SnapshotId> origSnapshotIds) {
|
||||
List<SnapshotId> snapshotIds = randomSnapshots(origSnapshotIds);
|
||||
return new RepositoryData(snapshotIds, randomIndices(snapshotIds));
|
||||
}
|
||||
|
||||
private static List<SnapshotId> randomSnapshots(final List<SnapshotId> origSnapshotIds) {
|
||||
final int numSnapshots = randomIntBetween(1, 30);
|
||||
final List<SnapshotId> snapshotIds = new ArrayList<>(origSnapshotIds);
|
||||
for (int i = 0; i < numSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
return snapshotIds;
|
||||
}
|
||||
|
||||
private static Map<String, IndexMeta> randomIndices(final List<SnapshotId> snapshotIds) {
|
||||
final int totalSnapshots = snapshotIds.size();
|
||||
final int numIndices = randomIntBetween(1, 30);
|
||||
final Map<String, IndexMeta> indices = new HashMap<>(numIndices);
|
||||
for (int i = 0; i < numIndices; i++) {
|
||||
final String indexName = randomAsciiOfLength(8);
|
||||
final Set<SnapshotId> indexSnapshots = new LinkedHashSet<>();
|
||||
final int numIndicesForSnapshot = randomIntBetween(1, numIndices);
|
||||
for (int j = 0; j < numIndicesForSnapshot; j++) {
|
||||
indexSnapshots.add(snapshotIds.get(randomIntBetween(0, totalSnapshots - 1)));
|
||||
}
|
||||
indices.put(indexName, new IndexMeta(indexName, UUIDs.randomBase64UUID(), indexSnapshots));
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
@ -44,6 +45,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
|
||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.blobId;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -109,48 +111,52 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
|
||||
final BlobStoreRepository repository = setupRepo();
|
||||
|
||||
// write to and read from a snapshot file with no entries
|
||||
assertThat(repository.getSnapshots().size(), equalTo(0));
|
||||
repository.writeSnapshotsToIndexGen(Collections.emptyList());
|
||||
// write to and read from a index file with no entries
|
||||
assertThat(repository.getSnapshots().size(), equalTo(0));
|
||||
final RepositoryData emptyData = new RepositoryData(Collections.emptyList(), Collections.emptyMap());
|
||||
repository.writeIndexGen(emptyData);
|
||||
final RepositoryData readData = repository.getRepositoryData();
|
||||
assertEquals(readData, emptyData);
|
||||
assertEquals(readData.getIndices().size(), 0);
|
||||
assertEquals(readData.getSnapshotIds().size(), 0);
|
||||
|
||||
// write to and read from a snapshot file with a random number of entries
|
||||
final int numSnapshots = randomIntBetween(1, 1000);
|
||||
// write to and read from an index file with snapshots but no indices
|
||||
final int numSnapshots = randomIntBetween(1, 20);
|
||||
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
|
||||
for (int i = 0; i < numSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
repository.writeSnapshotsToIndexGen(snapshotIds);
|
||||
assertThat(repository.getSnapshots(), equalTo(snapshotIds));
|
||||
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
|
||||
// write to and read from a index file with random repository data
|
||||
repositoryData = generateRandomRepoData();
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
|
||||
}
|
||||
|
||||
public void testIndexGenerationalFiles() throws Exception {
|
||||
final BlobStoreRepository repository = setupRepo();
|
||||
|
||||
// write to index generational file
|
||||
final int numSnapshots = randomIntBetween(1, 1000);
|
||||
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
|
||||
for (int i = 0; i < numSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
repository.writeSnapshotsToIndexGen(snapshotIds);
|
||||
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(0L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
|
||||
|
||||
// adding more and writing to a new index generational file
|
||||
for (int i = 0; i < 10; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
repository.writeSnapshotsToIndexGen(snapshotIds);
|
||||
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
|
||||
repositoryData = generateRandomRepoData();
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(1L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
|
||||
|
||||
// removing a snapshot adn writing to a new index generational file
|
||||
snapshotIds.remove(0);
|
||||
repository.writeSnapshotsToIndexGen(snapshotIds);
|
||||
assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds)));
|
||||
// removing a snapshot and writing to a new index generational file
|
||||
repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0));
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(2L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
|
||||
}
|
||||
|
@ -159,7 +165,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
final BlobStoreRepository repository = setupRepo();
|
||||
|
||||
// write old index file format
|
||||
final int numOldSnapshots = randomIntBetween(1, 50);
|
||||
final int numOldSnapshots = randomIntBetween(1, 30);
|
||||
final List<SnapshotId> snapshotIds = new ArrayList<>();
|
||||
for (int i = 0; i < numOldSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), SnapshotId.UNASSIGNED_UUID));
|
||||
|
@ -168,12 +174,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
|
||||
|
||||
// write to and read from a snapshot file with a random number of new entries added
|
||||
final int numSnapshots = randomIntBetween(1, 1000);
|
||||
for (int i = 0; i < numSnapshots; i++) {
|
||||
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
|
||||
}
|
||||
repository.writeSnapshotsToIndexGen(snapshotIds);
|
||||
assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds)));
|
||||
final RepositoryData repositoryData = generateRandomRepoData(snapshotIds);
|
||||
repository.writeIndexGen(repositoryData);
|
||||
assertEquals(repository.getRepositoryData(), repositoryData);
|
||||
}
|
||||
|
||||
public void testBlobId() {
|
||||
|
|
|
@ -61,7 +61,9 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
|
@ -2017,6 +2019,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> emulate an orphan snapshot");
|
||||
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
||||
final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData();
|
||||
final IndexId indexId = repositoryData.resolveIndexId(idxName);
|
||||
|
||||
clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() {
|
||||
|
||||
|
@ -2033,7 +2038,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
true,
|
||||
false,
|
||||
State.ABORTED,
|
||||
Collections.singletonList(idxName),
|
||||
Collections.singletonList(indexId),
|
||||
System.currentTimeMillis(),
|
||||
shards.build()));
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build();
|
||||
|
@ -2189,7 +2194,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
logger.info("--> truncate snapshot file to make it unreadable");
|
||||
Path snapshotPath = repo.resolve("snap-test-snap-2-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
|
||||
Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat");
|
||||
try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) {
|
||||
outChan.truncate(randomInt(10));
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
|
@ -112,7 +113,7 @@ public class MockRepository extends FsRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
|
||||
if (blockOnInitialization) {
|
||||
blockExecution();
|
||||
}
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.cloud.azure.storage.AzureStorageService;
|
|||
import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
|
@ -44,7 +46,6 @@ import org.elasticsearch.env.Environment;
|
|||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotCreationException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting;
|
||||
import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue;
|
||||
|
@ -153,7 +154,7 @@ public class AzureRepository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData clusterMetadata) {
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
|
||||
try {
|
||||
if (!blobStore.doesContainerExist(blobStore.container())) {
|
||||
logger.debug("container [{}] does not exist. Creating...", blobStore.container());
|
||||
|
|
Loading…
Reference in New Issue