Snapshot/Restore: Move in-progress snapshot and restore information from custom metadata to custom cluster state part

Information about in-progress snapshot and restore processes is not really metadata and should be represented as a part of the cluster state similar to discovery nodes, routing table, and cluster blocks. Since in-progress snapshot and restore information is no longer part of metadata, this refactoring also enables us to handle cluster blocks in more consistent manner and allow creation of snapshots of a read-only cluster.

Closes 
This commit is contained in:
Igor Motov 2015-06-10 12:57:45 -04:00
parent 440580dd55
commit 93beea1f67
21 changed files with 406 additions and 284 deletions

@ -59,8 +59,8 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
@Override
protected ClusterBlockException checkBlock(CreateSnapshotRequest request, ClusterState state) {
// We are writing to the cluster metadata and reading from indices - so we need to check both blocks
ClusterBlockException clusterBlockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
// We are reading the cluster metadata and indices - so we need to check both blocks
ClusterBlockException clusterBlockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
if (clusterBlockException != null) {
return clusterBlockException;
}

@ -58,7 +58,8 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
@Override
protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override

@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -82,7 +82,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
protected void masterOperation(final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
List<SnapshotMetaData.Entry> currentSnapshots = snapshotsService.currentSnapshots(request.repository(), request.snapshots());
List<SnapshotsInProgress.Entry> currentSnapshots = snapshotsService.currentSnapshots(request.repository(), request.snapshots());
if (currentSnapshots.isEmpty()) {
listener.onResponse(buildResponse(request, currentSnapshots, null));
@ -90,8 +90,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
}
Set<String> nodesIds = newHashSet();
for (SnapshotMetaData.Entry entry : currentSnapshots) {
for (SnapshotMetaData.ShardSnapshotStatus status : entry.shards().values()) {
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
for (SnapshotsInProgress.ShardSnapshotStatus status : entry.shards().values()) {
if (status.nodeId() != null) {
nodesIds.add(status.nodeId());
}
@ -111,7 +111,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
List<SnapshotMetaData.Entry> currentSnapshots =
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), request.snapshots());
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
@ -131,7 +131,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
}
private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, List<SnapshotMetaData.Entry> currentSnapshots,
private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshots,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) throws IOException {
// First process snapshot that are currently processed
ImmutableList.Builder<SnapshotStatus> builder = ImmutableList.builder();
@ -144,11 +144,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
nodeSnapshotStatusMap = newHashMap();
}
for (SnapshotMetaData.Entry entry : currentSnapshots) {
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
currentSnapshotIds.add(entry.snapshotId());
ImmutableList.Builder<SnapshotIndexShardStatus> shardStatusBuilder = ImmutableList.builder();
for (ImmutableMap.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
SnapshotMetaData.ShardSnapshotStatus status = shardEntry.getValue();
for (ImmutableMap.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
if (status.nodeId() != null) {
// We should have information about this shard from the shard:
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
@ -204,16 +204,16 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
for (ImmutableMap.Entry<ShardId, IndexShardSnapshotStatus> shardStatus : shardStatues.entrySet()) {
shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), shardStatus.getValue()));
}
final SnapshotMetaData.State state;
final SnapshotsInProgress.State state;
switch (snapshot.state()) {
case FAILED:
state = SnapshotMetaData.State.FAILED;
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotMetaData.State.SUCCESS;
state = SnapshotsInProgress.State.SUCCESS;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshot.state());

@ -38,6 +38,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
private boolean nodes = true;
private boolean metaData = true;
private boolean blocks = true;
private boolean customs = true;
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
@ -54,6 +55,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
nodes = true;
metaData = true;
blocks = true;
customs = true;
indices = Strings.EMPTY_ARRAY;
return this;
}
@ -63,6 +65,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
nodes = false;
metaData = false;
blocks = false;
customs = false;
indices = Strings.EMPTY_ARRAY;
return this;
}
@ -124,6 +127,15 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
return this;
}
public ClusterStateRequest customs(boolean customs) {
this.customs = customs;
return this;
}
public boolean customs() {
return customs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -131,6 +143,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
nodes = in.readBoolean();
metaData = in.readBoolean();
blocks = in.readBoolean();
customs = in.readBoolean();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@ -142,6 +155,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
out.writeBoolean(nodes);
out.writeBoolean(metaData);
out.writeBoolean(blocks);
out.writeBoolean(customs);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@ -123,6 +123,9 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
builder.metaData(mdBuilder);
}
if (request.customs()) {
builder.customs(currentState.customs());
}
listener.onResponse(new ClusterStateResponse(clusterName, builder.build()));
}

@ -117,6 +117,12 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
customPrototypes.put(type, proto);
}
static {
// register non plugin custom parts
registerPrototype(SnapshotsInProgress.TYPE, SnapshotsInProgress.PROTO);
registerPrototype(RestoreInProgress.TYPE, RestoreInProgress.PROTO);
}
@Nullable
public static <T extends Custom> T lookupPrototype(String type) {
//noinspection unchecked
@ -249,6 +255,10 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
return this.customs;
}
public <T extends Custom> T custom(String type) {
return (T) customs.get(type);
}
public ClusterName getClusterName() {
return this.clusterName;
}

