mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
[CCR] Add ccr.auto_follow_coordinator.wait_for_timeout
setting (#36714)
This setting controls the wait for timeout the autofollow coordinator should use when setting cluster state requests to a remote cluster.
This commit is contained in:
parent
7cbf03c001
commit
4ded4717fe
@ -162,7 +162,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||||||
ccrLicenseChecker,
|
ccrLicenseChecker,
|
||||||
restoreSourceService,
|
restoreSourceService,
|
||||||
new CcrRepositoryManager(settings, clusterService, client),
|
new CcrRepositoryManager(settings, clusterService, client),
|
||||||
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
|
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr;
|
|||||||
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.xpack.core.XPackSettings;
|
import org.elasticsearch.xpack.core.XPackSettings;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -28,6 +29,12 @@ public final class CcrSettings {
|
|||||||
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
|
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
|
||||||
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);
|
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
|
||||||
|
*/
|
||||||
|
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
|
||||||
|
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The settings defined by CCR.
|
* The settings defined by CCR.
|
||||||
*
|
*
|
||||||
@ -36,7 +43,8 @@ public final class CcrSettings {
|
|||||||
static List<Setting<?>> getSettings() {
|
static List<Setting<?>> getSettings() {
|
||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
XPackSettings.CCR_ENABLED_SETTING,
|
XPackSettings.CCR_ENABLED_SETTING,
|
||||||
CCR_FOLLOWING_INDEX_SETTING);
|
CCR_FOLLOWING_INDEX_SETTING,
|
||||||
|
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -27,12 +27,14 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||||||
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
|
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.license.LicenseUtils;
|
import org.elasticsearch.license.LicenseUtils;
|
||||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||||
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||||
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
||||||
@ -73,6 +75,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||||||
private final CcrLicenseChecker ccrLicenseChecker;
|
private final CcrLicenseChecker ccrLicenseChecker;
|
||||||
private final LongSupplier relativeMillisTimeProvider;
|
private final LongSupplier relativeMillisTimeProvider;
|
||||||
|
|
||||||
|
private volatile TimeValue waitForMetadataTimeOut;
|
||||||
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
|
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
|
||||||
|
|
||||||
// The following fields are read and updated under a lock:
|
// The following fields are read and updated under a lock:
|
||||||
@ -82,6 +85,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||||||
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
|
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
|
||||||
|
|
||||||
public AutoFollowCoordinator(
|
public AutoFollowCoordinator(
|
||||||
|
Settings settings,
|
||||||
Client client,
|
Client client,
|
||||||
ClusterService clusterService,
|
ClusterService clusterService,
|
||||||
CcrLicenseChecker ccrLicenseChecker,
|
CcrLicenseChecker ccrLicenseChecker,
|
||||||
@ -98,6 +102,15 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||||||
return size() > MAX_AUTO_FOLLOW_ERRORS;
|
return size() > MAX_AUTO_FOLLOW_ERRORS;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Consumer<TimeValue> updater = newWaitForTimeOut -> {
|
||||||
|
if (newWaitForTimeOut.equals(waitForMetadataTimeOut) == false) {
|
||||||
|
LOGGER.info("changing wait_for_metadata_timeout from [{}] to [{}]", waitForMetadataTimeOut, newWaitForTimeOut);
|
||||||
|
waitForMetadataTimeOut = newWaitForTimeOut;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater);
|
||||||
|
waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized AutoFollowStats getStats() {
|
public synchronized AutoFollowStats getStats() {
|
||||||
@ -181,6 +194,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||||||
request.metaData(true);
|
request.metaData(true);
|
||||||
request.routingTable(true);
|
request.routingTable(true);
|
||||||
request.waitForMetaDataVersion(metadataVersion);
|
request.waitForMetaDataVersion(metadataVersion);
|
||||||
|
request.waitForTimeout(waitForMetadataTimeOut);
|
||||||
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
|
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
|
||||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
||||||
client,
|
client,
|
||||||
|
@ -65,6 +65,7 @@ import org.elasticsearch.test.NodeConfigurationSource;
|
|||||||
import org.elasticsearch.test.TestCluster;
|
import org.elasticsearch.test.TestCluster;
|
||||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
||||||
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
|
||||||
import org.elasticsearch.xpack.core.XPackSettings;
|
import org.elasticsearch.xpack.core.XPackSettings;
|
||||||
@ -199,6 +200,8 @@ public abstract class CcrIntegTestCase extends ESTestCase {
|
|||||||
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||||
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
|
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
|
||||||
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
|
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
|
||||||
|
// Let cluster state api return quickly in order to speed up auto follow tests:
|
||||||
|
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
|
||||||
if (leaderSeedAddress != null) {
|
if (leaderSeedAddress != null) {
|
||||||
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
|
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@ import org.elasticsearch.license.LicensesMetaData;
|
|||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
import org.elasticsearch.xpack.ccr.LocalStateCcr;
|
||||||
import org.elasticsearch.xpack.core.XPackSettings;
|
import org.elasticsearch.xpack.core.XPackSettings;
|
||||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||||
@ -42,6 +43,8 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
|
|||||||
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
|
||||||
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
|
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
|
||||||
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
|
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
|
||||||
|
// Let cluster state api return quickly in order to speed up auto follow tests:
|
||||||
|
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,11 +19,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||||
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
||||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
||||||
@ -533,8 +535,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
|
|
||||||
public void testStats() {
|
public void testStats() {
|
||||||
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
||||||
|
Settings.EMPTY,
|
||||||
null,
|
null,
|
||||||
mock(ClusterService.class),
|
mockClusterService(),
|
||||||
new CcrLicenseChecker(() -> true, () -> false),
|
new CcrLicenseChecker(() -> true, () -> false),
|
||||||
() -> 1L);
|
() -> 1L);
|
||||||
|
|
||||||
@ -589,7 +592,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateAutoFollowers() {
|
public void testUpdateAutoFollowers() {
|
||||||
ClusterService clusterService = mock(ClusterService.class);
|
ClusterService clusterService = mockClusterService();
|
||||||
// Return a cluster state with no patterns so that the auto followers never really execute:
|
// Return a cluster state with no patterns so that the auto followers never really execute:
|
||||||
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||||
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||||
@ -597,6 +600,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
.build();
|
.build();
|
||||||
when(clusterService.state()).thenReturn(followerState);
|
when(clusterService.state()).thenReturn(followerState);
|
||||||
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
||||||
|
Settings.EMPTY,
|
||||||
null,
|
null,
|
||||||
clusterService,
|
clusterService,
|
||||||
new CcrLicenseChecker(() -> true, () -> false),
|
new CcrLicenseChecker(() -> true, () -> false),
|
||||||
@ -651,8 +655,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
|
|
||||||
public void testUpdateAutoFollowersNoPatterns() {
|
public void testUpdateAutoFollowersNoPatterns() {
|
||||||
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
||||||
|
Settings.EMPTY,
|
||||||
null,
|
null,
|
||||||
mock(ClusterService.class),
|
mockClusterService(),
|
||||||
new CcrLicenseChecker(() -> true, () -> false),
|
new CcrLicenseChecker(() -> true, () -> false),
|
||||||
() -> 1L);
|
() -> 1L);
|
||||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
||||||
@ -665,8 +670,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
|
|
||||||
public void testUpdateAutoFollowersNoAutoFollowMetadata() {
|
public void testUpdateAutoFollowersNoAutoFollowMetadata() {
|
||||||
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
|
||||||
|
Settings.EMPTY,
|
||||||
null,
|
null,
|
||||||
mock(ClusterService.class),
|
mockClusterService(),
|
||||||
new CcrLicenseChecker(() -> true, () -> false),
|
new CcrLicenseChecker(() -> true, () -> false),
|
||||||
() -> 1L);
|
() -> 1L);
|
||||||
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
|
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
|
||||||
@ -918,4 +924,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ClusterService mockClusterService() {
|
||||||
|
ClusterService clusterService = mock(ClusterService.class);
|
||||||
|
ClusterSettings clusterSettings =
|
||||||
|
new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT));
|
||||||
|
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||||
|
return clusterService;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user