Fix AutoFollowIT.testPauseAndResumeWithMultipleAutoFollowPatterns (#48289)
The test testPauseAndResumeWithMultipleAutoFollowPatterns failed multiple times, mostly because it creates too many leader indices and the following cluster cannot cope with cluster state updates generated by following indices creation and pause/ resume auto-followers changes. This commit simplifies the test by creating at most 20 leader indices and by waiting for any new leader index to be picked up by the auto-follower before created another leader index. It also pause and resume less auto-followers as previously. closes #47917
This commit is contained in:
parent
7d085ffbd9
commit
0094bd5939
|
@ -13,11 +13,11 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
|
@ -32,20 +32,20 @@ import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
|
|||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
@ -168,7 +168,7 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
|
||||
// Delete auto follow pattern and make sure that in the background the auto follower has stopped
|
||||
// then the leader index created after that should never be auto followed:
|
||||
deleteAutoFollowPatternSetting();
|
||||
deleteAutoFollowPattern("my-pattern");
|
||||
try {
|
||||
assertBusy(() -> {
|
||||
metaData[0] = getFollowerCluster().clusterService().state().metaData();
|
||||
|
@ -481,34 +481,41 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
.build();
|
||||
|
||||
final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"};
|
||||
if (randomBoolean()) {
|
||||
// sometimes create indices in the remote cluster that match the future auto follow patterns
|
||||
Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings));
|
||||
}
|
||||
|
||||
// create auto follow patterns
|
||||
final List<String> autoFollowPatterns = new ArrayList<>(prefixes.length);
|
||||
for (String prefix : prefixes) {
|
||||
String name = prefix + "pattern";
|
||||
putAutoFollowPatterns(name, new String[]{prefix + "*"});
|
||||
autoFollowPatterns.add(name);
|
||||
// create an auto follow pattern for each prefix
|
||||
final List<String> autoFollowPatterns = Arrays.stream(prefixes)
|
||||
.map(prefix -> {
|
||||
final String pattern = prefix + "pattern";
|
||||
putAutoFollowPatterns(pattern, new String[]{prefix + "*"});
|
||||
return pattern;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// pick up some random pattern to pause
|
||||
final List<String> pausedAutoFollowerPatterns = randomSubsetOf(randomIntBetween(1, 3), autoFollowPatterns);
|
||||
|
||||
// all patterns should be active
|
||||
assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)));
|
||||
assertTrue(getAutoFollowPattern(name).isActive());
|
||||
}
|
||||
|
||||
// no following indices are created yet
|
||||
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0));
|
||||
|
||||
// create random indices in the remote cluster that match the patterns
|
||||
final AtomicBoolean running = new AtomicBoolean(true);
|
||||
final Set<String> leaderIndices = ConcurrentCollections.newConcurrentSet();
|
||||
final AtomicInteger leaderIndices = new AtomicInteger(0);
|
||||
|
||||
// start creating new indices on the remote cluster
|
||||
final Thread createNewLeaderIndicesThread = new Thread(() -> {
|
||||
while (running.get()) {
|
||||
int leaderIndicesCount;
|
||||
while (running.get() && (leaderIndicesCount = leaderIndices.incrementAndGet()) < 20) {
|
||||
final String prefix = randomFrom(prefixes);
|
||||
final String leaderIndex = prefix + leaderIndicesCount;
|
||||
try {
|
||||
String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createLeaderIndex(indexName, leaderIndexSettings);
|
||||
leaderIndices.add(indexName);
|
||||
Thread.sleep(randomIntBetween(100, 500));
|
||||
createLeaderIndex(leaderIndex, leaderIndexSettings);
|
||||
ensureLeaderGreen(leaderIndex);
|
||||
if (pausedAutoFollowerPatterns.stream().noneMatch(pattern -> pattern.startsWith(prefix))) {
|
||||
final String followingIndex = "copy-" + leaderIndex;
|
||||
assertBusy(() -> assertTrue(followerClient().admin().indices()
|
||||
.exists(new IndicesExistsRequest(followingIndex)).actionGet().isExists()));
|
||||
} else {
|
||||
Thread.sleep(200L);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
@ -516,47 +523,45 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
});
|
||||
createNewLeaderIndicesThread.start();
|
||||
|
||||
// wait for some leader indices to be auto-followed
|
||||
assertBusy(() ->
|
||||
assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length)));
|
||||
// wait for 3 leader indices to be created on the remote cluster
|
||||
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(3)));
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(3L)));
|
||||
|
||||
final int nbLeaderIndices = leaderIndices.size();
|
||||
|
||||
// pause some random patterns
|
||||
final List<String> pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns);
|
||||
// now pause some random patterns
|
||||
pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern);
|
||||
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive())));
|
||||
assertBusy(() -> autoFollowPatterns.forEach(pattern ->
|
||||
assertThat(getAutoFollowPattern(pattern).isActive(), equalTo(pausedAutoFollowerPatterns.contains(pattern) == false))));
|
||||
|
||||
assertBusy(() -> {
|
||||
final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0;
|
||||
assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters));
|
||||
if (expectedAutoFollowedClusters > 0) {
|
||||
// wait for more indices to be created in the remote cluster while some patterns are paused
|
||||
assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3));
|
||||
}
|
||||
});
|
||||
ensureFollowerGreen(true, "copy-*");
|
||||
// wait for more leader indices to be created on the remote cluster
|
||||
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(6)));
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(6L)));
|
||||
|
||||
// resume auto follow patterns
|
||||
pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern);
|
||||
assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
|
||||
assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive())));
|
||||
|
||||
// wait for more leader indices to be created on the remote cluster
|
||||
assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(9)));
|
||||
assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(9L)));
|
||||
|
||||
// stop creating indices in the remote cluster
|
||||
running.set(false);
|
||||
createNewLeaderIndicesThread.join();
|
||||
|
||||
ensureLeaderGreen(leaderIndices.toArray(new String[0]));
|
||||
|
||||
// check that all leader indices have been correctly auto followed
|
||||
assertBusy(() -> {
|
||||
final Client client = followerClient();
|
||||
assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size()));
|
||||
leaderIndices.stream()
|
||||
.map(leaderIndex -> "copy-" + leaderIndex)
|
||||
.forEach(followerIndex ->
|
||||
assertTrue("following index must exist: " + followerIndex,
|
||||
client.admin().indices().exists(new IndicesExistsRequest(followerIndex)).actionGet().isExists()));
|
||||
});
|
||||
List<String> matchingPrefixes = Arrays.stream(prefixes).map(prefix -> prefix + "*").collect(Collectors.toList());
|
||||
for (IndexMetaData leaderIndexMetaData : leaderClient().admin().cluster().prepareState().get().getState().metaData()) {
|
||||
final String leaderIndex = leaderIndexMetaData.getIndex().getName();
|
||||
if (Regex.simpleMatch(matchingPrefixes, leaderIndex)) {
|
||||
String followingIndex = "copy-" + leaderIndex;
|
||||
assertBusy(() -> assertThat("Following index [" + followingIndex + "] must exists",
|
||||
followerClient().admin().indices().exists(new IndicesExistsRequest(followingIndex)).actionGet().isExists(), is(true)));
|
||||
}
|
||||
}
|
||||
|
||||
autoFollowPatterns.forEach(this::deleteAutoFollowPattern);
|
||||
|
||||
ensureFollowerGreen("copy-*");
|
||||
assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.get()));
|
||||
}
|
||||
|
||||
private void putAutoFollowPatterns(String name, String[] patterns) {
|
||||
|
@ -569,8 +574,8 @@ public class AutoFollowIT extends CcrIntegTestCase {
|
|||
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
private void deleteAutoFollowPatternSetting() {
|
||||
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request("my-pattern");
|
||||
private void deleteAutoFollowPattern(final String name) {
|
||||
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(name);
|
||||
assertTrue(followerClient().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue