diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 0de5dd3c1a4..8d677866e32 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -162,7 +162,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ccrLicenseChecker, restoreSourceService, new CcrRepositoryManager(settings, clusterService, client), - new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) + new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 544a45792e0..d7495dec8c2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -28,6 +29,12 @@ public final class CcrSettings { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); + /** + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + */ + public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + /** * The settings defined by CCR. * @@ -36,7 +43,8 @@ public final class CcrSettings { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING); + CCR_FOLLOWING_INDEX_SETTING, + CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 9043b379b5b..5cc5920cd21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -27,12 +27,14 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -73,6 +75,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final CcrLicenseChecker ccrLicenseChecker; private final LongSupplier relativeMillisTimeProvider; + private volatile TimeValue waitForMetadataTimeOut; private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: @@ -82,6 +85,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( + Settings settings, Client client, ClusterService clusterService, CcrLicenseChecker ccrLicenseChecker, @@ -98,6 +102,15 @@ public class AutoFollowCoordinator implements ClusterStateListener { return size() > MAX_AUTO_FOLLOW_ERRORS; } }; + + Consumer updater = newWaitForTimeOut -> { + if (newWaitForTimeOut.equals(waitForMetadataTimeOut) == false) { + LOGGER.info("changing wait_for_metadata_timeout from [{}] to [{}]", waitForMetadataTimeOut, newWaitForTimeOut); + waitForMetadataTimeOut = newWaitForTimeOut; + } + }; + clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater); + waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings); } public synchronized AutoFollowStats getStats() { @@ -181,6 +194,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { request.metaData(true); request.routingTable(true); request.waitForMetaDataVersion(metadataVersion); + request.waitForTimeout(waitForMetadataTimeOut); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 1d705934cce..01e51ea94f2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -65,6 +65,7 @@ import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.core.XPackSettings; @@ -199,6 +200,8 @@ public abstract class CcrIntegTestCase extends ESTestCase { builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + // Let cluster state api return quickly in order to speed up auto follow tests: + builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); if (leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 2b1c8a8ef2e..2fb1f868dd7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -15,6 +15,7 @@ import org.elasticsearch.license.LicensesMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.LocalStateCcr; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -42,6 +43,8 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + // Let cluster state api return quickly in order to speed up auto follow tests: + builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2e684e2a486..3acdde52a44 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -19,11 +19,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -533,8 +535,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); @@ -589,7 +592,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { } public void testUpdateAutoFollowers() { - ClusterService clusterService = mock(ClusterService.class); + ClusterService clusterService = mockClusterService(); // Return a cluster state with no patterns so that the auto followers never really execute: ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -597,6 +600,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); when(clusterService.state()).thenReturn(followerState); AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, clusterService, new CcrLicenseChecker(() -> true, () -> false), @@ -651,8 +655,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testUpdateAutoFollowersNoPatterns() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) @@ -665,8 +670,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testUpdateAutoFollowersNoAutoFollowMetadata() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, null, - mock(ClusterService.class), + mockClusterService(), new CcrLicenseChecker(() -> true, () -> false), () -> 1L); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build(); @@ -918,4 +924,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; } + private ClusterService mockClusterService() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = + new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return clusterService; + } + }