Migrate muted auto follow rolling upgrade test and unmute this test (#38900)
The rest of `CCRIT` is now no longer relevant, because the remaining test tests the same of the index following test in the rolling upgrade multi cluster module. Added `tests.upgrade_from_version` version to test. It is not needed in this branch, but is in 6.7 branch. Closes #37231
This commit is contained in:
parent
578514e892
commit
60cc04ed13
|
@ -41,6 +41,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task leaderClusterTestRunner = tasks.getByName("${taskPrefix}#leader#clusterTestRunner")
|
||||
leaderClusterTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'none'
|
||||
systemProperty 'tests.rest.cluster_name', 'leader'
|
||||
|
||||
|
@ -71,6 +72,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task followerClusterTestRunner = tasks.getByName("${taskPrefix}#follower#clusterTestRunner")
|
||||
followerClusterTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'none'
|
||||
systemProperty 'tests.rest.cluster_name', 'follower'
|
||||
|
||||
|
@ -115,6 +117,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task followerOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#oneThirdUpgradedTestRunner")
|
||||
followerOneThirdUpgradedTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'one_third'
|
||||
systemProperty 'tests.rest.cluster_name', 'follower'
|
||||
|
||||
|
@ -135,6 +138,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task followerTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#follower#twoThirdsUpgradedTestRunner")
|
||||
followerTwoThirdsUpgradedTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'two_third'
|
||||
systemProperty 'tests.rest.cluster_name', 'follower'
|
||||
|
||||
|
@ -155,6 +159,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task followerUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#follower#upgradedClusterTestRunner")
|
||||
followerUpgradedClusterTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'all'
|
||||
systemProperty 'tests.rest.cluster_name', 'follower'
|
||||
|
||||
|
@ -181,6 +186,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task leaderOneThirdUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#oneThirdUpgradedTestRunner")
|
||||
leaderOneThirdUpgradedTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'one_third'
|
||||
systemProperty 'tests.rest.cluster_name', 'leader'
|
||||
|
||||
|
@ -201,6 +207,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task leaderTwoThirdsUpgradedTestRunner = tasks.getByName("${taskPrefix}#leader#twoThirdsUpgradedTestRunner")
|
||||
leaderTwoThirdsUpgradedTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'two_third'
|
||||
systemProperty 'tests.rest.cluster_name', 'leader'
|
||||
|
||||
|
@ -221,6 +228,7 @@ for (Version version : bwcVersions.wireCompatible) {
|
|||
|
||||
Task leaderUpgradedClusterTestRunner = tasks.getByName("${taskPrefix}#leader#upgradedClusterTestRunner")
|
||||
leaderUpgradedClusterTestRunner.configure {
|
||||
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
|
||||
systemProperty 'tests.rest.upgrade_state', 'all'
|
||||
systemProperty 'tests.rest.cluster_name', 'leader'
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.upgrades;
|
|||
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
|
@ -71,6 +72,9 @@ public abstract class AbstractMultiClusterUpgradeTestCase extends ESRestTestCase
|
|||
|
||||
protected final ClusterName clusterName = ClusterName.parse(System.getProperty("tests.rest.cluster_name"));
|
||||
|
||||
protected static final Version UPGRADE_FROM_VERSION =
|
||||
Version.fromString(System.getProperty("tests.upgrade_from_version"));
|
||||
|
||||
private static RestClient leaderClient;
|
||||
private static RestClient followerClient;
|
||||
private static boolean initialized = false;
|
||||
|
|
|
@ -5,11 +5,13 @@
|
|||
*/
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -88,6 +90,123 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testAutoFollowing() throws Exception {
|
||||
String leaderIndex1 = "logs-20200101";
|
||||
String leaderIndex2 = "logs-20200102";
|
||||
String leaderIndex3 = "logs-20200103";
|
||||
|
||||
if (clusterName == ClusterName.LEADER) {
|
||||
switch (upgradeState) {
|
||||
case NONE:
|
||||
case ONE_THIRD:
|
||||
case TWO_THIRD:
|
||||
break;
|
||||
case ALL:
|
||||
index(leaderClient(), leaderIndex1, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertTotalHitCount(followerIndex, 320, followerClient());
|
||||
});
|
||||
index(leaderClient(), leaderIndex2, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertTotalHitCount(followerIndex, 256, followerClient());
|
||||
});
|
||||
index(leaderClient(), leaderIndex3, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex3;
|
||||
assertTotalHitCount(followerIndex, 192, followerClient());
|
||||
});
|
||||
|
||||
deleteAutoFollowPattern(followerClient(), "test_pattern");
|
||||
stopIndexFollowing(followerClient(), "copy-" + leaderIndex1);
|
||||
stopIndexFollowing(followerClient(), "copy-" + leaderIndex2);
|
||||
stopIndexFollowing(followerClient(), "copy-" + leaderIndex3);
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("unexpected upgrade_state [" + upgradeState + "]");
|
||||
}
|
||||
} else if (clusterName == ClusterName.FOLLOWER) {
|
||||
switch (upgradeState) {
|
||||
case NONE:
|
||||
putAutoFollowPattern(followerClient(), "test_pattern", "leader", "logs-*");
|
||||
createLeaderIndex(leaderClient(), leaderIndex1);
|
||||
index(leaderClient(), leaderIndex1, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1));
|
||||
assertTotalHitCount(followerIndex, 64, followerClient());
|
||||
});
|
||||
break;
|
||||
case ONE_THIRD:
|
||||
index(leaderClient(), leaderIndex1, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertTotalHitCount(followerIndex, 128, followerClient());
|
||||
});
|
||||
// Auto follow stats are kept in-memory on master elected node
|
||||
// and if this node get updated then auto follow stats are reset
|
||||
{
|
||||
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
|
||||
createLeaderIndex(leaderClient(), leaderIndex2);
|
||||
index(leaderClient(), leaderIndex2, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
|
||||
assertTotalHitCount(followerIndex, 64, followerClient());
|
||||
});
|
||||
}
|
||||
break;
|
||||
case TWO_THIRD:
|
||||
index(leaderClient(), leaderIndex1, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertTotalHitCount(followerIndex, 192, followerClient());
|
||||
});
|
||||
index(leaderClient(), leaderIndex2, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertTotalHitCount(followerIndex, 128, followerClient());
|
||||
});
|
||||
|
||||
// Auto follow stats are kept in-memory on master elected node
|
||||
// and if this node get updated then auto follow stats are reset
|
||||
{
|
||||
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
|
||||
createLeaderIndex(leaderClient(), leaderIndex3);
|
||||
index(leaderClient(), leaderIndex3, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex3;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
|
||||
assertTotalHitCount(followerIndex, 64, followerClient());
|
||||
});
|
||||
}
|
||||
break;
|
||||
case ALL:
|
||||
index(leaderClient(), leaderIndex1, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertTotalHitCount(followerIndex, 256, followerClient());
|
||||
});
|
||||
index(leaderClient(), leaderIndex2, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertTotalHitCount(followerIndex, 192, followerClient());
|
||||
});
|
||||
index(leaderClient(), leaderIndex3, 64);
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex3;
|
||||
assertTotalHitCount(followerIndex, 128, followerClient());
|
||||
});
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unexpected upgrade state [" + upgradeState + "]");
|
||||
}
|
||||
} else {
|
||||
throw new AssertionError("unexpected cluster_name [" + clusterName + "]");
|
||||
}
|
||||
}
|
||||
|
||||
public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
|
||||
assumeTrue("Tests only runs with upgrade_state [all]", upgradeState == UpgradeState.ALL);
|
||||
|
||||
|
@ -113,12 +232,13 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
|
|||
}
|
||||
|
||||
private static void createLeaderIndex(RestClient client, String indexName) throws IOException {
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
Settings.Builder indexSettings = Settings.builder()
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.build();
|
||||
createIndex(client, indexName, indexSettings);
|
||||
.put("index.number_of_replicas", 0);
|
||||
if (UPGRADE_FROM_VERSION.before(Version.V_7_0_0) || randomBoolean()) {
|
||||
indexSettings.put("index.soft_deletes.enabled", true);
|
||||
}
|
||||
createIndex(client, indexName, indexSettings.build());
|
||||
}
|
||||
|
||||
private static void createIndex(RestClient client, String name, Settings settings) throws IOException {
|
||||
|
@ -134,6 +254,29 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
|
|||
assertOK(client.performRequest(request));
|
||||
}
|
||||
|
||||
private static void putAutoFollowPattern(RestClient client, String name, String remoteCluster, String pattern) throws IOException {
|
||||
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
|
||||
request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"" + remoteCluster + "\"," +
|
||||
"\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}");
|
||||
assertOK(client.performRequest(request));
|
||||
}
|
||||
|
||||
private static void deleteAutoFollowPattern(RestClient client, String patternName) throws IOException {
|
||||
Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
|
||||
assertOK(client.performRequest(request));
|
||||
}
|
||||
|
||||
private int getNumberOfSuccessfulFollowedIndices() throws IOException {
|
||||
Request statsRequest = new Request("GET", "/_ccr/stats");
|
||||
Map<?, ?> response = toMap(client().performRequest(statsRequest));
|
||||
Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response);
|
||||
if (actualSuccessfulFollowedIndices != null) {
|
||||
return actualSuccessfulFollowedIndices;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private static void index(RestClient client, String index, int numDocs) throws IOException {
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
final Request request = new Request("POST", "/" + index + "/_doc/");
|
||||
|
@ -162,4 +305,10 @@ public class CcrRollingUpgradeIT extends AbstractMultiClusterUpgradeTestCase {
|
|||
assertThat(totalHits, equalTo(expectedTotalHits));
|
||||
}
|
||||
|
||||
private static void stopIndexFollowing(RestClient client, String followerIndex) throws IOException {
|
||||
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/pause_follow")));
|
||||
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_close")));
|
||||
assertOK(client.performRequest(new Request("POST", "/" + followerIndex + "/_ccr/unfollow")));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,283 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class CCRIT extends AbstractUpgradeTestCase {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(CCRIT.class);
|
||||
|
||||
private static final Version UPGRADE_FROM_VERSION =
|
||||
Version.fromString(System.getProperty("tests.upgrade_from_version"));
|
||||
|
||||
private static final boolean SECOND_ROUND = "false".equals(System.getProperty("tests.first_round"));
|
||||
|
||||
@Override
|
||||
protected boolean preserveClusterSettings() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void testIndexFollowing() throws Exception {
|
||||
assumeTrue("CCR became available in 6.5, but test relies on a fix that was shipped with 6.6.0",
|
||||
UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_6_0));
|
||||
setupRemoteCluster();
|
||||
|
||||
final String leaderIndex = "my-leader-index";
|
||||
final String followerIndex = "my-follower-index";
|
||||
|
||||
switch (CLUSTER_TYPE) {
|
||||
case OLD:
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 1)
|
||||
.build();
|
||||
createIndex(leaderIndex, indexSettings);
|
||||
followIndex(leaderIndex, followerIndex);
|
||||
index(leaderIndex, "1");
|
||||
assertDocumentExists(leaderIndex, "1");
|
||||
assertBusy(() -> {
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 0);
|
||||
assertDocumentExists(followerIndex, "1");
|
||||
});
|
||||
break;
|
||||
case MIXED:
|
||||
if (SECOND_ROUND == false) {
|
||||
index(leaderIndex, "2");
|
||||
assertDocumentExists(leaderIndex, "1", "2");
|
||||
assertBusy(() -> {
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 1);
|
||||
assertDocumentExists(followerIndex, "1", "2");
|
||||
});
|
||||
} else {
|
||||
index(leaderIndex, "3");
|
||||
assertDocumentExists(leaderIndex, "1", "2", "3");
|
||||
assertBusy(() -> {
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 2);
|
||||
assertDocumentExists(followerIndex, "1", "2", "3");
|
||||
});
|
||||
}
|
||||
break;
|
||||
case UPGRADED:
|
||||
index(leaderIndex, "4");
|
||||
assertDocumentExists(leaderIndex, "1", "2", "3", "4");
|
||||
assertBusy(() -> {
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 3);
|
||||
assertDocumentExists(followerIndex, "1", "2", "3", "4");
|
||||
});
|
||||
stopIndexFollowing(followerIndex);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37231")
|
||||
public void testAutoFollowing() throws Exception {
|
||||
assumeTrue("CCR became available in 6.5, but test relies on a fix that was shipped with 6.6.0",
|
||||
UPGRADE_FROM_VERSION.onOrAfter(Version.V_6_6_0));
|
||||
setupRemoteCluster();
|
||||
|
||||
final Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 1)
|
||||
.build();
|
||||
|
||||
String leaderIndex1 = "logs-20200101";
|
||||
String leaderIndex2 = "logs-20200102";
|
||||
String leaderIndex3 = "logs-20200103";
|
||||
|
||||
switch (CLUSTER_TYPE) {
|
||||
case OLD:
|
||||
putAutoFollowPattern("test_pattern", "logs-*");
|
||||
createIndex(leaderIndex1, indexSettings);
|
||||
index(leaderIndex1, "1");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(1));
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 0);
|
||||
assertDocumentExists(followerIndex, "1");
|
||||
});
|
||||
break;
|
||||
case MIXED:
|
||||
if (SECOND_ROUND == false) {
|
||||
index(leaderIndex1, "2");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 1);
|
||||
assertDocumentExists(followerIndex, "2");
|
||||
});
|
||||
// Auto follow stats are kept in-memory on master elected node
|
||||
// and if this node get updated then auto follow stats are reset
|
||||
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
|
||||
createIndex(leaderIndex2, indexSettings);
|
||||
index(leaderIndex2, "1");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 0);
|
||||
assertDocumentExists(followerIndex, "1");
|
||||
});
|
||||
} else {
|
||||
index(leaderIndex1, "3");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 2);
|
||||
assertDocumentExists(followerIndex, "3");
|
||||
});
|
||||
index(leaderIndex2, "2");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 1);
|
||||
assertDocumentExists(followerIndex, "2");
|
||||
});
|
||||
|
||||
// Auto follow stats are kept in-memory on master elected node
|
||||
// and if this node get updated then auto follow stats are reset
|
||||
int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
|
||||
createIndex(leaderIndex3, indexSettings);
|
||||
index(leaderIndex3, "1");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex3;
|
||||
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(previousNumberOfSuccessfulFollowedIndices + 1));
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 0);
|
||||
assertDocumentExists(followerIndex, "1");
|
||||
});
|
||||
}
|
||||
break;
|
||||
case UPGRADED:
|
||||
index(leaderIndex1, "4");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex1;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 3);
|
||||
assertDocumentExists(followerIndex, "4");
|
||||
});
|
||||
index(leaderIndex2, "3");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex2;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 2);
|
||||
assertDocumentExists(followerIndex, "3");
|
||||
});
|
||||
index(leaderIndex3, "2");
|
||||
assertBusy(() -> {
|
||||
String followerIndex = "copy-" + leaderIndex3;
|
||||
assertFollowerGlobalCheckpoint(followerIndex, 1);
|
||||
assertDocumentExists(followerIndex, "2");
|
||||
});
|
||||
|
||||
deleteAutoFollowPattern("test_pattern");
|
||||
|
||||
stopIndexFollowing("copy-" + leaderIndex1);
|
||||
stopIndexFollowing("copy-" + leaderIndex2);
|
||||
stopIndexFollowing("copy-" + leaderIndex3);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
|
||||
}
|
||||
}
|
||||
|
||||
private static void stopIndexFollowing(String followerIndex) throws IOException {
|
||||
pauseFollow(followerIndex);
|
||||
closeIndex(followerIndex);
|
||||
unfollow(followerIndex);
|
||||
}
|
||||
|
||||
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
|
||||
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
|
||||
request.setJsonEntity("{\"remote_cluster\": \"local\", \"leader_index\": \"" + leaderIndex +
|
||||
"\", \"read_poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void pauseFollow(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
|
||||
}
|
||||
|
||||
private static void unfollow(String followIndex) throws IOException {
|
||||
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
|
||||
}
|
||||
|
||||
private static void putAutoFollowPattern(String name, String pattern) throws IOException {
|
||||
Request request = new Request("PUT", "/_ccr/auto_follow/" + name);
|
||||
request.setJsonEntity("{\"leader_index_patterns\": [\"" + pattern + "\"], \"remote_cluster\": \"local\"," +
|
||||
"\"follow_index_pattern\": \"copy-{{leader_index}}\", \"read_poll_timeout\": \"10ms\"}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void deleteAutoFollowPattern(String patternName) throws IOException {
|
||||
Request request = new Request("DELETE", "/_ccr/auto_follow/" + patternName);
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void index(String index, String id) throws IOException {
|
||||
Request request = new Request("POST", "/" + index + "/_doc/" + id);
|
||||
request.setJsonEntity("{}");
|
||||
assertOK(client().performRequest(request));
|
||||
}
|
||||
|
||||
private static void assertDocumentExists(String index, String... ids) throws IOException {
|
||||
for (String id : ids) {
|
||||
Request request = new Request("HEAD", "/" + index + "/_doc/" + id);
|
||||
Response response = client().performRequest(request);
|
||||
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
|
||||
}
|
||||
}
|
||||
|
||||
private static void setupRemoteCluster() throws IOException {
|
||||
Request request = new Request("GET", "/_nodes");
|
||||
Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
|
||||
// Select node info of first node (we don't know the node id):
|
||||
nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
|
||||
String transportAddress = (String) nodesResponse.get("transport_address");
|
||||
|
||||
LOGGER.info("Configuring local remote cluster [{}]", transportAddress);
|
||||
request = new Request("PUT", "/_cluster/settings");
|
||||
request.setJsonEntity("{\"persistent\": {\"cluster.remote.local.seeds\": \"" + transportAddress + "\"}}");
|
||||
assertThat(client().performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
|
||||
}
|
||||
|
||||
private int getNumberOfSuccessfulFollowedIndices() throws IOException {
|
||||
Request statsRequest = new Request("GET", "/_ccr/stats");
|
||||
Map<?, ?> response = toMap(client().performRequest(statsRequest));
|
||||
Integer actualSuccessfulFollowedIndices = ObjectPath.eval("auto_follow_stats.number_of_successful_follow_indices", response);
|
||||
if (actualSuccessfulFollowedIndices != null) {
|
||||
return actualSuccessfulFollowedIndices;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private void assertFollowerGlobalCheckpoint(String followerIndex, int expectedFollowerCheckpoint) throws IOException {
|
||||
Request statsRequest = new Request("GET", "/" + followerIndex + "/_stats");
|
||||
statsRequest.addParameter("level", "shards");
|
||||
// Just docs metric is sufficient here:
|
||||
statsRequest.addParameter("metric", "docs");
|
||||
Map<?, ?> response = toMap(client().performRequest(statsRequest));
|
||||
LOGGER.info("INDEX STATS={}", response);
|
||||
assertThat(((Map) response.get("indices")).size(), equalTo(1));
|
||||
Integer actualFollowerCheckpoint = ObjectPath.eval("indices." + followerIndex + ".shards.0.0.seq_no.global_checkpoint", response);
|
||||
assertThat(actualFollowerCheckpoint, equalTo(expectedFollowerCheckpoint));
|
||||
}
|
||||
|
||||
private static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue