[Internal] make no master lock an instance var so it can be configured

This commit is contained in:
Shay Banon 2014-04-11 12:01:44 +02:00 committed by Boaz Leskes
parent 63d0406b67
commit 4824f05369
11 changed files with 61 additions and 30 deletions

View File

@ -108,6 +108,15 @@ public class ClusterBlocks {
return global.contains(block);
}
public boolean hasGlobalBlock(int blockId) {
for (ClusterBlock clusterBlock : global) {
if (clusterBlock.id() == blockId) {
return true;
}
}
return false;
}
/**
* Is there a global block with the provided status?
*/

View File

@ -84,7 +84,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private volatile ClusterState clusterState;
private final ClusterBlocks.Builder initialBlocks = ClusterBlocks.builder().addGlobalBlock(Discovery.NO_MASTER_BLOCK);
private final ClusterBlocks.Builder initialBlocks;
private volatile ScheduledFuture reconnectToNodes;
@ -104,6 +104,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
this.reconnectInterval = componentSettings.getAsTime("reconnect_interval", TimeValue.timeValueSeconds(10));
localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
initialBlocks = ClusterBlocks.builder().addGlobalBlock(discoveryService.getNoMasterBlock());
}
public NodeSettingsService settingsService() {
@ -380,7 +382,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
} else {
if (previousClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK) && !newClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
if (previousClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) && !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// force an update, its a fresh update from the master as we transition from a start of not having a master to having one
// have a fresh instances of routing and metadata to remove the chance that version might be the same
Builder builder = ClusterState.builder(newClusterState);

View File

@ -36,8 +36,6 @@ import org.elasticsearch.rest.RestStatus;
*/
public interface Discovery extends LifecycleComponent<Discovery> {
final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
DiscoveryNode localNode();
void addListener(InitialStateDiscoveryListener listener);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -60,14 +61,20 @@ public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryServic
private final TimeValue initialStateTimeout;
private final Discovery discovery;
private InitialStateListener initialStateListener;
private final DiscoverySettings discoverySettings;
@Inject
public DiscoveryService(Settings settings, Discovery discovery) {
public DiscoveryService(Settings settings, DiscoverySettings discoverySettings, Discovery discovery) {
super(settings);
this.discoverySettings = discoverySettings;
this.discovery = discovery;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
}
public ClusterBlock getNoMasterBlock() {
return discoverySettings.getNoMasterBlock();
}
@Override
protected void doStart() throws ElasticsearchException {
initialStateListener = new InitialStateListener();

View File

@ -19,11 +19,14 @@
package org.elasticsearch.discovery;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.rest.RestStatus;
/**
* Exposes common discovery settings that may be supported by all the different discovery implementations
@ -31,15 +34,18 @@ import org.elasticsearch.node.settings.NodeSettingsService;
public class DiscoverySettings extends AbstractComponent {
public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";
public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;
public final static int NO_MASTER_BLOCK_ID = 2;
private final ClusterBlock noMasterBlock;
@Inject
public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
}
/**
@ -49,6 +55,10 @@ public class DiscoverySettings extends AbstractComponent {
return publishTimeout;
}
public ClusterBlock getNoMasterBlock() {
return noMasterBlock;
}
private class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {

View File

@ -132,7 +132,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(Discovery.NO_MASTER_BLOCK);
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock());
return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build();
}

View File

@ -86,6 +86,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoveryNodeService discoveryNodeService;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
private final MasterFaultDetection masterFD;
private final NodesFaultDetection nodesFD;
@ -132,6 +133,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.discoverySettings = discoverySettings;
this.pingService = pingService;
this.version = version;
@ -321,7 +323,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.put(localNode);
// update the fact that we are the master...
latestDiscoNodes = builder.build();
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
return ClusterState.builder(currentState).nodes(latestDiscoNodes).blocks(clusterBlocks).build();
}
@ -845,7 +847,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
master = false;
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
.addGlobalBlock(NO_MASTER_BLOCK)
.addGlobalBlock(discoverySettings.getNoMasterBlock())
.build();
// clean the nodes, we are now not connected to anybody, since we try and reform the cluster

View File

@ -141,7 +141,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state, boolean asyncRecovery) {
DiscoveryNodes nodes = state.nodes();
if (state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
if (state.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");

View File

@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
@ -121,7 +122,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
private final List<InternalNode> nodes = Lists.newCopyOnWriteArrayList();
@Inject
public TribeService(Settings settings, ClusterService clusterService) {
public TribeService(Settings settings, ClusterService clusterService, DiscoveryService discoveryService) {
super(settings);
this.clusterService = clusterService;
Map<String, Settings> nodesSettings = Maps.newHashMap(settings.getGroups("tribe", true));
@ -143,7 +144,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (!nodes.isEmpty()) {
// remove the initial election / recovery blocks since we are not going to have a
// master elected in this single tribe node local "cluster"
clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK);
clusterService.removeInitialStateBlock(discoveryService.getNoMasterBlock());
clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
if (settings.getAsBoolean("tribe.blocks.write", false)) {
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -60,7 +61,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
logger.info("--> should be blocked, no master...");
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state
logger.info("--> start second node, cluster should be formed");
@ -70,9 +71,9 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
@ -98,11 +99,11 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
}
});
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state
logger.info("--> starting the previous master node again...");
@ -112,9 +113,9 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
@ -135,7 +136,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
}
}), equalTo(true));
@ -146,9 +147,9 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
@ -183,21 +184,21 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
}
});
awaitBusy(new Predicate<Object>() {
public boolean apply(Object obj) {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
return state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
}
});
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
logger.info("--> start two more nodes");
internalCluster().startNode(settings);
@ -298,9 +299,9 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
boolean success = true;
for (Client client : internalCluster()) {
ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
success &= state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK);
success &= state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID);
if (logger.isDebugEnabled()) {
logger.debug("Checking for NO_MASTER_BLOCK on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK));
logger.debug("Checking for NO_MASTER_BLOCK on client: {} NO_MASTER_BLOCK: [{}]", client, state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID));
}
}
return success;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
@ -74,7 +75,7 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
@Override
public void run() {
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertTrue(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK));
assertTrue(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID));
}
});