From 0575744a39152c695842195d05ed6b213b4e9c16 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 20 Oct 2015 17:24:44 +0200 Subject: [PATCH] Refactor retry logic for TransportMasterNodeAction - Same as for TransportInstanceSingleOperationAction and TransportReplicationAction, onClusterServiceClose consistently throws a NodeClosedException now. - Added retry logic if master could not publish cluster state or stepped down before publishing (ZenDiscovery). The test IndexingMasterFailoverIT shows the issue. - Simplified retry logic by moving bits from different places into shared retry method. - Removed boolean flag retrying that aborted retrying after a single master node change (now we retry until timeout). - Two existing predicates that deal with master node changes unified in a single predicate masterNodeChangedPredicate Closes #14222 --- .../master/TransportMasterNodeAction.java | 214 ++++++++---------- .../cluster/ClusterStateUpdateTask.java | 2 +- .../zen => cluster}/NotMasterException.java | 5 +- .../discovery/zen/NodeJoinController.java | 1 + .../zen/fd/MasterFaultDetection.java | 2 +- .../master/IndexingMasterFailoverIT.java | 116 ++++++++++ .../TransportMasterNodeActionTests.java | 56 +++-- .../zen/NodeJoinControllerTests.java | 1 + 8 files changed, 251 insertions(+), 146 deletions(-) rename core/src/main/java/org/elasticsearch/{discovery/zen => cluster}/NotMasterException.java (87%) create mode 100644 core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 113f1b2912c..eff1e7bd13c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.master; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; @@ -29,16 +30,16 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; @@ -49,6 +50,19 @@ import java.util.function.Supplier; * A base class for operations that needs to be performed on the master node. */ public abstract class TransportMasterNodeAction extends HandledTransportAction { + private static final ClusterStateObserver.ChangePredicate masterNodeChangedPredicate = new ClusterStateObserver.ChangePredicate() { + @Override + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, + ClusterState newState, ClusterState.ClusterStateStatus newStatus) { + // The condition !newState.nodes().masterNodeId().equals(previousState.nodes().masterNodeId()) is not sufficient as the same master node might get reelected after a disruption. + return newState.nodes().masterNodeId() != null && newState != previousState; + } + + @Override + public boolean apply(ClusterChangedEvent event) { + return event.nodesDelta().masterNodeChanged(); + } + }; protected final TransportService transportService; protected final ClusterService clusterService; @@ -76,10 +90,6 @@ public abstract class TransportMasterNodeAction listener) { new AsyncSingleAction(request, listener).start(); @@ -91,6 +101,14 @@ public abstract class TransportMasterNodeAction listener) { this.request = request; // TODO do we really need to wrap it in a listener? the handlers should be cheap @@ -102,10 +120,10 @@ public abstract class TransportMasterNodeAction delegate = new ActionListener() { + @Override + public void onResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof Discovery.FailedToCommitClusterStateException + || (t instanceof NotMasterException)) { + logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName); + retry(t, masterNodeChangedPredicate); + } else { + listener.onFailure(t); + } + } + }; + threadPool.executor(executor).execute(new ActionRunnable(delegate) { @Override protected void doRun() throws Exception { - masterOperation(request, clusterService.state(), listener); + masterOperation(request, clusterService.state(), delegate); } }); } } else { if (nodes.masterNode() == null) { - if (retrying) { - listener.onFailure(new MasterNotDiscoveredException()); - } else { - logger.debug("no known master node, scheduling a retry"); - observer.waitForNextChange( - new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - doStart(true); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]")); - } - }, new ClusterStateObserver.ChangePredicate() { - @Override - public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, - ClusterState newState, ClusterState.ClusterStateStatus newStatus) { - return newState.nodes().masterNodeId() != null; - } - - @Override - public boolean apply(ClusterChangedEvent event) { - return event.nodesDelta().masterNodeChanged(); - } - } - ); - } - return; - } - processBeforeDelegationToMaster(request, clusterState); - transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler() { - @Override - public Response newInstance() { - return newResponse(); - } - - @Override - public void handleResponse(Response response) { - listener.onResponse(response); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleException(final TransportException exp) { - if (exp.unwrapCause() instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]", - nodes.masterNode(), exp.getDetailedMessage()); - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - doStart(false); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new MasterNotDiscoveredException()); - } - }, new ClusterStateObserver.EventPredicate() { - @Override - public boolean apply(ClusterChangedEvent event) { - return event.nodesDelta().masterNodeChanged(); - } - } - ); - } else { - listener.onFailure(exp); + logger.debug("no known master node, scheduling a retry"); + retry(new MasterNotDiscoveredException(), masterNodeChangedPredicate); + } else { + transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler(listener) { + @Override + public Response newInstance() { + return newResponse(); } - } - }); + + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]", + actionName, nodes.masterNode(), exp.getDetailedMessage()); + retry(cause, masterNodeChangedPredicate); + } else { + listener.onFailure(exp); + } + } + }); + } } } + + private void retry(final Throwable failure, final ClusterStateObserver.ChangePredicate changePredicate) { + observer.waitForNextChange( + new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + doStart(); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + logger.debug("timed out while retrying [{}] after failure (timeout [{}])", failure, actionName, timeout); + listener.onFailure(failure); + } + }, changePredicate + ); + } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index c0f1438d432..7fef94d5c17 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -51,7 +51,7 @@ abstract public class ClusterStateUpdateTask { * called when the task was rejected because the local node is no longer master */ public void onNoLongerMaster(String source) { - onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]")); + onFailure(source, new NotMasterException("no longer master. source: [" + source + "]")); } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NotMasterException.java b/core/src/main/java/org/elasticsearch/cluster/NotMasterException.java similarity index 87% rename from core/src/main/java/org/elasticsearch/discovery/zen/NotMasterException.java rename to core/src/main/java/org/elasticsearch/cluster/NotMasterException.java index d78d22aa983..892510418e4 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NotMasterException.java +++ b/core/src/main/java/org/elasticsearch/cluster/NotMasterException.java @@ -16,13 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.discovery.zen; +package org.elasticsearch.cluster; /** * Thrown when a node join request or a master ping reaches a node which is not - * currently acting as a master. + * currently acting as a master or when a cluster state update task is to be executed + * on a node that is no longer master. */ public class NotMasterException extends IllegalStateException { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 5f2cbaa9c84..7eb0a0e8bac 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 3163e061692..8e337dd90c4 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -31,7 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.zen.NotMasterException; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java new file mode 100644 index 00000000000..fdb482774c6 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -0,0 +1,116 @@ +package org.elasticsearch.action.support.master; + +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.discovery.zen.fd.FaultDetection; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.disruption.NetworkDisconnectPartition; +import org.elasticsearch.test.disruption.NetworkPartition; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode +public class IndexingMasterFailoverIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final HashSet> classes = new HashSet<>(super.nodePlugins()); + classes.add(MockTransportService.TestPlugin.class); + return classes; + } + + /** + * Indexing operations which entail mapping changes require a blocking request to the master node to update the mapping. + * If the master node is being disrupted or if it cannot commit cluster state changes, it needs to retry within timeout limits. + * This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario. + */ + public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable { + logger.info("--> start 4 nodes, 3 master, 1 data"); + + final Settings sharedSettings = Settings.builder() + .put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly + .put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly + .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out + .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2) + .put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default + .build(); + + internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get(); + + String dataNode = internalCluster().startDataOnlyNode(sharedSettings); + + logger.info("--> wait for all nodes to join the cluster"); + ensureStableCluster(4); + + // We index data with mapping changes into cluster and have master failover at same time + client().admin().indices().prepareCreate("myindex") + .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) + .get(); + ensureGreen("myindex"); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + Thread indexingThread = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (InterruptedException e) { + logger.warn("Barrier interrupted", e); + return; + } catch (BrokenBarrierException e) { + logger.warn("Broken barrier", e); + return; + } + for (int i = 0; i < 10; i++) { + // index data with mapping changes + IndexResponse response = client(dataNode).prepareIndex("myindex", "mytype").setSource("field_" + i, "val").get(); + assertThat(response.isCreated(), equalTo(true)); + } + } + }); + indexingThread.setName("indexingThread"); + indexingThread.start(); + + barrier.await(); + + // interrupt communication between master and other nodes in cluster + String master = internalCluster().getMasterName(); + Set otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames())); + otherNodes.remove(master); + + NetworkPartition partition = new NetworkDisconnectPartition(Collections.singleton(master), otherNodes, random()); + internalCluster().setDisruptionScheme(partition); + + logger.info("--> disrupting network"); + partition.startDisrupting(); + + logger.info("--> waiting for new master to be elected"); + ensureStableCluster(3, dataNode); + + partition.stopDisrupting(); + logger.info("--> waiting to heal"); + ensureStableCluster(4); + + indexingThread.join(); + + ensureGreen("myindex"); + refresh(); + assertThat(client().prepareSearch("myindex").get().getHits().getTotalHits(), equalTo(10L)); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 59a69149bed..74b78840fd4 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -38,6 +39,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -256,17 +259,8 @@ public class TransportMasterNodeActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); PlainActionFuture listener = new PlainActionFuture<>(); - final AtomicBoolean delegationToMaster = new AtomicBoolean(); + new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener); - new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected void processBeforeDelegationToMaster(Request request, ClusterState state) { - logger.debug("Delegation to master called"); - delegationToMaster.set(true); - } - }.execute(request, listener); - - assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get()); assertThat(transport.capturedRequests().length, equalTo(1)); CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; assertTrue(capturedRequest.node.isMasterNode()); @@ -285,17 +279,8 @@ public class TransportMasterNodeActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); PlainActionFuture listener = new PlainActionFuture<>(); - final AtomicBoolean delegationToMaster = new AtomicBoolean(); + new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener); - new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected void processBeforeDelegationToMaster(Request request, ClusterState state) { - logger.debug("Delegation to master called"); - delegationToMaster.set(true); - } - }.execute(request, listener); - - assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get()); assertThat(transport.capturedRequests().length, equalTo(1)); CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; assertTrue(capturedRequest.node.isMasterNode()); @@ -320,4 +305,35 @@ public class TransportMasterNodeActionTests extends ESTestCase { } } } + + public void testMasterFailoverAfterStepDown() throws ExecutionException, InterruptedException { + Request request = new Request().masterNodeTimeout(TimeValue.timeValueHours(1)); + PlainActionFuture listener = new PlainActionFuture<>(); + + final Response response = new Response(); + + clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + // The other node has become master, simulate failures of this node while publishing cluster state through ZenDiscovery + TransportMasterNodeActionTests.this.clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + Throwable failure = randomBoolean() + ? new Discovery.FailedToCommitClusterStateException("Fake error") + : new NotMasterException("Fake error"); + listener.onFailure(failure); + } + }.execute(request, listener); + + assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; + assertTrue(capturedRequest.node.isMasterNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("testAction")); + + transport.handleResponse(capturedRequest.requestId, response); + assertTrue(listener.isDone()); + assertThat(listener.get(), equalTo(response)); + } } \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index e4089b37fce..fda82104de8 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingService;