Watcher: Improve cluster state listener behaviour (elastic/x-pack-elasticsearch#3538)

The cluster state listener used by watch now have two additional checks.
First, when no master node exists in the cluster state, watcher will
stop and the indexing listener will not try to trigger any new watch.
Second, when there is a global cluster write level block, it would not
be possible to update the watches index or write into the watcher
history, so the listener can bail at that case as well.

In addition this also changes the log level from debug to info when
watcher is stopped. It turned out that there are zero insights when or
if watcher is stopped when normal logging is activated. This makes it
super hard for support to know when watcher is stopped or started at all
due to shards being moved around.

Original commit: elastic/x-pack-elasticsearch@5e9ce24380
This commit is contained in:
Alexander Reelsen 2018-01-17 14:18:17 +01:00 committed by GitHub
parent 7ca8b72f97
commit ef2d2764a5
5 changed files with 82 additions and 12 deletions

View File

@ -9,12 +9,14 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
@ -99,10 +101,8 @@ final class WatcherIndexingListener extends AbstractComponent implements Indexin
if (isWatchDocument(shardId.getIndexName(), operation.type())) { if (isWatchDocument(shardId.getIndexName(), operation.type())) {
DateTime now = new DateTime(clock.millis(), UTC); DateTime now = new DateTime(clock.millis(), UTC);
try { try {
Watch watch = parser.parseWithSecrets(operation.id(), true, Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON);
operation.source(), now, XContentType.JSON); ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
ShardAllocationConfiguration shardAllocationConfiguration =
configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) { if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}", logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
watch.id(), shardId, configuration.localShards.keySet()); watch.id(), shardId, configuration.localShards.keySet());
@ -193,6 +193,14 @@ final class WatcherIndexingListener extends AbstractComponent implements Indexin
*/ */
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
// if there is no master node configured in the current state, this node should not try to trigger anything, but consider itself
// inactive. the same applies, if there is a cluster block that does not allow writes
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId()) ||
event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) {
configuration = INACTIVE;
return;
}
if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) { if (event.state().nodes().getLocalNode().isDataNode() && event.metaDataChanged()) {
try { try {
IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
@ -211,7 +219,7 @@ final class WatcherIndexingListener extends AbstractComponent implements Indexin
private void checkWatchIndexHasChanged(IndexMetaData metaData, ClusterChangedEvent event) { private void checkWatchIndexHasChanged(IndexMetaData metaData, ClusterChangedEvent event) {
String watchIndex = metaData.getIndex().getName(); String watchIndex = metaData.getIndex().getName();
ClusterState state = event.state(); ClusterState state = event.state();
String localNodeId = state.getNodes().getLocalNode().getId(); String localNodeId = state.nodes().getLocalNode().getId();
RoutingNode routingNode = state.getRoutingNodes().node(localNodeId); RoutingNode routingNode = state.getRoutingNodes().node(localNodeId);
// no local shards, exit early // no local shards, exit early

View File

@ -8,12 +8,14 @@ package org.elasticsearch.xpack.watcher;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -52,7 +54,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
public void beforeStop() { public void beforeStop() {
stop("stopping before shutting down"); stop("shutdown initiated");
} }
}); });
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true)); watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true));
@ -111,6 +113,16 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
return; return;
} }
if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
executor.execute(() -> this.stop("no master node"));
return;
}
if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) {
executor.execute(() -> this.stop("write level cluster block"));
return;
}
// find out if watcher was stopped or started manually due to this cluster state change // find out if watcher was stopped or started manually due to this cluster state change
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE); WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
@ -120,7 +132,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
boolean currentWatcherStopped = watcherMetaData != null && watcherMetaData.manuallyStopped() == true; boolean currentWatcherStopped = watcherMetaData != null && watcherMetaData.manuallyStopped() == true;
if (currentWatcherStopped) { if (currentWatcherStopped) {
executor.execute(() -> this.stop("watcher manually marked to shutdown in cluster state update, shutting down")); executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update"));
} else { } else {
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) { if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode(); DiscoveryNode localNode = event.state().nodes().getLocalNode();
@ -156,7 +168,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
if (previousAllocationIds.get().equals(currentAllocationIds) == false) { if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds); previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids")); executor.execute(() -> watcherService.reload(event.state(), "different shards allocated on this node"));
} }
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());

View File

