Simplify SnapshotResiliencyTests (#46961) (#47108)

Simplify `SnapshotResiliencyTests` to more closely
match the structure of `AbstractCoordinatorTestCase` and allow for
future drying up between the two classes:

* Make the test cluster nodes a nested-class in the test cluster itself
* Remove the needless custom network disruption implementation and
  simply track disconnected node ids like `AbstractCoordinatorTestCase`
  does
This commit is contained in:
Armin Braun 2019-09-25 14:53:11 +02:00 committed by GitHub
parent 83365e94ba
commit c4a166fc9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 353 additions and 379 deletions

View File

@ -165,7 +165,6 @@ import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
@ -240,7 +239,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
Runnable::run);
} finally {
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
}
}
@ -253,7 +252,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(0, 100);
final TestClusterNode masterNode =
final TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
@ -326,7 +325,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final String index = "test";
final int shards = randomIntBetween(1, 10);
TestClusterNode masterNode =
TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
@ -363,7 +362,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
clearDisruptionsAndAwaitSync();
final TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
final TestClusterNodes.TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
.orElseThrow(() -> new AssertionError("expected to find at least one active master node"));
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertThat(finalSnapshotsInProgress.entries(), empty());
@ -380,7 +379,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final String index = "test";
final int shards = randomIntBetween(1, 10);
TestClusterNode masterNode =
TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
@ -431,7 +430,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final int shards = randomIntBetween(1, 10);
final TestClusterNode masterNode =
final TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AdminClient masterAdminClient = masterNode.client.admin();
@ -443,8 +442,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
final TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
final TestClusterNode otherNode = testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
final TestClusterNodes.TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
final TestClusterNodes.TestClusterNode otherNode = testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode));
scheduleNow(new Runnable() {
@Override
@ -504,7 +503,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(2, 100);
TestClusterNode masterNode =
TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
@ -574,7 +573,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
assertEquals(0, snapshotInfo.failedShards());
}
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) {
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName,
String index, int shards) {
final AdminClient adminClient = masterNode.client.admin();
final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
@ -604,7 +604,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
if (randomBoolean()) {
disconnectRandomDataNode();
} else {
testClusterNodes.randomDataNode().ifPresent(TestClusterNode::restart);
testClusterNodes.randomDataNode().ifPresent(TestClusterNodes.TestClusterNode::restart);
}
}
@ -712,7 +712,10 @@ public class SnapshotResiliencyTests extends ESTestCase {
// LinkedHashMap so we have deterministic ordering when iterating over the map in tests
private final Map<String, TestClusterNode> nodes = new LinkedHashMap<>();
private final DisconnectedNodes disruptedLinks = new DisconnectedNodes();
/**
* Node ids that are disconnected from all other nodes.
*/
private final Set<String> disconnectedNodes = new HashSet<>();
TestClusterNodes(int masterNodes, int dataNodes) {
for (int i = 0; i < masterNodes; ++i) {
@ -751,7 +754,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
private TestClusterNode newNode(String nodeName, DiscoveryNodeRole role) throws IOException {
return new TestClusterNode(
new DiscoveryNode(nodeName, randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(),
Collections.singleton(role), Version.CURRENT), this::getDisruption);
Collections.singleton(role), Version.CURRENT));
}
public TestClusterNode randomMasterNodeSafe() {
@ -790,16 +793,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
}
public void disconnectNode(TestClusterNode node) {
if (disruptedLinks.disconnected.contains(node.node.getName())) {
if (disconnectedNodes.contains(node.node.getId())) {
return;
}
testClusterNodes.nodes.values().forEach(n -> n.transportService.getConnectionManager().disconnectFromNode(node.node));
disruptedLinks.disconnect(node.node.getName());
disconnectedNodes.add(node.node.getId());
}
public void clearNetworkDisruptions() {
final Set<String> disconnectedNodes = new HashSet<>(disruptedLinks.disconnected);
disruptedLinks.clear();
final Set<String> disconnectedNodes = new HashSet<>(this.disconnectedNodes);
this.disconnectedNodes.clear();
disconnectedNodes.forEach(nodeName -> {
if (testClusterNodes.nodes.containsKey(nodeName)) {
final DiscoveryNode node = testClusterNodes.nodes.get(nodeName).node;
@ -808,10 +811,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
});
}
private NetworkDisruption.DisruptedLinks getDisruption() {
return disruptedLinks;
}
/**
* Builds a {@link DiscoveryNodes} instance that holds the nodes in this test cluster.
* @return DiscoveryNodes
@ -833,7 +832,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
assertTrue(master.node.isMasterNode());
return master;
}
}
private final class TestClusterNode {
@ -870,12 +868,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
private final ThreadPool threadPool;
private final Supplier<NetworkDisruption.DisruptedLinks> disruption;
private Coordinator coordinator;
TestClusterNode(DiscoveryNode node, Supplier<NetworkDisruption.DisruptedLinks> disruption) throws IOException {
this.disruption = disruption;
TestClusterNode(DiscoveryNode node) throws IOException {
this.node = node;
final Environment environment = createEnvironment(node.getName());
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
@ -897,13 +892,20 @@ public class SnapshotResiliencyTests extends ESTestCase {
mockTransport = new DisruptableMockTransport(node, logger) {
@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
return disruption.get().disrupt(node.getName(), destination.getName())
if (node.equals(destination)) {
return ConnectionStatus.CONNECTED;
}
// Check if both nodes are still part of the cluster
if (nodes.containsKey(node.getName()) == false || nodes.containsKey(destination.getName()) == false) {
return ConnectionStatus.DISCONNECTED;
}
return disconnectedNodes.contains(node.getId()) || disconnectedNodes.contains(destination.getId())
? ConnectionStatus.DISCONNECTED : ConnectionStatus.CONNECTED;
}
@Override
protected Optional<DisruptableMockTransport> getDisruptableMockTransport(TransportAddress address) {
return testClusterNodes.nodes.values().stream().map(cn -> cn.mockTransport)
return nodes.values().stream().map(cn -> cn.mockTransport)
.filter(transport -> transport.getLocalNode().getAddress().equals(address))
.findAny();
}
@ -1156,13 +1158,13 @@ public class SnapshotResiliencyTests extends ESTestCase {
testClusterNodes.disconnectNode(this);
final ClusterState oldState = this.clusterService.state();
stop();
testClusterNodes.nodes.remove(node.getName());
nodes.remove(node.getName());
scheduleSoon(() -> {
try {
final TestClusterNode restartedNode = new TestClusterNode(
new DiscoveryNode(node.getName(), node.getId(), node.getAddress(), emptyMap(),
node.getRoles(), Version.CURRENT), disruption);
testClusterNodes.nodes.put(node.getName(), restartedNode);
node.getRoles(), Version.CURRENT));
nodes.put(node.getName(), restartedNode);
restartedNode.start(oldState);
} catch (IOException e) {
throw new AssertionError(e);
@ -1191,7 +1193,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
coordinator = new Coordinator(node.getName(), clusterService.getSettings(),
clusterService.getClusterSettings(), transportService, namedWriteableRegistry,
allocationService, masterService, () -> persistedState,
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
hostsResolver -> nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE);
@ -1206,33 +1208,5 @@ public class SnapshotResiliencyTests extends ESTestCase {
coordinator.startInitialJoin();
}
}
private final class DisconnectedNodes extends NetworkDisruption.DisruptedLinks {
/**
* Node names that are disconnected from all other nodes.
*/
private final Set<String> disconnected = new HashSet<>();
@Override
public boolean disrupt(String node1, String node2) {
if (node1.equals(node2)) {
return false;
}
// Check if both nodes are still part of the cluster
if (testClusterNodes.nodes.containsKey(node1) == false
|| testClusterNodes.nodes.containsKey(node2) == false) {
return true;
}
return disconnected.contains(node1) || disconnected.contains(node2);
}
public void disconnect(String node) {
disconnected.add(node);
}
public void clear() {
disconnected.clear();
}
}
}