Use transport blocking to make relocation take forever instead of relying on the relocation to take long enough to clash with the snapshot. Closes #61069
This commit is contained in:
parent
843122ccce
commit
3143b5ea47
|
@ -69,6 +69,7 @@ import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.index.seqno.RetentionLeaseActions;
|
import org.elasticsearch.index.seqno.RetentionLeaseActions;
|
||||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
@ -88,6 +89,7 @@ import org.elasticsearch.test.TestCustomMetadata;
|
||||||
import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
|
import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
|
||||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||||
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.transport.TransportMessageListener;
|
import org.elasticsearch.transport.TransportMessageListener;
|
||||||
import org.elasticsearch.transport.TransportRequest;
|
import org.elasticsearch.transport.TransportRequest;
|
||||||
import org.elasticsearch.transport.TransportRequestOptions;
|
import org.elasticsearch.transport.TransportRequestOptions;
|
||||||
|
@ -176,7 +178,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
return Arrays.asList(MockRepository.Plugin.class, TestCustomMetadataPlugin.class, BrokenSettingPlugin.class);
|
return Arrays.asList(MockRepository.Plugin.class, TestCustomMetadataPlugin.class, BrokenSettingPlugin.class,
|
||||||
|
MockTransportService.TestPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class BrokenSettingPlugin extends Plugin {
|
public static class BrokenSettingPlugin extends Plugin {
|
||||||
|
@ -1310,6 +1313,51 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
assertThat(snapshot3.state(), is(SnapshotState.SUCCESS));
|
assertThat(snapshot3.state(), is(SnapshotState.SUCCESS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSnapshotDeleteRelocatingPrimaryIndex() throws Exception {
|
||||||
|
internalCluster().startMasterOnlyNode();
|
||||||
|
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
|
||||||
|
final String repoName = "test-repo";
|
||||||
|
createRepository(repoName, "fs");
|
||||||
|
|
||||||
|
// Create index on two nodes and make sure each node has a primary by setting no replicas
|
||||||
|
final String indexName = "test-idx";
|
||||||
|
assertAcked(prepareCreate(indexName, 2, indexSettingsNoReplicas(between(2, 10))));
|
||||||
|
ensureGreen(indexName);
|
||||||
|
indexRandomDocs(indexName, 100);
|
||||||
|
|
||||||
|
// Drop all file chunk requests so that below relocation takes forever and we're guaranteed to run the snapshot in parallel to it
|
||||||
|
for (String nodeName : dataNodes) {
|
||||||
|
((MockTransportService) internalCluster().getInstance(TransportService.class, nodeName)).addSendBehavior(
|
||||||
|
(connection, requestId, action, request, options) -> {
|
||||||
|
if (PeerRecoveryTargetService.Actions.FILE_CHUNK.equals(action)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
connection.sendRequest(requestId, action, request, options);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> start relocations");
|
||||||
|
allowNodes(indexName, 1);
|
||||||
|
|
||||||
|
logger.info("--> wait for relocations to start");
|
||||||
|
|
||||||
|
assertBusy(() -> assertThat(
|
||||||
|
client().admin().cluster().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)),
|
||||||
|
1L, TimeUnit.MINUTES);
|
||||||
|
|
||||||
|
logger.info("--> snapshot");
|
||||||
|
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
|
||||||
|
.setWaitForCompletion(false).setPartial(true).setIndices(indexName).get();
|
||||||
|
|
||||||
|
assertAcked(client().admin().indices().prepareDelete(indexName));
|
||||||
|
|
||||||
|
logger.info("--> wait for snapshot to complete");
|
||||||
|
SnapshotInfo snapshotInfo = waitForCompletion(repoName, "test-snap", TimeValue.timeValueSeconds(600));
|
||||||
|
assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
|
||||||
|
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
|
||||||
|
logger.info("--> done");
|
||||||
|
}
|
||||||
|
|
||||||
private long calculateTotalFilesSize(List<Path> files) {
|
private long calculateTotalFilesSize(List<Path> files) {
|
||||||
return files.stream().mapToLong(f -> {
|
return files.stream().mapToLong(f -> {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1937,38 +1937,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
logger.info("--> done");
|
logger.info("--> done");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotDeleteRelocatingPrimaryIndex() throws Exception {
|
|
||||||
final String repoName = "test-repo";
|
|
||||||
createRepository(repoName, "fs");
|
|
||||||
|
|
||||||
// Create index on two nodes and make sure each node has a primary by setting no replicas
|
|
||||||
final String indexName = "test-idx";
|
|
||||||
assertAcked(prepareCreate(indexName, 2, indexSettingsNoReplicas(between(2, 10))));
|
|
||||||
ensureGreen(indexName);
|
|
||||||
indexRandomDocs(indexName, 100);
|
|
||||||
|
|
||||||
logger.info("--> start relocations");
|
|
||||||
allowNodes(indexName, 1);
|
|
||||||
|
|
||||||
logger.info("--> wait for relocations to start");
|
|
||||||
|
|
||||||
assertBusy(() -> assertThat(
|
|
||||||
client().admin().cluster().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)),
|
|
||||||
1L, TimeUnit.MINUTES);
|
|
||||||
|
|
||||||
logger.info("--> snapshot");
|
|
||||||
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
|
|
||||||
.setWaitForCompletion(false).setPartial(true).setIndices(indexName).get();
|
|
||||||
|
|
||||||
assertAcked(client().admin().indices().prepareDelete(indexName));
|
|
||||||
|
|
||||||
logger.info("--> wait for snapshot to complete");
|
|
||||||
SnapshotInfo snapshotInfo = waitForCompletion(repoName, "test-snap", TimeValue.timeValueSeconds(600));
|
|
||||||
assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
|
|
||||||
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
|
|
||||||
logger.info("--> done");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testSnapshotMoreThanOnce() throws InterruptedException {
|
public void testSnapshotMoreThanOnce() throws InterruptedException {
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue