diff --git a/pom.xml b/pom.xml
index 3f1d74ab3a6..d07a71cb471 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,7 @@
0.8.13
true
-
+
@@ -485,7 +485,8 @@
${tests.failfast}
false
- .
+ .
+
${tests.bwc}
${tests.bwc.path}
@@ -539,15 +540,15 @@
1.7
- validate
-
- run
-
-
-
- Using ${java.runtime.name} ${java.runtime.version} ${java.vendor}
-
-
+ validate
+
+ run
+
+
+
+ Using ${java.runtime.name} ${java.runtime.version} ${java.vendor}
+
+
invalid-patterns
@@ -575,7 +576,9 @@
- The following files contain tabs or nocommits:${line.separator}${validate.patternsFound}
+ The following files contain tabs or
+ nocommits:${line.separator}${validate.patternsFound}
+
@@ -583,7 +586,8 @@
tests
test
- ${skipTests}
+ ${skipTests}
+
false
@@ -597,7 +601,7 @@
-
+
@@ -710,7 +714,7 @@
org.elasticsearch.common.compress
- com.github.mustachejava
+ com.github.mustachejava
org.elasticsearch.common.mustache
@@ -1221,6 +1225,11 @@
jdk-unsafe
jdk-deprecated
+
+
+ org/elasticsearch/test/disruption/LongGCDisruption.class
+
+
test-signatures.txt
all-signatures.txt
@@ -1345,219 +1354,220 @@
-
-
- default
-
- true
-
-
-
-
- com.carrotsearch.randomizedtesting
- junit4-maven-plugin
-
- ${tests.jvm.argline}
-
-
-
- com.mycila
- license-maven-plugin
- 2.5
-
- dev-tools/elasticsearch_license_header.txt
-
- dev-tools/license_header_definition.xml
-
-
- src/main/java/org/elasticsearch/**/*.java
- src/test/java/org/elasticsearch/**/*.java
-
-
- src/main/java/org/elasticsearch/common/inject/**
-
- src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java
- src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java
- src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java
- src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java
- src/main/java/org/apache/lucene/**/X*.java
-
- src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
- src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java
-
-
-
-
- compile
-
- check
-
-
-
-
-
-
-
-
-
- dev
-
- true
-
-
-
-
- de.thetaphi
- forbiddenapis
- 1.5.1
-
-
- check-forbidden-apis
- none
-
-
- check-forbidden-test-apis
- none
-
-
-
-
-
-
-
-
- license
-
-
- license.generation
- true
-
-
-
-
-
-
- coverage
-
-
- tests.coverage
- true
-
-
-
-
-
- org.jacoco
- org.jacoco.agent
- runtime
- 0.6.4.201312101107
- test
-
-
-
-
-
- org.jacoco
- jacoco-maven-plugin
- 0.6.4.201312101107
-
-
- default-prepare-agent
-
- prepare-agent
-
-
-
- default-report
- prepare-package
-
- report
-
-
-
- default-check
-
- check
-
-
-
-
-
- jsr166e/**
- org/apache/lucene/**
-
-
-
-
-
-
-
- static
-
-
- tests.static
- true
-
-
-
-
-
- org.codehaus.mojo
- findbugs-maven-plugin
- 2.5.3
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jxr-plugin
- 2.3
-
-
- org.apache.maven.plugins
- maven-pmd-plugin
- 3.0.1
-
-
- ${basedir}/dev-tools/pmd/custom.xml
-
- 1.7
-
- **/jsr166e/**
- **/org/apache/lucene/**
- **/org/apache/elasticsearch/common/Base64.java
-
-
-
-
- org.codehaus.mojo
- findbugs-maven-plugin
- 2.5.3
-
- true
- target/site
- true
- 2048
- 1800000
- org.elasticsearch.-
-
-
-
- org.apache.maven.plugins
- maven-project-info-reports-plugin
- 2.7
-
-
-
- index
-
-
-
-
-
-
-
+
+
+ default
+
+ true
+
+
+
+
+ com.carrotsearch.randomizedtesting
+ junit4-maven-plugin
+
+ ${tests.jvm.argline}
+
+
+
+ com.mycila
+ license-maven-plugin
+ 2.5
+
+ dev-tools/elasticsearch_license_header.txt
+
+ dev-tools/license_header_definition.xml
+
+
+ src/main/java/org/elasticsearch/**/*.java
+ src/test/java/org/elasticsearch/**/*.java
+
+
+ src/main/java/org/elasticsearch/common/inject/**
+
+ src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java
+ src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java
+ src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java
+ src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java
+ src/main/java/org/apache/lucene/**/X*.java
+
+ src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
+
+ src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java
+
+
+
+
+ compile
+
+ check
+
+
+
+
+
+
+
+
+
+ dev
+
+ true
+
+
+
+
+ de.thetaphi
+ forbiddenapis
+ 1.5.1
+
+
+ check-forbidden-apis
+ none
+
+
+ check-forbidden-test-apis
+ none
+
+
+
+
+
+
+
+
+ license
+
+
+ license.generation
+ true
+
+
+
+
+
+
+ coverage
+
+
+ tests.coverage
+ true
+
+
+
+
+
+ org.jacoco
+ org.jacoco.agent
+ runtime
+ 0.6.4.201312101107
+ test
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.6.4.201312101107
+
+
+ default-prepare-agent
+
+ prepare-agent
+
+
+
+ default-report
+ prepare-package
+
+ report
+
+
+
+ default-check
+
+ check
+
+
+
+
+
+ jsr166e/**
+ org/apache/lucene/**
+
+
+
+
+
+
+
+ static
+
+
+ tests.static
+ true
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 2.5.3
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jxr-plugin
+ 2.3
+
+
+ org.apache.maven.plugins
+ maven-pmd-plugin
+ 3.0.1
+
+
+ ${basedir}/dev-tools/pmd/custom.xml
+
+ 1.7
+
+ **/jsr166e/**
+ **/org/apache/lucene/**
+ **/org/apache/elasticsearch/common/Base64.java
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+ 2.5.3
+
+ true
+ target/site
+ true
+ 2048
+ 1800000
+ org.elasticsearch.-
+
+
+
+ org.apache.maven.plugins
+ maven-project-info-reports-plugin
+ 2.7
+
+
+
+ index
+
+
+
+
+
+
+
diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
index fa77ae88478..edcf8334640 100644
--- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
+++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
@@ -137,6 +137,12 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
return new ClusterUpdateSettingsResponse(updateSettingsAcked && acknowledged, transientUpdates.build(), persistentUpdates.build());
}
+ @Override
+ public void onNoLongerMaster(String source) {
+ logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
+ listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
+ }
+
@Override
public void onFailure(String source, Throwable t) {
//if the reroute fails we only log
diff --git a/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
index c0e9a65de34..2e54d5cf181 100644
--- a/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
+++ b/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java
@@ -173,12 +173,12 @@ public class TransportRecoveryAction extends
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
- return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
+ return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
- return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, concreteIndices);
+ return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices);
}
static class ShardRecoveryRequest extends BroadcastShardOperationRequest {
diff --git a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
index 0ebfd47593e..5868aa12b5a 100644
--- a/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
+++ b/src/main/java/org/elasticsearch/action/bench/BenchmarkService.java
@@ -66,11 +66,11 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) {
@@ -171,8 +174,8 @@ public class BenchmarkService extends AbstractLifecycleComponent listener) {
@@ -228,7 +231,7 @@ public class BenchmarkService extends AbstractLifecycleComponent builder = new ImmutableList.Builder();
for (BenchmarkMetaData.Entry e : bmd.entries()) {
if (benchmarkId == null || match(e)) {
- e = process(e) ;
+ e = process(e);
instances.add(e);
}
// Don't keep finished benchmarks around in cluster state
@@ -741,7 +745,7 @@ public class BenchmarkService extends AbstractLifecycleComponent implements TimeoutClusterStateUpdateTask {
+ public abstract class BenchmarkStateChangeAction extends TimeoutClusterStateUpdateTask {
protected final R request;
public BenchmarkStateChangeAction(R request) {
diff --git a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java
index 7cdee753873..087bd1c6ad6 100644
--- a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java
+++ b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java
@@ -28,7 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
-public abstract class AckedClusterStateUpdateTask implements TimeoutClusterStateUpdateTask {
+public abstract class AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask {
private final ActionListener listener;
private final AckedRequest request;
@@ -40,6 +40,7 @@ public abstract class AckedClusterStateUpdateTask implements TimeoutCl
/**
* Called to determine which nodes the acknowledgement is expected from
+ *
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
@@ -50,6 +51,7 @@ public abstract class AckedClusterStateUpdateTask implements TimeoutCl
/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
+ *
* @param t optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Throwable t) {
diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java
index 6204599f57d..080fce84a36 100644
--- a/src/main/java/org/elasticsearch/cluster/ClusterService.java
+++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java
@@ -110,4 +110,5 @@ public interface ClusterService extends LifecycleComponent {
* Returns the tasks that are pending.
*/
List pendingTasks();
+
}
diff --git a/src/main/java/org/elasticsearch/cluster/ClusterState.java b/src/main/java/org/elasticsearch/cluster/ClusterState.java
index ecb041a233e..d208d6a20a7 100644
--- a/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -115,6 +115,8 @@ public class ClusterState implements ToXContent {
}
+ public static final long UNKNOWN_VERSION = -1;
+
private final long version;
private final RoutingTable routingTable;
diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java
new file mode 100644
index 00000000000..48afbb8f1fe
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/ClusterStateNonMasterUpdateTask.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+/**
+ * This is a marker interface to indicate that the task should be executed
+ * even if the current node is not a master.
+ */
+public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
+
+ @Override
+ public boolean runOnlyOnMaster() {
+ return false;
+ }
+}
diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
index 490a556ab12..921b6d149ee 100644
--- a/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
+++ b/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java
@@ -19,19 +19,37 @@
package org.elasticsearch.cluster;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+
/**
* A task that can update the cluster state.
*/
-public interface ClusterStateUpdateTask {
+abstract public class ClusterStateUpdateTask {
/**
* Update the cluster state based on the current state. Return the *same instance* if no state
* should be changed.
*/
- ClusterState execute(ClusterState currentState) throws Exception;
+ abstract public ClusterState execute(ClusterState currentState) throws Exception;
/**
* A callback called when execute fails.
*/
- void onFailure(String source, Throwable t);
+ abstract public void onFailure(String source, @Nullable Throwable t);
+
+
+ /**
+ * indicates whether this task should only run if current node is master
+ */
+ public boolean runOnlyOnMaster() {
+ return true;
+ }
+
+ /**
+ * called when the task was rejected because the local node is no longer master
+ */
+ public void onNoLongerMaster(String source) {
+ onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
+ }
}
diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java
new file mode 100644
index 00000000000..4af05b43581
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateNonMasterUpdateTask.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster;
+
+/**
+ * A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
+ * {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
+ */
+abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {
+
+ @Override
+ public boolean runOnlyOnMaster() {
+ return false;
+ }
+}
diff --git a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java
index 72074965f95..2d703ed2621 100644
--- a/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java
+++ b/src/main/java/org/elasticsearch/cluster/ProcessedClusterStateUpdateTask.java
@@ -23,11 +23,11 @@ package org.elasticsearch.cluster;
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* the cluster state update has been processed.
*/
-public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
+public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
/**
* Called when the result of the {@link #execute(ClusterState)} have been processed
* properly by all listeners.
*/
- void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
+ public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
}
diff --git a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java
index 1083e1ddcbe..1ae767c6560 100644
--- a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java
+++ b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java
@@ -25,11 +25,11 @@ import org.elasticsearch.common.unit.TimeValue;
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
-public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
+abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onFailure(String, Throwable)}
*/
- TimeValue timeout();
+ abstract public TimeValue timeout();
}
diff --git a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
index 957bd406263..bb7d332de4f 100644
--- a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
+++ b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
@@ -108,6 +108,19 @@ public class ClusterBlocks {
return global.contains(block);
}
+ public boolean hasGlobalBlock(int blockId) {
+ for (ClusterBlock clusterBlock : global) {
+ if (clusterBlock.id() == blockId) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasGlobalBlock(ClusterBlockLevel level) {
+ return global(level).size() > 0;
+ }
+
/**
* Is there a global block with the provided status?
*/
diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
index b33804de564..555b8b3ef1b 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
@@ -149,10 +149,15 @@ public class RoutingService extends AbstractLifecycleComponent i
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
+ @Override
+ public void onNoLongerMaster(String source) {
+ // no biggie
+ }
+
@Override
public void onFailure(String source, Throwable t) {
- ClusterState state = clusterService.state();
- logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
+ ClusterState state = clusterService.state();
+ logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
});
routingTableDirty = false;
diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
index fad94ba1944..c5fe004f8b9 100644
--- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
+++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
@@ -84,7 +84,7 @@ public class InternalClusterService extends AbstractLifecycleComponent {
- final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
-
DiscoveryNode localNode();
void addListener(InitialStateDiscoveryListener listener);
diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
index 0108db12a19..f73f2bbb593 100644
--- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
+++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
@@ -22,6 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@@ -38,6 +39,8 @@ import java.util.concurrent.TimeUnit;
*/
public class DiscoveryService extends AbstractLifecycleComponent {
+ public static final String SETTING_INITIAL_STATE_TIMEOUT = "discovery.initial_state_timeout";
+
private static class InitialStateListener implements InitialStateDiscoveryListener {
private final CountDownLatch latch = new CountDownLatch(1);
@@ -60,12 +63,18 @@ public class DiscoveryService extends AbstractLifecycleComponent implem
private final TransportService transportService;
private final ClusterService clusterService;
+ private final DiscoveryService discoveryService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
@@ -77,7 +78,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
- DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
+ DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
@@ -85,6 +86,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
+ this.discoveryService = discoveryService;
}
@Override
@@ -123,7 +125,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
// we are the first master (and the master)
master = true;
final LocalDiscovery master = firstMaster;
- clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
@@ -132,7 +134,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
- ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(Discovery.NO_MASTER_BLOCK);
+ ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock());
return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build();
}
@@ -149,7 +151,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
- clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
@@ -165,7 +167,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
- firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateUpdateTask() {
+ firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
@@ -225,7 +227,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
}
final LocalDiscovery master = firstMaster;
- master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateUpdateTask() {
+ master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
@@ -305,13 +307,22 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
- discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {
+ assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
+ assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
+
+ discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}
+ if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
+ // its a fresh update from the master as we transition from a start of not having a master to having one
+ logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
+ return nodeSpecificClusterState;
+ }
+
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index b1149cbbf55..d7c8c0ccafc 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -22,9 +22,7 @@ package org.elasticsearch.discovery.zen;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ElasticsearchIllegalStateException;
-import org.elasticsearch.Version;
+import org.elasticsearch.*;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -32,10 +30,10 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
@@ -45,6 +43,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
@@ -56,19 +55,20 @@ import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
-import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@@ -78,6 +78,16 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
*/
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, DiscoveryNodesProvider {
+ public final static String SETTING_REJOIN_ON_MASTER_GONE = "discovery.zen.rejoin_on_master_gone";
+ public final static String SETTING_PING_TIMEOUT = "discovery.zen.ping.timeout";
+ public final static String SETTING_JOIN_TIMEOUT = "discovery.zen.join_timeout";
+ public final static String SETTING_JOIN_RETRY_ATTEMPTS = "discovery.zen.join_retry_attempts";
+ public final static String SETTING_JOIN_RETRY_DELAY = "discovery.zen.join_retry_delay";
+ public final static String SETTING_MAX_PINGS_FROM_ANOTHER_MASTER = "discovery.zen.max_pings_from_another_master";
+ public final static String SETTING_SEND_LEAVE_REQUEST = "discovery.zen.send_leave_request";
+ public final static String SETTING_MASTER_ELECTION_FILTER_CLIENT = "discovery.zen.master_election.filter_client";
+ public final static String SETTING_MASTER_ELECTION_FILTER_DATA = "discovery.zen.master_election.filter_data";
+
public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
private final ThreadPool threadPool;
@@ -86,6 +96,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoveryNodeService discoveryNodeService;
+ private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
private final MasterFaultDetection masterFD;
private final NodesFaultDetection nodesFD;
@@ -97,6 +108,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
private final TimeValue pingTimeout;
private final TimeValue joinTimeout;
+ /** how many retry attempts to perform if join request failed with an retriable error */
+ private final int joinRetryAttempts;
+ /** how long to wait before performing another join attempt after a join request failed with an retriable error */
+ private final TimeValue joinRetryDelay;
+
+ /** how many pings from *another* master to tolerate before forcing a rejoin on other or local master */
+ private final int maxPingsFromAnotherMaster;
+
// a flag that should be used only for testing
private final boolean sendLeaveRequest;
@@ -118,41 +137,61 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
private final AtomicBoolean initialStateSent = new AtomicBoolean();
+ private volatile boolean rejoinOnMasterGone;
@Nullable
private NodeService nodeService;
+ private final BlockingQueue> processJoinRequests = ConcurrentCollections.newBlockingQueue();
+
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
- DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
+ DiscoveryNodeService discoveryNodeService, ZenPingService pingService, ElectMasterService electMasterService, Version version,
+ DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
+ this.discoverySettings = discoverySettings;
this.pingService = pingService;
this.version = version;
+ this.electMaster = electMasterService;
- // also support direct discovery.zen settings, for cases when it gets extended
- this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
- this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 20));
- this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);
+ // keep using componentSettings for BWC, in case this class gets extended.
+ TimeValue pingTimeout = componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3));
+ pingTimeout = componentSettings.getAsTime("ping_timeout", pingTimeout);
+ pingTimeout = settings.getAsTime("discovery.zen.ping_timeout", pingTimeout);
+ this.pingTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, pingTimeout);
- this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
- this.masterElectionFilterDataNodes = settings.getAsBoolean("discovery.zen.master_election.filter_data", false);
+ this.joinTimeout = settings.getAsTime(SETTING_JOIN_TIMEOUT, TimeValue.timeValueMillis(pingTimeout.millis() * 20));
+ this.joinRetryAttempts = settings.getAsInt(SETTING_JOIN_RETRY_ATTEMPTS, 3);
+ this.joinRetryDelay = settings.getAsTime(SETTING_JOIN_RETRY_DELAY, TimeValue.timeValueMillis(100));
+ this.maxPingsFromAnotherMaster = settings.getAsInt(SETTING_MAX_PINGS_FROM_ANOTHER_MASTER, 3);
+ this.sendLeaveRequest = settings.getAsBoolean(SETTING_SEND_LEAVE_REQUEST, true);
+
+ this.masterElectionFilterClientNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_CLIENT, true);
+ this.masterElectionFilterDataNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_DATA, false);
+ this.rejoinOnMasterGone = settings.getAsBoolean(SETTING_REJOIN_ON_MASTER_GONE, true);
+
+ if (this.joinRetryAttempts < 1) {
+ throw new ElasticsearchIllegalArgumentException("'" + SETTING_JOIN_RETRY_ATTEMPTS + "' must be a positive number. got [" + this.SETTING_JOIN_RETRY_ATTEMPTS + "]");
+ }
+ if (this.maxPingsFromAnotherMaster < 1) {
+ throw new ElasticsearchIllegalArgumentException("'" + SETTING_MAX_PINGS_FROM_ANOTHER_MASTER + "' must be a positive number. got [" + this.maxPingsFromAnotherMaster + "]");
+ }
logger.debug("using ping.timeout [{}], join.timeout [{}], master_election.filter_client [{}], master_election.filter_data [{}]", pingTimeout, joinTimeout, masterElectionFilterClientNodes, masterElectionFilterDataNodes);
- this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
- this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
+ this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this, clusterName);
this.masterFD.addListener(new MasterNodeFailureListener());
- this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
- this.nodesFD.addListener(new NodeFailureListener());
+ this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
+ this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.pingService.setNodesProvider(this);
@@ -178,7 +217,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
final String nodeId = DiscoveryService.generateNodeId(settings);
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
- nodesFD.updateNodes(latestDiscoNodes);
+ nodesFD.updateNodes(latestDiscoNodes, ClusterState.UNKNOWN_VERSION);
pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
@@ -272,7 +311,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
latestDiscoNodes = clusterState.nodes();
- nodesFD.updateNodes(clusterState.nodes());
+ nodesFD.updateNodes(clusterState.nodes(), clusterState.version());
publishClusterState.publish(clusterState, ackListener);
}
@@ -295,6 +334,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
});
}
+
+ /**
+ * returns true if there is a currently a background thread active for (re)joining the cluster
+ * used for testing.
+ */
+ public boolean joiningCluster() {
+ return currentJoinThread != null;
+ }
+
private void innerJoinCluster() {
boolean retry = true;
while (retry) {
@@ -311,18 +359,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
- clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
- DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
+ // Take into account the previous known nodes, if they happen not to be available
+ // then fault detection will remove these nodes.
+ DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(latestDiscoNodes)
.localNodeId(localNode.id())
.masterNodeId(localNode.id())
// put our local node
.put(localNode);
// update the fact that we are the master...
latestDiscoNodes = builder.build();
- ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
- return ClusterState.builder(currentState).nodes(latestDiscoNodes).blocks(clusterBlocks).build();
+ ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
+ currentState = ClusterState.builder(currentState).nodes(latestDiscoNodes).blocks(clusterBlocks).build();
+
+ // eagerly run reroute to remove dead nodes from routing table
+ RoutingAllocation.Result result = allocationService.reroute(currentState);
+ return ClusterState.builder(currentState).routingResult(result).build();
}
@Override
@@ -337,30 +391,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
});
} else {
this.master = false;
- try {
- // first, make sure we can connect to the master
- transportService.connectToNode(masterNode);
- } catch (Exception e) {
- logger.warn("failed to connect to master [{}], retrying...", e, masterNode);
- retry = true;
- continue;
- }
// send join request
- try {
- membership.sendJoinRequestBlocking(masterNode, localNode, joinTimeout);
- } catch (Exception e) {
- if (e instanceof ElasticsearchException) {
- logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticsearchException) e).getDetailedMessage());
- } else {
- logger.info("failed to send join request to master [{}], reason [{}]", masterNode, e.getMessage());
- }
- if (logger.isTraceEnabled()) {
- logger.trace("detailed failed reason", e);
- }
- // failed to send the join request, retry
+ retry = !joinElectedMaster(masterNode);
+ if (retry) {
+ continue;
+ }
+
+ if (latestDiscoNodes.masterNode() == null) {
+ logger.debug("no master node is set, despite of join request completing. retrying pings");
retry = true;
continue;
}
+
masterFD.start(masterNode, "initial_join");
// no need to submit the received cluster state, we will get it from the master when it publishes
// the fact that we joined
@@ -368,6 +410,52 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
}
}
+ /**
+ * Join a newly elected master.
+ *
+ * @return true if successful
+ */
+ private boolean joinElectedMaster(DiscoveryNode masterNode) {
+ try {
+ // first, make sure we can connect to the master
+ transportService.connectToNode(masterNode);
+ } catch (Exception e) {
+ logger.warn("failed to connect to master [{}], retrying...", e, masterNode);
+ return false;
+ }
+ int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
+ while (true) {
+ try {
+ logger.trace("joining master {}", masterNode);
+ membership.sendJoinRequestBlocking(masterNode, localNode, joinTimeout);
+ return true;
+ } catch (Throwable t) {
+ Throwable unwrap = ExceptionsHelper.unwrapCause(t);
+ if (unwrap instanceof ElasticsearchIllegalStateException) {
+ if (++joinAttempt == this.joinRetryAttempts) {
+ logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(t), joinAttempt);
+ return false;
+ } else {
+ logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(t), joinAttempt);
+ }
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("failed to send join request to master [{}]", t, masterNode);
+ } else {
+ logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(t));
+ }
+ return false;
+ }
+ }
+
+ try {
+ Thread.sleep(this.joinRetryDelay.millis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
private void handleLeaveRequest(final DiscoveryNode node) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
@@ -389,6 +477,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
+ @Override
+ public void onNoLongerMaster(String source) {
+ // ignoring (already logged)
+ }
+
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
@@ -424,6 +517,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
+ @Override
+ public void onNoLongerMaster(String source) {
+ // already logged
+ }
+
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
@@ -457,6 +555,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
return currentState;
}
+
+ @Override
+ public void onNoLongerMaster(String source) {
+ // ignoring (already logged)
+ }
+
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
@@ -481,7 +585,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
logger.info("master_left [{}], reason [{}]", masterNode, reason);
- clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
@@ -493,6 +597,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(null).build();
+ latestDiscoNodes = discoveryNodes;
+
+ // flush any pending cluster states from old master, so it will not be set as master again
+ ArrayList pendingNewClusterStates = new ArrayList<>();
+ processNewClusterStates.drainTo(pendingNewClusterStates);
+ logger.trace("removed [{}] pending cluster states", pendingNewClusterStates.size());
+
+ if (rejoinOnMasterGone) {
+ return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")");
+ }
if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")");
@@ -561,29 +675,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
- if (newState.version() > currentState.version()) {
- logger.warn("received cluster state from [{}] which is also master but with a newer cluster_state, rejoining to cluster...", newState.nodes().masterNode());
- return rejoin(currentState, "zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]");
- } else {
- logger.warn("received cluster state from [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster", newState.nodes().masterNode(), newState.nodes().masterNode());
-
- try {
- // make sure we're connected to this node (connect to node does nothing if we're already connected)
- // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
- // in the past (after a master failure, for example)
- transportService.connectToNode(newState.nodes().masterNode());
- transportService.sendRequest(newState.nodes().masterNode(), DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(currentState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
- @Override
- public void handleException(TransportException exp) {
- logger.warn("failed to send rejoin request to [{}]", exp, newState.nodes().masterNode());
- }
- });
- } catch (Exception e) {
- logger.warn("failed to send rejoin request to [{}]", e, newState.nodes().masterNode());
- }
-
- return currentState;
- }
+ return handleAnotherMaster(currentState, newState.nodes().masterNode(), newState.version(), "via a new cluster state");
}
@Override
@@ -610,7 +702,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);
- clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
+
+ assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
+ assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
+
+ clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// we already processed it in a previous event
@@ -642,6 +738,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
// we are going to use it for sure, poll (remove) it
potentialState = processNewClusterStates.poll();
+ if (potentialState == null) {
+ // might happen if the queue is drained
+ break;
+ }
+
potentialState.processed = true;
if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
@@ -670,7 +771,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
+ if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
+ // its a fresh update from the master as we transition from a start of not having a master to having one
+ logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId());
+ return updatedState;
+ }
+
+
+ // some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(updatedState);
+
// if the routing table did not change, use the original one
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
@@ -726,37 +836,75 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, joinTimeout);
-
+ processJoinRequests.add(new Tuple<>(node, callback));
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
+
+ private final List> drainedTasks = new ArrayList<>();
+
@Override
public ClusterState execute(ClusterState currentState) {
- if (currentState.nodes().nodeExists(node.id())) {
- // the node already exists in the cluster
- logger.info("received a join request for an existing node [{}]", node);
- // still send a new cluster state, so it will be re published and possibly update the other node
- return ClusterState.builder(currentState).build();
+ processJoinRequests.drainTo(drainedTasks);
+ if (drainedTasks.isEmpty()) {
+ return currentState;
}
- DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes());
- for (DiscoveryNode existingNode : currentState.nodes()) {
- if (node.address().equals(existingNode.address())) {
- builder.remove(existingNode.id());
- logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
+
+ boolean modified = false;
+ DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
+ for (Tuple task : drainedTasks) {
+ DiscoveryNode node = task.v1();
+ if (currentState.nodes().nodeExists(node.id())) {
+ logger.debug("received a join request for an existing node [{}]", node);
+ } else {
+ modified = true;
+ nodesBuilder.put(node);
+ for (DiscoveryNode existingNode : currentState.nodes()) {
+ if (node.address().equals(existingNode.address())) {
+ nodesBuilder.remove(existingNode.id());
+ logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
+ }
+ }
+ }
+ }
+
+ ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
+ if (modified) {
+ latestDiscoNodes = nodesBuilder.build();
+ stateBuilder.nodes(latestDiscoNodes);
+ }
+ return stateBuilder.build();
+ }
+
+ @Override
+ public void onNoLongerMaster(String source) {
+ Exception e = new EsRejectedExecutionException("no longer master. source: [" + source + "]");
+ innerOnFailure(e);
+ }
+
+ void innerOnFailure(Throwable t) {
+ for (Tuple drainedTask : drainedTasks) {
+ try {
+ drainedTask.v2().onFailure(t);
+ } catch (Exception e) {
+ logger.error("error during task failure", e);
}
}
- latestDiscoNodes = builder.build();
- // add the new node now (will update latestDiscoNodes on publish)
- return ClusterState.builder(currentState).nodes(latestDiscoNodes.newNode(node)).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
- callback.onFailure(t);
+ innerOnFailure(t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
- callback.onSuccess();
+ for (Tuple drainedTask : drainedTasks) {
+ try {
+ drainedTask.v2().onSuccess();
+ } catch (Exception e) {
+ logger.error("unexpected error during [{}]", e, source);
+ }
+ }
}
});
}
@@ -807,35 +955,36 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
List pingMasters = newArrayList();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
- pingMasters.add(pingResponse.master());
+ // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
+ // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
+ if (!localNode.equals(pingResponse.master())) {
+ pingMasters.add(pingResponse.master());
+ }
}
}
Set possibleMasterNodes = Sets.newHashSet();
- possibleMasterNodes.add(localNode);
+ if (localNode.masterNode()) {
+ possibleMasterNodes.add(localNode);
+ }
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
- // if we don't have enough master nodes, we bail, even if we get a response that indicates
- // there is a master by other node, we don't see enough...
- if (!electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
- logger.trace("not enough master nodes [{}]", possibleMasterNodes);
- return null;
- }
if (pingMasters.isEmpty()) {
- // lets tie break between discovered nodes
- DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
- if (localNode.equals(electedMaster)) {
- return localNode;
+ // if we don't have enough master nodes, we bail, because there are not enough master to elect from
+ if (electMaster.hasEnoughMasterNodes(possibleMasterNodes)) {
+ return electMaster.electMaster(possibleMasterNodes);
+ } else {
+ logger.trace("not enough master nodes [{}]", possibleMasterNodes);
+ return null;
}
} else {
- DiscoveryNode electedMaster = electMaster.electMaster(pingMasters);
- if (electedMaster != null) {
- return electedMaster;
- }
+
+ assert !pingMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
+ // lets tie break between discovered nodes
+ return electMaster.electMaster(pingMasters);
}
- return null;
}
private ClusterState rejoin(ClusterState clusterState, String reason) {
@@ -845,28 +994,45 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
master = false;
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
- .addGlobalBlock(NO_MASTER_BLOCK)
- .addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)
+ .addGlobalBlock(discoverySettings.getNoMasterBlock())
.build();
- // clear the routing table, we have no master, so we need to recreate the routing when we reform the cluster
- RoutingTable routingTable = RoutingTable.builder().build();
- // we also clean the metadata, since we are going to recover it if we become master
- MetaData metaData = MetaData.builder().build();
-
// clean the nodes, we are now not connected to anybody, since we try and reform the cluster
- latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
+ latestDiscoNodes = new DiscoveryNodes.Builder(latestDiscoNodes).masterNodeId(null).build();
asyncJoinCluster();
return ClusterState.builder(clusterState)
.blocks(clusterBlocks)
.nodes(latestDiscoNodes)
- .routingTable(routingTable)
- .metaData(metaData)
.build();
}
+ private ClusterState handleAnotherMaster(ClusterState localClusterState, final DiscoveryNode otherMaster, long otherClusterStateVersion, String reason) {
+ assert master : "handleAnotherMaster called but current node is not a master";
+ if (otherClusterStateVersion > localClusterState.version()) {
+ return rejoin(localClusterState, "zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
+ } else {
+ logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
+ try {
+ // make sure we're connected to this node (connect to node does nothing if we're already connected)
+ // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
+ // in the past (after a master failure, for example)
+ transportService.connectToNode(otherMaster);
+ transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().localNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
+
+ @Override
+ public void handleException(TransportException exp) {
+ logger.warn("failed to send rejoin request to [{}]", exp, otherMaster);
+ }
+ });
+ } catch (Exception e) {
+ logger.warn("failed to send rejoin request to [{}]", e, otherMaster);
+ }
+ return localClusterState;
+ }
+ }
+
private void sendInitialStateEventIfNeeded() {
if (initialStateSent.compareAndSet(false, true)) {
for (InitialStateDiscoveryListener listener : initialStateListeners) {
@@ -895,12 +1061,48 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
}
}
- private class NodeFailureListener implements NodesFaultDetection.Listener {
+ private class NodeFaultDetectionListener extends NodesFaultDetection.Listener {
+
+ private final AtomicInteger pingsWhileMaster = new AtomicInteger(0);
@Override
public void onNodeFailure(DiscoveryNode node, String reason) {
handleNodeFailure(node, reason);
}
+
+ @Override
+ public void onPingReceived(final NodesFaultDetection.PingRequest pingRequest) {
+ // if we are master, we don't expect any fault detection from another node. If we get it
+ // means we potentially have two masters in the cluster.
+ if (!master) {
+ pingsWhileMaster.set(0);
+ return;
+ }
+
+ // nodes pre 1.4.0 do not send this information
+ if (pingRequest.masterNode() == null) {
+ return;
+ }
+
+ if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) {
+ logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
+ return;
+ }
+ logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
+ clusterService.submitStateUpdateTask("ping from another master", Priority.URGENT, new ClusterStateUpdateTask() {
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ pingsWhileMaster.set(0);
+ return handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ logger.debug("unexpected error during cluster state update task after pings from another master", t);
+ }
+ });
+ }
}
private class MasterNodeFailureListener implements MasterFaultDetection.Listener {
@@ -922,6 +1124,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
}
}
+ boolean isRejoinOnMasterGone() {
+ return rejoinOnMasterGone;
+ }
+
static class RejoinClusterRequest extends TransportRequest {
private String fromNodeId;
@@ -955,7 +1161,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
- clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
@@ -966,6 +1172,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
return rejoin(currentState, "received a request to rejoin the cluster from [" + request.fromNodeId + "]");
}
+ @Override
+ public void onNoLongerMaster(String source) {
+ // already logged
+ }
+
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
@@ -989,6 +1200,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
ZenDiscovery.this.electMaster.minimumMasterNodes(), minimumMasterNodes);
handleMinimumMasterNodesChanged(minimumMasterNodes);
}
+
+ boolean rejoinOnMasterGone = settings.getAsBoolean(SETTING_REJOIN_ON_MASTER_GONE, ZenDiscovery.this.rejoinOnMasterGone);
+ if (rejoinOnMasterGone != ZenDiscovery.this.rejoinOnMasterGone) {
+ logger.info("updating {} from [{}] to [{}]", SETTING_REJOIN_ON_MASTER_GONE, ZenDiscovery.this.rejoinOnMasterGone, rejoinOnMasterGone);
+ ZenDiscovery.this.rejoinOnMasterGone = rejoinOnMasterGone;
+ }
}
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java
index e67c4e2af39..33987662bfa 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
@@ -44,6 +45,7 @@ public class ZenDiscoveryModule extends AbstractModule {
@Override
protected void configure() {
+ bind(ElectMasterService.class).asEagerSingleton();
bind(ZenPingService.class).asEagerSingleton();
Multibinder unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class extends UnicastHostsProvider> unicastHostProvider : unicastHostProviders) {
diff --git a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
index bcfa1dc2f02..9ba26387ec5 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/elect/ElectMasterService.java
@@ -24,12 +24,10 @@ import com.google.common.collect.Lists;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
/**
*
@@ -42,6 +40,7 @@ public class ElectMasterService extends AbstractComponent {
private volatile int minimumMasterNodes;
+ @Inject
public ElectMasterService(Settings settings) {
super(settings);
this.minimumMasterNodes = settings.getAsInt(DISCOVERY_ZEN_MINIMUM_MASTER_NODES, -1);
@@ -69,6 +68,18 @@ public class ElectMasterService extends AbstractComponent {
return count >= minimumMasterNodes;
}
+ /**
+ * Returns the given nodes sorted by likelyhood of being elected as master, most likely first.
+ * Non-master nodes are not removed but are rather put in the end
+ * @param nodes
+ * @return
+ */
+ public List sortByMasterLikelihood(Iterable nodes) {
+ ArrayList sortedNodes = Lists.newArrayList(nodes);
+ CollectionUtil.introSort(sortedNodes, nodeComparator);
+ return sortedNodes;
+ }
+
/**
* Returns a list of the next possible masters.
*/
@@ -120,6 +131,12 @@ public class ElectMasterService extends AbstractComponent {
@Override
public int compare(DiscoveryNode o1, DiscoveryNode o2) {
+ if (o1.masterNode() && !o2.masterNode()) {
+ return -1;
+ }
+ if (!o1.masterNode() && o2.masterNode()) {
+ return 1;
+ }
return o1.id().compareTo(o2.id());
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java
new file mode 100644
index 00000000000..d3e644f2166
--- /dev/null
+++ b/src/main/java/org/elasticsearch/discovery/zen/fd/FaultDetection.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.discovery.zen.fd;
+
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportConnectionListener;
+import org.elasticsearch.transport.TransportService;
+
+import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+
+/**
+ * A base class for {@link org.elasticsearch.discovery.zen.fd.MasterFaultDetection} & {@link org.elasticsearch.discovery.zen.fd.NodesFaultDetection},
+ * making sure both use the same setting.
+ */
+public abstract class FaultDetection extends AbstractComponent {
+
+ public static final String SETTING_CONNECT_ON_NETWORK_DISCONNECT = "discovery.zen.fd.connect_on_network_disconnect";
+ public static final String SETTING_PING_INTERVAL = "discovery.zen.fd.ping_interval";
+ public static final String SETTING_PING_TIMEOUT = "discovery.zen.fd.ping_timeout";
+ public static final String SETTING_PING_RETRIES = "discovery.zen.fd.ping_retries";
+ public static final String SETTING_REGISTER_CONNECTION_LISTENER = "discovery.zen.fd.register_connection_listener";
+
+ protected final ThreadPool threadPool;
+ protected final ClusterName clusterName;
+ protected final TransportService transportService;
+
+ // used mainly for testing, should always be true
+ protected final boolean registerConnectionListener;
+ protected final FDConnectionListener connectionListener;
+ protected final boolean connectOnNetworkDisconnect;
+
+ protected final TimeValue pingInterval;
+ protected final TimeValue pingRetryTimeout;
+ protected final int pingRetryCount;
+
+ public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
+ super(settings);
+ this.threadPool = threadPool;
+ this.transportService = transportService;
+ this.clusterName = clusterName;
+
+ this.connectOnNetworkDisconnect = settings.getAsBoolean(SETTING_CONNECT_ON_NETWORK_DISCONNECT, false);
+ this.pingInterval = settings.getAsTime(SETTING_PING_INTERVAL, timeValueSeconds(1));
+ this.pingRetryTimeout = settings.getAsTime(SETTING_PING_TIMEOUT, timeValueSeconds(30));
+ this.pingRetryCount = settings.getAsInt(SETTING_PING_RETRIES, 3);
+ this.registerConnectionListener = settings.getAsBoolean(SETTING_REGISTER_CONNECTION_LISTENER, true);
+
+ this.connectionListener = new FDConnectionListener();
+ if (registerConnectionListener) {
+ transportService.addConnectionListener(connectionListener);
+ }
+ }
+
+ public void close() {
+ transportService.removeConnectionListener(connectionListener);
+ }
+
+ /**
+ * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
+ */
+ abstract void handleTransportDisconnect(DiscoveryNode node);
+
+ private class FDConnectionListener implements TransportConnectionListener {
+ @Override
+ public void onNodeConnected(DiscoveryNode node) {
+ }
+
+ @Override
+ public void onNodeDisconnected(DiscoveryNode node) {
+ handleTransportDisconnect(node);
+ }
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java
index 26fd2b00e94..49709b7905b 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java
@@ -20,9 +20,10 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@@ -35,13 +36,12 @@ import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection that pings the master periodically to see if its alive.
*/
-public class MasterFaultDetection extends AbstractComponent {
+public class MasterFaultDetection extends FaultDetection {
public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";
@@ -52,29 +52,10 @@ public class MasterFaultDetection extends AbstractComponent {
void onDisconnectedFromMaster();
}
- private final ThreadPool threadPool;
-
- private final TransportService transportService;
-
private final DiscoveryNodesProvider nodesProvider;
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>();
-
- private final boolean connectOnNetworkDisconnect;
-
- private final TimeValue pingInterval;
-
- private final TimeValue pingRetryTimeout;
-
- private final int pingRetryCount;
-
- // used mainly for testing, should always be true
- private final boolean registerConnectionListener;
-
-
- private final FDConnectionListener connectionListener;
-
private volatile MasterPinger masterPinger;
private final Object masterNodeMutex = new Object();
@@ -85,25 +66,13 @@ public class MasterFaultDetection extends AbstractComponent {
private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();
- public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, DiscoveryNodesProvider nodesProvider) {
- super(settings);
- this.threadPool = threadPool;
- this.transportService = transportService;
+ public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
+ DiscoveryNodesProvider nodesProvider, ClusterName clusterName) {
+ super(settings, threadPool, transportService, clusterName);
this.nodesProvider = nodesProvider;
- this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
- this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
- this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
- this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
- this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
-
logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
- this.connectionListener = new FDConnectionListener();
- if (registerConnectionListener) {
- transportService.addConnectionListener(connectionListener);
- }
-
transportService.registerHandler(MASTER_PING_ACTION_NAME, new MasterPingRequestHandler());
}
@@ -155,7 +124,8 @@ public class MasterFaultDetection extends AbstractComponent {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
- // start the ping process
+
+ // we start pinging slightly later to allow the chosen master to complete it's own master election
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}
@@ -181,13 +151,14 @@ public class MasterFaultDetection extends AbstractComponent {
}
public void close() {
+ super.close();
stop("closing");
this.listeners.clear();
- transportService.removeConnectionListener(connectionListener);
transportService.removeHandler(MASTER_PING_ACTION_NAME);
}
- private void handleTransportDisconnect(DiscoveryNode node) {
+ @Override
+ protected void handleTransportDisconnect(DiscoveryNode node) {
synchronized (masterNodeMutex) {
if (!node.equals(this.masterNode)) {
return;
@@ -200,7 +171,8 @@ public class MasterFaultDetection extends AbstractComponent {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
- threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
+ // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
+ threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
} catch (Exception e) {
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
@@ -237,17 +209,6 @@ public class MasterFaultDetection extends AbstractComponent {
}
}
- private class FDConnectionListener implements TransportConnectionListener {
- @Override
- public void onNodeConnected(DiscoveryNode node) {
- }
-
- @Override
- public void onNodeDisconnected(DiscoveryNode node) {
- handleTransportDisconnect(node);
- }
- }
-
private class MasterPinger implements Runnable {
private volatile boolean running = true;
@@ -268,8 +229,10 @@ public class MasterFaultDetection extends AbstractComponent {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
- transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
- new BaseTransportResponseHandler() {
+ final MasterPingRequest request = new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id(), clusterName);
+ final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
+ transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() {
+
@Override
public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
@@ -326,7 +289,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
- transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
+ transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
}
}
}
@@ -349,6 +312,14 @@ public class MasterFaultDetection extends AbstractComponent {
}
static class NotMasterException extends ElasticsearchIllegalStateException {
+
+ NotMasterException(String msg) {
+ super(msg);
+ }
+
+ NotMasterException() {
+ }
+
@Override
public Throwable fillInStackTrace() {
return null;
@@ -377,6 +348,13 @@ public class MasterFaultDetection extends AbstractComponent {
if (!request.masterNodeId.equals(nodes.localNodeId())) {
throw new NotMasterException();
}
+
+ // ping from nodes of version < 1.4.0 will have the clustername set to null
+ if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
+ logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]", request.clusterName, clusterName);
+ throw new NotMasterException("master fault detection ping request is targeted for a different [" + request.clusterName + "] cluster then us [" + clusterName + "]");
+ }
+
// if we are no longer master, fail...
if (!nodes.localNodeMaster()) {
throw new NoLongerMasterException();
@@ -400,13 +378,15 @@ public class MasterFaultDetection extends AbstractComponent {
private String nodeId;
private String masterNodeId;
+ private ClusterName clusterName;
private MasterPingRequest() {
}
- private MasterPingRequest(String nodeId, String masterNodeId) {
+ private MasterPingRequest(String nodeId, String masterNodeId, ClusterName clusterName) {
this.nodeId = nodeId;
this.masterNodeId = masterNodeId;
+ this.clusterName = clusterName;
}
@Override
@@ -414,6 +394,9 @@ public class MasterFaultDetection extends AbstractComponent {
super.readFrom(in);
nodeId = in.readString();
masterNodeId = in.readString();
+ if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
+ clusterName = ClusterName.readClusterName(in);
+ }
}
@Override
@@ -421,6 +404,9 @@ public class MasterFaultDetection extends AbstractComponent {
super.writeTo(out);
out.writeString(nodeId);
out.writeString(masterNodeId);
+ if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
+ clusterName.writeTo(out);
+ }
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
index 6f4e403610c..90012099116 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java
@@ -20,9 +20,11 @@
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@@ -35,68 +37,40 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.node.DiscoveryNodes.EMPTY_NODES;
-import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.transport.TransportRequestOptions.options;
/**
* A fault detection of multiple nodes.
*/
-public class NodesFaultDetection extends AbstractComponent {
+public class NodesFaultDetection extends FaultDetection {
public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping";
+
+ public abstract static class Listener {
- public static interface Listener {
+ public void onNodeFailure(DiscoveryNode node, String reason) {}
+
+ public void onPingReceived(PingRequest pingRequest) {}
- void onNodeFailure(DiscoveryNode node, String reason);
}
- private final ThreadPool threadPool;
-
- private final TransportService transportService;
-
-
- private final boolean connectOnNetworkDisconnect;
-
- private final TimeValue pingInterval;
-
- private final TimeValue pingRetryTimeout;
-
- private final int pingRetryCount;
-
- // used mainly for testing, should always be true
- private final boolean registerConnectionListener;
-
-
private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap nodesFD = newConcurrentMap();
- private final FDConnectionListener connectionListener;
-
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
+ private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
+
private volatile boolean running = false;
- public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
- super(settings);
- this.threadPool = threadPool;
- this.transportService = transportService;
-
- this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", true);
- this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
- this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(30));
- this.pingRetryCount = componentSettings.getAsInt("ping_retries", 3);
- this.registerConnectionListener = componentSettings.getAsBoolean("register_connection_listener", true);
+ public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
+ super(settings, threadPool, transportService, clusterName);
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerHandler(PING_ACTION_NAME, new PingRequestHandler());
-
- this.connectionListener = new FDConnectionListener();
- if (registerConnectionListener) {
- transportService.addConnectionListener(connectionListener);
- }
}
public void addListener(Listener listener) {
@@ -107,9 +81,10 @@ public class NodesFaultDetection extends AbstractComponent {
listeners.remove(listener);
}
- public void updateNodes(DiscoveryNodes nodes) {
+ public void updateNodes(DiscoveryNodes nodes, long clusterStateVersion) {
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = nodes;
+ this.clusterStateVersion = clusterStateVersion;
if (!running) {
return;
}
@@ -121,7 +96,8 @@ public class NodesFaultDetection extends AbstractComponent {
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
- threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
+ // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
+ threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, new SendPingRequest(newNode));
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
@@ -146,12 +122,13 @@ public class NodesFaultDetection extends AbstractComponent {
}
public void close() {
+ super.close();
stop();
transportService.removeHandler(PING_ACTION_NAME);
- transportService.removeConnectionListener(connectionListener);
}
- private void handleTransportDisconnect(DiscoveryNode node) {
+ @Override
+ protected void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
@@ -167,7 +144,8 @@ public class NodesFaultDetection extends AbstractComponent {
try {
transportService.connectToNode(node);
nodesFD.put(node, new NodeFD());
- threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(node));
+ // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
+ threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, new SendPingRequest(node));
} catch (Exception e) {
logger.trace("[node ] [{}] transport disconnected (with verified connect)", node);
notifyNodeFailure(node, "transport disconnected (with verified connect)");
@@ -189,6 +167,19 @@ public class NodesFaultDetection extends AbstractComponent {
});
}
+ private void notifyPingReceived(final PingRequest pingRequest) {
+ threadPool.generic().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ for (Listener listener : listeners) {
+ listener.onPingReceived(pingRequest);
+ }
+ }
+
+ });
+ }
+
private class SendPingRequest implements Runnable {
private final DiscoveryNode node;
@@ -202,8 +193,9 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) {
return;
}
- transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout),
- new BaseTransportResponseHandler() {
+ final PingRequest pingRequest = new PingRequest(node.id(), clusterName, latestNodes.localNode(), clusterStateVersion);
+ final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
+ transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler() {
@Override
public PingResponse newInstance() {
return new PingResponse();
@@ -250,8 +242,7 @@ public class NodesFaultDetection extends AbstractComponent {
}
} else {
// resend the request, not reschedule, rely on send timeout
- transportService.sendRequest(node, PING_ACTION_NAME, new PingRequest(node.id()),
- options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this);
+ transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
}
}
}
@@ -270,18 +261,6 @@ public class NodesFaultDetection extends AbstractComponent {
volatile boolean running = true;
}
- private class FDConnectionListener implements TransportConnectionListener {
- @Override
- public void onNodeConnected(DiscoveryNode node) {
- }
-
- @Override
- public void onNodeDisconnected(DiscoveryNode node) {
- handleTransportDisconnect(node);
- }
- }
-
-
class PingRequestHandler extends BaseTransportRequestHandler {
@Override
@@ -296,6 +275,15 @@ public class NodesFaultDetection extends AbstractComponent {
if (!latestNodes.localNodeId().equals(request.nodeId)) {
throw new ElasticsearchIllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + latestNodes.localNodeId() + "]");
}
+
+ // PingRequest will have clusterName set to null if it came from a node of version <1.4.0
+ if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
+ // Don't introduce new exception for bwc reasons
+ throw new ElasticsearchIllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster [" + clusterName + "]");
+ }
+
+ notifyPingReceived(request);
+
channel.sendResponse(new PingResponse());
}
@@ -306,28 +294,63 @@ public class NodesFaultDetection extends AbstractComponent {
}
- static class PingRequest extends TransportRequest {
+ public static class PingRequest extends TransportRequest {
// the (assumed) node id we are pinging
private String nodeId;
+ private ClusterName clusterName;
+
+ private DiscoveryNode masterNode;
+
+ private long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
+
PingRequest() {
}
- PingRequest(String nodeId) {
+ PingRequest(String nodeId, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
this.nodeId = nodeId;
+ this.clusterName = clusterName;
+ this.masterNode = masterNode;
+ this.clusterStateVersion = clusterStateVersion;
+ }
+
+ public String nodeId() {
+ return nodeId;
+ }
+
+ public ClusterName clusterName() {
+ return clusterName;
+ }
+
+ public DiscoveryNode masterNode() {
+ return masterNode;
+ }
+
+ public long clusterStateVersion() {
+ return clusterStateVersion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
+ if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
+ clusterName = ClusterName.readClusterName(in);
+ masterNode = DiscoveryNode.readNode(in);
+ clusterStateVersion = in.readLong();
+ }
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
+ if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
+ clusterName.writeTo(out);
+ masterNode.writeTo(out);
+ out.writeLong(clusterStateVersion);
+ }
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java
index 53ee9248eac..39f710f7acd 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java
@@ -34,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
+import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
@@ -55,20 +56,20 @@ public class ZenPingService extends AbstractLifecycleComponent implemen
// here for backward comp. with discovery plugins
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
- @Nullable Set unicastHostsProviders) {
- this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, unicastHostsProviders);
+ ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) {
+ this(settings, threadPool, transportService, clusterName, networkService, Version.CURRENT, electMasterService, unicastHostsProviders);
}
@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
- Version version, @Nullable Set unicastHostsProviders) {
+ Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) {
super(settings);
ImmutableList.Builder zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
- zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, unicastHostsProviders));
+ zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders));
this.zenPings = zenPingsBuilder.build();
}
diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
index 25a43ead8ef..123f2d7fc7f 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java
@@ -19,8 +19,12 @@
package org.elasticsearch.discovery.zen.ping.unicast;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
-import org.elasticsearch.*;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -35,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
+import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@@ -62,10 +67,11 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
+ private final ElectMasterService electMasterService;
private final int concurrentConnects;
- private final DiscoveryNode[] nodes;
+ private final DiscoveryNode[] configuredTargetNodes;
private volatile DiscoveryNodesProvider nodesProvider;
@@ -73,16 +79,18 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen
private final Map> receivedResponses = newConcurrentMap();
- // a list of temporal responses a node will return for a request (holds requests from other nodes)
+ // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
private final Queue temporalResponses = ConcurrentCollections.newQueue();
private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList<>();
- public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version, @Nullable Set unicastHostsProviders) {
+ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
+ Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
+ this.electMasterService = electMasterService;
if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
@@ -99,20 +107,20 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen
List hosts = Lists.newArrayList(hostArr);
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
- List nodes = Lists.newArrayList();
+ List configuredTargetNodes = Lists.newArrayList();
int idCounter = 0;
for (String host : hosts) {
try {
TransportAddress[] addresses = transportService.addressesFromString(host);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) {
- nodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i], version.minimumCompatibilityVersion()));
+ configuredTargetNodes.add(new DiscoveryNode("#zen_unicast_" + (++idCounter) + "#", addresses[i], version.minimumCompatibilityVersion()));
}
} catch (Exception e) {
throw new ElasticsearchIllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
}
- this.nodes = nodes.toArray(new DiscoveryNode[nodes.size()]);
+ this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
transportService.registerHandler(ACTION_NAME, new UnicastPingRequestHandler());
}
@@ -143,6 +151,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen
this.nodesProvider = nodesProvider;
}
+ /**
+ * Clears the list of cached ping responses.
+ */
+ public void clearTemporalReponses() {
+ temporalResponses.clear();
+ }
+
public PingResponse[] pingAndWait(TimeValue timeout) {
final AtomicReference response = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
@@ -237,18 +252,30 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
- HashSet nodesToPing = new HashSet<>(Arrays.asList(nodes));
+ HashSet nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
- nodesToPing.add(temporalResponse.target());
+ nodesToPingSet.add(temporalResponse.target());
}
}
for (UnicastHostsProvider provider : hostsProviders) {
- nodesToPing.addAll(provider.buildDynamicNodes());
+ nodesToPingSet.addAll(provider.buildDynamicNodes());
}
+ // add all possible master nodes that were active in the last known cluster configuration
+ for (ObjectCursor masterNode : discoNodes.getMasterNodes().values()) {
+ nodesToPingSet.add(masterNode.value);
+ }
+
+ // sort the nodes by likelihood of being an active master
+ List sortedNodesToPing = electMasterService.sortByMasterLikelihood(nodesToPingSet);
+
+ // new add the the unicast targets first
+ ArrayList nodesToPing = Lists.newArrayList(configuredTargetNodes);
+ nodesToPing.addAll(sortedNodesToPing);
+
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
index d716a336a05..1e46bbb0171 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
@@ -40,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
*
@@ -85,12 +86,15 @@ public class PublishClusterStateAction extends AbstractComponent {
publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener));
}
- private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
+ private void publish(final ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
DiscoveryNode localNode = nodesProvider.nodes().localNode();
Map serializedStates = Maps.newHashMap();
+ final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
+ final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
+
for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) {
continue;
@@ -125,28 +129,30 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override
public void handleResponse(TransportResponse.Empty response) {
+ if (timedOutWaitingForNodes.get()) {
+ logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
+ }
publishResponseHandler.onResponse(node);
}
@Override
public void handleException(TransportException exp) {
- logger.debug("failed to send cluster state to [{}]", exp, node);
+ logger.debug("failed to send cluster state to {}", exp, node);
publishResponseHandler.onFailure(node, exp);
}
});
} catch (Throwable t) {
- logger.debug("error sending cluster state to [{}]", t, node);
+ logger.debug("error sending cluster state to {}", t, node);
publishResponseHandler.onFailure(node, t);
}
}
- TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
- boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
- if (!awaited) {
- logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
+ timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout));
+ if (timedOutWaitingForNodes.get()) {
+ logger.debug("timed out waiting for all nodes to process published state [{}] (timeout [{}])", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
diff --git a/src/main/java/org/elasticsearch/gateway/GatewayService.java b/src/main/java/org/elasticsearch/gateway/GatewayService.java
index 5f5eaa8e3e5..827a6559bf9 100644
--- a/src/main/java/org/elasticsearch/gateway/GatewayService.java
+++ b/src/main/java/org/elasticsearch/gateway/GatewayService.java
@@ -35,7 +35,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
@@ -134,12 +133,6 @@ public class GatewayService extends AbstractLifecycleComponent i
if (lifecycle.stoppedOrClosed()) {
return;
}
- if (event.state().blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
- // we need to clear those flags, since we might need to recover again in case we disconnect
- // from the cluster and then reconnect
- recovered.set(false);
- scheduledRecovery.set(false);
- }
if (event.localNodeMaster() && event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
checkStateMeetsSettingsAndMaybeRecover(event.state(), true);
}
@@ -147,7 +140,7 @@ public class GatewayService extends AbstractLifecycleComponent i
protected void checkStateMeetsSettingsAndMaybeRecover(ClusterState state, boolean asyncRecovery) {
DiscoveryNodes nodes = state.nodes();
- if (state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
+ if (state.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
logger.debug("not recovering from gateway, no master elected yet");
} else if (recoverAfterNodes != -1 && (nodes.masterAndDataNodes().size()) < recoverAfterNodes) {
logger.debug("not recovering from gateway, nodes_size (data+master) [" + nodes.masterAndDataNodes().size() + "] < recover_after_nodes [" + recoverAfterNodes + "]");
diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
index 02420d0e3d5..ecf5e6b6b22 100644
--- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
+++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
@@ -307,7 +307,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return;
}
- clusterService.submitStateUpdateTask("indices_store", new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("indices_store", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (clusterState.getVersion() != currentState.getVersion()) {
diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java
index e922f1b4932..e2e6f502e89 100644
--- a/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -245,6 +245,10 @@ public class TransportService extends AbstractLifecycleComponent implements Transport {
private final ThreadPool threadPool;
+ private final ThreadPoolExecutor workers;
private final Version version;
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
@@ -58,13 +62,20 @@ public class LocalTransport extends AbstractLifecycleComponent implem
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap connectedNodes = newConcurrentMap();
- public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local_address";
+ public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
+ public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";
+ public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
@Inject
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings);
this.threadPool = threadPool;
this.version = version;
+
+ int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings));
+ int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
+ logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
+ this.workers = EsExecutors.newFixed(workerCount, queueSize, EsExecutors.daemonThreadFactory(this.settings, "local_transport"));
}
@Override
@@ -106,6 +117,13 @@ public class LocalTransport extends AbstractLifecycleComponent implem
@Override
protected void doClose() throws ElasticsearchException {
+ workers.shutdown();
+ try {
+ workers.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ workers.shutdownNow();
}
@Override
@@ -185,7 +203,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem
transportServiceAdapter.sent(data.length);
- threadPool.generic().execute(new Runnable() {
+ targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, version, requestId);
@@ -193,8 +211,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem
});
}
- ThreadPool threadPool() {
- return this.threadPool;
+ ThreadPoolExecutor workers() {
+ return this.workers;
}
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
index f4d5e83053a..f316e9ba69d 100644
--- a/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
+++ b/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java
@@ -72,7 +72,7 @@ public class LocalTransportChannel implements TransportChannel {
response.writeTo(stream);
stream.close();
final byte[] data = bStream.bytes().toBytes();
- targetTransport.threadPool().generic().execute(new Runnable() {
+ targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
@@ -98,7 +98,7 @@ public class LocalTransportChannel implements TransportChannel {
too.close();
}
final byte[] data = stream.bytes().toBytes();
- targetTransport.threadPool().generic().execute(new Runnable() {
+ targetTransport.workers().execute(new Runnable() {
@Override
public void run() {
targetTransport.messageReceived(data, action, sourceTransport, version, null);
diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java
index e706e400658..a335f47b53c 100644
--- a/src/main/java/org/elasticsearch/tribe/TribeService.java
+++ b/src/main/java/org/elasticsearch/tribe/TribeService.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
@@ -43,7 +42,7 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
@@ -53,7 +52,6 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
/**
* The tribe service holds a list of node clients connected to a list of tribe members, and uses their
@@ -121,7 +119,7 @@ public class TribeService extends AbstractLifecycleComponent {
private final List nodes = Lists.newCopyOnWriteArrayList();
@Inject
- public TribeService(Settings settings, ClusterService clusterService) {
+ public TribeService(Settings settings, ClusterService clusterService, DiscoveryService discoveryService) {
super(settings);
this.clusterService = clusterService;
Map nodesSettings = Maps.newHashMap(settings.getGroups("tribe", true));
@@ -143,7 +141,7 @@ public class TribeService extends AbstractLifecycleComponent {
if (!nodes.isEmpty()) {
// remove the initial election / recovery blocks since we are not going to have a
// master elected in this single tribe node local "cluster"
- clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK);
+ clusterService.removeInitialStateBlock(discoveryService.getNoMasterBlock());
clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
if (settings.getAsBoolean("tribe.blocks.write", false)) {
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);
@@ -222,7 +220,7 @@ public class TribeService extends AbstractLifecycleComponent {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
- clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
+ clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState tribeState = event.state();
diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
index dde9eedc4e1..1d0a2038615 100644
--- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
+++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
@@ -19,6 +19,7 @@
package org.elasticsearch.cluster;
import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.ListenableFuture;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
@@ -256,6 +257,58 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
assertThat(processedLatch.await(1, TimeUnit.SECONDS), equalTo(true));
}
+ @Test
+ public void testMasterAwareExecution() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("discovery.type", "local")
+ .build();
+
+ ListenableFuture master = internalCluster().startNodeAsync(settings);
+ ListenableFuture nonMaster = internalCluster().startNodeAsync(settingsBuilder().put(settings).put("node.master", false).build());
+ master.get();
+ ensureGreen(); // make sure we have a cluster
+
+ ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nonMaster.get());
+
+ final boolean[] taskFailed = {false};
+ final CountDownLatch latch1 = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ latch1.countDown();
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ taskFailed[0] = true;
+ latch1.countDown();
+ }
+ });
+
+ latch1.await();
+ assertTrue("cluster state update task was executed on a non-master", taskFailed[0]);
+
+ taskFailed[0] = true;
+ final CountDownLatch latch2 = new CountDownLatch(1);
+ clusterService.submitStateUpdateTask("test", new ClusterStateNonMasterUpdateTask() {
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ taskFailed[0] = false;
+ latch2.countDown();
+ return currentState;
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ taskFailed[0] = true;
+ latch2.countDown();
+ }
+ });
+ latch2.await();
+ assertFalse("non-master cluster state update task was not executed", taskFailed[0]);
+ }
+
@Test
public void testAckedUpdateTaskNoAckExpected() throws Exception {
Settings settings = settingsBuilder()
@@ -655,7 +708,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
}
}
- private static class BlockingTask implements ClusterStateUpdateTask {
+ private static class BlockingTask extends ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
@@ -674,7 +727,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
}
- private static class PrioritiezedTask implements ClusterStateUpdateTask {
+ private static class PrioritiezedTask extends ClusterStateUpdateTask {
private final Priority priority;
private final CountDownLatch latch;
diff --git a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java
index 3fe477cc989..5e63990fe04 100644
--- a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java
+++ b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@@ -60,7 +60,7 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
logger.info("--> should be blocked, no master...");
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
- assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
+ assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state
logger.info("--> start second node, cluster should be formed");
@@ -70,9 +70,9 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
- assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
+ assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
- assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(false));
+ assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(false));
state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(2));
@@ -98,11 +98,11 @@ public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
awaitBusy(new Predicate