Check again on-going snapshots/restores of indices before closing (#43873)

Today we prevent any index that is actively snapshotted or restored to be closed. 
This verification is done during the execution of the first phase of index closing 
(ie before blocking the indices).

We should also do this verification again in the last phase of index closing 
(ie after the shard sanity checks and right before actually changing the index 
state and the routing table) because a snapshot/restore could sneak in while
 the shards are verified-before-close.
This commit is contained in:
Tanguy Leroux 2019-07-08 17:06:21 +02:00
parent 0c8294e633
commit 251287f89d
5 changed files with 119 additions and 76 deletions

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.SnapshotsService;
import java.util.Arrays; import java.util.Arrays;
@ -90,9 +91,15 @@ public class MetaDataDeleteIndexService {
*/ */
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) { public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
final MetaData meta = currentState.metaData(); final MetaData meta = currentState.metaData();
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet()); final Set<Index> indicesToDelete = indices.stream().map(i -> meta.getIndexSafe(i).getIndex()).collect(toSet());
// Check if index deletion conflicts with any running snapshots // Check if index deletion conflicts with any running snapshots
SnapshotsService.checkIndexDeletion(currentState, metaDatas); Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
if (snapshottingIndices.isEmpty() == false) {
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + snapshottingIndices +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.builder(meta); MetaData.Builder metaDataBuilder = MetaData.builder(meta);
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks()); ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

View File

@ -68,6 +68,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -86,6 +87,7 @@ import java.util.Set;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.singleton;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
/** /**
@ -230,11 +232,11 @@ public class MetaDataIndexStateService {
final ClusterState currentState) { final ClusterState currentState) {
final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final Set<IndexMetaData> indicesToClose = new HashSet<>(); final Set<Index> indicesToClose = new HashSet<>();
for (Index index : indices) { for (Index index : indices) {
final IndexMetaData indexMetaData = metadata.getSafe(index); final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
indicesToClose.add(indexMetaData); indicesToClose.add(index);
} else { } else {
logger.debug("index {} is already closed, ignoring", index); logger.debug("index {} is already closed, ignoring", index);
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
@ -246,16 +248,22 @@ public class MetaDataIndexStateService {
} }
// Check if index closing conflicts with any running restores // Check if index closing conflicts with any running restores
RestoreService.checkIndexClosing(currentState, indicesToClose); Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, indicesToClose);
if (restoringIndices.isEmpty() == false) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + restoringIndices);
}
// Check if index closing conflicts with any running snapshots // Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose); Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToClose);
if (snapshottingIndices.isEmpty() == false) {
throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + snapshottingIndices +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData indexToClose : indicesToClose) { for (Index index : indicesToClose) {
final Index index = indexToClose.getIndex();
ClusterBlock indexBlock = null; ClusterBlock indexBlock = null;
final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName()); final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
if (clusterBlocks != null) { if (clusterBlocks != null) {
@ -453,6 +461,24 @@ public class MetaDataIndexStateService {
continue; continue;
} }
// Check if index closing conflicts with any running restores
Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, singleton(index));
if (restoringIndices.isEmpty() == false) {
closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException(
"verification of shards before closing " + index + " succeeded but index is being restored in the meantime")));
logger.debug("verification of shards before closing {} succeeded but index is being restored in the meantime", index);
continue;
}
// Check if index closing conflicts with any running snapshots
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, singleton(index));
if (snapshottingIndices.isEmpty() == false) {
closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException(
"verification of shards before closing " + index + " succeeded but index is being snapshot in the meantime")));
logger.debug("verification of shards before closing {} succeeded but index is being snapshot in the meantime", index);
continue;
}
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE); final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);

View File

@ -83,6 +83,7 @@ import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet; import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
@ -873,30 +874,26 @@ public class RestoreService implements ClusterStateApplier {
} }
/** /**
* Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index * Returns the indices that are currently being restored and that are contained in the indices-to-check set.
* is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
*/ */
public static void checkIndexClosing(ClusterState currentState, Set<IndexMetaData> indices) { public static Set<Index> restoringIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE); final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) { if (restore == null) {
Set<Index> indicesToFail = null; return emptySet();
for (RestoreInProgress.Entry entry : restore) { }
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) { final Set<Index> indices = new HashSet<>();
IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex()); for (RestoreInProgress.Entry entry : restore) {
if (indexMetaData != null && indices.contains(indexMetaData)) { for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (indicesToFail == null) { Index index = shard.key.getIndex();
indicesToFail = new HashSet<>(); if (indicesToCheck.contains(index)
} && shard.value.state().completed() == false
indicesToFail.add(shard.key.getIndex()); && currentState.getMetaData().index(index) != null) {
} indices.add(index);
}
} }
} }
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
}
} }
return indices;
} }
@Override @Override

View File

@ -86,6 +86,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
@ -1417,62 +1418,37 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
/** /**
* Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being * Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
* snapshotted (with partial == false) makes the snapshot fail.
*/ */
public static void checkIndexDeletion(ClusterState currentState, Set<IndexMetaData> indices) { public static Set<Index> snapshottingIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
Set<Index> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (indicesToFail != null) { if (snapshots == null) {
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + indicesToFail + return emptySet();
". Try again after snapshot finishes or cancel the currently running snapshot.");
} }
}
/** final Set<Index> indices = new HashSet<>();
* Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
* snapshotted (with partial == false) makes the snapshot fail. if (entry.partial() == false) {
*/ if (entry.state() == State.INIT) {
public static void checkIndexClosing(ClusterState currentState, Set<IndexMetaData> indices) { for (IndexId index : entry.indices()) {
Set<Index> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
if (indicesToFail != null) { if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) {
throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + indicesToFail + indices.add(indexMetaData.getIndex());
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
}
private static Set<Index> indicesToFailForCloseOrDeletion(ClusterState currentState, Set<IndexMetaData> indices) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
Set<Index> indicesToFail = null;
if (snapshots != null) {
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.partial() == false) {
if (entry.state() == State.INIT) {
for (IndexId index : entry.indices()) {
IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
if (indexMetaData != null && indices.contains(indexMetaData)) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(indexMetaData.getIndex());
}
} }
} else { }
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) { } else {
if (!shard.value.state().completed()) { for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex()); Index index = shard.key.getIndex();
if (indexMetaData != null && indices.contains(indexMetaData)) { if (indicesToCheck.contains(index)
if (indicesToFail == null) { && shard.value.state().completed() == false
indicesToFail = new HashSet<>(); && currentState.getMetaData().index(index) != null) {
} indices.add(index);
indicesToFail.add(shard.key.getIndex());
}
}
} }
} }
} }
} }
} }
return indicesToFail; return indices;
} }
/** /**

View File

@ -64,6 +64,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@ -125,6 +126,42 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
} }
} }
public void testCloseRoutingTableWithRestoredIndex() {
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableWithRestoredIndex")).build();
String indexName = "restored-index";
ClusterBlock block = MetaDataIndexStateService.createIndexClosingBlock();
state = addRestoredIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
state = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().blocks(state.blocks()).addIndexBlock(indexName, block))
.build();
final Index index = state.metaData().index(indexName).getIndex();
final ClusterState updatedState =
MetaDataIndexStateService.closeRoutingTable(state, singletonMap(index, block), singletonMap(index, new IndexResult(index)))
.v1();
assertIsOpened(index.getName(), updatedState);
assertThat(updatedState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
}
public void testCloseRoutingTableWithSnapshottedIndex() {
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableWithSnapshottedIndex")).build();
String indexName = "snapshotted-index";
ClusterBlock block = MetaDataIndexStateService.createIndexClosingBlock();
state = addSnapshotIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
state = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().blocks(state.blocks()).addIndexBlock(indexName, block))
.build();
final Index index = state.metaData().index(indexName).getIndex();
final ClusterState updatedState =
MetaDataIndexStateService.closeRoutingTable(state, singletonMap(index, block), singletonMap(index, new IndexResult(index)))
.v1();
assertIsOpened(index.getName(), updatedState);
assertThat(updatedState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
}
public void testCloseRoutingTableRemovesRoutingTable() { public void testCloseRoutingTableRemovesRoutingTable() {
final Set<Index> nonBlockedIndices = new HashSet<>(); final Set<Index> nonBlockedIndices = new HashSet<>();
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>(); final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();