Decouple DiskThresholdMonitor & ClusterInfoService (#44105)

Today the `ClusterInfoService` requires the `DiskThresholdMonitor` at
construction time so that it can notify it when nodes report changes in their
disk usage, but this is awkward to construct: the `DiskThresholdMonitor`
requires a `RerouteService` which requires an `AllocationService` which comees
from the `ClusterModule` which requires the `ClusterInfoService`.

Today we break the cycle with a `LazilyInitializedRerouteService` which is
itself a little ugly. This commit replaces this with a more traditional
subject/observer relationship between the `ClusterInfoService` and the
`DiskThresholdMonitor`.
This commit is contained in:
David Turner 2019-07-09 18:43:05 +01:00
parent e70cad4c52
commit aec44fecbc
8 changed files with 84 additions and 128 deletions

View File

@ -19,12 +19,23 @@
package org.elasticsearch.cluster;
import java.util.function.Consumer;
/**
* Interface for a class used to gather information about a cluster at
* regular intervals
* Interface for a class used to gather information about a cluster periodically.
*/
@FunctionalInterface
public interface ClusterInfoService {
/** The latest cluster information */
/**
* @return the latest cluster information
*/
ClusterInfo getClusterInfo();
/**
* Add a listener for new cluster information
*/
default void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
throw new UnsupportedOperationException();
}
}

View File

@ -19,8 +19,10 @@
package org.elasticsearch.cluster;
import java.util.function.Consumer;
/**
* ClusterInfoService that provides empty maps for disk usage and shard sizes
* {@link ClusterInfoService} that provides empty maps for disk usage and shard sizes
*/
public class EmptyClusterInfoService implements ClusterInfoService {
public static final EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService();
@ -29,4 +31,9 @@ public class EmptyClusterInfoService implements ClusterInfoService {
public ClusterInfo getClusterInfo() {
return ClusterInfo.EMPTY;
}
@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
// never updated, so we can discard the listener
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -46,6 +47,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -85,10 +88,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final Consumer<ClusterInfo> listener;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
this.shardRoutingToDataPath = ImmutableOpenMap.of();
@ -109,7 +111,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
this.clusterService.addLocalNodeMasterListener(this);
// Add to listen for state changes (when nodes are added)
this.clusterService.addListener(this);
this.listener = listener;
}
private void setEnabled(boolean enabled) {
@ -356,14 +357,25 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
Thread.currentThread().interrupt(); // restore interrupt status
}
ClusterInfo clusterInfo = getClusterInfo();
boolean anyListeners = false;
for (final Consumer<ClusterInfo> listener : listeners) {
anyListeners = true;
try {
logger.trace("notifying [{}] of new cluster info", listener);
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
}
}
assert anyListeners : "expected to notify at least one listener";
return clusterInfo;
}
@Override
public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
/**
* A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components
* constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute
* service too.
*/
public class LazilyInitializedRerouteService implements RerouteService {
private final SetOnce<RerouteService> delegate = new SetOnce<>();
@Override
public void reroute(String reason, ActionListener<Void> listener) {
assert delegate.get() != null;
delegate.get().reroute(reason, listener);
}
public void setRerouteService(RerouteService rerouteService) {
delegate.set(rerouteService);
}
}

View File

@ -26,8 +26,8 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
@ -38,7 +38,6 @@ import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
@ -57,7 +56,6 @@ import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.BatchedRerouteService;
import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
@ -180,7 +178,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -370,11 +367,7 @@ public class Node implements Closeable {
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
diskThresholdMonitor::onNewInfo);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder();
@ -511,7 +504,10 @@ public class Node implements Closeable {
final RerouteService rerouteService
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
lazilyInitializedRerouteService.setRerouteService(rerouteService);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
@ -1021,8 +1017,8 @@ public class Node implements Closeable {
/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
ThreadPool threadPool, NodeClient client) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
}
/** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */

View File

@ -100,12 +100,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(deciders,
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
@ -278,12 +275,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@ -328,12 +322,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
usages = usagesBuilder.build();
final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes);
cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo2;
}
};
strategy = new AllocationService(deciders, new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY), cis);
@ -516,12 +507,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@ -580,12 +568,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
),
makeDecider(diskSettings))));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@ -677,12 +662,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
), decider)));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@ -856,12 +838,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
}
// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(
@ -948,13 +927,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
// Two shards should start happily
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
assertThat(decision.getExplanation(), containsString("there is only a single data node present"));
ClusterInfoService cis = () -> {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}
};
AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(

View File

@ -18,11 +18,6 @@
*/
package org.elasticsearch.cluster;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -40,6 +35,10 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -71,9 +70,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
null, null, null, null);
}
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
Consumer<ClusterInfo> listener) {
super(settings, clusterService, threadPool, client, listener);
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));

View File

@ -20,7 +20,6 @@
package org.elasticsearch.node;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -54,7 +53,6 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
/**
@ -158,11 +156,11 @@ public class MockNode extends Node {
@Override
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listener) {
ThreadPool threadPool, NodeClient client) {
if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
return super.newClusterInfoService(settings, clusterService, threadPool, client, listener);
return super.newClusterInfoService(settings, clusterService, threadPool, client);
} else {
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client, listener);
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
}
}