@ -151,7 +151,7 @@ public class WatcherService extends AbstractComponent {
} else { } else {
try { try {
if (state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING)) { if (state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING)) {
logger.debug("stopping watch service, reason [{}]", reason); logger.info("stopping watch service, reason [{}]", reason);
triggerService.stop(); triggerService.stop();
executionService.stop(); executionService.stop();
state.set(WatcherState.STOPPED); state.set(WatcherState.STOPPED);
@ -189,7 +189,7 @@ public class WatcherService extends AbstractComponent {
public void pauseExecution(String reason) { public void pauseExecution(String reason) {
int cancelledTaskCount = executionService.pauseExecution(); int cancelledTaskCount = executionService.pauseExecution();
triggerService.pauseExecution(); triggerService.pauseExecution();
logger.debug("paused execution service, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
} }
/** /**

View File

@ -10,6 +10,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -25,18 +26,19 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.elasticsearch.xpack.watcher.watch.clock.ClockMock;
import org.elasticsearch.xpack.watcher.WatcherIndexingListener.Configuration; import org.elasticsearch.xpack.watcher.WatcherIndexingListener.Configuration;
import org.elasticsearch.xpack.watcher.WatcherIndexingListener.ShardAllocationConfiguration; import org.elasticsearch.xpack.watcher.WatcherIndexingListener.ShardAllocationConfiguration;
import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchParser;
import org.elasticsearch.xpack.watcher.watch.WatchStatus; import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.watch.clock.ClockMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Before; import org.junit.Before;
@ -667,6 +669,30 @@ public class WatcherIndexingListenerTests extends ESTestCase {
assertThat(listener.getConfiguration(), is(INACTIVE)); assertThat(listener.getConfiguration(), is(INACTIVE));
} }
public void testThatIndexingListenerBecomesInactiveWithoutMasterNode() {
ClusterState clusterStateWithMaster = mockClusterState(Watch.INDEX);
ClusterState clusterStateWithoutMaster = mockClusterState(Watch.INDEX);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node_1").add(newNode("node_1")).build();
when(clusterStateWithoutMaster.nodes()).thenReturn(nodes);
assertThat(listener.getConfiguration(), is(not(INACTIVE)));
listener.clusterChanged(new ClusterChangedEvent("something", clusterStateWithoutMaster, clusterStateWithMaster));
assertThat(listener.getConfiguration(), is(INACTIVE));
}
public void testThatIndexingListenerBecomesInactiveOnClusterBlock() {
ClusterState clusterState = mockClusterState(Watch.INDEX);
ClusterState clusterStateWriteBlock = mockClusterState(Watch.INDEX);
ClusterBlocks clusterBlocks = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES).build();
when(clusterStateWriteBlock.getBlocks()).thenReturn(clusterBlocks);
assertThat(listener.getConfiguration(), is(not(INACTIVE)));
listener.clusterChanged(new ClusterChangedEvent("something", clusterStateWriteBlock, clusterState));
assertThat(listener.getConfiguration(), is(INACTIVE));
}
// //
// helper methods // helper methods
// //
@ -699,6 +725,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node_1").masterNodeId("node_1").add(newNode("node_1")).build(); DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node_1").masterNodeId("node_1").add(newNode("node_1")).build();
when(clusterState.nodes()).thenReturn(nodes); when(clusterState.nodes()).thenReturn(nodes);
when(clusterState.getBlocks()).thenReturn(ClusterBlocks.EMPTY_CLUSTER_BLOCK);
return clusterState; return clusterState;
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -485,6 +486,28 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, times(0)).start(any(ClusterState.class)); verify(watcherService, times(0)).start(any(ClusterState.class));
} }
public void testWatcherStopsWhenMasterNodeIsMissing() {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.localNodeId("node_1")
.add(newNode("node_1"))
.build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService, times(1)).stop(eq("no master node"));
}
public void testWatcherStopsOnClusterLevelBlock() {
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
.localNodeId("node_1")
.masterNodeId("node_1")
.add(newNode("node_1"))
.build();
ClusterBlocks clusterBlocks = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_WRITES).build();
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).blocks(clusterBlocks).build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService, times(1)).stop(eq("write level cluster block"));
}
private List<String> randomIndexPatterns() { private List<String> randomIndexPatterns() {
return IntStream.range(0, between(1, 10)) return IntStream.range(0, between(1, 10))
.mapToObj(n -> randomAlphaOfLengthBetween(1, 100)) .mapToObj(n -> randomAlphaOfLengthBetween(1, 100))