Selectively Add ClusterState Listeners Depending on Node Roles (#63223) (#63396)

We were not consistent in checking for node roles before adding listeners.
In some cases we did check the necessity of a CS listener and in others we did not.
This commit fixes a number of cases of redundant listeners that don't apply to all node roles.
This commit is contained in:
Armin Braun 2020-10-07 14:11:43 +02:00 committed by GitHub
parent eac99dd594
commit 244f1a60f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 61 additions and 24 deletions

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -96,6 +97,12 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
addClusterStateApplier(clusterService);
}
}
private static void addClusterStateApplier(ClusterService clusterService) {
clusterService.addStateApplier(event -> {
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
final RepositoryCleanupInProgress repositoryCleanupInProgress =

View File

@ -32,6 +32,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -89,12 +90,17 @@ public class TemplateUpgradeService implements ClusterStateListener {
}
return upgradedTemplates;
};
clusterService.addListener(this);
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.nodes().isLocalNodeElectedMaster() == false) {
return;
}
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist
@ -115,10 +121,6 @@ public class TemplateUpgradeService implements ClusterStateListener {
return;
}
if (state.nodes().isLocalNodeElectedMaster() == false) {
return;
}
lastTemplateMetadata = templates;
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
if (changes.isPresent()) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
@ -135,7 +136,9 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
clusterService.addListener(this);
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}
@Override
@ -159,8 +162,8 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme
@Override
public void clusterChanged(ClusterChangedEvent event) {
long currentNanoTime = currentNanoTime();
if (event.state().nodes().isLocalNodeElectedMaster()) {
if (event.localNodeMaster()) {
long currentNanoTime = currentNanoTime();
scheduleIfNeeded(currentNanoTime, event.state());
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
@ -132,8 +133,10 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
@Override
protected void doStart() {
// use post applied so that the state will be visible to the background recovery thread we spawn in performStateRecovery
clusterService.addListener(this);
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
// use post applied so that the state will be visible to the background recovery thread we spawn in performStateRecovery
clusterService.addListener(this);
}
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -93,13 +94,19 @@ public class PeerRecoverySourceService extends AbstractLifecycleComponent implem
@Override
protected void doStart() {
indicesService.clusterService().addListener(this);
final ClusterService clusterService = indicesService.clusterService();
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}
@Override
protected void doStop() {
ongoingRecoveries.awaitEmpty();
indicesService.clusterService().removeListener(this);
final ClusterService clusterService = indicesService.clusterService();
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
ongoingRecoveries.awaitEmpty();
indicesService.clusterService().removeListener(this);
}
}
@Override

View File

@ -560,7 +560,9 @@ public class Node implements Closeable {
final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);
final MetadataIndexUpgradeService metadataIndexUpgradeService = new MetadataIndexUpgradeService(settings, xContentRegistry,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), systemIndices, scriptService);
clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
}
new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
@ -1158,10 +1160,11 @@ public class Node implements Closeable {
/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client) {
final InternalClusterInfoService service =
new InternalClusterInfoService(settings, clusterService, threadPool, client);
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(service);
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
if (DiscoveryNode.isMasterNode(settings)) {
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(service);
}
return service;
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
@ -69,7 +70,9 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
this.threadPool = threadPool;
this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
clusterService.addListener(this);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(this);
}
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
this::setRecheckInterval);
}

View File

@ -51,6 +51,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.metadata.MetadataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
@ -159,7 +160,7 @@ public class RestoreService implements ClusterStateApplier {
private final ClusterSettings clusterSettings;
private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor;
private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
AllocationService allocationService, MetadataCreateIndexService createIndexService,
@ -170,9 +171,10 @@ public class RestoreService implements ClusterStateApplier {
this.allocationService = allocationService;
this.createIndexService = createIndexService;
this.metadataIndexUpgradeService = metadataIndexUpgradeService;
clusterService.addStateApplier(this);
this.clusterSettings = clusterSettings;
this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
clusterService.addStateApplier(this);
}
this.clusterSettings = clusterService.getClusterSettings();
this.shardLimitValidator = shardLimitValidator;
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@ -55,6 +56,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class DelayedAllocationServiceTests extends ESAllocationTestCase {
@ -68,8 +70,10 @@ public class DelayedAllocationServiceTests extends ESAllocationTestCase {
threadPool = new TestThreadPool(getTestName());
clusterService = mock(ClusterService.class);
allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
when(clusterService.getSettings()).thenReturn(NodeRoles.masterOnlyNode());
delayedAllocationService = new TestDelayAllocationService(threadPool, clusterService, allocationService);
verify(clusterService).addListener(delayedAllocationService);
verify(clusterService).getSettings();
}
@After

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -40,7 +41,9 @@ public class PeerRecoverySourceServiceTests extends IndexShardTestCase {
public void testDuplicateRecoveries() throws IOException {
IndexShard primary = newStartedShard(true);
final IndicesService indicesService = mock(IndicesService.class);
when(indicesService.clusterService()).thenReturn(mock(ClusterService.class));
final ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getSettings()).thenReturn(NodeRoles.dataNode());
when(indicesService.clusterService()).thenReturn(clusterService);
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
mock(TransportService.class), indicesService,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));