diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 07e221a7eac..49c4eed4568 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -129,7 +129,6 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster .getInstance(ClusterInfoService.class, internalTestCluster.getMasterName()); infoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); - infoService.onMaster(); ClusterInfo info = infoService.refresh(); assertNotNull("info should not be null", info); ImmutableOpenMap leastUsages = info.getNodeLeastAvailableDiskUsages(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 5810db59b15..7b663c1e713 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -89,7 +89,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase { final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); - clusterInfoService.onMaster(); // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L); @@ -151,7 +150,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase { final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); - clusterInfoService.onMaster(); // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L); diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index b56ba6dc3d1..5ac3a016904 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -31,7 +32,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -43,6 +44,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.monitor.fs.FsInfo; @@ -55,6 +57,7 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; /** @@ -68,10 +71,12 @@ import java.util.function.Consumer; * Every time the timer runs, gathers information about the disk usage and * shard sizes across the cluster. */ -public class InternalClusterInfoService implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener { +public class InternalClusterInfoService implements ClusterInfoService, ClusterStateListener { private static final Logger logger = LogManager.getLogger(InternalClusterInfoService.class); + private static final String REFRESH_EXECUTOR = ThreadPool.Names.MANAGEMENT; + public static final Setting INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING = Setting.timeSetting("cluster.info.update.interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10), Property.Dynamic, Property.NodeScope); @@ -84,19 +89,18 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private volatile ImmutableOpenMap leastAvailableSpaceUsages; private volatile ImmutableOpenMap mostAvailableSpaceUsages; private volatile IndicesStatsSummary indicesStatsSummary; - private volatile boolean isMaster = false; + // null if this node is not currently the master + private final AtomicReference refreshAndRescheduleRunnable = new AtomicReference<>(); private volatile boolean enabled; private volatile TimeValue fetchTimeout; - private final ClusterService clusterService; private final ThreadPool threadPool; - private final NodeClient client; + private final Client client; private final List> listeners = new CopyOnWriteArrayList<>(); - public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { + public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) { this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); this.mostAvailableSpaceUsages = ImmutableOpenMap.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; - this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); @@ -108,10 +112,8 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); - // Add InternalClusterInfoService to listen for Master changes - this.clusterService.addLocalNodeMasterListener(this); - // Add to listen for state changes (when nodes are added) - this.clusterService.addListener(this); + // listen for state changes (this node starts/stops being the elected master, or new nodes are added) + clusterService.addListener(this); } private void setEnabled(boolean enabled) { @@ -126,79 +128,57 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode this.updateFrequency = updateFrequency; } - @Override - public void onMaster() { - this.isMaster = true; - if (logger.isTraceEnabled()) { - logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); - } - - // Submit a job that will reschedule itself after running - threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); - - try { - if (clusterService.state().getNodes().getDataNodes().size() > 1) { - // Submit an info update job to be run immediately - threadPool.executor(executorName()).execute(this::maybeRefresh); - } - } catch (EsRejectedExecutionException ex) { - logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); - } - } - - @Override - public void offMaster() { - this.isMaster = false; - } - - @Override - public String executorName() { - return ThreadPool.Names.MANAGEMENT; - } - @Override public void clusterChanged(ClusterChangedEvent event) { - if (!this.enabled) { + if (event.localNodeMaster() && refreshAndRescheduleRunnable.get() == null) { + logger.trace("elected as master, scheduling cluster info update tasks"); + executeRefresh(event.state(), "became master"); + + final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable(); + refreshAndRescheduleRunnable.set(newRunnable); + threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable); + } else if (event.localNodeMaster() == false) { + refreshAndRescheduleRunnable.set(null); return; } - // Check whether it was a data node that was added - boolean dataNodeAdded = false; + if (enabled == false) { + return; + } + + // Refresh if a data node was added for (DiscoveryNode addedNode : event.nodesDelta().addedNodes()) { if (addedNode.isDataNode()) { - dataNodeAdded = true; + executeRefresh(event.state(), "data node added"); break; } } - if (this.isMaster && dataNodeAdded && event.state().getNodes().getDataNodes().size() > 1) { - if (logger.isDebugEnabled()) { - logger.debug("data node was added, retrieving new cluster info"); - } - threadPool.executor(executorName()).execute(this::maybeRefresh); - } - - if (this.isMaster && event.nodesRemoved()) { - for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { - if (removedNode.isDataNode()) { - if (logger.isTraceEnabled()) { - logger.trace("Removing node from cluster info: {}", removedNode.getId()); - } - if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) { - ImmutableOpenMap.Builder newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages); - newMaxUsages.remove(removedNode.getId()); - leastAvailableSpaceUsages = newMaxUsages.build(); - } - if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) { - ImmutableOpenMap.Builder newMinUsages = ImmutableOpenMap.builder(mostAvailableSpaceUsages); - newMinUsages.remove(removedNode.getId()); - mostAvailableSpaceUsages = newMinUsages.build(); - } + // Clean up info for any removed nodes + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + if (removedNode.isDataNode()) { + logger.trace("Removing node from cluster info: {}", removedNode.getId()); + if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) { + ImmutableOpenMap.Builder newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages); + newMaxUsages.remove(removedNode.getId()); + leastAvailableSpaceUsages = newMaxUsages.build(); + } + if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) { + ImmutableOpenMap.Builder newMinUsages = ImmutableOpenMap.builder(mostAvailableSpaceUsages); + newMinUsages.remove(removedNode.getId()); + mostAvailableSpaceUsages = newMinUsages.build(); } } } } + private void executeRefresh(ClusterState clusterState, String reason) { + if (clusterState.nodes().getDataNodes().size() > 1) { + logger.trace("refreshing cluster info in background [{}]", reason); + threadPool.executor(REFRESH_EXECUTOR).execute(new RefreshRunnable(reason)); + } + } + @Override public ClusterInfo getClusterInfo() { final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read @@ -206,39 +186,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace); } - /** - * Class used to submit {@link #maybeRefresh()} on the - * {@link InternalClusterInfoService} threadpool, these jobs will - * reschedule themselves by placing a new instance of this class onto the - * scheduled threadpool. - */ - public class SubmitReschedulingClusterInfoUpdatedJob implements Runnable { - @Override - public void run() { - if (logger.isTraceEnabled()) { - logger.trace("Submitting new rescheduling cluster info update job"); - } - try { - threadPool.executor(executorName()).execute(() -> { - try { - maybeRefresh(); - } finally { //schedule again after we refreshed - if (isMaster) { - if (logger.isTraceEnabled()) { - logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); - } - threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this); - } - } - }); - } catch (EsRejectedExecutionException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex); - } - } - } - } - /** * Retrieve the latest nodes stats, calling the listener when complete * @return a latch that can be used to wait for the nodes stats to complete if desired @@ -268,17 +215,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode return latch; } - private void maybeRefresh() { - // Short-circuit if not enabled - if (enabled) { - refresh(); - } else { - if (logger.isTraceEnabled()) { - logger.trace("Skipping ClusterInfoUpdatedJob since it is disabled"); - } - } - } - // allow tests to adjust the node stats on receipt List adjustNodesStats(List nodeStats) { return nodeStats; @@ -288,9 +224,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode * Refreshes the ClusterInfo in a blocking fashion */ public final ClusterInfo refresh() { - if (logger.isTraceEnabled()) { - logger.trace("Performing ClusterInfoUpdateJob"); - } + logger.trace("refreshing cluster info"); final CountDownLatch nodeLatch = updateNodeStats(new ActionListener() { @Override public void onResponse(NodesStatsResponse nodesStatsResponse) { @@ -483,4 +417,63 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode } } + /** + * Runs {@link InternalClusterInfoService#refresh()}, logging failures/rejections appropriately. + */ + private class RefreshRunnable extends AbstractRunnable { + private final String reason; + + RefreshRunnable(String reason) { + this.reason = reason; + } + + @Override + protected void doRun() { + if (enabled) { + logger.trace("refreshing cluster info [{}]", reason); + refresh(); + } else { + logger.trace("skipping cluster info refresh [{}] since it is disabled", reason); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(new ParameterizedMessage("refreshing cluster info failed [{}]", reason), e); + } + + + @Override + public void onRejection(Exception e) { + final boolean shutDown = e instanceof EsRejectedExecutionException && ((EsRejectedExecutionException) e).isExecutorShutdown(); + logger.log(shutDown ? Level.DEBUG : Level.WARN, "refreshing cluster info rejected [{}]", reason, e); + } + } + + + /** + * Runs {@link InternalClusterInfoService#refresh()}, logging failures/rejections appropriately, and reschedules itself on completion. + */ + private class RefreshAndRescheduleRunnable extends RefreshRunnable { + RefreshAndRescheduleRunnable() { + super("scheduled"); + } + + @Override + protected void doRun() { + if (this == refreshAndRescheduleRunnable.get()) { + super.doRun(); + } else { + logger.trace("master changed, scheduled refresh job is stale"); + } + } + + @Override + public void onAfter() { + if (this == refreshAndRescheduleRunnable.get()) { + logger.trace("scheduling next cluster info refresh in [{}]", updateFrequency); + threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, this); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java new file mode 100644 index 00000000000..6c80d8c01d8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -0,0 +1,187 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor; +import org.elasticsearch.cluster.coordination.NoMasterBlockService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; +import org.elasticsearch.node.Node; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; +import static org.hamcrest.Matchers.equalTo; + +public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { + + public void testScheduling() { + final DiscoveryNode discoveryNode = new DiscoveryNode("test", buildNewFakeTransportAddress(), Version.CURRENT); + final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build(); + final DiscoveryNodes localMaster = DiscoveryNodes.builder(noMaster).masterNodeId(discoveryNode.getId()).build(); + + final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()); + if (randomBoolean()) { + settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); + } + final Settings settings = settingsBuilder.build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); + + final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) { + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool); + } + }; + + final MasterService masterService = new FakeThreadPoolMasterService("test", "masterService", threadPool, r -> { + fail("master service should not run any tasks"); + }); + + final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); + + final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); + final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client); + clusterInfoService.addListener(ignored -> {}); + + clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService()); + clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build()); + masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish")); + masterService.setClusterStateSupplier(clusterApplierService::state); + clusterService.start(); + + final AtomicBoolean becameMaster1 = new AtomicBoolean(); + clusterApplierService.onNewClusterState("become master 1", + () -> ClusterState.builder(new ClusterName("cluster")).nodes(localMaster).build(), setFlagOnSuccess(becameMaster1)); + runUntilFlag(deterministicTaskQueue, becameMaster1); + + final AtomicBoolean failMaster1 = new AtomicBoolean(); + clusterApplierService.onNewClusterState("fail master 1", + () -> ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build(), setFlagOnSuccess(failMaster1)); + runUntilFlag(deterministicTaskQueue, failMaster1); + + final AtomicBoolean becameMaster2 = new AtomicBoolean(); + clusterApplierService.onNewClusterState("become master 2", + () -> ClusterState.builder(new ClusterName("cluster")).nodes(localMaster).build(), setFlagOnSuccess(becameMaster2)); + runUntilFlag(deterministicTaskQueue, becameMaster2); + + for (int i = 0; i < 3; i++) { + final int initialRequestCount = client.requestCount; + final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); + runFor(deterministicTaskQueue, duration); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval + } + + final AtomicBoolean failMaster2 = new AtomicBoolean(); + clusterApplierService.onNewClusterState("fail master 2", + () -> ClusterState.builder(new ClusterName("cluster")).nodes(noMaster).build(), setFlagOnSuccess(failMaster2)); + runUntilFlag(deterministicTaskQueue, failMaster2); + + runFor(deterministicTaskQueue, INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis()); + deterministicTaskQueue.runAllRunnableTasks(); + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + } + + private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime + && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } + + private static void runUntilFlag(DeterministicTaskQueue deterministicTaskQueue, AtomicBoolean flag) { + while (flag.get() == false) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } + + private static ClusterApplier.ClusterApplyListener setFlagOnSuccess(AtomicBoolean flag) { + return new ClusterApplier.ClusterApplyListener() { + + @Override + public void onSuccess(String source) { + assertTrue(flag.compareAndSet(false, true)); + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError("unexpected", e); + } + }; + } + + private static class FakeClusterInfoServiceClient extends NoOpClient { + + int requestCount; + + FakeClusterInfoServiceClient(ThreadPool threadPool) { + super(threadPool); + } + + @Override + protected void doExecute(ActionType action, + Request request, + ActionListener listener) { + if (request instanceof NodesStatsRequest || request instanceof IndicesStatsRequest) { + requestCount++; + // ClusterInfoService handles ClusterBlockExceptions quietly, so we invent such an exception to avoid excess logging + listener.onFailure(new ClusterBlockException( + org.elasticsearch.common.collect.Set.of(NoMasterBlockService.NO_MASTER_BLOCK_ALL))); + } else { + fail("unexpected action: " + action.name()); + } + } + } + +}