@ -17,31 +17,30 @@
* under the License.
*/
package org.elasticsearch.cluster.metadata;
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
/**
* Meta data about restore processes that are currently executing
*/
public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
public class RestoreInProgress extends AbstractDiffable<Custom> implements Custom {
public static final String TYPE = "restore";
public static final RestoreMetaData PROTO = new RestoreMetaData();
public static final RestoreInProgress PROTO = new RestoreInProgress();
private final ImmutableList<Entry> entries;
@ -50,7 +49,7 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
*
* @param entries list of currently running restore processes
*/
public RestoreMetaData(ImmutableList<Entry> entries) {
public RestoreInProgress(ImmutableList<Entry> entries) {
this.entries = entries;
}
@ -59,7 +58,7 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
*
* @param entries list of currently running restore processes
*/
public RestoreMetaData(Entry... entries) {
public RestoreInProgress(Entry... entries) {
this.entries = ImmutableList.copyOf(entries);
}
@ -93,7 +92,7 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreMetaData that = (RestoreMetaData) o;
RestoreInProgress that = (RestoreInProgress) o;
if (!entries.equals(that.entries)) return false;
@ -408,7 +407,7 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
* {@inheritDoc}
*/
@Override
public RestoreMetaData readFrom(StreamInput in) throws IOException {
public RestoreInProgress readFrom(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
@ -427,7 +426,7 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
}
entries[i] = new Entry(snapshotId, state, indexBuilder.build(), builder.build());
}
return new RestoreMetaData(entries);
return new RestoreInProgress(entries);
}
/**
@ -451,19 +450,6 @@ public class RestoreMetaData extends AbstractDiffable<MetaData.Custom> implement
}
}
/**
* {@inheritDoc}
*/
@Override
public RestoreMetaData fromXContent(XContentParser parser) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.API_ONLY;
}
/**
* {@inheritDoc}
*/

@ -17,22 +17,20 @@
* under the License.
*/
package org.elasticsearch.cluster.metadata;
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -41,17 +39,17 @@ import static com.google.common.collect.Maps.newHashMap;
/**
* Meta data about snapshots that are currently executing
*/
public class SnapshotMetaData extends AbstractDiffable<Custom> implements MetaData.Custom {
public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Custom {
public static final String TYPE = "snapshots";
public static final SnapshotMetaData PROTO = new SnapshotMetaData();
public static final SnapshotsInProgress PROTO = new SnapshotsInProgress();
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SnapshotMetaData that = (SnapshotMetaData) o;
SnapshotsInProgress that = (SnapshotsInProgress) o;
if (!entries.equals(that.entries)) return false;
@ -312,11 +310,11 @@ public class SnapshotMetaData extends AbstractDiffable<Custom> implements MetaDa
private final ImmutableList<Entry> entries;
public SnapshotMetaData(ImmutableList<Entry> entries) {
public SnapshotsInProgress(ImmutableList<Entry> entries) {
this.entries = entries;
}
public SnapshotMetaData(Entry... entries) {
public SnapshotsInProgress(Entry... entries) {
this.entries = ImmutableList.copyOf(entries);
}
@ -339,7 +337,7 @@ public class SnapshotMetaData extends AbstractDiffable<Custom> implements MetaDa
}
@Override
public SnapshotMetaData readFrom(StreamInput in) throws IOException {
public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
@ -361,7 +359,7 @@ public class SnapshotMetaData extends AbstractDiffable<Custom> implements MetaDa
}
entries[i] = new Entry(snapshotId, includeGlobalState, state, indexBuilder.build(), startTime, builder.build());
}
return new SnapshotMetaData(entries);
return new SnapshotsInProgress(entries);
}
@Override
@ -385,16 +383,6 @@ public class SnapshotMetaData extends AbstractDiffable<Custom> implements MetaDa
}
}
@Override
public SnapshotMetaData fromXContent(XContentParser parser) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.API_ONLY;
}
static final class Fields {
static final XContentBuilderString REPOSITORY = new XContentBuilderString("repository");
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");

@ -97,8 +97,6 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> {
static {
// register non plugin custom metadata
registerPrototype(RepositoriesMetaData.TYPE, RepositoriesMetaData.PROTO);
registerPrototype(SnapshotMetaData.TYPE, SnapshotMetaData.PROTO);
registerPrototype(RestoreMetaData.TYPE, RestoreMetaData.PROTO);
}
/**

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import java.util.*;
@ -56,6 +57,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final Map<ShardId, List<MutableShardRouting>> assignedShards = newHashMap();
private final ImmutableOpenMap<String, ClusterState.Custom> customs;
private int inactivePrimaryCount = 0;
private int inactiveShardCount = 0;
@ -70,6 +73,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
this.metaData = clusterState.metaData();
this.blocks = clusterState.blocks();
this.routingTable = clusterState.routingTable();
this.customs = clusterState.customs();
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
// fill in the nodeToShards with the "live" nodes
@ -157,6 +161,14 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return this.blocks;
}
public ImmutableOpenMap<String, ClusterState.Custom> customs() {
return this.customs;
}
public <T extends ClusterState.Custom> T custom(String type) {
return (T) customs.get(type);
}
public int requiredAverageNumberOfShardsPerNode() {
int totalNumberOfShards = 0;
// we need to recompute to take closed shards into account

@ -19,7 +19,7 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -99,14 +99,14 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
if (!enableRelocation && shardRouting.primary()) {
// Only primary shards are snapshotted
SnapshotMetaData snapshotMetaData = allocation.metaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null) {
SnapshotsInProgress snapshotsInProgress = allocation.routingNodes().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
// Snapshots are not running
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
}
for (SnapshotMetaData.Entry snapshot : snapshotMetaData.entries()) {
SnapshotMetaData.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId());
if (shardSnapshotStatus != null && !shardSnapshotStatus.state().completed() && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) {
logger.trace("Preventing snapshotted shard [{}] to be moved from node [{}]", shardRouting.shardId(), shardSnapshotStatus.nodeId());
return allocation.decision(Decision.NO, NAME, "snapshot for shard [%s] is currently running on node [%s]",

@ -75,6 +75,7 @@ public class RestClusterStateAction extends BaseRestHandler {
clusterStateRequest.routingTable(metrics.contains(ClusterState.Metric.ROUTING_TABLE) || metrics.contains(ClusterState.Metric.ROUTING_NODES));
clusterStateRequest.metaData(metrics.contains(ClusterState.Metric.METADATA));
clusterStateRequest.blocks(metrics.contains(ClusterState.Metric.BLOCKS));
clusterStateRequest.customs(metrics.contains(ClusterState.Metric.CUSTOMS));
}
settingsFilter.addFilterSettingParams(request);

@ -32,7 +32,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.RestoreMetaData.ShardRestoreStatus;
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -74,7 +74,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX
* <p/>
* First {@link #restoreSnapshot(RestoreRequest, org.elasticsearch.action.ActionListener))}
* method reads information about snapshot and metadata from repository. In update cluster state task it checks restore
* preconditions, restores global state if needed, creates {@link RestoreMetaData} record with list of shards that needs
* preconditions, restores global state if needed, creates {@link RestoreInProgress} record with list of shards that needs
* to be restored and adds this shard to the routing table using {@link RoutingTable.Builder#addAsRestore(IndexMetaData, RestoreSource)}
* method.
* <p/>
@ -86,7 +86,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX
* method to start shard restore process.
* <p/>
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)},
* which updates {@link RestoreMetaData} in cluster state or removes it when all shards are completed. In case of
* which updates {@link RestoreInProgress} in cluster state or removes it when all shards are completed. In case of
* restore failure a normal recovery fail-over process kicks in.
*/
public class RestoreService extends AbstractComponent implements ClusterStateListener {
@ -183,20 +183,21 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
public ClusterState execute(ClusterState currentState) {
// Check if another restore process is already running - cannot run two restore processes at the
// same time
RestoreMetaData restoreMetaData = currentState.metaData().custom(RestoreMetaData.TYPE);
if (restoreMetaData != null && !restoreMetaData.entries().isEmpty()) {
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) {
throw new ConcurrentSnapshotExecutionException(snapshotId, "Restore process is already running in this cluster");
}
// Updating cluster state
ClusterState.Builder builder = ClusterState.builder(currentState);
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
final ImmutableMap<ShardId, RestoreMetaData.ShardRestoreStatus> shards;
final ImmutableMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
Set<String> aliases = newHashSet();
if (!renamedIndices.isEmpty()) {
// We have some indices to restore
ImmutableMap.Builder<ShardId, RestoreMetaData.ShardRestoreStatus> shardsBuilder = ImmutableMap.builder();
ImmutableMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableMap.builder();
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
String index = indexEntry.getValue();
boolean partial = checkPartial(index);
@ -260,16 +261,16 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
if (!ignoreShards.contains(shard)) {
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreMetaData.ShardRestoreStatus(clusterService.state().nodes().localNodeId()));
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().localNodeId()));
} else {
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreMetaData.ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreInProgress.ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.FAILURE));
}
}
}
shards = shardsBuilder.build();
RestoreMetaData.Entry restoreEntry = new RestoreMetaData.Entry(snapshotId, RestoreMetaData.State.INIT, ImmutableList.copyOf(renamedIndices.keySet()), shards);
mdBuilder.putCustom(RestoreMetaData.TYPE, new RestoreMetaData(restoreEntry));
RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshotId, RestoreInProgress.State.INIT, ImmutableList.copyOf(renamedIndices.keySet()), shards);
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry));
} else {
shards = ImmutableMap.of();
}
@ -285,7 +286,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
shards.size(), shards.size() - failedShards(shards));
}
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocks).routingTable(rtBuilder).build();
ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rtBuilder).build();
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@ -457,7 +458,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
public void indexShardRestoreCompleted(SnapshotId snapshotId, ShardId shardId) {
logger.trace("[{}] successfully restored shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.SUCCESS));
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.SUCCESS));
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
@ -509,12 +510,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
return currentState;
}
final MetaData metaData = currentState.metaData();
final RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);
final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) {
int changedCount = 0;
final List<RestoreMetaData.Entry> entries = newArrayList();
for (RestoreMetaData.Entry entry : restore.entries()) {
final List<RestoreInProgress.Entry> entries = newArrayList();
for (RestoreInProgress.Entry entry : restore.entries()) {
Map<ShardId, ShardRestoreStatus> shards = null;
for (int i = 0; i < batchSize; i++) {
@ -533,7 +533,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (shards != null) {
if (!completed(shards)) {
entries.add(new RestoreMetaData.Entry(entry.snapshotId(), RestoreMetaData.State.STARTED, entry.indices(), ImmutableMap.copyOf(shards)));
entries.add(new RestoreInProgress.Entry(entry.snapshotId(), RestoreInProgress.State.STARTED, entry.indices(), ImmutableMap.copyOf(shards)));
} else {
logger.info("restore [{}] is done", entry.snapshotId());
if (batchedRestoreInfo == null) {
@ -553,9 +553,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot restore state updates", changedCount);
final RestoreMetaData updatedRestore = new RestoreMetaData(entries.toArray(new RestoreMetaData.Entry[entries.size()]));
final MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()).putCustom(RestoreMetaData.TYPE, updatedRestore);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
final RestoreInProgress updatedRestore = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(RestoreInProgress.TYPE, updatedRestore).build();
}
}
return currentState;
@ -578,7 +577,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
RoutingTable routingTable = newState.getRoutingTable();
final List<ShardId> waitForStarted = newArrayList();
for (Map.Entry<ShardId, ShardRestoreStatus> shard : shards.entrySet()) {
if (shard.getValue().state() == RestoreMetaData.State.SUCCESS ) {
if (shard.getValue().state() == RestoreInProgress.State.SUCCESS ) {
ShardId shardId = shard.getKey();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
if (shardRouting != null && !shardRouting.active()) {
@ -639,8 +638,8 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
});
}
private boolean completed(Map<ShardId, RestoreMetaData.ShardRestoreStatus> shards) {
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
private boolean completed(Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
for (RestoreInProgress.ShardRestoreStatus status : shards.values()) {
if (!status.state().completed()) {
return false;
}
@ -648,10 +647,10 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
return true;
}
private int failedShards(Map<ShardId, RestoreMetaData.ShardRestoreStatus> shards) {
private int failedShards(Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards) {
int failedShards = 0;
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
if (status.state() == RestoreMetaData.State.FAILURE) {
for (RestoreInProgress.ShardRestoreStatus status : shards.values()) {
if (status.state() == RestoreInProgress.State.FAILURE) {
failedShards++;
}
}
@ -696,8 +695,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
* @param event cluster changed event
*/
private void processDeletedIndices(ClusterChangedEvent event) {
MetaData metaData = event.state().metaData();
RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);
RestoreInProgress restore = event.state().custom(RestoreInProgress.TYPE);
if (restore == null) {
// Not restoring - nothing to do
return;
@ -705,7 +703,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (!event.indicesDeleted().isEmpty()) {
// Some indices were deleted, let's make sure all indices that we are restoring still exist
for (RestoreMetaData.Entry entry : restore.entries()) {
for (RestoreInProgress.Entry entry : restore.entries()) {
List<ShardId> shardsToFail = null;
for (ImmutableMap.Entry<ShardId, ShardRestoreStatus> shard : entry.shards().entrySet()) {
if (!shard.getValue().state().completed()) {
@ -720,7 +718,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
if (shardsToFail != null) {
for (ShardId shardId : shardsToFail) {
logger.trace("[{}] failing running shard restore [{}]", entry.snapshotId(), shardId);
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreMetaData.State.FAILURE, "index was deleted")));
updateRestoreStateOnMaster(new UpdateIndexShardRestoreStatusRequest(entry.snapshotId(), shardId, new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "index was deleted")));
}
}
}
@ -733,7 +731,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreInProgress.State.FAILURE));
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UPDATE_RESTORE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
@ -789,10 +787,9 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
* @return true if repository is currently in use by one of the running snapshots
*/
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
MetaData metaData = clusterState.metaData();
RestoreMetaData snapshots = metaData.custom(RestoreMetaData.TYPE);
RestoreInProgress snapshots = clusterState.custom(RestoreInProgress.TYPE);
if (snapshots != null) {
for (RestoreMetaData.Entry snapshot : snapshots.entries()) {
for (RestoreInProgress.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshotId().getRepository())) {
return true;
}

@ -28,8 +28,8 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.ShardSnapshotStatus;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -77,13 +77,13 @@ import static com.google.common.collect.Sets.newHashSet;
* <ul>
* <li>On the master node the {@link #createSnapshot(SnapshotRequest, CreateSnapshotListener)} is called and makes sure that no snapshots is currently running
* and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotMetaData.Entry, boolean, CreateSnapshotListener)} method
* <li>When cluster state is updated the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, CreateSnapshotListener)} method
* kicks in and initializes the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotsService#processIndexShardSnapshots(ClusterChangedEvent)} method</li>
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link #updateIndexShardSnapshotStatus(UpdateIndexShardSnapshotStatusRequest)} method</li>
* <li>When last shard is completed master node in {@link #innerUpdateSnapshotState} method marks the snapshot as completed</li>
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotMetaData.Entry)} finalizes snapshot in the repository,
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state</li>
* </ul>
*/
@ -135,7 +135,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @throws SnapshotMissingException if snapshot is not found
*/
public Snapshot snapshot(SnapshotId snapshotId) {
List<SnapshotMetaData.Entry> entries = currentSnapshots(snapshotId.getRepository(), new String[]{snapshotId.getSnapshot()});
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotId.getRepository(), new String[]{snapshotId.getSnapshot()});
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
@ -150,8 +150,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
*/
public List<Snapshot> snapshots(String repositoryName) {
Set<Snapshot> snapshotSet = newHashSet();
List<SnapshotMetaData.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotMetaData.Entry entry : entries) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
}
Repository repository = repositoriesService.repository(repositoryName);
@ -172,8 +172,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
*/
public List<Snapshot> currentSnapshots(String repositoryName) {
List<Snapshot> snapshotList = newArrayList();
List<SnapshotMetaData.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotMetaData.Entry entry : entries) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, null);
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
CollectionUtil.timSort(snapshotList);
@ -193,27 +193,25 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
final SnapshotId snapshotId = new SnapshotId(request.repository(), request.name());
clusterService.submitStateUpdateTask(request.cause(), new TimeoutClusterStateUpdateTask() {
private SnapshotMetaData.Entry newSnapshot = null;
private SnapshotsInProgress.Entry newSnapshot = null;
@Override
public ClusterState execute(ClusterState currentState) {
validate(request, currentState);
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
ImmutableList<String> indices = ImmutableList.copyOf(metaData.concreteIndices(request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices);
newSnapshot = new SnapshotMetaData.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
snapshots = new SnapshotMetaData(newSnapshot);
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
// TODO: What should we do if a snapshot is already running?
throw new ConcurrentSnapshotExecutionException(snapshotId, "a snapshot is already running");
}
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
@Override
@ -288,7 +286,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param partial allow partial snapshots
* @param userCreateSnapshotListener listener
*/
private void beginSnapshot(ClusterState clusterState, final SnapshotMetaData.Entry snapshot, final boolean partial, final CreateSnapshotListener userCreateSnapshotListener) {
private void beginSnapshot(ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, final CreateSnapshotListener userCreateSnapshotListener) {
boolean snapshotCreated = false;
try {
Repository repository = repositoriesService.repository(snapshot.snapshotId().getRepository());
@ -313,26 +311,25 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshotId().getSnapshot() + "]", new ProcessedClusterStateUpdateTask() {
boolean accepted = false;
SnapshotMetaData.Entry updatedSnapshot;
SnapshotsInProgress.Entry updatedSnapshot;
String failure = null;
@Override
public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
ImmutableList.Builder<SnapshotMetaData.Entry> entries = ImmutableList.builder();
for (SnapshotMetaData.Entry entry : snapshots.entries()) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
ImmutableList.Builder<SnapshotsInProgress.Entry> entries = ImmutableList.builder();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshotId().equals(snapshot.snapshotId())) {
// Replace the snapshot that was just created
ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
ImmutableMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
StringBuilder failureMessage = new StringBuilder();
updatedSnapshot = new SnapshotMetaData.Entry(entry, State.FAILED, shards);
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(updatedSnapshot);
if (missing.isEmpty() == false ) {
failureMessage.append("Indices don't have primary shards ");
@ -349,7 +346,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
continue;
}
}
updatedSnapshot = new SnapshotMetaData.Entry(entry, State.STARTED, shards);
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (!completed(shards.values())) {
accepted = true;
@ -358,8 +355,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
entries.add(entry);
}
}
mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build()));
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entries.build())).build();
}
@Override
@ -407,7 +403,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
}
private Snapshot inProgressSnapshot(SnapshotMetaData.Entry entry) {
private Snapshot inProgressSnapshot(SnapshotsInProgress.Entry entry) {
return new Snapshot(entry.snapshotId().getSnapshot(), entry.indices(), entry.startTime());
}
@ -421,35 +417,34 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param snapshots optional list of snapshots that will be used as a filter
* @return list of metadata for currently running snapshots
*/
public List<SnapshotMetaData.Entry> currentSnapshots(String repository, String[] snapshots) {
MetaData metaData = clusterService.state().metaData();
SnapshotMetaData snapshotMetaData = metaData.custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null || snapshotMetaData.entries().isEmpty()) {
public List<SnapshotsInProgress.Entry> currentSnapshots(String repository, String[] snapshots) {
SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return ImmutableList.of();
}
if ("_all".equals(repository)) {
return snapshotMetaData.entries();
return snapshotsInProgress.entries();
}
if (snapshotMetaData.entries().size() == 1) {
if (snapshotsInProgress.entries().size() == 1) {
// Most likely scenario - one snapshot is currently running
// Check this snapshot against the query
SnapshotMetaData.Entry entry = snapshotMetaData.entries().get(0);
SnapshotsInProgress.Entry entry = snapshotsInProgress.entries().get(0);
if (!entry.snapshotId().getRepository().equals(repository)) {
return ImmutableList.of();
}
if (snapshots != null && snapshots.length > 0) {
for (String snapshot : snapshots) {
if (entry.snapshotId().getSnapshot().equals(snapshot)) {
return snapshotMetaData.entries();
return snapshotsInProgress.entries();
}
}
return ImmutableList.of();
} else {
return snapshotMetaData.entries();
return snapshotsInProgress.entries();
}
}
ImmutableList.Builder<SnapshotMetaData.Entry> builder = ImmutableList.builder();
for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
ImmutableList.Builder<SnapshotsInProgress.Entry> builder = ImmutableList.builder();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
if (!entry.snapshotId().getRepository().equals(repository)) {
continue;
}
@ -544,8 +539,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
processStartedShards(event);
}
}
SnapshotMetaData prev = event.previousState().metaData().custom(SnapshotMetaData.TYPE);
SnapshotMetaData curr = event.state().metaData().custom(SnapshotMetaData.TYPE);
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
if (prev == null) {
if (curr != null) {
@ -579,16 +574,14 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
DiscoveryNodes nodes = currentState.nodes();
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null) {
return currentState;
}
boolean changed = false;
ArrayList<SnapshotMetaData.Entry> entries = newArrayList();
for (final SnapshotMetaData.Entry snapshot : snapshots.entries()) {
SnapshotMetaData.Entry updatedSnapshot = snapshot;
ArrayList<SnapshotsInProgress.Entry> entries = newArrayList();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
boolean snapshotChanged = false;
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
@ -609,10 +602,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
changed = true;
ImmutableMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
if (!snapshot.state().completed() && completed(shardsMap.values())) {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot, State.SUCCESS, shardsMap);
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap);
endSnapshot(updatedSnapshot);
} else {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot, snapshot.state(), shardsMap);
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap);
}
}
entries.add(updatedSnapshot);
@ -635,9 +628,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
}
if (changed) {
snapshots = new SnapshotMetaData(entries.toArray(new SnapshotMetaData.Entry[entries.size()]));
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
return currentState;
}
@ -655,33 +647,30 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData metaData = currentState.metaData();
RoutingTable routingTable = currentState.routingTable();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotMetaData.Entry> entries = newArrayList();
for (final SnapshotMetaData.Entry snapshot : snapshots.entries()) {
SnapshotMetaData.Entry updatedSnapshot = snapshot;
ArrayList<SnapshotsInProgress.Entry> entries = newArrayList();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
if (snapshot.state() == State.STARTED) {
ImmutableMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(), routingTable);
if (shards != null) {
changed = true;
if (!snapshot.state().completed() && completed(shards.values())) {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot, State.SUCCESS, shards);
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
endSnapshot(updatedSnapshot);
} else {
updatedSnapshot = new SnapshotMetaData.Entry(snapshot, shards);
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
}
}
entries.add(updatedSnapshot);
}
}
if (changed) {
snapshots = new SnapshotMetaData(entries.toArray(new SnapshotMetaData.Entry[entries.size()]));
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
}
return currentState;
@ -735,9 +724,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) {
SnapshotMetaData curr = event.state().metaData().custom(SnapshotMetaData.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
if (curr != null) {
for (SnapshotMetaData.Entry entry : curr.entries()) {
for (SnapshotsInProgress.Entry entry : curr.entries()) {
if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) {
for (String index : entry.waitingIndices().keySet()) {
if (event.indexRoutingTableChanged(index)) {
@ -759,11 +748,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
// Check if we just became the master
boolean newMaster = !event.previousState().nodes().localNodeMaster();
SnapshotMetaData snapshotMetaData = event.state().getMetaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null) {
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
return false;
}
for (SnapshotMetaData.Entry snapshot : snapshotMetaData.entries()) {
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) {
// We just replaced old master and snapshots in intermediate states needs to be cleaned
return true;
@ -786,11 +775,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param event cluster state changed event
*/
private void processIndexShardSnapshots(ClusterChangedEvent event) {
SnapshotMetaData snapshotMetaData = event.state().metaData().custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
Map<SnapshotId, SnapshotShards> survivors = newHashMap();
// First, remove snapshots that are no longer there
for (Map.Entry<SnapshotId, SnapshotShards> entry : shardSnapshots.entrySet()) {
if (snapshotMetaData != null && snapshotMetaData.snapshot(entry.getKey()) != null) {
if (snapshotsInProgress != null && snapshotsInProgress.snapshot(entry.getKey()) != null) {
survivors.put(entry.getKey(), entry.getValue());
}
}
@ -800,12 +789,12 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
Map<SnapshotId, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = newHashMap();
// Now go through all snapshots and update existing or create missing
final String localNodeId = clusterService.localNode().id();
if (snapshotMetaData != null) {
for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
if (snapshotsInProgress != null) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
if (entry.state() == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = newHashMap();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.getValue().nodeId())) {
if (shard.getValue().state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.getKey()))) {
@ -833,7 +822,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
if (snapshotShards != null) {
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
if (snapshotStatus != null) {
switch (snapshotStatus.stage()) {
@ -843,7 +832,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.snapshotId(), shard.getKey(),
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.SUCCESS)));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
@ -883,15 +872,15 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
public void run() {
try {
shardSnapshotService.snapshot(entry.getKey(), shardEntry.getValue());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotMetaData.State.SUCCESS)));
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)));
} catch (Throwable t) {
logger.warn("[{}] [{}] failed to create snapshot", t, shardEntry.getKey(), entry.getKey());
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotMetaData.State.FAILED, ExceptionsHelper.detailedMessage(t))));
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))));
}
}
});
} catch (Throwable t) {
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotMetaData.State.FAILED, ExceptionsHelper.detailedMessage(t))));
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(entry.getKey(), shardEntry.getKey(), new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(t))));
}
}
}
@ -903,11 +892,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param event
*/
private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
SnapshotMetaData snapshotMetaData = event.state().getMetaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null) {
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
return;
}
for (SnapshotMetaData.Entry snapshot : snapshotMetaData.entries()) {
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
ImmutableMap<ShardId, IndexShardSnapshotStatus> localShards = currentSnapshotShards(snapshot.snapshotId());
if (localShards != null) {
@ -922,7 +911,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshotId(), shardId);
updateIndexShardSnapshotStatus(new UpdateIndexShardSnapshotStatusRequest(snapshot.snapshotId(), shardId,
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotMetaData.State.SUCCESS)));
new ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.SUCCESS)));
} else if (localShard.getValue().stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshotId(), shardId);
@ -961,7 +950,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param shards list of shard statuses
* @return true if all shards have completed (either successfully or failed), false otherwise
*/
private boolean completed(Collection<SnapshotMetaData.ShardSnapshotStatus> shards) {
private boolean completed(Collection<SnapshotsInProgress.ShardSnapshotStatus> shards) {
for (ShardSnapshotStatus status : shards) {
if (!status.state().completed()) {
return false;
@ -976,10 +965,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param shards list of shard statuses
* @return list of failed and closed indices
*/
private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards, MetaData metaData) {
private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards, MetaData metaData) {
Set<String> missing = newHashSet();
Set<String> closed = newHashSet();
for (ImmutableMap.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> entry : shards.entrySet()) {
for (ImmutableMap.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == State.MISSING) {
if (metaData.hasIndex(entry.getKey().getIndex()) && metaData.index(entry.getKey().getIndex()).getState() == IndexMetaData.State.CLOSE) {
closed.add(entry.getKey().getIndex());
@ -1019,12 +1008,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return currentState;
}
final MetaData metaData = currentState.metaData();
final SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotMetaData.Entry> entries = newArrayList();
for (SnapshotMetaData.Entry entry : snapshots.entries()) {
final List<SnapshotsInProgress.Entry> entries = newArrayList();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
HashMap<ShardId, ShardSnapshotStatus> shards = null;
for (int i = 0; i < batchSize; i++) {
@ -1043,11 +1031,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (shards != null) {
if (!completed(shards.values())) {
entries.add(new SnapshotMetaData.Entry(entry, ImmutableMap.copyOf(shards)));
entries.add(new SnapshotsInProgress.Entry(entry, ImmutableMap.copyOf(shards)));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotMetaData.Entry updatedEntry = new SnapshotMetaData.Entry(entry, State.SUCCESS, ImmutableMap.copyOf(shards));
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, ImmutableMap.copyOf(shards));
entries.add(updatedEntry);
// Finalize snapshot in the repository
endSnapshot(updatedEntry);
@ -1060,9 +1048,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
final SnapshotMetaData updatedSnapshots = new SnapshotMetaData(entries.toArray(new SnapshotMetaData.Entry[entries.size()]));
final MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()).putCustom(SnapshotMetaData.TYPE, updatedSnapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build();
}
}
return currentState;
@ -1084,7 +1071,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
*
* @param entry snapshot
*/
private void endSnapshot(SnapshotMetaData.Entry entry) {
private void endSnapshot(SnapshotsInProgress.Entry entry) {
endSnapshot(entry, null);
}
@ -1097,7 +1084,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param entry snapshot
* @param failure failure reason or null if snapshot was successful
*/
private void endSnapshot(final SnapshotMetaData.Entry entry, final String failure) {
private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
@Override
public void run() {
@ -1136,13 +1123,11 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
clusterService.submitStateUpdateTask("remove snapshot metadata", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotMetaData.Entry> entries = newArrayList();
for (SnapshotMetaData.Entry entry : snapshots.entries()) {
ArrayList<SnapshotsInProgress.Entry> entries = newArrayList();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshotId().equals(snapshotId)) {
changed = true;
} else {
@ -1150,9 +1135,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
}
if (changed) {
snapshots = new SnapshotMetaData(entries.toArray(new SnapshotMetaData.Entry[entries.size()]));
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
}
return currentState;
@ -1196,14 +1180,12 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null) {
// No snapshots running - we can continue
return currentState;
}
SnapshotMetaData.Entry snapshot = snapshots.snapshot(snapshotId);
SnapshotsInProgress.Entry snapshot = snapshots.snapshot(snapshotId);
if (snapshot == null) {
// This snapshot is not running - continue
if (!snapshots.entries().isEmpty()) {
@ -1252,10 +1234,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
endSnapshot(snapshot);
}
}
SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshot, State.ABORTED, shards);
snapshots = new SnapshotMetaData(newSnapshot);
mdBuilder.putCustom(SnapshotMetaData.TYPE, snapshots);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, shards);
snapshots = new SnapshotsInProgress(newSnapshot);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
}
@ -1303,10 +1284,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @return true if repository is currently in use by one of the running snapshots
*/
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
MetaData metaData = clusterState.metaData();
SnapshotMetaData snapshots = metaData.custom(SnapshotMetaData.TYPE);
SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
for (SnapshotMetaData.Entry snapshot : snapshots.entries()) {
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshotId().getRepository())) {
return true;
}
@ -1343,18 +1323,18 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param indices list of indices to be snapshotted
* @return list of shard to be included into current snapshot
*/
private ImmutableMap<ShardId, SnapshotMetaData.ShardSnapshotStatus> shards(ClusterState clusterState, ImmutableList<String> indices) {
ImmutableMap.Builder<ShardId, SnapshotMetaData.ShardSnapshotStatus> builder = ImmutableMap.builder();
private ImmutableMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, ImmutableList<String> indices) {
ImmutableMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableMap.builder();
MetaData metaData = clusterState.metaData();
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);
if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(index, 0), new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "missing index"));
builder.put(new ShardId(index, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
} else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
for (int i = 0; i < indexMetaData.numberOfShards(); i++) {
ShardId shardId = new ShardId(index, i);
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed"));
}
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index);
@ -1363,17 +1343,17 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (indexRoutingTable != null) {
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
if (primary == null || !primary.assignedToNode()) {
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
} else if (primary.relocating() || primary.initializing()) {
// The WAITING state was introduced in V1.2.0 - don't use it if there are nodes with older version in the cluster
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
} else if (!primary.started()) {
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet"));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, "primary shard hasn't been started yet"));
} else {
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(primary.currentNodeId()));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
}
} else {
builder.put(shardId, new SnapshotMetaData.ShardSnapshotStatus(null, State.MISSING, "missing routing table"));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing routing table"));
}
}
}
@ -1656,7 +1636,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
private static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
private SnapshotId snapshotId;
private ShardId shardId;
private SnapshotMetaData.ShardSnapshotStatus status;
private SnapshotsInProgress.ShardSnapshotStatus status;
volatile boolean processed; // state field, no need to serialize
@ -1664,7 +1644,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
private UpdateIndexShardSnapshotStatusRequest(SnapshotId snapshotId, ShardId shardId, SnapshotMetaData.ShardSnapshotStatus status) {
private UpdateIndexShardSnapshotStatusRequest(SnapshotId snapshotId, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
this.snapshotId = snapshotId;
this.shardId = shardId;
this.status = status;
@ -1675,7 +1655,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
super.readFrom(in);
snapshotId = SnapshotId.readSnapshotId(in);
shardId = ShardId.readShardId(in);
status = SnapshotMetaData.ShardSnapshotStatus.readShardSnapshotStatus(in);
status = SnapshotsInProgress.ShardSnapshotStatus.readShardSnapshotStatus(in);
}
@Override
@ -1694,7 +1674,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return shardId;
}
public SnapshotMetaData.ShardSnapshotStatus status() {
public SnapshotsInProgress.ShardSnapshotStatus status() {
return status;
}

@ -88,16 +88,16 @@ public class SnapshotBlocksTests extends ElasticsearchIntegrationTest {
@Test
public void testCreateSnapshotWithBlocks() {
logger.info("--> creating a snapshot is blocked when the cluster is read only");
logger.info("--> creating a snapshot is allowed when the cluster is read only");
try {
setClusterReadOnly(true);
assertBlocked(client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-1"), MetaData.CLUSTER_READ_ONLY_BLOCK);
assertThat(client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-1").setWaitForCompletion(true).get().status(), equalTo(RestStatus.OK));
} finally {
setClusterReadOnly(false);
}
logger.info("--> creating a snapshot is allowed when the cluster is not read only");
CreateSnapshotResponse response = client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-1")
CreateSnapshotResponse response = client().admin().cluster().prepareCreateSnapshot(REPOSITORY_NAME, "snapshot-2")
.setWaitForCompletion(true)
.execute().actionGet();
assertThat(response.status(), equalTo(RestStatus.OK));
@ -126,17 +126,13 @@ public class SnapshotBlocksTests extends ElasticsearchIntegrationTest {
@Test
public void testDeleteSnapshotWithBlocks() {
logger.info("--> deleting a snapshot is blocked when the cluster is read only");
logger.info("--> deleting a snapshot is allowed when the cluster is read only");
try {
setClusterReadOnly(true);
assertBlocked(client().admin().cluster().prepareDeleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME), MetaData.CLUSTER_READ_ONLY_BLOCK);
assertTrue(client().admin().cluster().prepareDeleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME).get().isAcknowledged());
} finally {
setClusterReadOnly(false);
}
logger.info("--> deleting a snapshot is allowed when the cluster is not read only");
DeleteSnapshotResponse response = client().admin().cluster().prepareDeleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME).execute().actionGet();
assertThat(response.isAcknowledged(), equalTo(true));
}
@Test

@ -85,6 +85,8 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
builder = randomBlocks(clusterState);
break;
case 3:
builder = randomClusterStateCustoms(clusterState);
break;
case 4:
builder = randomMetaDataChanges(clusterState);
break;
@ -163,6 +165,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
/**
* Randomly updates nodes in the cluster state
*/
private ClusterState.Builder randomNodes(ClusterState clusterState) {
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
List<String> nodeIds = randomSubsetOf(randomInt(clusterState.nodes().nodes().size() - 1), clusterState.nodes().nodes().keys().toArray(String.class));
@ -182,6 +187,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return ClusterState.builder(clusterState).nodes(nodes);
}
/**
* Randomly updates routing table in the cluster state
*/
private ClusterState.Builder randomRoutingTable(ClusterState clusterState) {
RoutingTable.Builder builder = RoutingTable.builder(clusterState.routingTable());
int numberOfIndices = clusterState.routingTable().indicesRouting().size();
@ -202,6 +210,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return ClusterState.builder(clusterState).routingTable(builder.build());
}
/**
* Randomly updates index routing table in the cluster state
*/
private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds) {
IndexRoutingTable.Builder builder = IndexRoutingTable.builder(index);
int shardCount = randomInt(10);
@ -218,6 +229,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return builder.build();
}
/**
* Randomly creates or removes cluster blocks
*/
private ClusterState.Builder randomBlocks(ClusterState clusterState) {
ClusterBlocks.Builder builder = ClusterBlocks.builder().blocks(clusterState.blocks());
int globalBlocksCount = clusterState.blocks().global().size();
@ -234,6 +248,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return ClusterState.builder(clusterState).blocks(builder);
}
/**
* Returns a random global block
*/
private ClusterBlock randomGlobalBlock() {
switch (randomInt(2)) {
case 0:
@ -245,6 +262,67 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
}
/**
* Random cluster state part generator interface. Used by {@link #randomClusterStateParts(ClusterState, String, RandomClusterPart)}
* method to update cluster state with randomly generated parts
*/
private interface RandomClusterPart<T> {
/**
* Returns list of parts from metadata
*/
ImmutableOpenMap<String, T> parts(ClusterState clusterState);
/**
* Puts the part back into metadata
*/
ClusterState.Builder put(ClusterState.Builder builder, T part);
/**
* Remove the part from metadata
*/
ClusterState.Builder remove(ClusterState.Builder builder, String name);
/**
* Returns a random part with the specified name
*/
T randomCreate(String name);
/**
* Makes random modifications to the part
*/
T randomChange(T part);
}
/**
* Takes an existing cluster state and randomly adds, removes or updates a cluster state part using randomPart generator.
* If a new part is added the prefix value is used as a prefix of randomly generated part name.
*/
private <T> ClusterState randomClusterStateParts(ClusterState clusterState, String prefix, RandomClusterPart<T> randomPart) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
ImmutableOpenMap<String, T> parts = randomPart.parts(clusterState);
int partCount = parts.size();
if (partCount > 0) {
List<String> randomParts = randomSubsetOf(randomInt(partCount - 1), randomPart.parts(clusterState).keys().toArray(String.class));
for (String part : randomParts) {
if (randomBoolean()) {
randomPart.remove(builder, part);
} else {
randomPart.put(builder, randomPart.randomChange(parts.get(part)));
}
}
}
int additionalPartCount = randomIntBetween(1, 20);
for (int i = 0; i < additionalPartCount; i++) {
String name = randomName(prefix);
randomPart.put(builder, randomPart.randomCreate(name));
}
return builder.build();
}
/**
* Makes random metadata changes
*/
private ClusterState.Builder randomMetaDataChanges(ClusterState clusterState) {
MetaData metaData = clusterState.metaData();
int changesCount = randomIntBetween(1, 10);
@ -269,6 +347,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return ClusterState.builder(clusterState).metaData(MetaData.builder(metaData).version(metaData.version() + 1).build());
}
/**
* Makes random settings changes
*/
private Settings randomSettings(Settings settings) {
Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
@ -282,6 +363,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
/**
* Randomly updates persistent or transient settings of the given metadata
*/
private MetaData randomMetaDataSettings(MetaData metaData) {
if (randomBoolean()) {
return MetaData.builder(metaData).persistentSettings(randomSettings(metaData.persistentSettings())).build();
@ -290,6 +374,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
}
/**
* Random metadata part generator
*/
private interface RandomPart<T> {
/**
* Returns list of parts from metadata
@ -318,6 +405,10 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
/**
* Takes an existing cluster state and randomly adds, removes or updates a metadata part using randomPart generator.
* If a new part is added the prefix value is used as a prefix of randomly generated part name.
*/
private <T> MetaData randomParts(MetaData metaData, String prefix, RandomPart<T> randomPart) {
MetaData.Builder builder = MetaData.builder(metaData);
ImmutableOpenMap<String, T> parts = randomPart.parts(metaData);
@ -340,6 +431,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return builder.build();
}
/**
* Randomly add, deletes or updates indices in the metadata
*/
private MetaData randomIndices(MetaData metaData) {
return randomParts(metaData, "index", new RandomPart<IndexMetaData>() {
@ -404,6 +498,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
});
}
/**
* Generates a random warmer
*/
private IndexWarmersMetaData randomWarmers() {
if (randomBoolean()) {
return new IndexWarmersMetaData(
@ -418,6 +515,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
}
}
/**
* Randomly adds, deletes or updates index templates in the metadata
*/
private MetaData randomTemplates(MetaData metaData) {
return randomParts(metaData, "template", new RandomPart<IndexTemplateMetaData>() {
@Override
@ -460,6 +560,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
});
}
/**
* Generates random alias
*/
private AliasMetaData randomAlias() {
AliasMetaData.Builder builder = newAliasMetaDataBuilder(randomName("alias"));
if (randomBoolean()) {
@ -471,6 +574,9 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
return builder.build();
}
/**
* Randomly adds, deletes or updates repositories in the metadata
*/
private MetaData randomMetaDataCustoms(final MetaData metaData) {
return randomParts(metaData, "custom", new RandomPart<MetaData.Custom>() {
@ -481,14 +587,7 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
@Override
public MetaData.Builder put(MetaData.Builder builder, MetaData.Custom part) {
if (part instanceof SnapshotMetaData) {
return builder.putCustom(SnapshotMetaData.TYPE, part);
} else if (part instanceof RepositoriesMetaData) {
return builder.putCustom(RepositoriesMetaData.TYPE, part);
} else if (part instanceof RestoreMetaData) {
return builder.putCustom(RestoreMetaData.TYPE, part);
}
throw new IllegalArgumentException("Unknown custom part " + part);
return builder.putCustom(part.type(), part);
}
@Override
@ -498,26 +597,7 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
@Override
public MetaData.Custom randomCreate(String name) {
switch (randomIntBetween(0, 2)) {
case 0:
return new SnapshotMetaData(new SnapshotMetaData.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
randomBoolean(),
SnapshotMetaData.State.fromValue((byte) randomIntBetween(0, 6)),
ImmutableList.<String>of(),
Math.abs(randomLong()),
ImmutableMap.<ShardId, SnapshotMetaData.ShardSnapshotStatus>of()));
case 1:
return new RepositoriesMetaData();
case 2:
return new RestoreMetaData(new RestoreMetaData.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
RestoreMetaData.State.fromValue((byte) randomIntBetween(0, 3)),
ImmutableList.<String>of(),
ImmutableMap.<ShardId, RestoreMetaData.ShardRestoreStatus>of()));
default:
throw new IllegalArgumentException("Shouldn't be here");
}
return new RepositoriesMetaData();
}
@Override
@ -527,6 +607,59 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
});
}
/**
* Randomly adds, deletes or updates in-progress snapshot and restore records in the cluster state
*/
private ClusterState.Builder randomClusterStateCustoms(final ClusterState clusterState) {
return ClusterState.builder(randomClusterStateParts(clusterState, "custom", new RandomClusterPart<ClusterState.Custom>() {
@Override
public ImmutableOpenMap<String, ClusterState.Custom> parts(ClusterState clusterState) {
return clusterState.customs();
}
@Override
public ClusterState.Builder put(ClusterState.Builder builder, ClusterState.Custom part) {
return builder.putCustom(part.type(), part);
}
@Override
public ClusterState.Builder remove(ClusterState.Builder builder, String name) {
return builder.removeCustom(name);
}
@Override
public ClusterState.Custom randomCreate(String name) {
switch (randomIntBetween(0, 1)) {
case 0:
return new SnapshotsInProgress(new SnapshotsInProgress.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
randomBoolean(),
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
ImmutableList.<String>of(),
Math.abs(randomLong()),
ImmutableMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>of()));
case 1:
return new RestoreInProgress(new RestoreInProgress.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
RestoreInProgress.State.fromValue((byte) randomIntBetween(0, 3)),
ImmutableList.<String>of(),
ImmutableMap.<ShardId, RestoreInProgress.ShardRestoreStatus>of()));
default:
throw new IllegalArgumentException("Shouldn't be here");
}
}
@Override
public ClusterState.Custom randomChange(ClusterState.Custom part) {
return part;
}
}));
}
/**
* Generates a random name that starts with the given prefix
*/
private String randomName(String prefix) {
return prefix + Strings.randomBase64UUID(getRandom());
}

@ -19,13 +19,12 @@
package org.elasticsearch.snapshots;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
@ -106,8 +105,8 @@ public abstract class AbstractSnapshotTests extends ElasticsearchIntegrationTest
if (snapshotInfos.get(0).state().completed()) {
// Make sure that snapshot clean up operations are finished
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
SnapshotMetaData snapshotMetaData = stateResponse.getState().getMetaData().custom(SnapshotMetaData.TYPE);
if (snapshotMetaData == null || snapshotMetaData.snapshot(snapshotId) == null) {
SnapshotsInProgress snapshotsInProgress = stateResponse.getState().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshotId) == null) {
return snapshotInfos.get(0);
}
}

@ -68,7 +68,6 @@ import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.elasticsearch.snapshots.mockstore.MockRepositoryPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.junit.Ignore;
import org.junit.Test;

@ -33,7 +33,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.*;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
@ -42,11 +41,10 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.Entry;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.ShardSnapshotStatus;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -1390,7 +1388,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus("test-repo").execute().actionGet();
assertThat(response.getSnapshots().size(), equalTo(1));
SnapshotStatus snapshotStatus = response.getSnapshots().get(0);
assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED));
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
@ -1403,7 +1401,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
response = client.admin().cluster().prepareSnapshotStatus().execute().actionGet();
assertThat(response.getSnapshots().size(), equalTo(1));
snapshotStatus = response.getSnapshots().get(0);
assertThat(snapshotStatus.getState(), equalTo(SnapshotMetaData.State.STARTED));
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
@ -1769,9 +1767,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
shards.put(new ShardId("test-idx", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
ImmutableList.Builder<Entry> entries = ImmutableList.builder();
entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, State.ABORTED, ImmutableList.of("test-idx"), System.currentTimeMillis(), shards.build()));
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build()));
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entries.build())).build();
}
@Override

@ -426,4 +426,13 @@ The restore operation uses the standard shard recovery mechanism. Therefore, any
be canceled by deleting indices that are being restored. Please note that data for all deleted indices will be removed
from the cluster as a result of this operation.
[float]
=== Effect of cluster blocks on snapshot and restore operations
Many snapshot and restore operations are affected by cluster and index blocks. For example, registering and unregistering
repositories require write global metadata access. The snapshot operation requires that all indices and their metadata as
well as the global metadata were readable. The restore operation requires the global metadata to be writable, however
the index level blocks are ignored during restore because indices are essentially recreated during restore.
Please note that a repository content is not part of the cluster and therefore cluster blocks don't affect internal
repository operations such as listing or deleting snapshots from an already registered repository.