From 244f1a60f92e12e2f382a8f616d5fc3b260bd776 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 7 Oct 2020 14:11:43 +0200 Subject: [PATCH] Selectively Add ClusterState Listeners Depending on Node Roles (#63223) (#63396) We were not consistent in checking for node roles before adding listeners. In some cases we did check the necessity of a CS listener and in others we did not. This commit fixes a number of cases of redundant listeners that don't apply to all node roles. --- .../cleanup/TransportCleanupRepositoryAction.java | 7 +++++++ .../cluster/metadata/TemplateUpgradeService.java | 12 +++++++----- .../cluster/routing/DelayedAllocationService.java | 9 ++++++--- .../org/elasticsearch/gateway/GatewayService.java | 7 +++++-- .../indices/recovery/PeerRecoverySourceService.java | 13 ++++++++++--- .../src/main/java/org/elasticsearch/node/Node.java | 13 ++++++++----- .../persistent/PersistentTasksClusterService.java | 5 ++++- .../org/elasticsearch/snapshots/RestoreService.java | 10 ++++++---- .../routing/DelayedAllocationServiceTests.java | 4 ++++ .../recovery/PeerRecoverySourceServiceTests.java | 5 ++++- 10 files changed, 61 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 97f568c34ac..6b8f51dc6df 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -96,6 +97,12 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA // We add a state applier that will remove any dangling repository cleanup actions on master failover. // This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent // operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes. + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + addClusterStateApplier(clusterService); + } + } + + private static void addClusterStateApplier(ClusterService clusterService) { clusterService.addStateApplier(event -> { if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { final RepositoryCleanupInProgress repositoryCleanupInProgress = diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index 9cc938d9ec5..e00140dbb41 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -32,6 +32,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -89,12 +90,17 @@ public class TemplateUpgradeService implements ClusterStateListener { } return upgradedTemplates; }; - clusterService.addListener(this); + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + clusterService.addListener(this); + } } @Override public void clusterChanged(ClusterChangedEvent event) { ClusterState state = event.state(); + if (state.nodes().isLocalNodeElectedMaster() == false) { + return; + } if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, // while they actually do exist @@ -115,10 +121,6 @@ public class TemplateUpgradeService implements ClusterStateListener { return; } - if (state.nodes().isLocalNodeElectedMaster() == false) { - return; - } - lastTemplateMetadata = templates; Optional, Set>> changes = calculateTemplateChanges(templates); if (changes.isPresent()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java index 82d459f7f3c..40ed5584e19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; @@ -135,7 +136,9 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme this.threadPool = threadPool; this.clusterService = clusterService; this.allocationService = allocationService; - clusterService.addListener(this); + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + clusterService.addListener(this); + } } @Override @@ -159,8 +162,8 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme @Override public void clusterChanged(ClusterChangedEvent event) { - long currentNanoTime = currentNanoTime(); - if (event.state().nodes().isLocalNodeElectedMaster()) { + if (event.localNodeMaster()) { + long currentNanoTime = currentNanoTime(); scheduleIfNeeded(currentNanoTime, event.state()); } } diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index 80c9ad7aa94..2f758781a54 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; @@ -132,8 +133,10 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste @Override protected void doStart() { - // use post applied so that the state will be visible to the background recovery thread we spawn in performStateRecovery - clusterService.addListener(this); + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + // use post applied so that the state will be visible to the background recovery thread we spawn in performStateRecovery + clusterService.addListener(this); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 1ce4dde52a4..a16e9343fc3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -93,13 +94,19 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem @Override protected void doStart() { - indicesService.clusterService().addListener(this); + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); + } } @Override protected void doStop() { - ongoingRecoveries.awaitEmpty(); - indicesService.clusterService().removeListener(this); + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + ongoingRecoveries.awaitEmpty(); + indicesService.clusterService().removeListener(this); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 3caf468b2fb..fa9493195ec 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -560,7 +560,9 @@ public class Node implements Closeable { final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders); final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(settings, xContentRegistry, indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), systemIndices, scriptService); - clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService)); + if (DiscoveryNode.isMasterNode(settings)) { + clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService)); + } new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); Set taskHeaders = Stream.concat( @@ -1158,10 +1160,11 @@ public class Node implements Closeable { /** Constructs a ClusterInfoService which may be mocked for tests. */ protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - final InternalClusterInfoService service = - new InternalClusterInfoService(settings, clusterService, threadPool, client); - // listen for state changes (this node starts/stops being the elected master, or new nodes are added) - clusterService.addListener(service); + final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client); + if (DiscoveryNode.isMasterNode(settings)) { + // listen for state changes (this node starts/stops being the elected master, or new nodes are added) + clusterService.addListener(service); + } return service; } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ee08223ef62..a9b930c522a 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; @@ -69,7 +70,9 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); this.threadPool = threadPool; this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); - clusterService.addListener(this); + if (DiscoveryNode.isMasterNode(settings)) { + clusterService.addListener(this); + } clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5d8d41b1ad1..f6575befbdf 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.metadata.MetadataIndexUpgradeService; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.RoutingChangesObserver; @@ -159,7 +160,7 @@ public class RestoreService implements ClusterStateApplier { private final ClusterSettings clusterSettings; - private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor; + private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService, AllocationService allocationService, MetadataCreateIndexService createIndexService, @@ -170,9 +171,10 @@ public class RestoreService implements ClusterStateApplier { this.allocationService = allocationService; this.createIndexService = createIndexService; this.metadataIndexUpgradeService = metadataIndexUpgradeService; - clusterService.addStateApplier(this); - this.clusterSettings = clusterSettings; - this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); + if (DiscoveryNode.isMasterNode(clusterService.getSettings())) { + clusterService.addStateApplier(this); + } + this.clusterSettings = clusterService.getClusterSettings(); this.shardLimitValidator = shardLimitValidator; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java index c631878a80e..49e229a710b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.NodeRoles; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -55,6 +56,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class DelayedAllocationServiceTests extends ESAllocationTestCase { @@ -68,8 +70,10 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase { threadPool = new TestThreadPool(getTestName()); clusterService = mock(ClusterService.class); allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + when(clusterService.getSettings()).thenReturn(NodeRoles.masterOnlyNode()); delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService); verify(clusterService).addListener(delayedAllocationService); + verify(clusterService).getSettings(); } @After diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 7f7a1d588d0..7a50f7b7c25 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.NodeRoles; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -40,7 +41,9 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase { public void testDuplicateRecoveries() throws IOException { IndexShard primary = newStartedShard(true); final IndicesService indicesService = mock(IndicesService.class); - when(indicesService.clusterService()).thenReturn(mock(ClusterService.class)); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getSettings()).thenReturn(NodeRoles.dataNode()); + when(indicesService.clusterService()).thenReturn(clusterService); PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService( mock(TransportService.class), indicesService, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));