mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Fix scheduling of ClusterInfoService#refresh (#59880)
Today the `InternalClusterInfoService` uses the `LocalNodeMasterListener` interface to start/stop its operations. Since the `onMaster` and `offMaster` methods are called on the `MANAGEMENT` threadpool, there's no guarantee that they run in the correct sequence, which could result in an elected master failing to regularly update the cluster info. Since this service is also a `ClusterStateListener` we may as well drop the usage of the `LocalNodeMasterListener` interface and simply update the status of the local node on the applier thread in `clusterChanged` to ensure consistency. Additionally, today the `InternalClusterInfoService` uses a simple flag to track whether the local node is the elected master or not. If the node stops being the master and then starts again within a few seconds then the scheduled updates from the old mastership might carry on running in addition to the ones for the new mastership. This commit addresses that by tracking the identity of the scheduled update job and creating a new job for each mastership.
This commit is contained in:
parent
fb40ccf8a4
commit
dde568caf7
@ -129,7 +129,6 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
|
||||
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster
|
||||
.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName());
|
||||
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
|
||||
infoService.onMaster();
|
||||
ClusterInfo info = infoService.refresh();
|
||||
assertNotNull("info should not be null", info);
|
||||
ImmutableOpenMap<String, DiskUsage> leastUsages = info.getNodeLeastAvailableDiskUsages();
|
||||
|
@ -89,7 +89,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
||||
|
||||
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
|
||||
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
|
||||
clusterInfoService.onMaster();
|
||||
|
||||
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);
|
||||
@ -151,7 +150,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
|
||||
|
||||
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
|
||||
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
|
||||
clusterInfoService.onMaster();
|
||||
|
||||
// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
|
||||
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
@ -31,7 +32,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.store.StoreStats;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
@ -55,6 +57,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
@ -68,10 +71,12 @@ import java.util.function.Consumer;
|
||||
* Every time the timer runs, gathers information about the disk usage and
|
||||
* shard sizes across the cluster.
|
||||
*/
|
||||
public class InternalClusterInfoService implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
|
||||
public class InternalClusterInfoService implements ClusterInfoService, ClusterStateListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(InternalClusterInfoService.class);
|
||||
|
||||
private static final String REFRESH_EXECUTOR = ThreadPool.Names.MANAGEMENT;
|
||||
|
||||
public static final Setting<TimeValue> INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING =
|
||||
Setting.timeSetting("cluster.info.update.interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10),
|
||||
Property.Dynamic, Property.NodeScope);
|
||||
@ -84,19 +89,18 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
|
||||
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
|
||||
private volatile IndicesStatsSummary indicesStatsSummary;
|
||||
private volatile boolean isMaster = false;
|
||||
// null if this node is not currently the master
|
||||
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
|
||||
private volatile boolean enabled;
|
||||
private volatile TimeValue fetchTimeout;
|
||||
private final ClusterService clusterService;
|
||||
private final ThreadPool threadPool;
|
||||
private final NodeClient client;
|
||||
private final Client client;
|
||||
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
||||
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
|
||||
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
|
||||
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.client = client;
|
||||
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
|
||||
@ -108,10 +112,8 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
|
||||
this::setEnabled);
|
||||
|
||||
// Add InternalClusterInfoService to listen for Master changes
|
||||
this.clusterService.addLocalNodeMasterListener(this);
|
||||
// Add to listen for state changes (when nodes are added)
|
||||
this.clusterService.addListener(this);
|
||||
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
|
||||
clusterService.addListener(this);
|
||||
}
|
||||
|
||||
private void setEnabled(boolean enabled) {
|
||||
@ -126,79 +128,57 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
this.updateFrequency = updateFrequency;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMaster() {
|
||||
this.isMaster = true;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
|
||||
}
|
||||
|
||||
// Submit a job that will reschedule itself after running
|
||||
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
|
||||
|
||||
try {
|
||||
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
|
||||
// Submit an info update job to be run immediately
|
||||
threadPool.executor(executorName()).execute(this::maybeRefresh);
|
||||
}
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void offMaster() {
|
||||
this.isMaster = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executorName() {
|
||||
return ThreadPool.Names.MANAGEMENT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (!this.enabled) {
|
||||
if (event.localNodeMaster() && refreshAndRescheduleRunnable.get() == null) {
|
||||
logger.trace("elected as master, scheduling cluster info update tasks");
|
||||
executeRefresh(event.state(), "became master");
|
||||
|
||||
final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable();
|
||||
refreshAndRescheduleRunnable.set(newRunnable);
|
||||
threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable);
|
||||
} else if (event.localNodeMaster() == false) {
|
||||
refreshAndRescheduleRunnable.set(null);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check whether it was a data node that was added
|
||||
boolean dataNodeAdded = false;
|
||||
if (enabled == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Refresh if a data node was added
|
||||
for (DiscoveryNode addedNode : event.nodesDelta().addedNodes()) {
|
||||
if (addedNode.isDataNode()) {
|
||||
dataNodeAdded = true;
|
||||
executeRefresh(event.state(), "data node added");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.isMaster && dataNodeAdded && event.state().getNodes().getDataNodes().size() > 1) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("data node was added, retrieving new cluster info");
|
||||
}
|
||||
threadPool.executor(executorName()).execute(this::maybeRefresh);
|
||||
}
|
||||
|
||||
if (this.isMaster && event.nodesRemoved()) {
|
||||
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
|
||||
if (removedNode.isDataNode()) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Removing node from cluster info: {}", removedNode.getId());
|
||||
}
|
||||
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages);
|
||||
newMaxUsages.remove(removedNode.getId());
|
||||
leastAvailableSpaceUsages = newMaxUsages.build();
|
||||
}
|
||||
if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> newMinUsages = ImmutableOpenMap.builder(mostAvailableSpaceUsages);
|
||||
newMinUsages.remove(removedNode.getId());
|
||||
mostAvailableSpaceUsages = newMinUsages.build();
|
||||
}
|
||||
// Clean up info for any removed nodes
|
||||
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
|
||||
if (removedNode.isDataNode()) {
|
||||
logger.trace("Removing node from cluster info: {}", removedNode.getId());
|
||||
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages);
|
||||
newMaxUsages.remove(removedNode.getId());
|
||||
leastAvailableSpaceUsages = newMaxUsages.build();
|
||||
}
|
||||
if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) {
|
||||
ImmutableOpenMap.Builder<String, DiskUsage> newMinUsages = ImmutableOpenMap.builder(mostAvailableSpaceUsages);
|
||||
newMinUsages.remove(removedNode.getId());
|
||||
mostAvailableSpaceUsages = newMinUsages.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void executeRefresh(ClusterState clusterState, String reason) {
|
||||
if (clusterState.nodes().getDataNodes().size() > 1) {
|
||||
logger.trace("refreshing cluster info in background [{}]", reason);
|
||||
threadPool.executor(REFRESH_EXECUTOR).execute(new RefreshRunnable(reason));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterInfo getClusterInfo() {
|
||||
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
|
||||
@ -206,39 +186,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace);
|
||||
}
|
||||
|
||||
/**
|
||||
* Class used to submit {@link #maybeRefresh()} on the
|
||||
* {@link InternalClusterInfoService} threadpool, these jobs will
|
||||
* reschedule themselves by placing a new instance of this class onto the
|
||||
* scheduled threadpool.
|
||||
*/
|
||||
public class SubmitReschedulingClusterInfoUpdatedJob implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Submitting new rescheduling cluster info update job");
|
||||
}
|
||||
try {
|
||||
threadPool.executor(executorName()).execute(() -> {
|
||||
try {
|
||||
maybeRefresh();
|
||||
} finally { //schedule again after we refreshed
|
||||
if (isMaster) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
|
||||
}
|
||||
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the latest nodes stats, calling the listener when complete
|
||||
* @return a latch that can be used to wait for the nodes stats to complete if desired
|
||||
@ -268,17 +215,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
return latch;
|
||||
}
|
||||
|
||||
private void maybeRefresh() {
|
||||
// Short-circuit if not enabled
|
||||
if (enabled) {
|
||||
refresh();
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Skipping ClusterInfoUpdatedJob since it is disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// allow tests to adjust the node stats on receipt
|
||||
List<NodeStats> adjustNodesStats(List<NodeStats> nodeStats) {
|
||||
return nodeStats;
|
||||
@ -288,9 +224,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
* Refreshes the ClusterInfo in a blocking fashion
|
||||
*/
|
||||
public final ClusterInfo refresh() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Performing ClusterInfoUpdateJob");
|
||||
}
|
||||
logger.trace("refreshing cluster info");
|
||||
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
|
||||
@Override
|
||||
public void onResponse(NodesStatsResponse nodesStatsResponse) {
|
||||
@ -483,4 +417,63 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs {@link InternalClusterInfoService#refresh()}, logging failures/rejections appropriately.
|
||||
*/
|
||||
private class RefreshRunnable extends AbstractRunnable {
|
||||
private final String reason;
|
||||
|
||||
RefreshRunnable(String reason) {
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
if (enabled) {
|
||||
logger.trace("refreshing cluster info [{}]", reason);
|
||||
refresh();
|
||||
} else {
|
||||
logger.trace("skipping cluster info refresh [{}] since it is disabled", reason);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn(new ParameterizedMessage("refreshing cluster info failed [{}]", reason), e);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onRejection(Exception e) {
|
||||
final boolean shutDown = e instanceof EsRejectedExecutionException && ((EsRejectedExecutionException) e).isExecutorShutdown();
|
||||
logger.log(shutDown ? Level.DEBUG : Level.WARN, "refreshing cluster info rejected [{}]", reason, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Runs {@link InternalClusterInfoService#refresh()}, logging failures/rejections appropriately, and reschedules itself on completion.
|
||||
*/
|
||||
private class RefreshAndRescheduleRunnable extends RefreshRunnable {
|
||||
RefreshAndRescheduleRunnable() {
|
||||
super("scheduled");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
if (this == refreshAndRescheduleRunnable.get()) {
|
||||
super.doRun();
|
||||
} else {
|
||||
logger.trace("master changed, scheduled refresh job is stale");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
if (this == refreshAndRescheduleRunnable.get()) {
|
||||
logger.trace("scheduling next cluster info refresh in [{}]", updateFrequency);
|
||||
threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,187 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
|
||||
import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor;
|
||||
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterApplier;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
|
||||
import org.elasticsearch.cluster.service.MasterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.client.NoOpClient;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
|
||||
|
||||
public void testScheduling() {
|
||||
final DiscoveryNode discoveryNode = new DiscoveryNode("test", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build();
|
||||
final DiscoveryNodes localMaster = DiscoveryNodes.builder(noMaster).masterNodeId(discoveryNode.getId()).build();
|
||||
|
||||
final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName());
|
||||
if (randomBoolean()) {
|
||||
settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms");
|
||||
}
|
||||
final Settings settings = settingsBuilder.build();
|
||||
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
|
||||
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
|
||||
|
||||
final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
|
||||
@Override
|
||||
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
|
||||
return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
|
||||
}
|
||||
};
|
||||
|
||||
final MasterService masterService = new FakeThreadPoolMasterService("test", "masterService", threadPool, r -> {
|
||||
fail("master service should not run any tasks");
|
||||
});
|
||||
|
||||
final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
|
||||
|
||||
final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
|
||||
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
|
||||
clusterInfoService.addListener(ignored -> {});
|
||||
|
||||
clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService());
|
||||
clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build());
|
||||
masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish"));
|
||||
masterService.setClusterStateSupplier(clusterApplierService::state);
|
||||
clusterService.start();
|
||||
|
||||
final AtomicBoolean becameMaster1 = new AtomicBoolean();
|
||||
clusterApplierService.onNewClusterState("become master 1",
|
||||
() -> ClusterState.builder(new ClusterName("cluster")).nodes(localMaster).build(), setFlagOnSuccess(becameMaster1));
|
||||
runUntilFlag(deterministicTaskQueue, becameMaster1);
|
||||
|
||||
final AtomicBoolean failMaster1 = new AtomicBoolean();
|
||||
clusterApplierService.onNewClusterState("fail master 1",
|
||||
() -> ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build(), setFlagOnSuccess(failMaster1));
|
||||
runUntilFlag(deterministicTaskQueue, failMaster1);
|
||||
|
||||
final AtomicBoolean becameMaster2 = new AtomicBoolean();
|
||||
clusterApplierService.onNewClusterState("become master 2",
|
||||
() -> ClusterState.builder(new ClusterName("cluster")).nodes(localMaster).build(), setFlagOnSuccess(becameMaster2));
|
||||
runUntilFlag(deterministicTaskQueue, becameMaster2);
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
final int initialRequestCount = client.requestCount;
|
||||
final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis();
|
||||
runFor(deterministicTaskQueue, duration);
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval
|
||||
}
|
||||
|
||||
final AtomicBoolean failMaster2 = new AtomicBoolean();
|
||||
clusterApplierService.onNewClusterState("fail master 2",
|
||||
() -> ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build(), setFlagOnSuccess(failMaster2));
|
||||
runUntilFlag(deterministicTaskQueue, failMaster2);
|
||||
|
||||
runFor(deterministicTaskQueue, INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis());
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
assertFalse(deterministicTaskQueue.hasRunnableTasks());
|
||||
assertFalse(deterministicTaskQueue.hasDeferredTasks());
|
||||
}
|
||||
|
||||
private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) {
|
||||
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration;
|
||||
while (deterministicTaskQueue.getCurrentTimeMillis() < endTime
|
||||
&& (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) {
|
||||
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
|
||||
deterministicTaskQueue.advanceTime();
|
||||
} else if (deterministicTaskQueue.hasRunnableTasks()) {
|
||||
deterministicTaskQueue.runRandomTask();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void runUntilFlag(DeterministicTaskQueue deterministicTaskQueue, AtomicBoolean flag) {
|
||||
while (flag.get() == false) {
|
||||
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
|
||||
deterministicTaskQueue.advanceTime();
|
||||
} else if (deterministicTaskQueue.hasRunnableTasks()) {
|
||||
deterministicTaskQueue.runRandomTask();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ClusterApplier.ClusterApplyListener setFlagOnSuccess(AtomicBoolean flag) {
|
||||
return new ClusterApplier.ClusterApplyListener() {
|
||||
|
||||
@Override
|
||||
public void onSuccess(String source) {
|
||||
assertTrue(flag.compareAndSet(false, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static class FakeClusterInfoServiceClient extends NoOpClient {
|
||||
|
||||
int requestCount;
|
||||
|
||||
FakeClusterInfoServiceClient(ThreadPool threadPool) {
|
||||
super(threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,
|
||||
Request request,
|
||||
ActionListener<Response> listener) {
|
||||
if (request instanceof NodesStatsRequest || request instanceof IndicesStatsRequest) {
|
||||
requestCount++;
|
||||
// ClusterInfoService handles ClusterBlockExceptions quietly, so we invent such an exception to avoid excess logging
|
||||
listener.onFailure(new ClusterBlockException(
|
||||
org.elasticsearch.common.collect.Set.of(NoMasterBlockService.NO_MASTER_BLOCK_ALL)));
|
||||
} else {
|
||||
fail("unexpected action: " + action.name());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user