Refactor retry logic for TransportMasterNodeAction

- Same as for TransportInstanceSingleOperationAction and TransportReplicationAction, onClusterServiceClose consistently throws a NodeClosedException now.
- Added retry logic if master could not publish cluster state or stepped down before publishing (ZenDiscovery). The test IndexingMasterFailoverIT shows the issue.
- Simplified retry logic by moving bits from different places into shared retry method.
  - Removed boolean flag retrying that aborted retrying after a single master node change (now we retry until timeout).
  - Two existing predicates that deal with master node changes unified in a single predicate masterNodeChangedPredicate

Closes #14222
This commit is contained in:
Yannick Welsch 2015-10-20 17:24:44 +02:00
parent c6b41bc704
commit 0575744a39
8 changed files with 251 additions and 146 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
@ -29,16 +30,16 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
@ -49,6 +50,19 @@ import java.util.function.Supplier;
* A base class for operations that needs to be performed on the master node.
*/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
private static final ClusterStateObserver.ChangePredicate masterNodeChangedPredicate = new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
// The condition !newState.nodes().masterNodeId().equals(previousState.nodes().masterNodeId()) is not sufficient as the same master node might get reelected after a disruption.
return newState.nodes().masterNodeId() != null && newState != previousState;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
};
protected final TransportService transportService;
protected final ClusterService clusterService;
@ -76,10 +90,6 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
}
@Override
protected void doExecute(final Request request, ActionListener<Response> listener) {
new AsyncSingleAction(request, listener).start();
@ -91,6 +101,14 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
private final Request request;
private volatile ClusterStateObserver observer;
private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
};
AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request;
// TODO do we really need to wrap it in a listener? the handlers should be cheap
@ -102,10 +120,10 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger);
doStart(false);
doStart();
}
protected void doStart(boolean retrying) {
protected void doStart() {
final ClusterState clusterState = observer.observedState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
@ -114,131 +132,83 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
return;
} else {
logger.trace("can't execute due to a cluster block, retrying", blockException);
retry(blockException, retryableOrNoBlockPredicate);
}
logger.trace("can't execute due to a cluster block, retrying", blockException);
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart(false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(blockException);
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
}
);
} else {
threadPool.executor(executor).execute(new ActionRunnable(listener) {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName);
retry(t, masterNodeChangedPredicate);
} else {
listener.onFailure(t);
}
}
};
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
@Override
protected void doRun() throws Exception {
masterOperation(request, clusterService.state(), listener);
masterOperation(request, clusterService.state(), delegate);
}
});
}
} else {
if (nodes.masterNode() == null) {
if (retrying) {
listener.onFailure(new MasterNotDiscoveredException());
} else {
logger.debug("no known master node, scheduling a retry");
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart(true);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
}, new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return newState.nodes().masterNodeId() != null;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
}
return;
}
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponse();
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleException(final TransportException exp) {
if (exp.unwrapCause() instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
nodes.masterNode(), exp.getDetailedMessage());
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart(false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException());
}
}, new ClusterStateObserver.EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
} else {
listener.onFailure(exp);
logger.debug("no known master node, scheduling a retry");
retry(new MasterNotDiscoveredException(), masterNodeChangedPredicate);
} else {
transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {
return newResponse();
}
}
});
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
actionName, nodes.masterNode(), exp.getDetailedMessage());
retry(cause, masterNodeChangedPredicate);
} else {
listener.onFailure(exp);
}
}
});
}
}
}
private void retry(final Throwable failure, final ClusterStateObserver.ChangePredicate changePredicate) {
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart();
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
logger.debug("timed out while retrying [{}] after failure (timeout [{}])", failure, actionName, timeout);
listener.onFailure(failure);
}
}, changePredicate
);
}
}
}

View File

@ -51,7 +51,7 @@ abstract public class ClusterStateUpdateTask {
* called when the task was rejected because the local node is no longer master
*/
public void onNoLongerMaster(String source) {
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
}
/**

View File

@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
package org.elasticsearch.cluster;
/**
* Thrown when a node join request or a master ping reaches a node which is not
* currently acting as a master.
* currently acting as a master or when a cluster state update task is to be executed
* on a node that is no longer master.
*/
public class NotMasterException extends IllegalStateException {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;

View File

@ -31,7 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.NotMasterException;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

View File

@ -0,0 +1,116 @@
package org.elasticsearch.action.support.master;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
import org.elasticsearch.test.disruption.NetworkPartition;
import org.elasticsearch.test.transport.MockTransportService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class IndexingMasterFailoverIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins());
classes.add(MockTransportService.TestPlugin.class);
return classes;
}
/**
* Indexing operations which entail mapping changes require a blocking request to the master node to update the mapping.
* If the master node is being disrupted or if it cannot commit cluster state changes, it needs to retry within timeout limits.
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
*/
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");
final Settings sharedSettings = Settings.builder()
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly
.put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.build();
internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get();
String dataNode = internalCluster().startDataOnlyNode(sharedSettings);
logger.info("--> wait for all nodes to join the cluster");
ensureStableCluster(4);
// We index data with mapping changes into cluster and have master failover at same time
client().admin().indices().prepareCreate("myindex")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.get();
ensureGreen("myindex");
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread indexingThread = new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException e) {
logger.warn("Barrier interrupted", e);
return;
} catch (BrokenBarrierException e) {
logger.warn("Broken barrier", e);
return;
}
for (int i = 0; i < 10; i++) {
// index data with mapping changes
IndexResponse response = client(dataNode).prepareIndex("myindex", "mytype").setSource("field_" + i, "val").get();
assertThat(response.isCreated(), equalTo(true));
}
}
});
indexingThread.setName("indexingThread");
indexingThread.start();
barrier.await();
// interrupt communication between master and other nodes in cluster
String master = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
otherNodes.remove(master);
NetworkPartition partition = new NetworkDisconnectPartition(Collections.singleton(master), otherNodes, random());
internalCluster().setDisruptionScheme(partition);
logger.info("--> disrupting network");
partition.startDisrupting();
logger.info("--> waiting for new master to be elected");
ensureStableCluster(3, dataNode);
partition.stopDisrupting();
logger.info("--> waiting to heal");
ensureStableCluster(4);
indexingThread.join();
ensureGreen("myindex");
refresh();
assertThat(client().prepareSearch("myindex").get().getHits().getTotalHits(), equalTo(10L));
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -38,6 +39,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
@ -256,17 +259,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final AtomicBoolean delegationToMaster = new AtomicBoolean();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
logger.debug("Delegation to master called");
delegationToMaster.set(true);
}
}.execute(request, listener);
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isMasterNode());
@ -285,17 +279,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final AtomicBoolean delegationToMaster = new AtomicBoolean();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
logger.debug("Delegation to master called");
delegationToMaster.set(true);
}
}.execute(request, listener);
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isMasterNode());
@ -320,4 +305,35 @@ public class TransportMasterNodeActionTests extends ESTestCase {
}
}
}
public void testMasterFailoverAfterStepDown() throws ExecutionException, InterruptedException {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueHours(1));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final Response response = new Response();
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
// The other node has become master, simulate failures of this node while publishing cluster state through ZenDiscovery
TransportMasterNodeActionTests.this.clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
Throwable failure = randomBoolean()
? new Discovery.FailedToCommitClusterStateException("Fake error")
: new NotMasterException("Fake error");
listener.onFailure(failure);
}
}.execute(request, listener);
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isMasterNode());
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("testAction"));
transport.handleResponse(capturedRequest.requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;