mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-03 09:29:11 +00:00
Introduce inner class AsyncSingleAction to store state used across retry invocations
This commit is contained in:
parent
af3b126a15
commit
c6b41bc704
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
||||
@ -81,146 +82,163 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, ActionListener<Response> listener) {
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
|
||||
new AsyncSingleAction(request, listener).start();
|
||||
}
|
||||
|
||||
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
|
||||
final ClusterState clusterState = observer.observedState();
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.localNodeMaster() || localExecute(request)) {
|
||||
// check for block, if blocked, retry, else, execute locally
|
||||
final ClusterBlockException blockException = checkBlock(request, clusterState);
|
||||
if (blockException != null) {
|
||||
if (!blockException.retryable()) {
|
||||
listener.onFailure(blockException);
|
||||
return;
|
||||
}
|
||||
logger.trace("can't execute due to a cluster block, retrying", blockException);
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, false);
|
||||
}
|
||||
class AsyncSingleAction {
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
private final ActionListener<Response> listener;
|
||||
private final Request request;
|
||||
private volatile ClusterStateObserver observer;
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
}, new ClusterStateObserver.ValidationPredicate() {
|
||||
@Override
|
||||
protected boolean validate(ClusterState newState) {
|
||||
ClusterBlockException blockException = checkBlock(request, newState);
|
||||
return (blockException == null || !blockException.retryable());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
} else {
|
||||
threadPool.executor(executor).execute(new ActionRunnable(listener) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
masterOperation(request, clusterService.state(), listener);
|
||||
}
|
||||
});
|
||||
AsyncSingleAction(Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
}
|
||||
} else {
|
||||
if (nodes.masterNode() == null) {
|
||||
if (retrying) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
} else {
|
||||
logger.debug("no known master node, scheduling a retry");
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger);
|
||||
doStart(false);
|
||||
}
|
||||
|
||||
protected void doStart(boolean retrying) {
|
||||
final ClusterState clusterState = observer.observedState();
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.localNodeMaster() || localExecute(request)) {
|
||||
// check for block, if blocked, retry, else, execute locally
|
||||
final ClusterBlockException blockException = checkBlock(request, clusterState);
|
||||
if (blockException != null) {
|
||||
if (!blockException.retryable()) {
|
||||
listener.onFailure(blockException);
|
||||
return;
|
||||
}
|
||||
logger.trace("can't execute due to a cluster block, retrying", blockException);
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, true);
|
||||
doStart(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
|
||||
listener.onFailure(blockException);
|
||||
}
|
||||
}, new ClusterStateObserver.ChangePredicate() {
|
||||
}, new ClusterStateObserver.ValidationPredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
|
||||
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
|
||||
return newState.nodes().masterNodeId() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
protected boolean validate(ClusterState newState) {
|
||||
ClusterBlockException blockException = checkBlock(request, newState);
|
||||
return (blockException == null || !blockException.retryable());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
} else {
|
||||
threadPool.executor(executor).execute(new ActionRunnable(listener) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
masterOperation(request, clusterService.state(), listener);
|
||||
}
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
processBeforeDelegationToMaster(request, clusterState);
|
||||
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException) {
|
||||
// we want to retry here a bit to see if a new master is elected
|
||||
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
|
||||
nodes.masterNode(), exp.getDetailedMessage());
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
innerExecute(request, listener, observer, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
}
|
||||
}, new ClusterStateObserver.EventPredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
if (nodes.masterNode() == null) {
|
||||
if (retrying) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
logger.debug("no known master node, scheduling a retry");
|
||||
observer.waitForNextChange(
|
||||
new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
doStart(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
|
||||
}
|
||||
}, new ClusterStateObserver.ChangePredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
|
||||
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
|
||||
return newState.nodes().masterNodeId() != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
});
|
||||
processBeforeDelegationToMaster(request, clusterState);
|
||||
transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
|
||||
@Override
|
||||
public Response newInstance() {
|
||||
return newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(final TransportException exp) {
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException) {
|
||||
// we want to retry here a bit to see if a new master is elected
|
||||
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
|
||||
nodes.masterNode(), exp.getDetailedMessage());
|
||||
observer.waitForNextChange(new ClusterStateObserver.Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
doStart(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new NodeClosedException(clusterService.localNode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new MasterNotDiscoveredException());
|
||||
}
|
||||
}, new ClusterStateObserver.EventPredicate() {
|
||||
@Override
|
||||
public boolean apply(ClusterChangedEvent event) {
|
||||
return event.nodesDelta().masterNodeChanged();
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user