[Cluster] Refactored ClusterStateUpdateTask protection against execution on a non master
Previous implementation used a marker interface and had no explicit failure call back for the case update task was run on a non master (i.e., the master stepped down after it was submitted). That lead to a couple of instance of checks. This approach moves ClusterStateUpdateTask from an interface to an abstract class, which allows adding a flag to indicate whether it should only run on master nodes (defaults to true). It also adds an explicit onNoLongerMaster call back to allow different error handling for that case. This also removed the need for the NoLongerMaster. Closes #7511
This commit is contained in:
parent
596a4a0735
commit
34f4ca763c
|
@ -138,16 +138,17 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onNoLongerMaster(String source) {
|
||||||
//if the reroute fails we only log
|
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
|
||||||
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
|
logger.debug("failed to preform reroute after cluster settings were updated - current node is no longer a master");
|
||||||
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
|
listener.onResponse(new ClusterUpdateSettingsResponse(updateSettingsAcked, transientUpdates.build(), persistentUpdates.build()));
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Throwable t) {
|
||||||
|
//if the reroute fails we only log
|
||||||
logger.debug("failed to perform [{}]", t, source);
|
logger.debug("failed to perform [{}]", t, source);
|
||||||
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
|
listener.onFailure(new ElasticsearchException("reroute after update settings failed", t));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(final ClusterState currentState) {
|
public ClusterState execute(final ClusterState currentState) {
|
||||||
|
|
|
@ -86,13 +86,16 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() throws ElasticsearchException { }
|
protected void doStart() throws ElasticsearchException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws ElasticsearchException { }
|
protected void doStop() throws ElasticsearchException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws ElasticsearchException { }
|
protected void doClose() throws ElasticsearchException {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lists actively running benchmarks on the cluster
|
* Lists actively running benchmarks on the cluster
|
||||||
|
@ -403,6 +406,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract T newInstance();
|
public abstract T newInstance();
|
||||||
|
|
||||||
protected abstract void sendResponse();
|
protected abstract void sendResponse();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -675,7 +679,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class UpdateBenchmarkStateTask implements ProcessedClusterStateUpdateTask {
|
public abstract class UpdateBenchmarkStateTask extends ProcessedClusterStateUpdateTask {
|
||||||
|
|
||||||
private final String reason;
|
private final String reason;
|
||||||
protected final String benchmarkId;
|
protected final String benchmarkId;
|
||||||
|
@ -741,7 +745,7 @@ public class BenchmarkService extends AbstractLifecycleComponent<BenchmarkServic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> implements TimeoutClusterStateUpdateTask {
|
public abstract class BenchmarkStateChangeAction<R extends MasterNodeOperationRequest> extends TimeoutClusterStateUpdateTask {
|
||||||
protected final R request;
|
protected final R request;
|
||||||
|
|
||||||
public BenchmarkStateChangeAction(R request) {
|
public BenchmarkStateChangeAction(R request) {
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
|
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
|
||||||
* all the nodes have acknowledged a cluster state update request
|
* all the nodes have acknowledged a cluster state update request
|
||||||
*/
|
*/
|
||||||
public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutClusterStateUpdateTask {
|
public abstract class AckedClusterStateUpdateTask<Response> extends TimeoutClusterStateUpdateTask {
|
||||||
|
|
||||||
private final ActionListener<Response> listener;
|
private final ActionListener<Response> listener;
|
||||||
private final AckedRequest request;
|
private final AckedRequest request;
|
||||||
|
@ -40,6 +40,7 @@ public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutCl
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called to determine which nodes the acknowledgement is expected from
|
* Called to determine which nodes the acknowledgement is expected from
|
||||||
|
*
|
||||||
* @param discoveryNode a node
|
* @param discoveryNode a node
|
||||||
* @return true if the node is expected to send ack back, false otherwise
|
* @return true if the node is expected to send ack back, false otherwise
|
||||||
*/
|
*/
|
||||||
|
@ -50,6 +51,7 @@ public abstract class AckedClusterStateUpdateTask<Response> implements TimeoutCl
|
||||||
/**
|
/**
|
||||||
* Called once all the nodes have acknowledged the cluster state update request. Must be
|
* Called once all the nodes have acknowledged the cluster state update request. Must be
|
||||||
* very lightweight execution, since it gets executed on the cluster service thread.
|
* very lightweight execution, since it gets executed on the cluster service thread.
|
||||||
|
*
|
||||||
* @param t optional error that might have been thrown
|
* @param t optional error that might have been thrown
|
||||||
*/
|
*/
|
||||||
public void onAllNodesAcked(@Nullable Throwable t) {
|
public void onAllNodesAcked(@Nullable Throwable t) {
|
||||||
|
|
|
@ -111,15 +111,4 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
|
||||||
*/
|
*/
|
||||||
List<PendingClusterTask> pendingTasks();
|
List<PendingClusterTask> pendingTasks();
|
||||||
|
|
||||||
/**
|
|
||||||
* an exception to indicate a {@link org.elasticsearch.cluster.ClusterStateUpdateTask} was not executed as
|
|
||||||
* the current node is no longer master
|
|
||||||
*/
|
|
||||||
public static class NoLongerMasterException extends ElasticsearchIllegalStateException {
|
|
||||||
|
|
||||||
public NoLongerMasterException(String msg) {
|
|
||||||
super(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,5 +23,10 @@ package org.elasticsearch.cluster;
|
||||||
* This is a marker interface to indicate that the task should be executed
|
* This is a marker interface to indicate that the task should be executed
|
||||||
* even if the current node is not a master.
|
* even if the current node is not a master.
|
||||||
*/
|
*/
|
||||||
public interface ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
|
public abstract class ClusterStateNonMasterUpdateTask extends ClusterStateUpdateTask {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean runOnlyOnMaster() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,19 +19,37 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster;
|
package org.elasticsearch.cluster;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A task that can update the cluster state.
|
* A task that can update the cluster state.
|
||||||
*/
|
*/
|
||||||
public interface ClusterStateUpdateTask {
|
abstract public class ClusterStateUpdateTask {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the cluster state based on the current state. Return the *same instance* if no state
|
* Update the cluster state based on the current state. Return the *same instance* if no state
|
||||||
* should be changed.
|
* should be changed.
|
||||||
*/
|
*/
|
||||||
ClusterState execute(ClusterState currentState) throws Exception;
|
abstract public ClusterState execute(ClusterState currentState) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A callback called when execute fails.
|
* A callback called when execute fails.
|
||||||
*/
|
*/
|
||||||
void onFailure(String source, Throwable t);
|
abstract public void onFailure(String source, @Nullable Throwable t);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* indicates whether this task should only run if current node is master
|
||||||
|
*/
|
||||||
|
public boolean runOnlyOnMaster() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* called when the task was rejected because the local node is no longer master
|
||||||
|
*/
|
||||||
|
public void onNoLongerMaster(String source) {
|
||||||
|
onFailure(source, new EsRejectedExecutionException("no longer master. source: [" + source + "]"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,13 @@
|
||||||
package org.elasticsearch.cluster;
|
package org.elasticsearch.cluster;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A combination interface between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
|
* A combination between {@link org.elasticsearch.cluster.ProcessedClusterStateUpdateTask} and
|
||||||
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
|
* {@link org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask} to allow easy creation of anonymous classes
|
||||||
*/
|
*/
|
||||||
public interface ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask, ClusterStateNonMasterUpdateTask {
|
abstract public class ProcessedClusterStateNonMasterUpdateTask extends ProcessedClusterStateUpdateTask {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean runOnlyOnMaster() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,11 +23,11 @@ package org.elasticsearch.cluster;
|
||||||
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
|
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
|
||||||
* the cluster state update has been processed.
|
* the cluster state update has been processed.
|
||||||
*/
|
*/
|
||||||
public interface ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
|
public abstract class ProcessedClusterStateUpdateTask extends ClusterStateUpdateTask {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the result of the {@link #execute(ClusterState)} have been processed
|
* Called when the result of the {@link #execute(ClusterState)} have been processed
|
||||||
* properly by all listeners.
|
* properly by all listeners.
|
||||||
*/
|
*/
|
||||||
void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
|
public abstract void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,11 +25,11 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||||
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
|
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
|
||||||
* a timeout.
|
* a timeout.
|
||||||
*/
|
*/
|
||||||
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
|
abstract public class TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the cluster state update task wasn't processed by the provided timeout, call
|
* If the cluster state update task wasn't processed by the provided timeout, call
|
||||||
* {@link #onFailure(String, Throwable)}
|
* {@link #onFailure(String, Throwable)}
|
||||||
*/
|
*/
|
||||||
TimeValue timeout();
|
abstract public TimeValue timeout();
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,13 +149,16 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
||||||
return ClusterState.builder(currentState).routingResult(routingResult).build();
|
return ClusterState.builder(currentState).routingResult(routingResult).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNoLongerMaster(String source) {
|
||||||
|
// no biggie
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
if (!(t instanceof ClusterService.NoLongerMasterException)) {
|
|
||||||
ClusterState state = clusterService.state();
|
ClusterState state = clusterService.state();
|
||||||
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
|
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
routingTableDirty = false;
|
routingTableDirty = false;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -325,9 +325,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
||||||
}
|
}
|
||||||
logger.debug("processing [{}]: execute", source);
|
logger.debug("processing [{}]: execute", source);
|
||||||
ClusterState previousClusterState = clusterState;
|
ClusterState previousClusterState = clusterState;
|
||||||
if (!previousClusterState.nodes().localNodeMaster() && !(updateTask instanceof ClusterStateNonMasterUpdateTask)) {
|
if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
|
||||||
logger.debug("failing [{}]: local node is no longer master", source);
|
logger.debug("failing [{}]: local node is no longer master", source);
|
||||||
updateTask.onFailure(source, new NoLongerMasterException("source: " + source));
|
updateTask.onNoLongerMaster(source);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ClusterState newClusterState;
|
ClusterState newClusterState;
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.discovery.DiscoveryService;
|
import org.elasticsearch.discovery.DiscoveryService;
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
|
@ -477,12 +478,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onNoLongerMaster(String source) {
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
// ignoring (already logged)
|
||||||
logger.debug("not processing {} leave request as we are no longer master", node);
|
|
||||||
} else {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Throwable t) {
|
||||||
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
@ -516,12 +518,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onNoLongerMaster(String source) {
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
// already logged
|
||||||
logger.debug("not processing [{}] as we are no longer master", source);
|
|
||||||
} else {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Throwable t) {
|
||||||
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -552,14 +555,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNoLongerMaster(String source) {
|
||||||
|
// ignoring (already logged)
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
|
||||||
logger.debug("not processing [{}] as we are no longer master", source);
|
|
||||||
} else {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
@ -870,16 +875,26 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onNoLongerMaster(String source) {
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
Exception e = new EsRejectedExecutionException("no longer master. source: [" + source + "]");
|
||||||
logger.debug("not processing [{}] as we are no longer master", source);
|
innerOnFailure(e);
|
||||||
} else {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void innerOnFailure(Throwable t) {
|
||||||
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedTasks) {
|
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedTasks) {
|
||||||
|
try {
|
||||||
drainedTask.v2().onFailure(t);
|
drainedTask.v2().onFailure(t);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("error during task failure", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Throwable t) {
|
||||||
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
|
innerOnFailure(t);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
@ -1158,12 +1173,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onNoLongerMaster(String source) {
|
||||||
if (t instanceof ClusterService.NoLongerMasterException) {
|
// already logged
|
||||||
logger.debug("not processing [{}] as we are no longer master", source);
|
|
||||||
} else {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(String source, Throwable t) {
|
||||||
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -708,7 +708,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class BlockingTask implements ClusterStateUpdateTask {
|
private static class BlockingTask extends ClusterStateUpdateTask {
|
||||||
private final CountDownLatch latch = new CountDownLatch(1);
|
private final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -727,7 +727,7 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class PrioritiezedTask implements ClusterStateUpdateTask {
|
private static class PrioritiezedTask extends ClusterStateUpdateTask {
|
||||||
|
|
||||||
private final Priority priority;
|
private final Priority priority;
|
||||||
private final CountDownLatch latch;
|
private final CountDownLatch latch;
|
||||||
|
|
Loading…
Reference in New Issue