diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index dceb0255fbb..ef7f3b9c3ec 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -81,7 +81,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; -import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; +import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_ID; import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -102,6 +102,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; private final Supplier persistedStateSupplier; + private final DiscoverySettings discoverySettings; // TODO: the following two fields are package-private as some tests require access to them // These tests can be rewritten to use public methods once Coordinator is more feature-complete final Object mutex = new Object(); @@ -147,6 +148,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm); this.persistedStateSupplier = persistedStateSupplier; + this.discoverySettings = new DiscoverySettings(settings, clusterSettings); this.lastKnownLeader = Optional.empty(); this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); @@ -528,7 +530,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .blocks(ClusterBlocks.builder() .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) - .addGlobalBlock(NO_MASTER_BLOCK_WRITES)) // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL + .addGlobalBlock(discoverySettings.getNoMasterBlock())) .nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())) .build(); applierState = initialState; @@ -568,7 +570,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery assert peerFinder.getCurrentTerm() == getCurrentTerm(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); - assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); + assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_ID); assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector + " vs " + getPreVoteResponse(); @@ -873,11 +875,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) { if (clusterState.nodes().getMasterNodeId() != null) { // remove block if it already exists before adding new one - assert clusterState.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID) == false : + assert clusterState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_ID) == false : "NO_MASTER_BLOCK should only be added by Coordinator"; - // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL final ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()).addGlobalBlock( - NO_MASTER_BLOCK_WRITES).build(); + discoverySettings.getNoMasterBlock()).build(); final DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build(); return ClusterState.builder(clusterState).blocks(clusterBlocks).nodes(discoveryNodes).build(); } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 41faba7f99d..5407cbc54c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; @@ -44,6 +45,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.PublishClusterStateStats; @@ -95,11 +97,15 @@ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_ import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; +import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_ALL; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_ID; +import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_SETTING; +import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; @@ -914,6 +920,39 @@ public class CoordinatorTests extends ESTestCase { cluster.stabilise(); } + public void testAppliesNoMasterBlockWritesByDefault() { + testAppliesNoMasterBlock(null, NO_MASTER_BLOCK_WRITES); + } + + public void testAppliesNoMasterBlockWritesIfConfigured() { + testAppliesNoMasterBlock("write", NO_MASTER_BLOCK_WRITES); + } + + public void testAppliesNoMasterBlockAllIfConfigured() { + testAppliesNoMasterBlock("all", NO_MASTER_BLOCK_ALL); + } + + private void testAppliesNoMasterBlock(String noMasterBlockSetting, ClusterBlock expectedBlock) { + final Cluster cluster = new Cluster(3); + cluster.runRandomly(); + cluster.stabilise(); + + final ClusterNode leader = cluster.getAnyLeader(); + leader.submitUpdateTask("update NO_MASTER_BLOCK_SETTING", cs -> { + final Builder settingsBuilder = Settings.builder().put(cs.metaData().persistentSettings()); + settingsBuilder.put(NO_MASTER_BLOCK_SETTING.getKey(), noMasterBlockSetting); + return ClusterState.builder(cs).metaData(MetaData.builder(cs.metaData()).persistentSettings(settingsBuilder.build())).build(); + }); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing setting update"); + + leader.disconnect(); + cluster.runFor(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "detecting disconnection"); + + assertThat(leader.clusterApplier.lastAppliedClusterState.blocks().global(), contains(expectedBlock)); + + // TODO reboot the leader and verify that the same block is applied when it restarts + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; }