From aec44fecbc815959a88deb54b2fef17a84b5ae3f Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 9 Jul 2019 18:43:05 +0100 Subject: [PATCH] Decouple DiskThresholdMonitor & ClusterInfoService (#44105) Today the `ClusterInfoService` requires the `DiskThresholdMonitor` at construction time so that it can notify it when nodes report changes in their disk usage, but this is awkward to construct: the `DiskThresholdMonitor` requires a `RerouteService` which requires an `AllocationService` which comees from the `ClusterModule` which requires the `ClusterInfoService`. Today we break the cycle with a `LazilyInitializedRerouteService` which is itself a little ugly. This commit replaces this with a more traditional subject/observer relationship between the `ClusterInfoService` and the `DiskThresholdMonitor`. --- .../cluster/ClusterInfoService.java | 17 ++++- .../cluster/EmptyClusterInfoService.java | 9 ++- .../cluster/InternalClusterInfoService.java | 28 +++++-- .../LazilyInitializedRerouteService.java | 42 ----------- .../java/org/elasticsearch/node/Node.java | 20 ++--- .../decider/DiskThresholdDeciderTests.java | 74 +++++++------------ .../MockInternalClusterInfoService.java | 14 ++-- .../java/org/elasticsearch/node/MockNode.java | 8 +- 8 files changed, 84 insertions(+), 128 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java index ebc14b800cf..a3daa9a1261 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java @@ -19,12 +19,23 @@ package org.elasticsearch.cluster; +import java.util.function.Consumer; + /** - * Interface for a class used to gather information about a cluster at - * regular intervals + * Interface for a class used to gather information about a cluster periodically. */ +@FunctionalInterface public interface ClusterInfoService { - /** The latest cluster information */ + /** + * @return the latest cluster information + */ ClusterInfo getClusterInfo(); + + /** + * Add a listener for new cluster information + */ + default void addListener(Consumer clusterInfoConsumer) { + throw new UnsupportedOperationException(); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java index c6632bd524a..94845ccc952 100644 --- a/server/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java @@ -19,8 +19,10 @@ package org.elasticsearch.cluster; +import java.util.function.Consumer; + /** - * ClusterInfoService that provides empty maps for disk usage and shard sizes + * {@link ClusterInfoService} that provides empty maps for disk usage and shard sizes */ public class EmptyClusterInfoService implements ClusterInfoService { public static final EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService(); @@ -29,4 +31,9 @@ public class EmptyClusterInfoService implements ClusterInfoService { public ClusterInfo getClusterInfo() { return ClusterInfo.EMPTY; } + + @Override + public void addListener(Consumer clusterInfoConsumer) { + // never updated, so we can discard the listener + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 4b893619891..1a5d0e64fcd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -46,6 +47,8 @@ import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -85,10 +88,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeClient client; - private final Consumer listener; + private final List> listeners = Collections.synchronizedList(new ArrayList<>(1)); - public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, - Consumer listener) { + public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); this.mostAvailableSpaceUsages = ImmutableOpenMap.of(); this.shardRoutingToDataPath = ImmutableOpenMap.of(); @@ -109,7 +111,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode this.clusterService.addLocalNodeMasterListener(this); // Add to listen for state changes (when nodes are added) this.clusterService.addListener(this); - this.listener = listener; } private void setEnabled(boolean enabled) { @@ -356,14 +357,25 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode Thread.currentThread().interrupt(); // restore interrupt status } ClusterInfo clusterInfo = getClusterInfo(); - try { - listener.accept(clusterInfo); - } catch (Exception e) { - logger.info("Failed executing ClusterInfoService listener", e); + boolean anyListeners = false; + for (final Consumer listener : listeners) { + anyListeners = true; + try { + logger.trace("notifying [{}] of new cluster info", listener); + listener.accept(clusterInfo); + } catch (Exception e) { + logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e); + } } + assert anyListeners : "expected to notify at least one listener"; return clusterInfo; } + @Override + public void addListener(Consumer clusterInfoConsumer) { + listeners.add(clusterInfoConsumer); + } + static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder newShardSizes, ImmutableOpenMap.Builder newShardRoutingToDataPath) { for (ShardStats s : stats) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java deleted file mode 100644 index 672d6dbc3a0..00000000000 --- a/server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.routing; - -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.action.ActionListener; - -/** - * A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components - * constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute - * service too. - */ -public class LazilyInitializedRerouteService implements RerouteService { - - private final SetOnce delegate = new SetOnce<>(); - - @Override - public void reroute(String reason, ActionListener listener) { - assert delegate.get() != null; - delegate.get().reroute(reason, listener); - } - - public void setRerouteService(RerouteService rerouteService) { - delegate.set(rerouteService); - } -} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 35ddb3758cf..9467af274e3 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -26,8 +26,8 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; @@ -38,7 +38,6 @@ import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.bootstrap.BootstrapContext; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; @@ -57,7 +56,6 @@ import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.routing.BatchedRerouteService; -import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.cluster.service.ClusterService; @@ -180,7 +178,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -370,11 +367,7 @@ public class Node implements Closeable { .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); - final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService(); - final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, - clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService); - final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, - diskThresholdMonitor::onNewInfo); + final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); ModulesBuilder modules = new ModulesBuilder(); @@ -511,7 +504,10 @@ public class Node implements Closeable { final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute); - lazilyInitializedRerouteService.setRerouteService(rerouteService); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, + clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService); + clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); + final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), @@ -1021,8 +1017,8 @@ public class Node implements Closeable { /** Constructs a ClusterInfoService which may be mocked for tests. */ protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService, - ThreadPool threadPool, NodeClient client, Consumer listeners) { - return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners); + ThreadPool threadPool, NodeClient client) { + return new InternalClusterInfoService(settings, clusterService, threadPool, client); } /** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */ diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index cf8abf6ccd6..651ed26e4cf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -100,12 +100,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings)))); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); @@ -278,12 +275,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), makeDecider(diskSettings)))); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), @@ -328,12 +322,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used usages = usagesBuilder.build(); final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes); - cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo2; - } + cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo2; }; strategy = new AllocationService(deciders, new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis); @@ -516,12 +507,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ), makeDecider(diskSettings)))); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), @@ -580,12 +568,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ), makeDecider(diskSettings)))); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), @@ -677,12 +662,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ), decider))); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(), @@ -856,12 +838,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { } // Creating AllocationService instance and the services it depends on... - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList( new SameShardAllocationDecider( @@ -948,13 +927,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { // Two shards should start happily assertThat(decision.type(), equalTo(Decision.Type.YES)); - assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present")); - ClusterInfoService cis = new ClusterInfoService() { - @Override - public ClusterInfo getClusterInfo() { - logger.info("--> calling fake getClusterInfo"); - return clusterInfo; - } + assertThat(decision.getExplanation(), containsString("there is only a single data node present")); + ClusterInfoService cis = () -> { + logger.info("--> calling fake getClusterInfo"); + return clusterInfo; }; AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList( diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 037bb544919..dfe21ee4294 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -18,11 +18,6 @@ */ package org.elasticsearch.cluster; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.function.Consumer; - import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; @@ -40,6 +35,10 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -71,9 +70,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { null, null, null, null); } - public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client, - Consumer listener) { - super(settings, clusterService, threadPool, client, listener); + public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { + super(settings, clusterService, threadPool, client); this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100)); stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100)); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 31b8ba01dc4..201abfac573 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -20,7 +20,6 @@ package org.elasticsearch.node; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -54,7 +53,6 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -158,11 +156,11 @@ public class MockNode extends Node { @Override protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService, - ThreadPool threadPool, NodeClient client, Consumer listener) { + ThreadPool threadPool, NodeClient client) { if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) { - return super.newClusterInfoService(settings, clusterService, threadPool, client, listener); + return super.newClusterInfoService(settings, clusterService, threadPool, client); } else { - return new MockInternalClusterInfoService(settings, clusterService, threadPool, client, listener); + return new MockInternalClusterInfoService(settings, clusterService, threadPool, client); } }