Assert no exceptions during state application (#47090)
Today we log and swallow exceptions during cluster state application, but such an exception should not occur. This commit adds assertions of this fact, and updates the Javadocs to explain it. Relates #47038
This commit is contained in:
parent
eb86d71edd
commit
ac920e8e64
|
@ -28,7 +28,11 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
public interface ClusterStateApplier {
|
||||
|
||||
/**
|
||||
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied
|
||||
* Called when a new cluster state ({@link ClusterChangedEvent#state()} needs to be applied. The cluster state to be applied is already
|
||||
* committed when this method is called, so an applier must therefore be prepared to deal with any state it receives without throwing
|
||||
* an exception. Throwing an exception from an applier is very bad because it will stop the application of this state before it has
|
||||
* reached all the other appliers, and will likely result in another attempt to apply the same (or very similar) cluster state which
|
||||
* might continue until this node is removed from the cluster.
|
||||
*/
|
||||
void applyClusterState(ClusterChangedEvent event);
|
||||
}
|
||||
|
|
|
@ -390,7 +390,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
return true;
|
||||
}
|
||||
|
||||
protected void runTask(UpdateTask task) {
|
||||
private void runTask(UpdateTask task) {
|
||||
if (!lifecycle.started()) {
|
||||
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
|
||||
return;
|
||||
|
@ -447,6 +447,9 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
|
||||
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
|
||||
}
|
||||
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
|
||||
// continue we will retry with the same cluster state but that might not help.
|
||||
assert applicationMayFail();
|
||||
task.listener.onFailure(task.source, e);
|
||||
}
|
||||
}
|
||||
|
@ -667,4 +670,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
|
|||
return threadPool.relativeTimeInMillis();
|
||||
}
|
||||
|
||||
// overridden by tests that need to check behaviour in the event of an application failure
|
||||
protected boolean applicationMayFail() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -193,7 +193,6 @@ public abstract class AbstractScopedSettings {
|
|||
} catch (Exception ex) {
|
||||
logger.warn("failed to apply settings", ex);
|
||||
throw ex;
|
||||
} finally {
|
||||
}
|
||||
return lastSettingsApplied = newSettings;
|
||||
}
|
||||
|
|
|
@ -532,8 +532,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
final IndexMetaData newIndexMetaData = state.metaData().index(index);
|
||||
assert newIndexMetaData != null : "index " + index + " should have been removed by deleteIndices";
|
||||
if (ClusterChangedEvent.indexMetaDataChanged(currentIndexMetaData, newIndexMetaData)) {
|
||||
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
|
||||
String reason = null;
|
||||
try {
|
||||
reason = "metadata update failed";
|
||||
try {
|
||||
indexService.updateMetaData(currentIndexMetaData, newIndexMetaData);
|
||||
} catch (Exception e) {
|
||||
assert false : e;
|
||||
throw e;
|
||||
}
|
||||
|
||||
reason = "mapping update failed";
|
||||
if (indexService.updateMapping(currentIndexMetaData, newIndexMetaData) && sendRefreshMapping) {
|
||||
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
|
||||
new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetaData.getIndex().getName(),
|
||||
|
@ -541,14 +550,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|||
);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (mapping update failed)");
|
||||
indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")");
|
||||
|
||||
// fail shards that would be created or updated by createOrUpdateShards
|
||||
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
||||
if (localRoutingNode != null) {
|
||||
for (final ShardRouting shardRouting : localRoutingNode) {
|
||||
if (shardRouting.index().equals(index) && failedShardsCache.containsKey(shardRouting.shardId()) == false) {
|
||||
sendFailShard(shardRouting, "failed to update mapping for index", e, state);
|
||||
sendFailShard(shardRouting, "failed to update index (" + reason + ")", e, state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -586,6 +586,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||
final ClusterNode follower0 = cluster.getAnyNodeExcept(leader);
|
||||
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
|
||||
|
||||
follower0.allowClusterStateApplicationFailure();
|
||||
follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
|
||||
AckCollector ackCollector = leader.submitValue(randomLong());
|
||||
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
|
||||
|
@ -605,6 +606,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
|
|||
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0);
|
||||
final long startingTerm = leader.coordinator.getCurrentTerm();
|
||||
|
||||
leader.allowClusterStateApplicationFailure();
|
||||
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
|
||||
AckCollector ackCollector = leader.submitValue(randomLong());
|
||||
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
|
||||
|
|
|
@ -358,6 +358,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
|
|||
clusterApplierService.addStateApplier(event -> {
|
||||
throw new RuntimeException("dummy exception");
|
||||
});
|
||||
clusterApplierService.allowClusterStateApplicationFailure();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
|
||||
|
@ -386,6 +387,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
|
|||
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
clusterApplierService.clusterSettings.addSettingsUpdateConsumer(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
|
||||
v -> {});
|
||||
clusterApplierService.allowClusterStateApplicationFailure();
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state())
|
||||
|
@ -496,6 +498,7 @@ public class ClusterApplierServiceTests extends ESTestCase {
|
|||
|
||||
final ClusterSettings clusterSettings;
|
||||
volatile Long currentTimeOverride = null;
|
||||
boolean applicationMayFail;
|
||||
|
||||
TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
|
||||
super("test_node", settings, clusterSettings, threadPool);
|
||||
|
@ -509,6 +512,15 @@ public class ClusterApplierServiceTests extends ESTestCase {
|
|||
}
|
||||
return super.currentTimeInMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean applicationMayFail() {
|
||||
return this.applicationMayFail;
|
||||
}
|
||||
|
||||
void allowClusterStateApplicationFailure() {
|
||||
this.applicationMayFail = true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1172,6 +1172,10 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
private boolean isNotUsefullyBootstrapped() {
|
||||
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
|
||||
}
|
||||
|
||||
void allowClusterStateApplicationFailure() {
|
||||
clusterApplierService.allowClusterStateApplicationFailure();
|
||||
}
|
||||
}
|
||||
|
||||
private List<TransportAddress> provideSeedHosts(SeedHostsProvider.HostsResolver ignored) {
|
||||
|
@ -1282,6 +1286,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
private final String nodeName;
|
||||
private final DeterministicTaskQueue deterministicTaskQueue;
|
||||
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
|
||||
private boolean applicationMayFail;
|
||||
|
||||
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
|
||||
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
|
||||
|
@ -1326,6 +1331,15 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|||
protected void connectToNodesAndWait(ClusterState newClusterState) {
|
||||
// don't do anything, and don't block
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean applicationMayFail() {
|
||||
return this.applicationMayFail;
|
||||
}
|
||||
|
||||
void allowClusterStateApplicationFailure() {
|
||||
this.applicationMayFail = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected DiscoveryNode createDiscoveryNode(int nodeIndex, boolean masterEligible) {
|
||||
|
|
Loading…
Reference in New Issue