Backport rolling upgrade multi cluster module ()

* Add rolling upgrade multi cluster test module ()

This test starts 2 clusters, each with 3 nodes.
First the leader cluster is started and tests are run against it and
then the follower cluster is started and tests execute against this two cluster.

Then the follower cluster is upgraded, one node at a time.
After that the leader cluster is upgraded, one node at a time.
Every time a node is upgraded tests are ran while both clusters are online.
(and either leader cluster has mixed node versions or the follower cluster)

This commit only tests CCR index following, but could be used for CCS tests as well.
In particular for CCR, unidirectional index following is tested during a rolling upgrade.
During the test several indices are created and followed in the leader cluster before or
while the follower cluster is being upgraded.

This tests also verifies that attempting to follow an index in the upgraded cluster
from the not upgraded cluster fails. After both clusters are upgraded following the
index that previously failed should succeed.

Relates to  and 

* Filter out upgraded version index settings when starting index following ()

The `index.version.upgraded` and `index.version.upgraded_string` are likely
to be different between leader and follower index. In the event that
a follower index gets restored on a upgraded node while the leader index
is still on non-upgraded nodes.

Closes 
This commit is contained in:
Martijn van Groningen 2019-02-14 08:12:14 +01:00 committed by GitHub
parent 60c1dcde88
commit 88489a3f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 621 additions and 2 deletions
x-pack
plugin/ccr/src
main/java/org/elasticsearch/xpack/ccr/action
test/java/org/elasticsearch/xpack/ccr/action
qa/rolling-upgrade-multi-cluster

@ -243,7 +243,8 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
Settings leaderSettings = filter(leaderIndex.getSettings());
Settings followerSettings = filter(followIndex.getSettings());
if (leaderSettings.equals(followerSettings) == false) {
throw new IllegalArgumentException("the leader and follower index settings must be identical");
throw new IllegalArgumentException("the leader index setting[" + leaderSettings + "] and follower index settings [" +
followerSettings + "] must be identical");
}
// Validates if the current follower mapping is mergable with the leader mapping.
@ -456,6 +457,11 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
settings.remove(IndexMetaData.SETTING_CREATION_DATE);
// Follower index may be upgraded, while the leader index hasn't been upgraded, so it is expected
// that these settings are different:
settings.remove(IndexMetaData.SETTING_VERSION_UPGRADED);
settings.remove(IndexMetaData.SETTING_VERSION_UPGRADED_STRING);
Iterator<String> iterator = settings.keys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();

