parent
46f1e30fa9
commit
1425e28639
|
@ -207,6 +207,16 @@ didn't exist in the cluster. If cluster state is restored, the restored template
|
|||
cluster are added and existing templates with the same name are replaced by the restored templates. The restored
|
||||
persistent settings are added to the existing persistent settings.
|
||||
|
||||
[float]
|
||||
=== Partial restore
|
||||
|
||||
added[1.3.0]
|
||||
|
||||
By default, entire restore operation will fail if one or more indices participating in the operation don't have
|
||||
snapshots of all shards available. It can occur if some shards failed to snapshot for example. It is still possible to
|
||||
restore such indices by setting `partial` to `true`. Please note, that only successfully snapshotted shards will be
|
||||
restored in this case and all missing shards will be recreated empty.
|
||||
|
||||
|
||||
[float]
|
||||
=== Snapshot status
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore;
|
|||
|
||||
import org.elasticsearch.ElasticsearchGenerationException;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
|
||||
|
@ -67,6 +68,8 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
|
|||
|
||||
private boolean includeGlobalState = true;
|
||||
|
||||
private boolean partial = false;
|
||||
|
||||
private Settings settings = EMPTY_SETTINGS;
|
||||
|
||||
RestoreSnapshotRequest() {
|
||||
|
@ -272,6 +275,26 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
|
|||
return waitForCompletion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if indices with failed to snapshot shards should be partially restored.
|
||||
*
|
||||
* @return true if indices with failed to snapshot shards should be partially restored
|
||||
*/
|
||||
public boolean partial() {
|
||||
return partial;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to true to allow indices with failed to snapshot shards should be partially restored.
|
||||
*
|
||||
* @param partial true if indices with failed to snapshot shards should be partially restored.
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreSnapshotRequest partial(boolean partial) {
|
||||
this.partial = partial;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets repository-specific restore settings.
|
||||
* <p/>
|
||||
|
@ -405,6 +428,8 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
|
|||
expandWildcardsOpen = nodeBooleanValue(entry.getValue());
|
||||
} else if (name.equals("expand_wildcards_closed") || name.equals("expandWildcardsClosed")) {
|
||||
expandWildcardsClosed = nodeBooleanValue(entry.getValue());
|
||||
} else if (name.equals("partial")) {
|
||||
partial(nodeBooleanValue(entry.getValue()));
|
||||
} else if (name.equals("settings")) {
|
||||
if (!(entry.getValue() instanceof Map)) {
|
||||
throw new ElasticsearchIllegalArgumentException("malformed settings section, should indices an inner object");
|
||||
|
@ -511,6 +536,9 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
|
|||
renameReplacement = in.readOptionalString();
|
||||
waitForCompletion = in.readBoolean();
|
||||
includeGlobalState = in.readBoolean();
|
||||
if (in.getVersion().onOrAfter(Version.V_1_3_0)) {
|
||||
partial = in.readBoolean();
|
||||
}
|
||||
settings = readSettingsFromStream(in);
|
||||
}
|
||||
|
||||
|
@ -525,6 +553,9 @@ public class RestoreSnapshotRequest extends MasterNodeOperationRequest<RestoreSn
|
|||
out.writeOptionalString(renameReplacement);
|
||||
out.writeBoolean(waitForCompletion);
|
||||
out.writeBoolean(includeGlobalState);
|
||||
if (out.getVersion().onOrAfter(Version.V_1_3_0)) {
|
||||
out.writeBoolean(partial);
|
||||
}
|
||||
writeSettingsToStream(settings, out);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -208,6 +208,17 @@ public class RestoreSnapshotRequestBuilder extends MasterNodeOperationRequestBui
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* If set to true the restore procedure will restore partially snapshotted indices
|
||||
*
|
||||
* @param partial true if partially snapshotted indices should be restored
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreSnapshotRequestBuilder setPartial(boolean partial) {
|
||||
request.partial(partial);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ActionListener<RestoreSnapshotResponse> listener) {
|
||||
client.restoreSnapshot(request, listener);
|
||||
|
|
|
@ -75,15 +75,10 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeOperation
|
|||
|
||||
@Override
|
||||
protected void masterOperation(final RestoreSnapshotRequest request, ClusterState state, final ActionListener<RestoreSnapshotResponse> listener) throws ElasticsearchException {
|
||||
RestoreService.RestoreRequest restoreRequest =
|
||||
new RestoreService.RestoreRequest("restore_snapshot[" + request.snapshot() + "]", request.repository(), request.snapshot())
|
||||
.indices(request.indices())
|
||||
.indicesOptions(request.indicesOptions())
|
||||
.renamePattern(request.renamePattern())
|
||||
.renameReplacement(request.renameReplacement())
|
||||
.includeGlobalState(request.includeGlobalState())
|
||||
.settings(request.settings())
|
||||
.masterNodeTimeout(request.masterNodeTimeout());
|
||||
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(
|
||||
"restore_snapshot[" + request.snapshot() + "]", request.repository(), request.snapshot(),
|
||||
request.indices(), request.indicesOptions(), request.renamePattern(), request.renameReplacement(),
|
||||
request.settings(), request.masterNodeTimeout(), request.includeGlobalState(), request.partial());
|
||||
restoreService.restoreSnapshot(restoreRequest, new RestoreSnapshotListener() {
|
||||
@Override
|
||||
public void onResponse(RestoreInfo restoreInfo) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import com.carrotsearch.hppc.cursors.IntCursor;
|
||||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -381,28 +382,33 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
/**
|
||||
* Initializes a new empty index, to be restored from a snapshot
|
||||
*/
|
||||
public Builder initializeAsNewRestore(IndexMetaData indexMetaData, RestoreSource restoreSource) {
|
||||
return initializeAsRestore(indexMetaData, restoreSource, true);
|
||||
public Builder initializeAsNewRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards) {
|
||||
return initializeAsRestore(indexMetaData, restoreSource, ignoreShards, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes an existing index, to be restored from a snapshot
|
||||
*/
|
||||
public Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource) {
|
||||
return initializeAsRestore(indexMetaData, restoreSource, false);
|
||||
return initializeAsRestore(indexMetaData, restoreSource, null, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes an index, to be restored from snapshot
|
||||
*/
|
||||
private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, boolean asNew) {
|
||||
private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards, boolean asNew) {
|
||||
if (!shards.isEmpty()) {
|
||||
throw new ElasticsearchIllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
|
||||
}
|
||||
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
|
||||
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
|
||||
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, i == 0 ? restoreSource : null, i == 0, ShardRoutingState.UNASSIGNED, 0));
|
||||
if (asNew && ignoreShards.contains(shardId)) {
|
||||
// This shards wasn't completely snapshotted - restore it as new shard
|
||||
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, i == 0, ShardRoutingState.UNASSIGNED, 0));
|
||||
} else {
|
||||
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, i == 0 ? restoreSource : null, i == 0, ShardRoutingState.UNASSIGNED, 0));
|
||||
}
|
||||
}
|
||||
shards.put(shardId, indexShardRoutingBuilder.build());
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.routing;
|
||||
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import com.google.common.collect.*;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -417,9 +418,9 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder addAsNewRestore(IndexMetaData indexMetaData, RestoreSource restoreSource) {
|
||||
public Builder addAsNewRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards) {
|
||||
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
|
||||
.initializeAsNewRestore(indexMetaData, restoreSource);
|
||||
.initializeAsNewRestore(indexMetaData, restoreSource, ignoreShards);
|
||||
add(indexRoutingBuilder);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import com.carrotsearch.hppc.IntOpenHashSet;
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -28,8 +30,7 @@ import org.elasticsearch.cluster.*;
|
|||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.*;
|
||||
import org.elasticsearch.cluster.metadata.RestoreMetaData.ShardRestoreStatus;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -70,7 +71,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX
|
|||
* {@link org.elasticsearch.index.gateway.IndexShardGatewayService#recover(boolean, org.elasticsearch.index.gateway.IndexShardGatewayService.RecoveryListener)}
|
||||
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
|
||||
* at the {@link org.elasticsearch.cluster.routing.ShardRouting#restoreSource()} property. If this property is not null
|
||||
* {@code recover} method uses {@link org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService#restore(org.elasticsearch.index.gateway.RecoveryState)}
|
||||
* {@code recover} method uses {@link org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService#restore(org.elasticsearch.indices.recovery.RecoveryState)}
|
||||
* method to start shard restore process.
|
||||
* <p/>
|
||||
* At the end of the successful restore process {@code IndexShardSnapshotAndRestoreService} calls {@link #indexShardRestoreCompleted(SnapshotId, ShardId)},
|
||||
|
@ -119,25 +120,10 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
final MetaData metaData = repository.readSnapshotMetaData(snapshotId, filteredIndices);
|
||||
|
||||
// Make sure that we can restore from this snapshot
|
||||
if (!snapshot.state().restorable()) {
|
||||
throw new SnapshotRestoreException(snapshotId, "unsupported snapshot state [" + snapshot.state() + "]");
|
||||
}
|
||||
if (Version.CURRENT.before(snapshot.version())) {
|
||||
throw new SnapshotRestoreException(snapshotId, "incompatible snapshot version [" + snapshot.version() + "]");
|
||||
}
|
||||
validateSnapshotRestorable(snapshotId, snapshot);
|
||||
|
||||
// Find list of indices that we need to restore
|
||||
final Map<String, String> renamedIndices = newHashMap();
|
||||
for (String index : filteredIndices) {
|
||||
String renamedIndex = index;
|
||||
if (request.renameReplacement() != null && request.renamePattern() != null) {
|
||||
renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement());
|
||||
}
|
||||
String previousIndex = renamedIndices.put(renamedIndex, index);
|
||||
if (previousIndex != null) {
|
||||
throw new SnapshotRestoreException(snapshotId, "indices [" + index + "] and [" + previousIndex + "] are renamed into the same index [" + renamedIndex + "]");
|
||||
}
|
||||
}
|
||||
final Map<String, String> renamedIndices = renamedIndices(request, filteredIndices);
|
||||
|
||||
// Now we can start the actual restore process by adding shards to be recovered in the cluster state
|
||||
// and updating cluster metadata (global and index) as needed
|
||||
|
@ -157,39 +143,32 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
|
||||
if (!metaData.indices().isEmpty()) {
|
||||
final ImmutableMap<ShardId, RestoreMetaData.ShardRestoreStatus> shards;
|
||||
if (!renamedIndices.isEmpty()) {
|
||||
// We have some indices to restore
|
||||
ImmutableMap.Builder<ShardId, RestoreMetaData.ShardRestoreStatus> shards = ImmutableMap.builder();
|
||||
ImmutableMap.Builder<ShardId, RestoreMetaData.ShardRestoreStatus> shardsBuilder = ImmutableMap.builder();
|
||||
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
|
||||
String index = indexEntry.getValue();
|
||||
// Make sure that index was fully snapshotted - don't restore
|
||||
if (failed(snapshot, index)) {
|
||||
throw new SnapshotRestoreException(snapshotId, "index [" + index + "] wasn't fully snapshotted - cannot restore");
|
||||
}
|
||||
boolean partial = checkPartial(index);
|
||||
RestoreSource restoreSource = new RestoreSource(snapshotId, index);
|
||||
String renamedIndex = indexEntry.getKey();
|
||||
IndexMetaData snapshotIndexMetaData = metaData.index(index);
|
||||
// Check that the index is closed or doesn't exist
|
||||
IndexMetaData currentIndexMetaData = currentState.metaData().index(renamedIndex);
|
||||
IntSet ignoreShards = new IntOpenHashSet();
|
||||
if (currentIndexMetaData == null) {
|
||||
// Index doesn't exist - create it and start recovery
|
||||
// Make sure that the index we are about to create has a validate name
|
||||
createIndexService.validateIndexName(renamedIndex, currentState);
|
||||
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN).index(renamedIndex);
|
||||
IndexMetaData updatedIndexMetaData = indexMdBuilder.build();
|
||||
rtBuilder.addAsNewRestore(updatedIndexMetaData, restoreSource);
|
||||
if (partial) {
|
||||
populateIgnoredShards(index, ignoreShards);
|
||||
}
|
||||
rtBuilder.addAsNewRestore(updatedIndexMetaData, restoreSource, ignoreShards);
|
||||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
} else {
|
||||
// Index exist - checking that it's closed
|
||||
if (currentIndexMetaData.state() != IndexMetaData.State.CLOSE) {
|
||||
// TODO: Enable restore for open indices
|
||||
throw new SnapshotRestoreException(snapshotId, "cannot restore index [" + renamedIndex + "] because it's open");
|
||||
}
|
||||
// Make sure that the number of shards is the same. That's the only thing that we cannot change
|
||||
if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
|
||||
throw new SnapshotRestoreException(snapshotId, "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() +
|
||||
"] shard from snapshot with [" + snapshotIndexMetaData.getNumberOfShards() + "] shards");
|
||||
}
|
||||
validateExistingIndex(currentIndexMetaData, snapshotIndexMetaData, renamedIndex, partial);
|
||||
// Index exists and it's closed - open it in metadata and start recovery
|
||||
IndexMetaData.Builder indexMdBuilder = IndexMetaData.builder(snapshotIndexMetaData).state(IndexMetaData.State.OPEN);
|
||||
indexMdBuilder.version(Math.max(snapshotIndexMetaData.version(), currentIndexMetaData.version() + 1));
|
||||
|
@ -199,15 +178,74 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
mdBuilder.put(updatedIndexMetaData, true);
|
||||
}
|
||||
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
|
||||
shards.put(new ShardId(renamedIndex, shard), new RestoreMetaData.ShardRestoreStatus(clusterService.state().nodes().localNodeId()));
|
||||
if (!ignoreShards.contains(shard)) {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreMetaData.ShardRestoreStatus(clusterService.state().nodes().localNodeId()));
|
||||
} else {
|
||||
shardsBuilder.put(new ShardId(renamedIndex, shard), new RestoreMetaData.ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RestoreMetaData.Entry restoreEntry = new RestoreMetaData.Entry(snapshotId, RestoreMetaData.State.INIT, ImmutableList.copyOf(renamedIndices.keySet()), shards.build());
|
||||
shards = shardsBuilder.build();
|
||||
RestoreMetaData.Entry restoreEntry = new RestoreMetaData.Entry(snapshotId, RestoreMetaData.State.INIT, ImmutableList.copyOf(renamedIndices.keySet()), shards);
|
||||
mdBuilder.putCustom(RestoreMetaData.TYPE, new RestoreMetaData(restoreEntry));
|
||||
} else {
|
||||
shards = ImmutableMap.of();
|
||||
}
|
||||
|
||||
// Restore global state if needed
|
||||
restoreGlobalStateIfRequested(mdBuilder);
|
||||
|
||||
if (completed(shards)) {
|
||||
// We don't have any indices to restore - we are done
|
||||
restoreInfo = new RestoreInfo(request.name(), ImmutableList.<String>copyOf(renamedIndices.keySet()),
|
||||
shards.size(), shards.size() - failedShards(shards));
|
||||
}
|
||||
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocks).routingTable(rtBuilder).build();
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
|
||||
return ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
private void populateIgnoredShards(String index, IntSet ignoreShards) {
|
||||
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
||||
if (index.equals(failure.index())) {
|
||||
ignoreShards.add(failure.shardId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkPartial(String index) {
|
||||
// Make sure that index was fully snapshotted
|
||||
if (failed(snapshot, index)) {
|
||||
if (request.partial()) {
|
||||
return true;
|
||||
} else {
|
||||
throw new SnapshotRestoreException(snapshotId, "index [" + index + "] wasn't fully snapshotted - cannot restore");
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void validateExistingIndex(IndexMetaData currentIndexMetaData, IndexMetaData snapshotIndexMetaData, String renamedIndex, boolean partial) {
|
||||
// Index exist - checking that it's closed
|
||||
if (currentIndexMetaData.state() != IndexMetaData.State.CLOSE) {
|
||||
// TODO: Enable restore for open indices
|
||||
throw new SnapshotRestoreException(snapshotId, "cannot restore index [" + renamedIndex + "] because it's open");
|
||||
}
|
||||
// Index exist - checking if it's partial restore
|
||||
if (partial) {
|
||||
throw new SnapshotRestoreException(snapshotId, "cannot restore partial index [" + renamedIndex + "] because such index already exists");
|
||||
}
|
||||
// Make sure that the number of shards is the same. That's the only thing that we cannot change
|
||||
if (currentIndexMetaData.getNumberOfShards() != snapshotIndexMetaData.getNumberOfShards()) {
|
||||
throw new SnapshotRestoreException(snapshotId, "cannot restore index [" + renamedIndex + "] with [" + currentIndexMetaData.getNumberOfShards() +
|
||||
"] shard from snapshot with [" + snapshotIndexMetaData.getNumberOfShards() + "] shards");
|
||||
}
|
||||
}
|
||||
|
||||
private void restoreGlobalStateIfRequested(MetaData.Builder mdBuilder) {
|
||||
if (request.includeGlobalState()) {
|
||||
if (metaData.persistentSettings() != null) {
|
||||
mdBuilder.persistentSettings(metaData.persistentSettings());
|
||||
|
@ -228,17 +266,9 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (metaData.indices().isEmpty()) {
|
||||
// We don't have any indices to restore - we are done
|
||||
restoreInfo = new RestoreInfo(request.name(), ImmutableList.<String>of(), 0, 0);
|
||||
}
|
||||
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocks).routingTable(rtBuilder).build();
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
|
||||
return ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
logger.warn("[{}] failed to restore snapshot", t, snapshotId);
|
||||
|
@ -290,8 +320,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
private void innerUpdateRestoreState(final UpdateIndexShardRestoreStatusRequest request) {
|
||||
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
|
||||
|
||||
private boolean completed = true;
|
||||
|
||||
private RestoreInfo restoreInfo = null;
|
||||
|
||||
@Override
|
||||
|
@ -307,23 +335,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
HashMap<ShardId, ShardRestoreStatus> shards = newHashMap(entry.shards());
|
||||
logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state());
|
||||
shards.put(request.shardId(), request.status());
|
||||
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
|
||||
if (!status.state().completed()) {
|
||||
completed = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!completed) {
|
||||
if (!completed(shards)) {
|
||||
entries.add(new RestoreMetaData.Entry(entry.snapshotId(), RestoreMetaData.State.STARTED, entry.indices(), ImmutableMap.copyOf(shards)));
|
||||
} else {
|
||||
logger.info("restore [{}] is done", request.snapshotId());
|
||||
int failedShards = 0;
|
||||
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
|
||||
if (status.state() == RestoreMetaData.State.FAILURE) {
|
||||
failedShards++;
|
||||
}
|
||||
}
|
||||
restoreInfo = new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards);
|
||||
restoreInfo = new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards(shards));
|
||||
}
|
||||
changed = true;
|
||||
} else {
|
||||
|
@ -359,6 +375,57 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
});
|
||||
}
|
||||
|
||||
private boolean completed(Map<ShardId, RestoreMetaData.ShardRestoreStatus> shards) {
|
||||
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
|
||||
if (!status.state().completed()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private int failedShards(Map<ShardId, RestoreMetaData.ShardRestoreStatus> shards) {
|
||||
int failedShards = 0;
|
||||
for (RestoreMetaData.ShardRestoreStatus status : shards.values()) {
|
||||
if (status.state() == RestoreMetaData.State.FAILURE) {
|
||||
failedShards++;
|
||||
}
|
||||
}
|
||||
return failedShards;
|
||||
}
|
||||
|
||||
private Map<String, String> renamedIndices(RestoreRequest request, ImmutableList<String> filteredIndices) {
|
||||
Map<String, String> renamedIndices = newHashMap();
|
||||
for (String index : filteredIndices) {
|
||||
String renamedIndex = index;
|
||||
if (request.renameReplacement() != null && request.renamePattern() != null) {
|
||||
renamedIndex = index.replaceAll(request.renamePattern(), request.renameReplacement());
|
||||
}
|
||||
String previousIndex = renamedIndices.put(renamedIndex, index);
|
||||
if (previousIndex != null) {
|
||||
throw new SnapshotRestoreException(new SnapshotId(request.repository(), request.name()),
|
||||
"indices [" + index + "] and [" + previousIndex + "] are renamed into the same index [" + renamedIndex + "]");
|
||||
}
|
||||
}
|
||||
return renamedIndices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks that snapshots can be restored and have compatible version
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param snapshot snapshot metadata
|
||||
*/
|
||||
private void validateSnapshotRestorable(SnapshotId snapshotId, Snapshot snapshot) {
|
||||
if (!snapshot.state().restorable()) {
|
||||
throw new SnapshotRestoreException(snapshotId, "unsupported snapshot state [" + snapshot.state() + "]");
|
||||
}
|
||||
if (Version.CURRENT.before(snapshot.version())) {
|
||||
throw new SnapshotRestoreException(snapshotId, "the snapshot was created with Elasticsearch version [" +
|
||||
snapshot.version() + "] which is higher than the version of this node [" + Version.CURRENT + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any of the deleted indices are still recovering and fails recovery on the shards of these indices
|
||||
*
|
||||
|
@ -405,15 +472,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
return false;
|
||||
}
|
||||
|
||||
private boolean failed(Snapshot snapshot, String index, int shard) {
|
||||
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
|
||||
if (index.equals(failure.index()) && shard == failure.shardId()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds restore completion listener
|
||||
* <p/>
|
||||
|
@ -473,117 +531,57 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
*/
|
||||
public static class RestoreRequest {
|
||||
|
||||
private String cause;
|
||||
final private String cause;
|
||||
|
||||
private String name;
|
||||
final private String name;
|
||||
|
||||
private String repository;
|
||||
final private String repository;
|
||||
|
||||
private String[] indices;
|
||||
final private String[] indices;
|
||||
|
||||
private String renamePattern;
|
||||
final private String renamePattern;
|
||||
|
||||
private String renameReplacement;
|
||||
final private String renameReplacement;
|
||||
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
|
||||
final private IndicesOptions indicesOptions;
|
||||
|
||||
private Settings settings;
|
||||
final private Settings settings;
|
||||
|
||||
private TimeValue masterNodeTimeout;
|
||||
final private TimeValue masterNodeTimeout;
|
||||
|
||||
private boolean includeGlobalState = false;
|
||||
final private boolean includeGlobalState;
|
||||
|
||||
final private boolean partial;
|
||||
|
||||
/**
|
||||
* Constructs new restore request
|
||||
*
|
||||
* @param cause cause for restoring the snapshot
|
||||
* @param repository repository name
|
||||
* @param name snapshot name
|
||||
* @param cause cause for restoring the snapshot
|
||||
* @param repository repository name
|
||||
* @param name snapshot name
|
||||
* @param indices list of indices to restore
|
||||
* @param indicesOptions indices options
|
||||
* @param renamePattern pattern to rename indices
|
||||
* @param renameReplacement replacement for renamed indices
|
||||
* @param settings repository specific restore settings
|
||||
* @param masterNodeTimeout master node timeout
|
||||
* @param includeGlobalState include global state into restore
|
||||
* @param partial allow partial restore
|
||||
*/
|
||||
public RestoreRequest(String cause, String repository, String name) {
|
||||
public RestoreRequest(String cause, String repository, String name, String[] indices, IndicesOptions indicesOptions,
|
||||
String renamePattern, String renameReplacement, Settings settings,
|
||||
TimeValue masterNodeTimeout, boolean includeGlobalState, boolean partial) {
|
||||
this.cause = cause;
|
||||
this.name = name;
|
||||
this.repository = repository;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets list of indices to restore
|
||||
*
|
||||
* @param indices list of indices
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest indices(String[] indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets indices options flags
|
||||
*
|
||||
* @param indicesOptions indices options flags
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* If true global cluster state will be restore as part of the restore operation
|
||||
*
|
||||
* @param includeGlobalState restore global state flag
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest includeGlobalState(boolean includeGlobalState) {
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets repository-specific restore settings
|
||||
*
|
||||
* @param settings restore settings
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest settings(Settings settings) {
|
||||
this.settings = settings;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets master node timeout
|
||||
* <p/>
|
||||
* This timeout will affect only start of the restore process. Once restore process has started this timeout
|
||||
* has no affect for the duration of restore.
|
||||
*
|
||||
* @param masterNodeTimeout master node timeout
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest masterNodeTimeout(TimeValue masterNodeTimeout) {
|
||||
this.masterNodeTimeout = masterNodeTimeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets index rename pattern
|
||||
*
|
||||
* @param renamePattern rename pattern
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest renamePattern(String renamePattern) {
|
||||
this.renamePattern = renamePattern;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets index rename replacement
|
||||
*
|
||||
* @param renameReplacement rename replacement
|
||||
* @return this request
|
||||
*/
|
||||
public RestoreRequest renameReplacement(String renameReplacement) {
|
||||
this.renameReplacement = renameReplacement;
|
||||
return this;
|
||||
this.indicesOptions = indicesOptions;
|
||||
this.settings = settings;
|
||||
this.masterNodeTimeout = masterNodeTimeout;
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
this.partial = partial;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -667,6 +665,15 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
return includeGlobalState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if incomplete indices will be restored
|
||||
*
|
||||
* @return partial indices restore flag
|
||||
*/
|
||||
public boolean partial() {
|
||||
return partial;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return master node timeout
|
||||
*
|
||||
|
|
|
@ -210,33 +210,41 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
internalCluster().startNode(settingsBuilder().put("gateway.type", "local"));
|
||||
cluster().wipeIndices("_all");
|
||||
|
||||
assertAcked(prepareCreate("test-idx-1", 2, settingsBuilder().put("number_of_shards", 6)
|
||||
logger.info("--> create an index that will have some unallocated shards");
|
||||
assertAcked(prepareCreate("test-idx-some", 2, settingsBuilder().put("number_of_shards", 6)
|
||||
.put("number_of_replicas", 0)
|
||||
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data into test-idx-1");
|
||||
logger.info("--> indexing some data into test-idx-some");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
index("test-idx-some", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client().prepareCount("test-idx-1").get().getCount(), equalTo(100L));
|
||||
assertThat(client().prepareCount("test-idx-some").get().getCount(), equalTo(100L));
|
||||
|
||||
logger.info("--> shutdown one of the nodes");
|
||||
internalCluster().stopRandomDataNode();
|
||||
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("<2").execute().actionGet().isTimedOut(), equalTo(false));
|
||||
|
||||
assertAcked(prepareCreate("test-idx-2", 1, settingsBuilder().put("number_of_shards", 6)
|
||||
logger.info("--> create an index that will have all allocated shards");
|
||||
assertAcked(prepareCreate("test-idx-all", 1, settingsBuilder().put("number_of_shards", 6)
|
||||
.put("number_of_replicas", 0)
|
||||
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
|
||||
ensureGreen("test-idx-2");
|
||||
ensureGreen("test-idx-all");
|
||||
|
||||
logger.info("--> indexing some data into test-idx-2");
|
||||
logger.info("--> indexing some data into test-idx-all");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx-2", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
index("test-idx-all", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client().prepareCount("test-idx-2").get().getCount(), equalTo(100L));
|
||||
assertThat(client().prepareCount("test-idx-all").get().getCount(), equalTo(100L));
|
||||
|
||||
logger.info("--> create an index that will have no allocated shards");
|
||||
assertAcked(prepareCreate("test-idx-none", 1, settingsBuilder().put("number_of_shards", 6)
|
||||
.put("index.routing.allocation.include.tag", "nowhere")
|
||||
.put("number_of_replicas", 0)
|
||||
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
|
||||
|
||||
logger.info("--> create repository");
|
||||
logger.info("--> creating repository");
|
||||
|
@ -257,7 +265,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
public boolean apply(Object o) {
|
||||
SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap-2").get();
|
||||
ImmutableList<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
|
||||
if(snapshotStatuses.size() == 1) {
|
||||
if (snapshotStatuses.size() == 1) {
|
||||
logger.trace("current snapshot status [{}]", snapshotStatuses.get(0));
|
||||
return snapshotStatuses.get(0).getState().completed();
|
||||
}
|
||||
|
@ -269,34 +277,58 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
assertThat(snapshotStatuses.size(), equalTo(1));
|
||||
SnapshotStatus snapshotStatus = snapshotStatuses.get(0);
|
||||
logger.info("State: [{}], Reason: [{}]", createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
|
||||
assertThat(snapshotStatus.getShardsStats().getTotalShards(), equalTo(12));
|
||||
assertThat(snapshotStatus.getShardsStats().getTotalShards(), equalTo(18));
|
||||
assertThat(snapshotStatus.getShardsStats().getDoneShards(), lessThan(12));
|
||||
assertThat(snapshotStatus.getShardsStats().getDoneShards(), greaterThan(6));
|
||||
} else {
|
||||
logger.info("checking snapshot completion using wait_for_completion flag");
|
||||
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setPartial(true).execute().actionGet();
|
||||
logger.info("State: [{}], Reason: [{}]", createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(12));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(18));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(12));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(6));
|
||||
}
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").execute().actionGet().getSnapshots().get(0).state(), equalTo(SnapshotState.PARTIAL));
|
||||
|
||||
assertAcked(client().admin().indices().prepareClose("test-idx-1", "test-idx-2").execute().actionGet());
|
||||
assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all").execute().actionGet());
|
||||
|
||||
logger.info("--> restore incomplete snapshot - should fail");
|
||||
assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false).setWaitForCompletion(true).execute(), SnapshotRestoreException.class);
|
||||
|
||||
logger.info("--> restore snapshot for the index that was snapshotted completely");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false).setIndices("test-idx-2").setWaitForCompletion(true).execute().actionGet();
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false).setIndices("test-idx-all").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(6));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
|
||||
ensureGreen("test-idx-2");
|
||||
ensureGreen("test-idx-all");
|
||||
|
||||
assertThat(client().prepareCount("test-idx-2").get().getCount(), equalTo(100L));
|
||||
assertThat(client().prepareCount("test-idx-all").get().getCount(), equalTo(100L));
|
||||
|
||||
logger.info("--> restore snapshot for the partial index");
|
||||
cluster().wipeIndices("test-idx-some");
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
|
||||
.setRestoreGlobalState(false).setIndices("test-idx-some").setPartial(true).setWaitForCompletion(true).get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), allOf(greaterThan(0), lessThan(6)));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), greaterThan(0));
|
||||
|
||||
ensureGreen("test-idx-some");
|
||||
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
|
||||
|
||||
logger.info("--> restore snapshot for the index that didn't have any shards snapshotted successfully");
|
||||
cluster().wipeIndices("test-idx-none");
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
|
||||
.setRestoreGlobalState(false).setIndices("test-idx-none").setPartial(true).setWaitForCompletion(true).get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(6));
|
||||
|
||||
ensureGreen("test-idx-some");
|
||||
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -387,7 +419,7 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
|
||||
logger.info("--> update index settings to back to normal");
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
|
||||
));
|
||||
|
||||
// Make sure that snapshot finished - doesn't matter if it failed or succeeded
|
||||
|
@ -434,8 +466,8 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
|||
}
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000))
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue