mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-28 02:48:38 +00:00
Switch more tests to zen2 (#36367)
1. CCR tests work without any changes 2. `testDanglingIndices` require changes the source code (added TODO). 3. `testIndexDeletionWhenNodeRejoins` because it's using just two nodes, adding the node to exclusions is needed on restart. 4. `testCorruptTranslogTruncationOfReplica` starts dedicated master one, because otherwise, the cluster does not form, if nodes are stopped and one node is started back. 5. `testResolvePath` needs TEST cluster, because all nodes are stopped at the end of the test and it's not possible to perform checks needed by SUITE cluster. 6. `SnapshotDisruptionIT`. Without changes, the test fails because Zen2 retries snapshot creation as soon as network partition heals. This results into the race between creating snapshot and test cleanup logic (deleting index). Zen1 on the other hand, also schedules retry, but it takes some time after network partition heals, so cleanup logic executes latter and test passes. The check that snapshot is eventually created is added to the end of the test.
This commit is contained in:
parent
13b1f19772
commit
8b821706cc
server/src/test/java/org/elasticsearch
discovery
gateway
index/shard
test/framework/src/main/java/org/elasticsearch/test
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack
@ -68,7 +68,6 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(AbstractDisruptionTestCase.DEFAULT_SETTINGS)
|
||||
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false)
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // requires more work
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")
|
||||
.build();
|
||||
}
|
||||
@ -133,7 +132,7 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
||||
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
assertBusy(() -> {
|
||||
SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState()
|
||||
SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(false).get().getState()
|
||||
.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null && snapshots.entries().size() > 0) {
|
||||
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
|
||||
@ -146,15 +145,9 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
||||
logger.info("--> verify that snapshot was successful or no longer exist");
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo")
|
||||
.setSnapshots("test-snap-2").get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
||||
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
logger.info("--> done verifying");
|
||||
assertSnapshotExists("test-repo", "test-snap-2");
|
||||
} catch (SnapshotMissingException exception) {
|
||||
logger.info("--> snapshot doesn't exist");
|
||||
logger.info("--> done verifying, snapshot doesn't exist");
|
||||
}
|
||||
}, 1, TimeUnit.MINUTES);
|
||||
|
||||
@ -172,6 +165,21 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
||||
cause = cause.getCause();
|
||||
assertThat(cause, instanceOf(FailedToCommitClusterStateException.class));
|
||||
}
|
||||
|
||||
logger.info("--> verify that snapshot eventually will be created due to retries");
|
||||
assertBusy(() -> {
|
||||
assertSnapshotExists("test-repo", "test-snap-2");
|
||||
}, 1, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
private void assertSnapshotExists(String repository, String snapshot) {
|
||||
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository)
|
||||
.setSnapshots(snapshot).get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
||||
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
logger.info("--> done verifying, snapshot exists");
|
||||
}
|
||||
|
||||
private void createRandomIndex(String idxName) throws InterruptedException {
|
||||
|
@ -276,10 +276,14 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
||||
}
|
||||
|
||||
public void testDanglingIndices() throws Exception {
|
||||
/*TODO This test test does not work with Zen2, because once master node looses its cluster state during restart
|
||||
it will start with term = 1, which is the same as the term data node has. Data node won't accept cluster state from master
|
||||
after the restart, because the term is the same, but version of the cluster state is greater on the data node.
|
||||
Consider adding term to JoinRequest, so that master node can bump its term if its current term is less than JoinRequest#term.
|
||||
*/
|
||||
logger.info("--> starting two nodes");
|
||||
|
||||
final String node_1 = internalCluster().startNodes(2,
|
||||
//TODO fails wih Zen2
|
||||
Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build()).get(0);
|
||||
|
||||
logger.info("--> indexing a simple document");
|
||||
@ -333,9 +337,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
||||
final List<String> nodes;
|
||||
logger.info("--> starting a cluster with " + numNodes + " nodes");
|
||||
nodes = internalCluster().startNodes(numNodes,
|
||||
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100))
|
||||
//TODO fails with Zen2
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
|
||||
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100)).build());
|
||||
logger.info("--> create an index");
|
||||
createIndex(indexName);
|
||||
|
||||
@ -355,6 +357,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
||||
final Client client = client(otherNode);
|
||||
client.admin().indices().prepareDelete(indexName).execute().actionGet();
|
||||
assertFalse(client.admin().indices().prepareExists(indexName).execute().actionGet().isExists());
|
||||
logger.info("--> index deleted");
|
||||
return super.onNodeStopped(nodeName);
|
||||
}
|
||||
});
|
||||
|
@ -70,7 +70,6 @@ import org.elasticsearch.test.CorruptionUtils;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.engine.MockEngineSupport;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
@ -99,16 +98,9 @@ import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // no state persistence yet
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
|
||||
@ -260,7 +252,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
|
||||
}
|
||||
|
||||
public void testCorruptTranslogTruncation() throws Exception {
|
||||
internalCluster().startNodes(2, Settings.EMPTY);
|
||||
internalCluster().startNodes(2);
|
||||
|
||||
final String node1 = internalCluster().getNodeNames()[0];
|
||||
final String node2 = internalCluster().getNodeNames()[1];
|
||||
@ -436,10 +428,10 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
|
||||
}
|
||||
|
||||
public void testCorruptTranslogTruncationOfReplica() throws Exception {
|
||||
internalCluster().startNodes(2, Settings.EMPTY);
|
||||
internalCluster().startMasterOnlyNode();
|
||||
|
||||
final String node1 = internalCluster().getNodeNames()[0];
|
||||
final String node2 = internalCluster().getNodeNames()[1];
|
||||
final String node1 = internalCluster().startDataOnlyNode();
|
||||
final String node2 = internalCluster().startDataOnlyNode();
|
||||
logger.info("--> nodes name: {}, {}", node1, node2);
|
||||
|
||||
final String indexName = "test";
|
||||
@ -481,12 +473,11 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
|
||||
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
|
||||
final Set<Path> translogDirs = getDirs(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME);
|
||||
|
||||
// stop the cluster nodes. we don't use full restart so the node start up order will be the same
|
||||
// and shard roles will be maintained
|
||||
// stop data nodes. After the restart the 1st node will be primary and the 2nd node will be replica
|
||||
internalCluster().stopRandomDataNode();
|
||||
internalCluster().stopRandomDataNode();
|
||||
|
||||
// Corrupt the translog file(s)
|
||||
// Corrupt the translog file(s) on the replica
|
||||
logger.info("--> corrupting translog");
|
||||
TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs);
|
||||
|
||||
|
@ -1641,35 +1641,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
}
|
||||
|
||||
private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
|
||||
final Set<String> excludedNodeIds = new HashSet<>();
|
||||
|
||||
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
|
||||
|
||||
final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
|
||||
assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
|
||||
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
|
||||
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
|
||||
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
|
||||
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
|
||||
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
|
||||
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
|
||||
assert excludedNodeIds.size() == stoppingMasters;
|
||||
|
||||
logger.info("adding voting config exclusions {} prior to shutdown", excludedNodeIds);
|
||||
try {
|
||||
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
|
||||
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (stoppingMasters > 0) {
|
||||
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
|
||||
}
|
||||
}
|
||||
final Set<String> excludedNodeIds = excludeMasters(nodeAndClients);
|
||||
|
||||
for (NodeAndClient nodeAndClient: nodeAndClients) {
|
||||
removeDisruptionSchemeFromNode(nodeAndClient);
|
||||
@ -1678,14 +1650,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
nodeAndClient.close();
|
||||
}
|
||||
|
||||
if (excludedNodeIds.isEmpty() == false) {
|
||||
logger.info("removing voting config exclusions for {} after shutdown", excludedNodeIds);
|
||||
try {
|
||||
client().execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
removeExclusions(excludedNodeIds);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1751,31 +1716,78 @@ public final class InternalTestCluster extends TestCluster {
|
||||
|
||||
private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
|
||||
logger.info("Restarting node [{}] ", nodeAndClient.name);
|
||||
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
|
||||
}
|
||||
final int masterNodesCount = getMasterNodesCount();
|
||||
// special case to allow stopping one node in a two node cluster and keep it functional
|
||||
final boolean updateMinMaster = nodeAndClient.isMasterEligible() && masterNodesCount == 2 && autoManageMinMasterNodes;
|
||||
if (updateMinMaster) {
|
||||
updateMinMasterNodes(masterNodesCount - 1);
|
||||
}
|
||||
|
||||
Set<String> excludedNodeIds = excludeMasters(Collections.singleton(nodeAndClient));
|
||||
|
||||
final Settings newSettings = nodeAndClient.closeForRestart(callback,
|
||||
autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
|
||||
autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1);
|
||||
|
||||
removeExclusions(excludedNodeIds);
|
||||
|
||||
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
|
||||
nodeAndClient.startNode();
|
||||
if (activeDisruptionScheme != null) {
|
||||
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
|
||||
}
|
||||
if (callback.validateClusterForming() || updateMinMaster) {
|
||||
|
||||
if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
|
||||
// we have to validate cluster size if updateMinMaster == true, because we need the
|
||||
// second node to join in order to increment min_master_nodes back to 2.
|
||||
// we also have to do via the node that was just restarted as it may be that the master didn't yet process
|
||||
// the fact it left
|
||||
validateClusterFormed(nodeAndClient.name);
|
||||
}
|
||||
if (updateMinMaster) {
|
||||
updateMinMasterNodes(masterNodesCount);
|
||||
|
||||
if (excludedNodeIds.isEmpty() == false) {
|
||||
updateMinMasterNodes(getMasterNodesCount());
|
||||
}
|
||||
}
|
||||
|
||||
private Set<String> excludeMasters(Collection<NodeAndClient> nodeAndClients) {
|
||||
final Set<String> excludedNodeIds = new HashSet<>();
|
||||
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
|
||||
|
||||
final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
|
||||
assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
|
||||
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
|
||||
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
|
||||
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
|
||||
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
|
||||
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
|
||||
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(excludedNodeIds::add);
|
||||
assert excludedNodeIds.size() == stoppingMasters;
|
||||
|
||||
logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeIds);
|
||||
try {
|
||||
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
|
||||
new AddVotingConfigExclusionsRequest(excludedNodeIds.toArray(new String[0]))).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (stoppingMasters > 0) {
|
||||
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
|
||||
}
|
||||
}
|
||||
return excludedNodeIds;
|
||||
}
|
||||
|
||||
private void removeExclusions(Set<String> excludedNodeIds) {
|
||||
if (excludedNodeIds.isEmpty() == false) {
|
||||
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
|
||||
try {
|
||||
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client(random);
|
||||
client.execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1833,7 +1845,6 @@ public final class InternalTestCluster extends TestCluster {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the name of the current master node in the cluster.
|
||||
*/
|
||||
|
@ -188,7 +188,6 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
||||
builder.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS));
|
||||
builder.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
|
||||
builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file");
|
||||
builder.put(TestZenDiscovery.USE_ZEN2.getKey(), false); // some tests do full cluster restarts
|
||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
|
||||
builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
|
||||
builder.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
|
||||
|
Loading…
x
Reference in New Issue
Block a user