Discovery: make sure NodeJoinController.ElectionCallback is always called from the update cluster state thread

This is important for correct handling of the joining thread. This causes assertions to trip in our test runs. See http://build-us-00.elastic.co/job/es_g1gc_master_metal/11653/ as an example

Closes #12372
This commit is contained in:
Boaz Leskes 2015-07-21 11:37:50 +02:00
parent 7adf9929ba
commit 5def4193e9
6 changed files with 65 additions and 18 deletions

View File

@ -337,6 +337,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return updateTasksExecutor.getMaxTaskWaitTime();
}
/** asserts that the current thread is the cluster state update thread */
public boolean assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
return true;
}
static abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;

View File

@ -21,12 +21,14 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -77,11 +79,11 @@ public class NodeJoinController extends AbstractComponent {
* @param callback the result of the election (success or failure) will be communicated by calling methods on this
* object
**/
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final Callback callback) {
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
final CountDownLatch done = new CountDownLatch(1);
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins, clusterService) {
@Override
void onClose() {
if (electionContext.compareAndSet(this, null)) {
@ -95,7 +97,7 @@ public class NodeJoinController extends AbstractComponent {
if (electionContext.compareAndSet(null, newContext) == false) {
// should never happen, but be conservative
callback.onFailure(new IllegalStateException("double waiting for election"));
failContext(newContext, new IllegalStateException("double waiting for election"));
return;
}
try {
@ -118,13 +120,35 @@ public class NodeJoinController extends AbstractComponent {
logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes);
}
// callback will clear the context, if it's active
newContext.onFailure(new ElasticsearchTimeoutException("timed out waiting to be elected"));
failContext(newContext, new ElasticsearchTimeoutException("timed out waiting to be elected"));
} catch (Throwable t) {
logger.error("unexpected failure while waiting for incoming joins", t);
newContext.onFailure(t);
failContext(newContext, "unexpected failure while waiting for pending joins", t);
}
}
private void failContext(final ElectionContext context, final Throwable throwable) {
failContext(context, throwable.getMessage(), throwable);
}
/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
context.onFailure(throwable);
return currentState;
}
@Override
public void onFailure(String source, Throwable updateFailure) {
logger.warn("unexpected error while trying to fail election context due to [{}]. original exception [{}]", updateFailure, reason, throwable);
context.onFailure(updateFailure);
}
});
}
/**
* Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a
* master or when {@link #stopAccumulatingJoins()} is called.
@ -252,23 +276,33 @@ public class NodeJoinController extends AbstractComponent {
}
public interface Callback {
public interface ElectionCallback {
/**
* called when the local node is successfully elected as master
* Guaranteed to be called on the cluster state update thread
**/
void onElectedAsMaster(ClusterState state);
/**
* called when the local node failed to be elected as master
* Guaranteed to be called on the cluster state update thread
**/
void onFailure(Throwable t);
}
static abstract class ElectionContext implements Callback {
private final Callback callback;
static abstract class ElectionContext implements ElectionCallback {
private final ElectionCallback callback;
private final int requiredMasterJoins;
private final ClusterService clusterService;
/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();
ElectionContext(Callback callback, int requiredMasterJoins) {
ElectionContext(ElectionCallback callback, int requiredMasterJoins, ClusterService clusterService) {
this.callback = callback;
this.requiredMasterJoins = requiredMasterJoins;
this.clusterService = clusterService;
}
abstract void onClose();
@ -276,6 +310,8 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onElectedAsMaster(ClusterState state) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
assertClusterStateThread();
assert state.nodes().localNodeMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) {
try {
onClose();
@ -287,6 +323,7 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onFailure(Throwable t) {
assertClusterStateThread();
if (closed.compareAndSet(false, true)) {
try {
onClose();
@ -295,6 +332,10 @@ public class NodeJoinController extends AbstractComponent {
}
}
}
private void assertClusterStateThread() {
assert clusterService instanceof InternalClusterService == false || ((InternalClusterService) clusterService).assertClusterStateThread();
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
@ -372,7 +371,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.Callback() {
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
@ -1307,7 +1306,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
private void assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
assert clusterService instanceof InternalClusterService == false || ((InternalClusterService) clusterService).assertClusterStateThread();
}
}

View File

@ -45,7 +45,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@LuceneTestCase.AwaitsFix(bugUrl = "boaz is looking at failures in this test class. Example failure: http://build-us-00.elastic.co/job/es_g1gc_master_metal/11653/")
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {
@Test

View File

@ -157,7 +157,7 @@ public class NodeJoinControllerTests extends ElasticsearchTestCase {
@Override
protected void doRun() throws Exception {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
@ -205,7 +205,7 @@ public class NodeJoinControllerTests extends ElasticsearchTestCase {
@Override
protected void doRun() throws Exception {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
@ -323,7 +323,7 @@ public class NodeJoinControllerTests extends ElasticsearchTestCase {
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
@ -440,7 +440,7 @@ public class NodeJoinControllerTests extends ElasticsearchTestCase {
logger.info("--> waiting to be elected as master (required joins [{}])", requiredJoins);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));

View File

@ -194,7 +194,8 @@ public class TestClusterService implements ClusterService {
try {
newState = updateTask.execute(previousClusterState);
} catch (Exception e) {
throw new ElasticsearchException("failed to process cluster state update task [" + source + "]", e);
updateTask.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", e));
return;
}
setStateAndNotifyListeners(newState);
if (updateTask instanceof ProcessedClusterStateUpdateTask) {