Merge pull request #14222 from ywelsch/fix/refactor-transport-master-node-action
Refactor retry logic for TransportMasterNodeAction
This commit is contained in:
commit
33ca2ec597
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.support.master;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -29,15 +30,16 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -48,6 +50,19 @@ import java.util.function.Supplier;
|
|||
* A base class for operations that needs to be performed on the master node.
|
||||
*/
|
||||
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
|
||||
private static final ClusterStateObserver.ChangePredicate masterNodeChangedPredicate = new ClusterStateObserver.ChangePredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
|
||||
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
|
||||
// The condition !newState.nodes().masterNodeId().equals(previousState.nodes().masterNodeId()) is not sufficient as the same master node might get reelected after a disruption.
|
||||
return newState.nodes().masterNodeId() != null && newState != previousState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
};
|
||||
|
||||
protected final TransportService transportService;
|
||||
protected final ClusterService clusterService;
|
||||
|
@ -75,152 +90,125 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
|
||||
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
|
||||
|
||||
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, ActionListener<Response> listener) {
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
|
||||
new AsyncSingleAction(request, listener).start();
|
||||
}
|
||||
|
||||
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
|
||||
final ClusterState clusterState = observer.observedState();
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.localNodeMaster() || localExecute(request)) {
|
||||
// check for block, if blocked, retry, else, execute locally
|
||||
final ClusterBlockException blockException = checkBlock(request, clusterState);
|
||||
if (blockException != null) {
|
||||
if (!blockException.retryable()) {
|
||||
listener.onFailure(blockException);
|
||||
return;
|
||||
}
|
||||
logger.trace("can't execute due to a cluster block, retrying", blockException);
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, false);
|
||||
}
|
||||
class AsyncSingleAction {
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
private final ActionListener<Response> listener;
|
||||
private final Request request;
|
||||
private volatile ClusterStateObserver observer;
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
}, new ClusterStateObserver.ValidationPredicate() {
|
||||
@Override
|
||||
protected boolean validate(ClusterState newState) {
|
||||
ClusterBlockException blockException = checkBlock(request, newState);
|
||||
return (blockException == null || !blockException.retryable());
|
||||
private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() {
|
||||
@Override
|
||||
protected boolean validate(ClusterState newState) {
|
||||
ClusterBlockException blockException = checkBlock(request, newState);
|
||||
return (blockException == null || !blockException.retryable());
|
||||
}
|
||||
};
|
||||
|
||||
AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger);
|
||||
doStart();
|
||||
}
|
||||
|
||||
protected void doStart() {
|
||||
final ClusterState clusterState = observer.observedState();
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.localNodeMaster() || localExecute(request)) {
|
||||
// check for block, if blocked, retry, else, execute locally
|
||||
final ClusterBlockException blockException = checkBlock(request, clusterState);
|
||||
if (blockException != null) {
|
||||
if (!blockException.retryable()) {
|
||||
listener.onFailure(blockException);
|
||||
} else {
|
||||
logger.trace("can't execute due to a cluster block, retrying", blockException);
|
||||
retry(blockException, retryableOrNoBlockPredicate);
|
||||
}
|
||||
} else {
|
||||
ActionListener<Response> delegate = new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (t instanceof Discovery.FailedToCommitClusterStateException
|
||||
|| (t instanceof NotMasterException)) {
|
||||
logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName);
|
||||
retry(t, masterNodeChangedPredicate);
|
||||
} else {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
};
|
||||
threadPool.executor(executor).execute(new ActionRunnable(delegate) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
masterOperation(request, clusterService.state(), delegate);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
threadPool.executor(executor).execute(new ActionRunnable(listener) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
masterOperation(request, clusterService.state(), listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (nodes.masterNode() == null) {
|
||||
if (retrying) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
} else {
|
||||
if (nodes.masterNode() == null) {
|
||||
logger.debug("no known master node, scheduling a retry");
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, true);
|
||||
}
|
||||
retry(new MasterNotDiscoveredException(), masterNodeChangedPredicate);
|
||||
} else {
|
||||
transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
|
||||
}
|
||||
}, new ClusterStateObserver.ChangePredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
|
||||
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
|
||||
return newState.nodes().masterNodeId() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
Throwable cause = exp.unwrapCause();
|
||||
if (cause instanceof ConnectTransportException) {
|
||||
// we want to retry here a bit to see if a new master is elected
|
||||
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
|
||||
actionName, nodes.masterNode(), exp.getDetailedMessage());
|
||||
retry(cause, masterNodeChangedPredicate);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
processBeforeDelegationToMaster(request, clusterState);
|
||||
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException) {
|
||||
// we want to retry here a bit to see if a new master is elected
|
||||
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
|
||||
nodes.masterNode(), exp.getDetailedMessage());
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
}
|
||||
}, new ClusterStateObserver.EventPredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
private void retry(final Throwable failure, final ClusterStateObserver.ChangePredicate changePredicate) {
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
doStart();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
logger.debug("timed out while retrying [{}] after failure (timeout [{}])", failure, actionName, timeout);
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}, changePredicate
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ abstract public class ClusterStateUpdateTask {
|
|||
* called when the task was rejected because the local node is no longer master
|
||||
*/
|
||||
public void onNoLongerMaster(String source) {
|
||||
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
|
||||
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -16,13 +16,14 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.discovery.zen;
|
||||
package org.elasticsearch.cluster;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Thrown when a node join request or a master ping reaches a node which is not
|
||||
* currently acting as a master.
|
||||
* currently acting as a master or when a cluster state update task is to be executed
|
||||
* on a node that is no longer master.
|
||||
*/
|
||||
public class NotMasterException extends IllegalStateException {
|
||||
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.NotMasterException;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
package org.elasticsearch.action.support.master;
|
||||
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
|
||||
import org.elasticsearch.test.disruption.NetworkPartition;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
public class IndexingMasterFailoverIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins());
|
||||
classes.add(MockTransportService.TestPlugin.class);
|
||||
return classes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indexing operations which entail mapping changes require a blocking request to the master node to update the mapping.
|
||||
* If the master node is being disrupted or if it cannot commit cluster state changes, it needs to retry within timeout limits.
|
||||
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
|
||||
*/
|
||||
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
|
||||
logger.info("--> start 4 nodes, 3 master, 1 data");
|
||||
|
||||
final Settings sharedSettings = Settings.builder()
|
||||
.put(FaultDetection.SETTING_PING_TIMEOUT, "1s") // for hitting simulated network failures quickly
|
||||
.put(FaultDetection.SETTING_PING_RETRIES, "1") // for hitting simulated network failures quickly
|
||||
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)
|
||||
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
|
||||
.build();
|
||||
|
||||
internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get();
|
||||
|
||||
String dataNode = internalCluster().startDataOnlyNode(sharedSettings);
|
||||
|
||||
logger.info("--> wait for all nodes to join the cluster");
|
||||
ensureStableCluster(4);
|
||||
|
||||
// We index data with mapping changes into cluster and have master failover at same time
|
||||
client().admin().indices().prepareCreate("myindex")
|
||||
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
|
||||
.get();
|
||||
ensureGreen("myindex");
|
||||
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
Thread indexingThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Barrier interrupted", e);
|
||||
return;
|
||||
} catch (BrokenBarrierException e) {
|
||||
logger.warn("Broken barrier", e);
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
// index data with mapping changes
|
||||
IndexResponse response = client(dataNode).prepareIndex("myindex", "mytype").setSource("field_" + i, "val").get();
|
||||
assertThat(response.isCreated(), equalTo(true));
|
||||
}
|
||||
}
|
||||
});
|
||||
indexingThread.setName("indexingThread");
|
||||
indexingThread.start();
|
||||
|
||||
barrier.await();
|
||||
|
||||
// interrupt communication between master and other nodes in cluster
|
||||
String master = internalCluster().getMasterName();
|
||||
Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
|
||||
otherNodes.remove(master);
|
||||
|
||||
NetworkPartition partition = new NetworkDisconnectPartition(Collections.singleton(master), otherNodes, random());
|
||||
internalCluster().setDisruptionScheme(partition);
|
||||
|
||||
logger.info("--> disrupting network");
|
||||
partition.startDisrupting();
|
||||
|
||||
logger.info("--> waiting for new master to be elected");
|
||||
ensureStableCluster(3, dataNode);
|
||||
|
||||
partition.stopDisrupting();
|
||||
logger.info("--> waiting to heal");
|
||||
ensureStableCluster(4);
|
||||
|
||||
indexingThread.join();
|
||||
|
||||
ensureGreen("myindex");
|
||||
refresh();
|
||||
assertThat(client().prepareSearch("myindex").get().getHits().getTotalHits(), equalTo(10L));
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
|
|||
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
|
@ -38,6 +39,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -256,17 +259,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
|||
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
|
||||
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
final AtomicBoolean delegationToMaster = new AtomicBoolean();
|
||||
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
|
||||
|
||||
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
|
||||
@Override
|
||||
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
|
||||
logger.debug("Delegation to master called");
|
||||
delegationToMaster.set(true);
|
||||
}
|
||||
}.execute(request, listener);
|
||||
|
||||
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
|
||||
assertTrue(capturedRequest.node.isMasterNode());
|
||||
|
@ -285,17 +279,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
|||
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
|
||||
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
final AtomicBoolean delegationToMaster = new AtomicBoolean();
|
||||
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
|
||||
|
||||
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
|
||||
@Override
|
||||
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
|
||||
logger.debug("Delegation to master called");
|
||||
delegationToMaster.set(true);
|
||||
}
|
||||
}.execute(request, listener);
|
||||
|
||||
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
|
||||
assertTrue(capturedRequest.node.isMasterNode());
|
||||
|
@ -320,4 +305,35 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testMasterFailoverAfterStepDown() throws ExecutionException, InterruptedException {
|
||||
Request request = new Request().masterNodeTimeout(TimeValue.timeValueHours(1));
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
final Response response = new Response();
|
||||
|
||||
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
|
||||
|
||||
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
|
||||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
// The other node has become master, simulate failures of this node while publishing cluster state through ZenDiscovery
|
||||
TransportMasterNodeActionTests.this.clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
|
||||
Throwable failure = randomBoolean()
|
||||
? new Discovery.FailedToCommitClusterStateException("Fake error")
|
||||
: new NotMasterException("Fake error");
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}.execute(request, listener);
|
||||
|
||||
assertThat(transport.capturedRequests().length, equalTo(1));
|
||||
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
|
||||
assertTrue(capturedRequest.node.isMasterNode());
|
||||
assertThat(capturedRequest.request, equalTo(request));
|
||||
assertThat(capturedRequest.action, equalTo("testAction"));
|
||||
|
||||
transport.handleResponse(capturedRequest.requestId, response);
|
||||
assertTrue(listener.isDone());
|
||||
assertThat(listener.get(), equalTo(response));
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
|
|
Loading…
Reference in New Issue