There is no point in writing out snapshots that contain no data that can be restored whatsoever. It may have made sense to do so in the past when there was an `INIT` snapshot step that wrote data to the repository that would've other become unreferenced, but in the current day state machine without the `INIT` step there is no point in doing so.
This commit is contained in:
parent
e1014038e9
commit
06d94cbb2a
|
@ -44,7 +44,6 @@ import org.elasticsearch.cluster.ClusterStateListener;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -510,11 +509,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
createRepository("test-repo", "fs");
|
||||
|
||||
logger.info("--> start snapshot with default settings without a closed index - should fail");
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
|
||||
final SnapshotException sne = expectThrows(SnapshotException.class,
|
||||
() -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
|
||||
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
|
||||
.setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().reason(), containsString("Indices don't have primary shards"));
|
||||
.setWaitForCompletion(true).execute().actionGet());
|
||||
assertThat(sne.getMessage(), containsString("Indices don't have primary shards"));
|
||||
|
||||
if (randomBoolean()) {
|
||||
logger.info("checking snapshot completion using status");
|
||||
|
@ -534,8 +533,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
|
||||
assertThat(snapshotStatuses.size(), equalTo(1));
|
||||
SnapshotStatus snapshotStatus = snapshotStatuses.get(0);
|
||||
logger.info("State: [{}], Reason: [{}]",
|
||||
createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().reason());
|
||||
|
||||
assertThat(snapshotStatus.getShardsStats().getTotalShards(), equalTo(22));
|
||||
assertThat(snapshotStatus.getShardsStats().getDoneShards(), lessThan(16));
|
||||
assertThat(snapshotStatus.getShardsStats().getDoneShards(), greaterThan(10));
|
||||
|
@ -552,7 +550,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
}, 1, TimeUnit.MINUTES);
|
||||
} else {
|
||||
logger.info("checking snapshot completion using wait_for_completion flag");
|
||||
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
|
||||
final CreateSnapshotResponse createSnapshotResponse =
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
|
||||
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
|
||||
.setWaitForCompletion(true).setPartial(true).execute().actionGet();
|
||||
logger.info("State: [{}], Reason: [{}]",
|
||||
|
@ -833,56 +832,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
}
|
||||
}
|
||||
|
||||
public void testMasterShutdownDuringFailedSnapshot() throws Exception {
|
||||
logger.info("--> starting two master nodes and two data nodes");
|
||||
internalCluster().startMasterOnlyNodes(2);
|
||||
internalCluster().startDataOnlyNodes(2);
|
||||
|
||||
final Path repoPath = randomRepoPath();
|
||||
createRepository("test-repo", "mock", repoPath);
|
||||
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||
|
||||
assertAcked(prepareCreate("test-idx", 0, indexSettingsNoReplicas(6)));
|
||||
ensureGreen();
|
||||
indexRandomDocs("test-idx", randomIntBetween(50, 100));
|
||||
|
||||
logger.info("--> stopping random data node, which should cause shards to go missing");
|
||||
internalCluster().stopRandomDataNode();
|
||||
assertBusy(() ->
|
||||
assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile("test-repo");
|
||||
|
||||
logger.info("--> snapshot");
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
logger.info("--> waiting for block to kick in on " + masterNode);
|
||||
waitForBlock(masterNode, "test-repo", TimeValue.timeValueSeconds(60));
|
||||
|
||||
logger.info("--> stopping master node");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
assertBusy(() -> {
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
||||
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
|
||||
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertTrue(snapshotInfo.state().completed());
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
SnapshotsInProgress snapshotsInProgress = clusterState.custom(SnapshotsInProgress.TYPE);
|
||||
assertEquals(0, snapshotsInProgress.entries().size());
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
logger.info("--> verify that snapshot failed");
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
||||
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(SnapshotState.FAILED, snapshotInfo.state());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a shrunken index (created via the shrink APIs) and subsequently snapshotted
|
||||
* can be restored when the node the shrunken index was created on is no longer part of
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.RestoreInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
|
@ -58,7 +57,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
|
@ -142,7 +140,6 @@ import static org.hamcrest.Matchers.lessThan;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
// The tests in here do a lot of state updates and other writes to disk and are slowed down too much by WindowsFS
|
||||
@LuceneTestCase.SuppressFileSystems(value = "WindowsFS")
|
||||
|
@ -1111,6 +1108,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
}
|
||||
|
||||
public void testUnallocatedShards() {
|
||||
disableRepoConsistencyCheck("This test intentionally leaves an empty repository");
|
||||
createRepository("test-repo", "fs");
|
||||
|
||||
logger.info("--> creating index that cannot be allocated");
|
||||
|
@ -1118,12 +1116,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
.put("index.number_of_shards", 3)).setWaitForActiveShards(ActiveShardCount.NONE).get();
|
||||
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true).setIndices("test-idx").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.FAILED));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(3));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().reason(), startsWith("Indices don't have primary shards"));
|
||||
final SnapshotException sne = expectThrows(SnapshotException.class,
|
||||
() -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true).setIndices("test-idx").get());
|
||||
assertThat(sne.getMessage(), containsString("Indices don't have primary shards"));
|
||||
assertThat(getRepositoryData("test-repo"), is(RepositoryData.EMPTY));
|
||||
}
|
||||
|
||||
public void testDeleteSnapshot() throws Exception {
|
||||
|
@ -1825,7 +1822,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus("test-repo").execute().actionGet();
|
||||
assertThat(response.getSnapshots().size(), equalTo(1));
|
||||
SnapshotStatus snapshotStatus = response.getSnapshots().get(0);
|
||||
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
|
||||
assertThat(snapshotStatus.getState(), equalTo(State.STARTED));
|
||||
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
|
||||
|
||||
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
|
||||
|
@ -1840,7 +1837,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
response = client.admin().cluster().prepareSnapshotStatus().execute().actionGet();
|
||||
assertThat(response.getSnapshots().size(), equalTo(1));
|
||||
snapshotStatus = response.getSnapshots().get(0);
|
||||
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
|
||||
assertThat(snapshotStatus.getState(), equalTo(State.STARTED));
|
||||
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
|
||||
|
||||
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
|
||||
|
@ -2951,64 +2948,21 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
|
||||
}
|
||||
|
||||
public void testSnapshotStatusOnFailedIndex() throws Exception {
|
||||
logger.info("--> creating repository");
|
||||
final Path repoPath = randomRepoPath();
|
||||
final Client client = client();
|
||||
assertAcked(client.admin().cluster()
|
||||
.preparePutRepository("test-repo")
|
||||
.setType("fs")
|
||||
.setVerify(false)
|
||||
.setSettings(Settings.builder().put("location", repoPath)));
|
||||
public void testSnapshotStatusOnFailedSnapshot() throws Exception {
|
||||
String repoName = "test-repo";
|
||||
createRepository(repoName, "fs");
|
||||
final String snapshot = "test-snap-1";
|
||||
addBwCFailedSnapshot(repoName, snapshot, Collections.emptyMap());
|
||||
|
||||
logger.info("--> creating good index");
|
||||
assertAcked(prepareCreate("test-idx-good").setSettings(indexSettingsNoReplicas(1)));
|
||||
ensureGreen();
|
||||
indexRandomDocs("test-idx-good", randomIntBetween(1, 5));
|
||||
logger.info("--> creating bad index");
|
||||
assertAcked(prepareCreate("test-idx-bad")
|
||||
.setWaitForActiveShards(ActiveShardCount.NONE)
|
||||
.setSettings(indexSettingsNoReplicas(1)
|
||||
// set shard allocation to none so the primary cannot be
|
||||
// allocated - simulates a "bad" index that fails to snapshot
|
||||
.put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")));
|
||||
|
||||
logger.info("--> snapshot bad index and get status");
|
||||
client.admin().cluster()
|
||||
.prepareCreateSnapshot("test-repo", "test-snap1")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx-bad")
|
||||
.get();
|
||||
SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster()
|
||||
.prepareSnapshotStatus("test-repo")
|
||||
.setSnapshots("test-snap1")
|
||||
.get();
|
||||
final SnapshotsStatusResponse snapshotsStatusResponse =
|
||||
client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshot).get();
|
||||
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
|
||||
assertEquals(State.FAILED, snapshotsStatusResponse.getSnapshots().get(0).getState());
|
||||
|
||||
logger.info("--> snapshot both good and bad index and get status");
|
||||
client.admin().cluster()
|
||||
.prepareCreateSnapshot("test-repo", "test-snap2")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx-good", "test-idx-bad")
|
||||
.get();
|
||||
snapshotsStatusResponse = client.admin().cluster()
|
||||
.prepareSnapshotStatus("test-repo")
|
||||
.setSnapshots("test-snap2")
|
||||
.get();
|
||||
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
|
||||
// verify a FAILED status is returned instead of a 500 status code
|
||||
// see https://github.com/elastic/elasticsearch/issues/23716
|
||||
SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(State.FAILED, snapshotStatus.getState());
|
||||
for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getShards()) {
|
||||
assertEquals(SnapshotIndexShardStage.FAILURE, shardStatus.getStage());
|
||||
if (shardStatus.getIndex().equals("test-idx-good")) {
|
||||
assertEquals("skipped", shardStatus.getFailure());
|
||||
} else {
|
||||
assertEquals("primary shard is not allocated", shardStatus.getFailure());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetSnapshotsFromIndexBlobOnly() throws Exception {
|
||||
|
|
|
@ -281,7 +281,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
|
|||
startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState, userMetadata);
|
||||
}
|
||||
|
||||
private SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, String reason,
|
||||
SnapshotInfo(SnapshotId snapshotId, List<String> indices, List<String> dataStreams, SnapshotState state, String reason,
|
||||
Version version, long startTime, long endTime, int totalShards, int successfulShards,
|
||||
List<SnapshotShardFailure> shardFailures, Boolean includeGlobalState, Map<String, Object> userMetadata) {
|
||||
this.snapshotId = Objects.requireNonNull(snapshotId);
|
||||
|
|
|
@ -373,19 +373,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
if (missing.isEmpty() == false) {
|
||||
// TODO: We should just throw here instead of creating a FAILED and hence useless snapshot in the repository
|
||||
newEntry = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), false,
|
||||
State.FAILED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
|
||||
"Indices don't have primary shards " + missing, userMeta, version);
|
||||
throw new SnapshotException(
|
||||
new Snapshot(repositoryName, snapshotId), "Indices don't have primary shards " + missing);
|
||||
}
|
||||
}
|
||||
if (newEntry == null) {
|
||||
newEntry = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(),
|
||||
State.STARTED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
|
||||
null, userMeta, version);
|
||||
}
|
||||
newEntry = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(),
|
||||
State.STARTED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
|
||||
null, userMeta, version);
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
|
||||
SnapshotsInProgress.of(Collections.singletonList(newEntry))).build();
|
||||
}
|
||||
|
|
|
@ -224,6 +224,7 @@ import static org.hamcrest.Matchers.contains;
|
|||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -266,11 +267,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
// Create another snapshot and then clean up the repository to verify that the repository works correctly no matter the
|
||||
// failures seen during the previous test.
|
||||
client().admin().cluster().prepareCreateSnapshot("repo", "last-snapshot")
|
||||
.setWaitForCompletion(true).execute(createSnapshotResponse);
|
||||
.setWaitForCompletion(true).setPartial(true).execute(createSnapshotResponse);
|
||||
continueOrDie(createSnapshotResponse, r -> {
|
||||
final SnapshotInfo snapshotInfo = r.getSnapshotInfo();
|
||||
// Snapshot can fail because some tests leave indices in a red state because data nodes were stopped
|
||||
assertThat(snapshotInfo.state(), either(is(SnapshotState.SUCCESS)).or(is(SnapshotState.FAILED)));
|
||||
// Snapshot can be partial because some tests leave indices in a red state because data nodes were stopped
|
||||
assertThat(snapshotInfo.state(), either(is(SnapshotState.SUCCESS)).or(is(SnapshotState.PARTIAL)));
|
||||
assertThat(snapshotInfo.shardFailures(), iterableWithSize(snapshotInfo.failedShards()));
|
||||
assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards() - snapshotInfo.failedShards()));
|
||||
client().admin().cluster().cleanupRepository(new CleanupRepositoryRequest("repo"), cleanupResponse);
|
||||
|
@ -376,6 +377,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
|
||||
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
final boolean partial = randomBoolean();
|
||||
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
|
||||
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
||||
scheduleNow(this::disconnectRandomDataNode);
|
||||
|
@ -384,10 +386,12 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
||||
}
|
||||
testClusterNodes.randomMasterNodeSafe().client.admin().cluster()
|
||||
.prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener);
|
||||
.prepareCreateSnapshot(repoName, snapshotName).setPartial(partial).execute(createSnapshotResponseStepListener);
|
||||
});
|
||||
|
||||
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
|
||||
final AtomicBoolean snapshotNeverStarted = new AtomicBoolean(false);
|
||||
|
||||
createSnapshotResponseStepListener.whenComplete(createSnapshotResponse -> {
|
||||
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
||||
scheduleNow(this::disconnectOrRestartDataNode);
|
||||
}
|
||||
|
@ -401,9 +405,21 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
} else if (randomBoolean()) {
|
||||
scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
||||
}
|
||||
}, e -> {
|
||||
if (partial == false) {
|
||||
final SnapshotException unwrapped = (SnapshotException) ExceptionsHelper.unwrap(e, SnapshotException.class);
|
||||
assertNotNull(unwrapped);
|
||||
assertThat(unwrapped.getMessage(), endsWith("Indices don't have primary shards [test]"));
|
||||
snapshotNeverStarted.set(true);
|
||||
} else {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
|
||||
if (snapshotNeverStarted.get()) {
|
||||
return true;
|
||||
}
|
||||
final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty();
|
||||
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));
|
||||
|
@ -412,11 +428,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
|
||||
final TestClusterNodes.TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
|
||||
.orElseThrow(() -> new AssertionError("expected to find at least one active master node"));
|
||||
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
SnapshotsInProgress finalSnapshotsInProgress =
|
||||
randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
||||
assertThat(finalSnapshotsInProgress.entries(), empty());
|
||||
final Repository repository = randomMaster.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
if (snapshotNeverStarted.get()) {
|
||||
assertThat(snapshotIds, empty());
|
||||
} else {
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSnapshotDeleteWithMasterFailover() {
|
||||
|
|
|
@ -29,9 +29,12 @@ import org.elasticsearch.cluster.ClusterStateObserver;
|
|||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -46,6 +49,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
@ -65,10 +69,13 @@ import java.nio.file.attribute.BasicFileAttributes;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
@ -375,6 +382,34 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
assertEquals(getCountForIndex(index), count);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a snapshot in state {@link SnapshotState#FAILED} to the given repository.
|
||||
*
|
||||
* @param repoName repository to add snapshot to
|
||||
* @param snapshotName name for the new failed snapshot
|
||||
* @param metadata snapshot metadata to write (as returned by {@link SnapshotInfo#userMetadata()})
|
||||
*/
|
||||
protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<String, Object> metadata) throws Exception {
|
||||
final ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
final RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
|
||||
assertNotNull(repositoriesMetadata);
|
||||
final RepositoryMetadata initialRepoMetadata = repositoriesMetadata.repository(repoName);
|
||||
assertNotNull(initialRepoMetadata);
|
||||
assertThat("We can only manually insert a snapshot into a repository that does not have a generation tracked in the CS",
|
||||
initialRepoMetadata.generation(), is(RepositoryData.UNKNOWN_REPO_GEN));
|
||||
final Repository repo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID(random()));
|
||||
logger.info("--> adding old version FAILED snapshot [{}] to repository [{}]", snapshotId, repoName);
|
||||
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
||||
Collections.emptyList(), Collections.emptyList(),
|
||||
SnapshotState.FAILED, "failed on purpose",
|
||||
SnapshotsService.OLD_SNAPSHOT_FORMAT, 0L,0L, 0, 0, Collections.emptyList(),
|
||||
randomBoolean(), metadata);
|
||||
PlainActionFuture.<RepositoryData, Exception>get(f -> repo.finalizeSnapshot(
|
||||
ShardGenerations.EMPTY, getRepositoryData(repoName).getGenId(), state.metadata(), snapshotInfo,
|
||||
SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f));
|
||||
}
|
||||
|
||||
protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
|
||||
logger.info("--> verify no more operations in the cluster state");
|
||||
awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
|
||||
|
|
|
@ -297,27 +297,33 @@ public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase
|
|||
// Create a failed snapshot
|
||||
AtomicReference<String> failedSnapshotName = new AtomicReference<>();
|
||||
{
|
||||
logger.info("--> stopping random data node, which should cause shards to go missing");
|
||||
internalCluster().stopRandomDataNode();
|
||||
assertBusy(() ->
|
||||
assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
||||
30, TimeUnit.SECONDS);
|
||||
if (partialSuccess) {
|
||||
logger.info("--> stopping random data node, which should cause shards to go missing");
|
||||
internalCluster().stopRandomDataNode();
|
||||
assertBusy(() -> assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
|
||||
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
|
||||
|
||||
logger.info("--> start snapshot");
|
||||
ActionFuture<ExecuteSnapshotLifecycleAction.Response> snapshotFuture = client()
|
||||
.execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId));
|
||||
logger.info("--> start snapshot");
|
||||
ActionFuture<ExecuteSnapshotLifecycleAction.Response> snapshotFuture = client()
|
||||
.execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId));
|
||||
|
||||
logger.info("--> waiting for block to kick in on " + masterNode);
|
||||
waitForBlock(masterNode, REPO, TimeValue.timeValueSeconds(60));
|
||||
logger.info("--> waiting for block to kick in on " + masterNode);
|
||||
waitForBlock(masterNode, REPO, TimeValue.timeValueSeconds(60));
|
||||
|
||||
logger.info("--> stopping master node");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
logger.info("--> stopping master node");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
failedSnapshotName.set(snapshotFuture.get().getSnapshotName());
|
||||
assertNotNull(failedSnapshotName.get());
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
failedSnapshotName.set(snapshotFuture.get().getSnapshotName());
|
||||
assertNotNull(failedSnapshotName.get());
|
||||
} else {
|
||||
final String snapshotName = "failed-snapshot-1";
|
||||
addBwCFailedSnapshot(REPO, snapshotName,
|
||||
Collections.singletonMap(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD, policyId));
|
||||
failedSnapshotName.set(snapshotName);
|
||||
}
|
||||
|
||||
logger.info("--> verify that snapshot [{}] is {}", failedSnapshotName.get(), expectedUnsuccessfulState);
|
||||
assertBusy(() -> {
|
||||
|
|
Loading…
Reference in New Issue