From a264cb6ddb809b8d7ef9375a2caea5a6f2e628a4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 5 Dec 2018 13:39:14 +0100 Subject: [PATCH] Refactor AutoFollowCoordinator to track leader indices per remote cluster (#36031) and replaced poll interval setting with a hardcoded poll interval. The hard coded interval will be removed in a follow up change to make use of cluster state API's wait_for_metatdata_version. Before the auto following was bootstrapped from thread pool scheduler, but now auto followers for new remote clusters are bootstrapped when a new cluster state is published. Originates from #35895 Relates to #33007 --- x-pack/plugin/ccr/qa/chain/build.gradle | 1 + .../elasticsearch/xpack/ccr/AutoFollowIT.java | 69 +++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 10 +- .../ccr/action/AutoFollowCoordinator.java | 275 ++++++++++-------- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 90 +++--- .../action/AutoFollowCoordinatorTests.java | 54 +++- 7 files changed, 319 insertions(+), 182 deletions(-) create mode 100644 x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java diff --git a/x-pack/plugin/ccr/qa/chain/build.gradle b/x-pack/plugin/ccr/qa/chain/build.gradle index f93feb4a66a..b9bf933d10f 100644 --- a/x-pack/plugin/ccr/qa/chain/build.gradle +++ b/x-pack/plugin/ccr/qa/chain/build.gradle @@ -46,6 +46,7 @@ followClusterTestCluster { numNodes = 1 clusterName = 'follow-cluster' setting 'xpack.license.self_generated.type', 'trial' + setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\"" setting 'node.name', 'follow' } diff --git a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java new file mode 100644 index 00000000000..4096202c92a --- /dev/null +++ b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class AutoFollowIT extends ESCCRRestTestCase { + + public void testAutoFollowPatterns() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + try (RestClient middleClient = buildMiddleClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20200101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(middleClient.performRequest(request)); + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true"); + } + } + assertBusy(() -> { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Map autoFollowStats = (Map) response.get("auto_follow_stats"); + assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2)); + + ensureYellow("logs-20190101"); + ensureYellow("logs-20200101"); + verifyDocuments("logs-20190101", 5, "filtered_field:true"); + verifyDocuments("logs-20200101", 5, "filtered_field:true"); + }); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7cceecbd399..6e09d29ba4f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -155,7 +155,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d3f5c85b4f8..544a45792e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -29,12 +28,6 @@ public final class CcrSettings { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); - /** - * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow - */ - public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = - Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); - /** * The settings defined by CCR. * @@ -43,8 +36,7 @@ public final class CcrSettings { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_POLL_INTERVAL); + CCR_FOLLOWING_INDEX_SETTING); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 6bddedc0104..707cd1abe5e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -15,14 +15,14 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; @@ -31,7 +31,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; -import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -45,28 +44,29 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** * A component that runs only on the elected master node and follows leader indices automatically * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. */ -public class AutoFollowCoordinator implements ClusterStateApplier { +public class AutoFollowCoordinator implements ClusterStateListener { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final TimeValue pollInterval; private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; - private volatile boolean localNodeMaster = false; + private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: private long numberOfSuccessfulIndicesAutoFollowed = 0; @@ -75,7 +75,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( - Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, @@ -84,10 +83,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); - - this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); - clusterService.addStateApplier(this); - + clusterService.addListener(this); this.recentAutoFollowErrors = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { @@ -130,151 +126,189 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } } - private void doAutoFollow() { - if (localNodeMaster == false) { - return; - } - ClusterState followerClusterState = clusterService.state(); + void updateAutoFollowers(ClusterState followerClusterState) { AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - return; - } - - if (autoFollowMetadata.getPatterns().isEmpty()) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } if (ccrLicenseChecker.isCcrAllowed() == false) { // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr")); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } - Consumer> handler = results -> { - updateStats(results); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - }; - AutoFollower operation = new AutoFollower(handler, followerClusterState) { + final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); + Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() + .map(entry -> entry.getValue().getRemoteCluster()) + .filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false) + .collect(Collectors.toSet()); - @Override - void getLeaderClusterState(final String remoteCluster, - final BiConsumer handler) { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.metaData(true); - request.routingTable(true); - // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - client, - remoteCluster, - request, - e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); + Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); + for (String remoteCluster : newRemoteClusters) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { + + @Override + void getLeaderClusterState(final String remoteCluster, + final BiConsumer handler) { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + request.routingTable(true); + // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + client, + remoteCluster, + request, + e -> handler.accept(null, e), + leaderClusterState -> handler.accept(leaderClusterState, null)); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request request, + Runnable successHandler, + Consumer failureHandler) { + Client followerClient = CcrLicenseChecker.wrapClient(client, headers); + followerClient.execute( + PutFollowAction.INSTANCE, + request, + ActionListener.wrap(r -> successHandler.run(), failureHandler) + ); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } + + }; + newAutoFollowers.put(remoteCluster, autoFollower); + autoFollower.autoFollowIndices(); + } + + List removedRemoteClusters = new ArrayList<>(); + for (String remoteCluster : autoFollowers.keySet()) { + boolean exist = autoFollowMetadata.getPatterns().values().stream() + .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); + if (exist == false) { + removedRemoteClusters.add(remoteCluster); } - - @Override - void createAndFollow(Map headers, - PutFollowAction.Request request, - Runnable successHandler, - Consumer failureHandler) { - Client followerClient = CcrLicenseChecker.wrapClient(client, headers); - followerClient.execute( - PutFollowAction.INSTANCE, - request, - ActionListener.wrap(r -> successHandler.run(), failureHandler) - ); - } - - @Override - void updateAutoFollowMetadata(Function updateFunction, - Consumer handler) { - clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updateFunction.apply(currentState); - } - - @Override - public void onFailure(String source, Exception e) { - handler.accept(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - handler.accept(null); - } - }); - } - - }; - operation.autoFollowIndices(); + } + this.autoFollowers = autoFollowers + .copyAndPutAll(newAutoFollowers) + .copyAndRemoveAll(removedRemoteClusters); } @Override - public void applyClusterState(ClusterChangedEvent event) { - final boolean beforeLocalMasterNode = localNodeMaster; - localNodeMaster = event.localNodeMaster(); - if (beforeLocalMasterNode == false && localNodeMaster) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + updateAutoFollowers(event.state()); } } + /** + * Each auto follower independently monitors a remote cluster for new leader indices that should be auto followed. + * The reason that this should happen independently, is that when auto followers start to make use of cluster state + * API's 'wait_for_metadata_version' feature, it may take sometime before a remote cluster responds with a new + * cluster state or times out. Other auto follow patterns for different remote clusters are then forced to wait, + * which can cause new follower indices to unnecessarily start with a large backlog of operations that need to be + * replicated. + */ abstract static class AutoFollower { - private final Consumer> handler; - private final ClusterState followerClusterState; - private final AutoFollowMetadata autoFollowMetadata; + private final String remoteCluster; + private final ThreadPool threadPool; + private final Consumer> statsUpdater; + private final Supplier followerClusterStateSupplier; - private final CountDown autoFollowPatternsCountDown; - private final AtomicArray autoFollowResults; + private volatile CountDown autoFollowPatternsCountDown; + private volatile AtomicArray autoFollowResults; - AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { - this.handler = handler; - this.followerClusterState = followerClusterState; - this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); - this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); - this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); + AutoFollower(final String remoteCluster, + final ThreadPool threadPool, + final Consumer> statsUpdater, + final Supplier followerClusterStateSupplier) { + this.remoteCluster = remoteCluster; + this.threadPool = threadPool; + this.statsUpdater = statsUpdater; + this.followerClusterStateSupplier = followerClusterStateSupplier; } void autoFollowIndices() { - int i = 0; - for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - final int slot = i; - final String autoFollowPattenName = entry.getKey(); - final AutoFollowPattern autoFollowPattern = entry.getValue(); - final String remoteCluster = autoFollowPattern.getRemoteCluster(); + final ClusterState followerClusterState = followerClusterStateSupplier.get(); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); + return; + } - Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { - assert e == null; - final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern, - leaderClusterState, followerClusterState, followedIndices); + final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (patterns.isEmpty()) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); + return; + } + + this.autoFollowPatternsCountDown = new CountDown(patterns.size()); + this.autoFollowResults = new AtomicArray<>(patterns.size()); + + getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { + if (leaderClusterState != null) { + assert e == null; + + int i = 0; + for (String autoFollowPatternName : patterns) { + final int slot = i; + AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); + Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); + + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, + followerClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { - finalise(slot, new AutoFollowResult(autoFollowPattenName)); + finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { List> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns() .entrySet().stream() - .filter(item -> autoFollowPattenName.equals(item.getKey()) == false) + .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) .map(item -> new Tuple<>(item.getKey(), item.getValue())) .collect(Collectors.toList()); Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, + checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, patternsForTheSameLeaderCluster, resultHandler); } - } else { - finalise(slot, new AutoFollowResult(autoFollowPattenName, e)); + i++; } - }); - i++; - } + } else { + List results = new ArrayList<>(patterns.size()); + for (String autoFollowPatternName : patterns) { + results.add(new AutoFollowResult(autoFollowPatternName, e)); + } + statsUpdater.accept(results); + } + }); } private void checkAutoFollowPattern(String autoFollowPattenName, @@ -357,12 +391,13 @@ public class AutoFollowCoordinator implements ClusterStateApplier { assert autoFollowResults.get(slot) == null; autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowResults.asList()); + statsUpdater.accept(autoFollowResults.asList()); + // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: + threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices); } } - static List getLeaderIndicesToFollow(String remoteCluster, - AutoFollowPattern autoFollowPattern, + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState leaderClusterState, ClusterState followerClusterState, List followedIndexUUIDs) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 967c3a7e8c7..928584316c5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -140,60 +140,64 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase { } public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception { - // Update the cluster state so that we have auto follow patterns and verify that we log a warning in case of incompatible license: - CountDownLatch latch = new CountDownLatch(1); - ClusterService clusterService = getInstanceFromNode(ClusterService.class); - clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( - Collections.singletonMap("test_alias", autoFollowPattern), - Collections.emptyMap(), - Collections.emptyMap()); - - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) - .build()); - return newState.build(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Exception e) { - latch.countDown(); - fail("unexpected error [" + e.getMessage() + "]"); - } - }); - latch.await(); - final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class); final MockLogAppender appender = new MockLogAppender(); appender.start(); appender.addExpectation( - new MockLogAppender.ExceptionSeenEventExpectation( - getTestName(), - logger.getName(), - Level.WARN, - "skipping auto-follower coordination", - ElasticsearchSecurityException.class, - "current license is non-compliant for [ccr]")); - Loggers.addAppender(logger, appender); + new MockLogAppender.ExceptionSeenEventExpectation( + getTestName(), + logger.getName(), + Level.WARN, + "skipping auto-follower coordination", + ElasticsearchSecurityException.class, + "current license is non-compliant for [ccr]")); + try { - assertBusy(appender::assertAllExpectationsMatched); + // Need to add mock log appender before submitting CS update, otherwise we miss the expected log: + // (Auto followers for new remote clusters are bootstrapped when a new cluster state is published) + Loggers.addAppender(logger, appender); + // Update the cluster state so that we have auto follow patterns and verify that we log a warning + // in case of incompatible license: + CountDownLatch latch = new CountDownLatch(1); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( + Collections.singletonMap("test_alias", autoFollowPattern), + Collections.emptyMap(), + Collections.emptyMap()); + + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata) + .build()); + return newState.build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + latch.countDown(); + } + + @Override + public void onFailure(String source, Exception e) { + latch.countDown(); + fail("unexpected error [" + e.getMessage() + "]"); + } + }); + latch.await(); + appender.assertAllExpectationsMatched(); } finally { Loggers.removeAppender(logger, appender); appender.stop(); } } + private void assertNonCompliantLicense(final Exception e) { assertThat(e, instanceOf(ElasticsearchSecurityException.class)); assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]")); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2b7fee13502..7b296524676 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -34,11 +35,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; @@ -46,7 +49,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -54,6 +59,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -83,7 +89,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower(handler, currentState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -119,6 +125,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -142,7 +149,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -169,6 +176,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerUpdateClusterStateFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -196,7 +204,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -225,6 +233,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -252,7 +261,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -324,7 +333,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(routingTableBuilder.build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(3)); @@ -333,7 +342,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(2).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -365,7 +374,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -379,7 +388,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .build(); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, Collections.emptyList()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); assertThat(result.size(), equalTo(2)); result.sort(Comparator.comparing(Index::getName)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -429,7 +438,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - Settings.EMPTY, null, null, mock(ClusterService.class), @@ -503,4 +511,32 @@ public class AutoFollowCoordinatorTests extends ESTestCase { return csBuilder.build(); } + private static Supplier followerClusterStateSupplier(ClusterState... states) { + final AutoFollowMetadata emptyAutoFollowMetadata = + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, emptyAutoFollowMetadata)) + .build(); + final LinkedList queue = new LinkedList<>(Arrays.asList(states)); + return () -> { + final ClusterState current = queue.poll(); + if (current != null) { + return current; + } else { + return lastState; + } + }; + } + + private static ThreadPool mockThreadPool() { + ThreadPool threadPool = mock(ThreadPool.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[2]; + task.run(); + return null; + }).when(threadPool).schedule(any(), anyString(), any()); + return threadPool; + } + }