diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index f030b99d0a0..999428a22ce 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -13,11 +13,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrIntegTestCase; @@ -32,20 +32,20 @@ import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -168,7 +168,7 @@ public class AutoFollowIT extends CcrIntegTestCase { // Delete auto follow pattern and make sure that in the background the auto follower has stopped // then the leader index created after that should never be auto followed: - deleteAutoFollowPatternSetting(); + deleteAutoFollowPattern("my-pattern"); try { assertBusy(() -> { metaData[0] = getFollowerCluster().clusterService().state().metaData(); @@ -481,34 +481,41 @@ public class AutoFollowIT extends CcrIntegTestCase { .build(); final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"}; - if (randomBoolean()) { - // sometimes create indices in the remote cluster that match the future auto follow patterns - Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings)); - } - // create auto follow patterns - final List autoFollowPatterns = new ArrayList<>(prefixes.length); - for (String prefix : prefixes) { - String name = prefix + "pattern"; - putAutoFollowPatterns(name, new String[]{prefix + "*"}); - autoFollowPatterns.add(name); - assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1))); - assertTrue(getAutoFollowPattern(name).isActive()); - } + // create an auto follow pattern for each prefix + final List autoFollowPatterns = Arrays.stream(prefixes) + .map(prefix -> { + final String pattern = prefix + "pattern"; + putAutoFollowPatterns(pattern, new String[]{prefix + "*"}); + return pattern; + }).collect(Collectors.toList()); - // no following indices are created yet - assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0)); + // pick up some random pattern to pause + final List pausedAutoFollowerPatterns = randomSubsetOf(randomIntBetween(1, 3), autoFollowPatterns); + + // all patterns should be active + assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1))); - // create random indices in the remote cluster that match the patterns final AtomicBoolean running = new AtomicBoolean(true); - final Set leaderIndices = ConcurrentCollections.newConcurrentSet(); + final AtomicInteger leaderIndices = new AtomicInteger(0); + + // start creating new indices on the remote cluster final Thread createNewLeaderIndicesThread = new Thread(() -> { - while (running.get()) { + int leaderIndicesCount; + while (running.get() && (leaderIndicesCount = leaderIndices.incrementAndGet()) < 20) { + final String prefix = randomFrom(prefixes); + final String leaderIndex = prefix + leaderIndicesCount; try { - String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - createLeaderIndex(indexName, leaderIndexSettings); - leaderIndices.add(indexName); - Thread.sleep(randomIntBetween(100, 500)); + createLeaderIndex(leaderIndex, leaderIndexSettings); + ensureLeaderGreen(leaderIndex); + if (pausedAutoFollowerPatterns.stream().noneMatch(pattern -> pattern.startsWith(prefix))) { + final String followingIndex = "copy-" + leaderIndex; + assertBusy(() -> assertTrue(followerClient().admin().indices() + .exists(new IndicesExistsRequest(followingIndex)).actionGet().isExists())); + } else { + Thread.sleep(200L); + } } catch (Exception e) { throw new AssertionError(e); } @@ -516,47 +523,45 @@ public class AutoFollowIT extends CcrIntegTestCase { }); createNewLeaderIndicesThread.start(); - // wait for some leader indices to be auto-followed - assertBusy(() -> - assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length))); + // wait for 3 leader indices to be created on the remote cluster + assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(3))); + assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(3L))); - final int nbLeaderIndices = leaderIndices.size(); - - // pause some random patterns - final List pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns); + // now pause some random patterns pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern); - assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive()))); + assertBusy(() -> autoFollowPatterns.forEach(pattern -> + assertThat(getAutoFollowPattern(pattern).isActive(), equalTo(pausedAutoFollowerPatterns.contains(pattern) == false)))); - assertBusy(() -> { - final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0; - assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters)); - if (expectedAutoFollowedClusters > 0) { - // wait for more indices to be created in the remote cluster while some patterns are paused - assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3)); - } - }); - ensureFollowerGreen(true, "copy-*"); + // wait for more leader indices to be created on the remote cluster + assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(6))); + assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(6L))); // resume auto follow patterns pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern); - assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + assertBusy(() -> autoFollowPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + + // wait for more leader indices to be created on the remote cluster + assertBusy(() -> assertThat(leaderIndices.get(), greaterThanOrEqualTo(9))); + assertBusy(() -> assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo(9L))); - // stop creating indices in the remote cluster running.set(false); createNewLeaderIndicesThread.join(); - ensureLeaderGreen(leaderIndices.toArray(new String[0])); - // check that all leader indices have been correctly auto followed - assertBusy(() -> { - final Client client = followerClient(); - assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size())); - leaderIndices.stream() - .map(leaderIndex -> "copy-" + leaderIndex) - .forEach(followerIndex -> - assertTrue("following index must exist: " + followerIndex, - client.admin().indices().exists(new IndicesExistsRequest(followerIndex)).actionGet().isExists())); - }); + List matchingPrefixes = Arrays.stream(prefixes).map(prefix -> prefix + "*").collect(Collectors.toList()); + for (IndexMetaData leaderIndexMetaData : leaderClient().admin().cluster().prepareState().get().getState().metaData()) { + final String leaderIndex = leaderIndexMetaData.getIndex().getName(); + if (Regex.simpleMatch(matchingPrefixes, leaderIndex)) { + String followingIndex = "copy-" + leaderIndex; + assertBusy(() -> assertThat("Following index [" + followingIndex + "] must exists", + followerClient().admin().indices().exists(new IndicesExistsRequest(followingIndex)).actionGet().isExists(), is(true))); + } + } + + autoFollowPatterns.forEach(this::deleteAutoFollowPattern); + + ensureFollowerGreen("copy-*"); + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.get())); } private void putAutoFollowPatterns(String name, String[] patterns) { @@ -569,8 +574,8 @@ public class AutoFollowIT extends CcrIntegTestCase { assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); } - private void deleteAutoFollowPatternSetting() { - DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request("my-pattern"); + private void deleteAutoFollowPattern(final String name) { + DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request(name); assertTrue(followerClient().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged()); }