Merge pull request #16933 from ywelsch/fix/snaprestore-fails-close
Prevent closing index during snapshot restore
This commit is contained in:
commit
20f5255670
|
@ -19,12 +19,14 @@
|
|||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
|
@ -37,11 +39,14 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Service responsible for submitting open/close index requests
|
||||
|
@ -78,7 +83,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
List<String> indicesToClose = new ArrayList<>();
|
||||
Set<String> indicesToClose = new HashSet<>();
|
||||
for (String index : request.indices()) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
|
@ -94,6 +99,28 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
return currentState;
|
||||
}
|
||||
|
||||
// Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
|
||||
// is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
|
||||
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (restore != null) {
|
||||
Set<String> indicesToFail = null;
|
||||
for (RestoreInProgress.Entry entry : restore.entries()) {
|
||||
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
|
||||
if (!shard.value.state().completed()) {
|
||||
if (indicesToClose.contains(shard.key.getIndexName())) {
|
||||
if (indicesToFail == null) {
|
||||
indicesToFail = new HashSet<>();
|
||||
}
|
||||
indicesToFail.add(shard.key.getIndexName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (indicesToFail != null) {
|
||||
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("closing indices [{}]", indicesAsString);
|
||||
|
||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||
|
|
|
@ -137,6 +137,32 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
public static void blockAllDataNodes(String repository) {
|
||||
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
|
||||
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
|
||||
}
|
||||
}
|
||||
|
||||
public static void unblockAllDataNodes(String repository) {
|
||||
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
|
||||
((MockRepository)repositoriesService.repository(repository)).unblock();
|
||||
}
|
||||
}
|
||||
|
||||
public void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException {
|
||||
if (false == awaitBusy(() -> {
|
||||
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
|
||||
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
|
||||
if (mockRepository.blocked()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}, timeout.millis(), TimeUnit.MILLISECONDS)) {
|
||||
fail("Timeout waiting for repository block on any data node!!!");
|
||||
}
|
||||
}
|
||||
|
||||
public static void unblockNode(String node) {
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository("test-repo")).unblock();
|
||||
}
|
||||
|
|
|
@ -1865,6 +1865,66 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
}
|
||||
}
|
||||
|
||||
public void testCloseIndexDuringRestore() throws Exception {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("mock").setSettings(Settings.settingsBuilder()
|
||||
.put("location", randomRepoPath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
|
||||
));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2");
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().totalHits(), equalTo(100L));
|
||||
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().totalHits(), equalTo(100L));
|
||||
|
||||
logger.info("--> snapshot");
|
||||
assertThat(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setIndices("test-idx-*").setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
||||
logger.info("--> deleting indices before restoring");
|
||||
assertAcked(client.admin().indices().prepareDelete("test-idx-*").get());
|
||||
|
||||
blockAllDataNodes("test-repo");
|
||||
logger.info("--> execution will be blocked on all data nodes");
|
||||
|
||||
logger.info("--> start restore");
|
||||
ListenableActionFuture<RestoreSnapshotResponse> restoreFut = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true)
|
||||
.execute();
|
||||
|
||||
logger.info("--> waiting for block to kick in");
|
||||
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueSeconds(60));
|
||||
|
||||
logger.info("--> close index while restore is running");
|
||||
try {
|
||||
client.admin().indices().prepareClose("test-idx-1").get();
|
||||
fail("Expected closing index to fail during restore");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("Cannot close indices that are being restored: [test-idx-1]"));
|
||||
}
|
||||
|
||||
logger.info("--> unblocking all data nodes");
|
||||
unblockAllDataNodes("test-repo");
|
||||
|
||||
logger.info("--> wait for restore to finish");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = restoreFut.get();
|
||||
logger.info("--> check that all shards were recovered");
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testDeleteOrphanSnapshot() throws Exception {
|
||||
Client client = client();
|
||||
|
||||
|
|
Loading…
Reference in New Issue