@ -150,7 +150,10 @@ public class TransportResumeFollowActionTests extends ESTestCase {
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical"));
assertThat(e.getMessage(), equalTo("the leader index setting[{\"index.analysis.analyzer.my_analyzer.tokenizer\"" +
":\"whitespace\",\"index.analysis.analyzer.my_analyzer.type\":\"custom\",\"index.number_of_shards\":\"5\"}] " +
"and follower index settings [{\"index.analysis.analyzer.my_analyzer.tokenizer\":\"standard\"," +
"\"index.analysis.analyzer.my_analyzer.type\":\"custom\",\"index.number_of_shards\":\"5\"}] must be identical"));
}
{
// should fail because the following index does not have the following_index settings
@ -242,6 +245,21 @@ public class TransportResumeFollowActionTests extends ESTestCase {
}
}
public void testFilter() {
Settings.Builder settings = Settings.builder();
settings.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "");
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "");
settings.put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), "");
settings.put(IndexMetaData.SETTING_INDEX_UUID, "");
settings.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, "");
settings.put(IndexMetaData.SETTING_CREATION_DATE, "");
settings.put(IndexMetaData.SETTING_VERSION_UPGRADED, "");
settings.put(IndexMetaData.SETTING_VERSION_UPGRADED_STRING, "");
Settings result = TransportResumeFollowAction.filter(settings.build());
assertThat(result.size(), equalTo(0));
}
private static IndexMetaData createIMD(String index,
int numberOfShards,
Settings settings,

@ -0,0 +1,262 @@
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
dependencies {
// "org.elasticsearch.plugin:x-pack-core:${version}" doesn't work with idea because the testArtifacts are also here
testCompile project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') // to be moved in a later commit
}
// This is a top level task which we will add dependencies to below.
// It is a single task that can be used to backcompat tests against all versions.
task bwcTest {
description = 'Runs backwards compatibility tests.'
group = 'verification'
}
for (Version version : bwcVersions.wireCompatible) {
String taskPrefix = "v${version}"
// ============================================================================================
// Create leader cluster
// ============================================================================================
RestIntegTestTask leaderClusterTest = tasks.create(name: "${taskPrefix}#leader#clusterTest", type: RestIntegTestTask) {
mustRunAfter(precommit)
}
configure(extensions.findByName("${taskPrefix}#leader#clusterTestCluster")) {
bwcVersion = version
numBwcNodes = 3
numNodes = 3
clusterName = 'leader'
setting 'xpack.security.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
}
Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner")
leaderClusterTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'none'
systemProperty 'tests.rest.cluster_name', 'leader'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
}
// ============================================================================================
// Create follower cluster
// ============================================================================================
RestIntegTestTask followerClusterTest = tasks.create(name: "${taskPrefix}#follower#clusterTest", type: RestIntegTestTask) {
mustRunAfter(precommit)
}
configure(extensions.findByName("${taskPrefix}#follower#clusterTestCluster")) {
dependsOn leaderClusterTestRunner
bwcVersion = version
numBwcNodes = 3
numNodes = 3
clusterName = 'follower'
setting 'xpack.security.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
}
Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner")
followerClusterTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'none'
systemProperty 'tests.rest.cluster_name', 'follower'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(0).transportUri()}"
}
// ============================================================================================
// Upgrade follower cluster
// ============================================================================================
Closure configureUpgradeCluster = {String prefix, String cluster, String name, Task lastRunner, int stopNode,
RestIntegTestTask clusterTest, Closure getOtherUnicastHostAddresses ->
configure(extensions.findByName("${prefix}#${cluster}#${name}")) {
dependsOn lastRunner, "${prefix}#${cluster}#clusterTestCluster#node${stopNode}.stop"
clusterName = cluster
otherUnicastHostAddresses = { getOtherUnicastHostAddresses() }
minimumMasterNodes = { 2 }
autoSetInitialMasterNodes = false
/* Override the data directory so the new node always gets the node we
* just stopped's data directory. */
dataDir = { nodeNumber -> clusterTest.nodes[stopNode].dataDir }
setting 'repositories.url.allowed_urls', 'http://snapshot.test*'
setting 'xpack.security.enabled', 'false'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
setting 'node.name', "upgraded-node-${cluster}-${stopNode}"
setting 'node.attr.upgraded', 'true'
}
}
Task followerOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#follower#oneThirdUpgradedTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'follower', 'oneThirdUpgradedTestCluster', followerClusterTestRunner, 0, followerClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [followerClusterTest.nodes.get(1).transportUri(), followerClusterTest.nodes.get(2).transportUri()] })
Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner")
followerOneThirdUpgradedTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'one_third'
systemProperty 'tests.rest.cluster_name', 'follower'
systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(1).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(1).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
finalizedBy "${taskPrefix}#follower#clusterTestCluster#node1.stop"
}
Task followerTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#follower#twoThirdsUpgradedTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'follower', 'twoThirdsUpgradedTestCluster', followerOneThirdUpgradedTestRunner, 1, followerClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [followerClusterTest.nodes.get(2).transportUri(), followerOneThirdUpgradedTest.nodes.get(0).transportUri()] })
Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner")
followerTwoThirdsUpgradedTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'two_third'
systemProperty 'tests.rest.cluster_name', 'follower'
systemProperty 'tests.follower_host', "${-> followerClusterTest.nodes.get(2).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerClusterTest.nodes.get(2).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
finalizedBy "${taskPrefix}#follower#clusterTestCluster#node2.stop"
}
Task followerUpgradedClusterTest = tasks.create(name: "${taskPrefix}#follower#upgradedClusterTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'follower', 'upgradedClusterTestCluster', followerTwoThirdsUpgradedTestRunner, 2, followerClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [followerOneThirdUpgradedTest.nodes.get(0).transportUri(), followerTwoThirdsUpgradedTest.nodes.get(0).transportUri()] })
Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner")
followerUpgradedClusterTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'all'
systemProperty 'tests.rest.cluster_name', 'follower'
systemProperty 'tests.follower_host', "${-> followerOneThirdUpgradedTest.nodes.get(0).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerOneThirdUpgradedTest.nodes.get(0).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
// This is needed, otherwise leader node 0 will stop after the leaderClusterTestRunner task has run.
// Here it is ok to stop, because in the next task, the leader node 0 gets upgraded.
finalizedBy "v${version}#leader#clusterTestCluster#node0.stop"
}
// ============================================================================================
// Upgrade leader cluster
// ============================================================================================
Task leaderOneThirdUpgradedTest = tasks.create(name: "${taskPrefix}#leader#oneThirdUpgradedTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'leader', 'oneThirdUpgradedTestCluster', followerUpgradedClusterTestRunner, 0, leaderClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [leaderClusterTest.nodes.get(1).transportUri(), leaderClusterTest.nodes.get(2).transportUri()] })
Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner")
leaderOneThirdUpgradedTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'one_third'
systemProperty 'tests.rest.cluster_name', 'leader'
systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(2).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(2).transportUri()}"
finalizedBy "${taskPrefix}#leader#clusterTestCluster#node1.stop"
}
Task leaderTwoThirdsUpgradedTest = tasks.create(name: "${taskPrefix}#leader#twoThirdsUpgradedTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'leader', 'twoThirdsUpgradedTestCluster', leaderOneThirdUpgradedTestRunner, 1, leaderClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [leaderClusterTest.nodes.get(2).transportUri(), leaderOneThirdUpgradedTest.nodes.get(0).transportUri()] })
Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner")
leaderTwoThirdsUpgradedTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'two_third'
systemProperty 'tests.rest.cluster_name', 'leader'
systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderOneThirdUpgradedTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderOneThirdUpgradedTest.nodes.get(0).transportUri()}"
finalizedBy "${taskPrefix}#leader#clusterTestCluster#node2.stop"
}
Task leaderUpgradedClusterTest = tasks.create(name: "${taskPrefix}#leader#upgradedClusterTest", type: RestIntegTestTask)
configureUpgradeCluster(taskPrefix, 'leader', "upgradedClusterTestCluster", leaderTwoThirdsUpgradedTestRunner, 2, leaderClusterTest,
// Use all running nodes as seed nodes so there is no race between pinging and the tests
{ [leaderOneThirdUpgradedTest.nodes.get(0).transportUri(), leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()] })
Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner")
leaderUpgradedClusterTestRunner.configure {
systemProperty 'tests.rest.upgrade_state', 'all'
systemProperty 'tests.rest.cluster_name', 'leader'
systemProperty 'tests.follower_host', "${-> followerUpgradedClusterTest.nodes.get(0).httpUri()}"
systemProperty 'tests.follower_remote_cluster_seed', "${-> followerUpgradedClusterTest.nodes.get(0).transportUri()}"
systemProperty 'tests.leader_host', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).httpUri()}"
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderTwoThirdsUpgradedTest.nodes.get(0).transportUri()}"
/*
* Force stopping all the upgraded nodes after the test runner
* so they are alive during the test.
*/
finalizedBy "${taskPrefix}#follower#oneThirdUpgradedTestCluster#stop"
finalizedBy "${taskPrefix}#follower#twoThirdsUpgradedTestCluster#stop"
finalizedBy "${taskPrefix}#follower#upgradedClusterTestCluster#stop"
finalizedBy "${taskPrefix}#leader#oneThirdUpgradedTestCluster#stop"
finalizedBy "${taskPrefix}#leader#twoThirdsUpgradedTestCluster#stop"
}
if (project.bwc_tests_enabled) {
Task versionBwcTest = tasks.create(name: "${taskPrefix}#bwcTest") {
dependsOn = [leaderUpgradedClusterTest]
}
bwcTest.dependsOn(versionBwcTest)
}
}
unitTest.enabled = false // no unit tests for rolling upgrades, only the rest integration test
// basic integ tests includes testing bwc against the most recent version
task integTest {
if (project.bwc_tests_enabled) {
for (final def version : bwcVersions.unreleasedWireCompatible) {
dependsOn "v${version}#bwcTest"
}
}
}
check.dependsOn(integTest)

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.upgrades;
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractMultiClusterUpgradeTestCase extends ESRestTestCase {
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
enum UpgradeState {
NONE,
ONE_THIRD,
TWO_THIRD,
ALL;
public static UpgradeState parse(String value) {
switch (value) {
case "none":
return NONE;
case "one_third":
return ONE_THIRD;
case "two_third":
return TWO_THIRD;
case "all":
return ALL;
default:
throw new AssertionError("unknown cluster type: " + value);
}
}
}
protected final UpgradeState upgradeState = UpgradeState.parse(System.getProperty("tests.rest.upgrade_state"));
enum ClusterName {
LEADER,
FOLLOWER;
public static ClusterName parse(String value) {
switch (value) {
case "leader":
return LEADER;
case "follower":
return FOLLOWER;
default:
throw new AssertionError("unknown cluster type: " + value);
}
}
}
protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name"));
private static RestClient leaderClient;
private static RestClient followerClient;
private static boolean initialized = false;
@Before
public void initClientsAndConfigureClusters() throws IOException {
String leaderHost = System.getProperty("tests.leader_host");
if (leaderHost == null) {
throw new AssertionError("leader host is missing");
}
if (initialized) {
return;
}
String followerHost = System.getProperty("tests.follower_host");
if (clusterName == ClusterName.LEADER) {
leaderClient = buildClient(leaderHost);
if (followerHost != null) {
followerClient = buildClient(followerHost);
}
} else if (clusterName == ClusterName.FOLLOWER) {
if (followerHost == null) {
throw new AssertionError("follower host is missing");
}
leaderClient = buildClient(leaderHost);
followerClient = buildClient(followerHost);
} else {
throw new AssertionError("unknown cluster name: " + clusterName);
}
configureLeaderRemoteClusters();
configureFollowerRemoteClusters();
initialized = true;
}
private void configureLeaderRemoteClusters() throws IOException {
String leaderRemoteClusterSeed = System.getProperty("tests.leader_remote_cluster_seed");
if (leaderRemoteClusterSeed != null) {
logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("{\"persistent\": {\"cluster.remote.leader.seeds\": \"" + leaderRemoteClusterSeed + "\"}}");
assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
if (followerClient != null) {
assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
}
} else {
logger.info("No leader remote cluster seed found.");
}
}
private void configureFollowerRemoteClusters() throws IOException {
String followerRemoteClusterSeed = System.getProperty("tests.follower_remote_cluster_seed");
if (followerRemoteClusterSeed != null) {
logger.info("Configuring follower remote cluster [{}]", followerRemoteClusterSeed);
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity("{\"persistent\": {\"cluster.remote.follower.seeds\": \"" + followerRemoteClusterSeed + "\"}}");
assertThat(leaderClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
assertThat(followerClient.performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
} else {
logger.info("No follower remote cluster seed found.");
}
}
@AfterClass
public static void destroyClients() throws IOException {
try {
IOUtils.close(leaderClient, followerClient);
} finally {
leaderClient = null;
followerClient = null;
}
}
protected static RestClient leaderClient() {
return leaderClient;
}
protected static RestClient followerClient() {
return followerClient;
}
private RestClient buildClient(final String url) throws IOException {
int portSeparator = url.lastIndexOf(':');
HttpHost httpHost = new HttpHost(url.substring(0, portSeparator),
Integer.parseInt(url.substring(portSeparator + 1)), getProtocol());
return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
}
protected static Map<?, ?> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
}
}

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.upgrades;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
public void testIndexFollowing() throws Exception {
logger.info("clusterName={}, upgradeState={}", clusterName, upgradeState);
if (clusterName == ClusterName.LEADER) {
switch (upgradeState) {
case NONE:
createLeaderIndex(leaderClient(), "leader_index1");
index(leaderClient(), "leader_index1", 64);
createLeaderIndex(leaderClient(), "leader_index2");
index(leaderClient(), "leader_index2", 64);
break;
case ONE_THIRD:
break;
case TWO_THIRD:
break;
case ALL:
createLeaderIndex(leaderClient(), "leader_index4");
followIndex(followerClient(), "leader", "leader_index4", "follower_index4");
index(leaderClient(), "leader_index4", 64);
assertTotalHitCount("follower_index4", 64, followerClient());
break;
default:
throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
}
} else if (clusterName == ClusterName.FOLLOWER) {
switch (upgradeState) {
case NONE:
followIndex(followerClient(), "leader", "leader_index1", "follower_index1");
assertTotalHitCount("follower_index1", 64, followerClient());
break;
case ONE_THIRD:
index(leaderClient(), "leader_index1", 64);
assertTotalHitCount("follower_index1", 128, followerClient());
followIndex(followerClient(), "leader", "leader_index2", "follower_index2");
assertTotalHitCount("follower_index2", 64, followerClient());
break;
case TWO_THIRD:
index(leaderClient(), "leader_index1", 64);
assertTotalHitCount("follower_index1", 192, followerClient());
index(leaderClient(), "leader_index2", 64);
assertTotalHitCount("follower_index2", 128, followerClient());
createLeaderIndex(leaderClient(), "leader_index3");
index(leaderClient(), "leader_index3", 64);
followIndex(followerClient(), "leader", "leader_index3", "follower_index3");
assertTotalHitCount("follower_index3", 64, followerClient());
break;
case ALL:
index(leaderClient(), "leader_index1", 64);
assertTotalHitCount("follower_index1", 256, followerClient());
index(leaderClient(), "leader_index2", 64);
assertTotalHitCount("follower_index2", 192, followerClient());
index(leaderClient(), "leader_index3", 64);
assertTotalHitCount("follower_index3", 128, followerClient());
break;
default:
throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
}
} else {
throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
}
}
public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL);
if (clusterName == ClusterName.FOLLOWER) {
// At this point the leader cluster has not been upgraded, but follower cluster has been upgrade.
// Create a leader index in the follow cluster and try to follow it in the leader cluster.
// This should fail, because the leader cluster at this point in time can't do file based recovery from follower.
createLeaderIndex(followerClient(), "not_supported");
index(followerClient(), "not_supported", 64);
ResponseException e = expectThrows(ResponseException.class,
() -> followIndex(leaderClient(), "follower", "not_supported", "not_supported"));
assertThat(e.getMessage(), containsString("the snapshot was created with Elasticsearch version ["));
assertThat(e.getMessage(), containsString("] which is higher than the version of this node ["));
} else if (clusterName == ClusterName.LEADER) {
// At this point all nodes in both clusters have been updated and
// the leader cluster can now follow leader_index4 in the follower cluster:
followIndex(leaderClient(), "follower", "not_supported", "not_supported");
assertTotalHitCount("not_supported", 64, leaderClient());
} else {
throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
}
}
private static void createLeaderIndex(RestClient client, String indexName) throws IOException {
Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.build();
createIndex(client, indexName, indexSettings);
}
private static void createIndex(RestClient client, String name, Settings settings) throws IOException {
Request request = new Request("PUT", "/" + name);
request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings) + "}");
client.performRequest(request);
}
private static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1");
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
"\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client.performRequest(request));
}
private static void index(RestClient client, String index, int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final Request request = new Request("POST", "/" + index + "/_doc/");
request.setJsonEntity("{}");
assertOK(client.performRequest(request));
if (randomIntBetween(0, 5) == 3) {
assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
}
}
}
private static void assertTotalHitCount(final String index,
final int expectedTotalHits,
final RestClient client) throws Exception {
assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
assertBusy(() -> verifyTotalHitCount(index, expectedTotalHits, client));
}
private static void verifyTotalHitCount(final String index,
final int expectedTotalHits,
final RestClient client) throws IOException {
final Request request = new Request("GET", "/" + index + "/_search");
request.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Map<?, ?> response = toMap(client.performRequest(request));
final int totalHits = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(totalHits, equalTo(expectedTotalHits));
}
}