Fail closing or deleting indices during a full snapshot

Closes #16321
This commit is contained in:
Yannick Welsch 2016-03-08 21:20:15 +01:00
parent 0bbb84c19a
commit 266394c3ab
8 changed files with 198 additions and 49 deletions

View File

@ -69,15 +69,17 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private final State state;
private final SnapshotId snapshotId;
private final boolean includeGlobalState;
private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(SnapshotId snapshotId, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState;
this.partial = partial;
this.indices = indices;
this.startTime = startTime;
if (shards == null) {
@ -90,7 +92,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
this(entry.snapshotId, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards);
}
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@ -121,6 +123,10 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return includeGlobalState;
}
public boolean partial() {
return partial;
}
public long startTime() {
return startTime;
}
@ -133,6 +139,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
Entry entry = (Entry) o;
if (includeGlobalState != entry.includeGlobalState) return false;
if (partial != entry.partial) return false;
if (startTime != entry.startTime) return false;
if (!indices.equals(entry.indices)) return false;
if (!shards.equals(entry.shards)) return false;
@ -148,6 +155,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
int result = state.hashCode();
result = 31 * result + snapshotId.hashCode();
result = 31 * result + (includeGlobalState ? 1 : 0);
result = 31 * result + (partial ? 1 : 0);
result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
@ -360,6 +368,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
for (int i = 0; i < entries.length; i++) {
SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
boolean includeGlobalState = in.readBoolean();
boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>();
@ -375,7 +384,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
}
entries[i] = new Entry(snapshotId, includeGlobalState, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
entries[i] = new Entry(snapshotId, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
}
return new SnapshotsInProgress(entries);
}
@ -386,6 +395,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
for (Entry entry : entries) {
entry.snapshotId().writeTo(out);
out.writeBoolean(entry.includeGlobalState());
out.writeBoolean(entry.partial());
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (String index : entry.indices()) {
@ -406,6 +416,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
static final XContentBuilderString PARTIAL = new XContentBuilderString("partial");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
@ -431,6 +442,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field(Fields.PARTIAL, entry.partial());
builder.field(Fields.STATE, entry.state());
builder.startArray(Fields.INDICES);
{

View File

@ -34,11 +34,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -67,7 +68,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
public void deleteIndices(final Request request, final Listener userListener) {
Collection<String> indices = Arrays.asList(request.indices);
Set<String> indices = Sets.newHashSet(request.indices);
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) {
@ -84,6 +85,9 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
@Override
public ClusterState execute(final ClusterState currentState) {
// Check if index deletion conflicts with any running snapshots
SnapshotsService.checkIndexDeletion(currentState, indices);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

View File

@ -19,14 +19,12 @@
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -39,8 +37,9 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import java.util.ArrayList;
import java.util.Arrays;
@ -99,27 +98,10 @@ public class MetaDataIndexStateService extends AbstractComponent {
return currentState;
}
// Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
// is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) {
Set<String> indicesToFail = null;
for (RestoreInProgress.Entry entry : restore.entries()) {
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indicesToClose.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
}
}
// Check if index closing conflicts with any running restores
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);

View File

@ -774,6 +774,32 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
return false;
}
/**
* Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
* is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
*/
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) {
Set<String> indicesToFail = null;
for (RestoreInProgress.Entry entry : restore.entries()) {
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indices.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
}
}
}
/**
* Adds restore completion listener
* <p>

View File

@ -206,7 +206,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndices(currentState, request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices);
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), request.partial(), State.INIT, indices, System.currentTimeMillis(), null);
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
// TODO: What should we do if a snapshot is already running?
@ -228,7 +228,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
@Override
public void run() {
beginSnapshot(newState, newSnapshot, request.partial, listener);
beginSnapshot(newState, newSnapshot, request.partial(), listener);
}
});
}
@ -1061,6 +1061,63 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return builder.build();
}
/**
* Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being
* snapshotted (with partial == false) makes the snapshot fail.
*/
public static void checkIndexDeletion(ClusterState currentState, Set<String> indices) {
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot delete indices that are being snapshotted: " + indicesToFail +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
}
/**
* Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being
* snapshotted (with partial == false) makes the snapshot fail.
*/
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being snapshotted: " + indicesToFail +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
}
private static Set<String> indicesToFailForCloseOrDeletion(ClusterState currentState, Set<String> indices) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
Set<String> indicesToFail = null;
if (snapshots != null) {
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.partial() == false) {
if (entry.state() == State.INIT) {
for (String index : entry.indices()) {
if (indices.contains(index)) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(index);
}
}
} else {
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indices.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
}
}
}
return indicesToFail;
}
/**
* Adds snapshot completion listener
*
@ -1302,6 +1359,15 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return includeGlobalState;
}
/**
* Returns true if partial snapshot should be allowed
*
* @return true if partial snapshot should be allowed
*/
public boolean partial() {
return partial;
}
/**
* Returns master node timeout
*

View File

@ -639,6 +639,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
return new SnapshotsInProgress(new SnapshotsInProgress.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
randomBoolean(),
randomBoolean(),
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.<String>emptyList(),
Math.abs(randomLong()),

View File

@ -1813,12 +1813,15 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
public void testDeleteIndexDuringSnapshot() throws Exception {
public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
Client client = client();
boolean allowPartial = randomBoolean();
logger.info("--> creating repository");
// only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing
boolean initBlocking = allowPartial || randomBoolean();
if (initBlocking) {
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.settingsBuilder()
.put("location", randomRepoPath())
@ -1826,6 +1829,15 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_init", true)
));
} else {
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.settingsBuilder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true)
));
}
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
@ -1845,23 +1857,59 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
ListenableActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute();
logger.info("--> wait for block to kick in");
if (initBlocking) {
waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1));
logger.info("--> delete some indices while snapshot is running");
client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get();
} else {
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
}
if (allowPartial) {
// partial snapshots allow close / delete operations
if (randomBoolean()) {
logger.info("--> delete index while partial snapshot is running");
client.admin().indices().prepareDelete("test-idx-1").get();
} else {
logger.info("--> close index while partial snapshot is running");
client.admin().indices().prepareClose("test-idx-1").get();
}
} else {
// non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed
if (randomBoolean()) {
try {
logger.info("--> delete index while non-partial snapshot is running");
client.admin().indices().prepareDelete("test-idx-1").get();
fail("Expected deleting index to fail during snapshot");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [test-idx-1]"));
}
} else {
try {
logger.info("--> close index while non-partial snapshot is running");
client.admin().indices().prepareClose("test-idx-1").get();
fail("Expected closing index to fail during snapshot");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [test-idx-1]"));
}
}
}
if (initBlocking) {
logger.info("--> unblock running master node");
unblockNode(internalCluster().getMasterName());
} else {
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
}
logger.info("--> waiting for snapshot to finish");
CreateSnapshotResponse createSnapshotResponse = future.get();
if (allowPartial) {
logger.info("Deleted index during snapshot, but allow partial");
logger.info("Deleted/Closed index during snapshot, but allow partial");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL)));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(createSnapshotResponse.getSnapshotInfo().totalShards()));
} else {
logger.info("Deleted index during snapshot and doesn't allow partial");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.FAILED)));
logger.info("Snapshot successfully completed");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
}
}
@ -1960,7 +2008,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
shards.put(new ShardId("test-idx", "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId("test-idx", "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
List<Entry> entries = new ArrayList<>();
entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, State.ABORTED, Collections.singletonList("test-idx"), System.currentTimeMillis(), shards.build()));
entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, false, State.ABORTED, Collections.singletonList("test-idx"), System.currentTimeMillis(), shards.build()));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build();
}

View File

@ -21,6 +21,7 @@ your application to Elasticsearch 5.0.
* <<breaking_50_scripting>>
* <<breaking_50_term_vectors>>
* <<breaking_50_security>>
* <<breaking_50_snapshot_restore>>
[[breaking_50_search_changes]]
=== Warmers
@ -844,3 +845,12 @@ distributed document frequencies anymore.
The option to disable the security manager `--security.manager.enabled` has been removed. In order to grant special
permissions to elasticsearch users must tweak the local Java Security Policy.
[[breaking_50_snapshot_restore]]
=== Snapshot/Restore
==== Closing / deleting indices while running snapshot
In previous versions of Elasticsearch, closing or deleting an index during a full snapshot would make the snapshot fail. This is now changed
by failing the close/delete index request instead. The behavior for partial snapshots remains unchanged: Closing or deleting an index during
a partial snapshot is still possible. The snapshot result is then marked as partial.