Watcher adds watches to the trigger service on the postIndex action for the .watches index. This has the (intentional) side effect of also adding the watches to the stats. The tests rely on these stats for their assertions. The tests also start and stop Watcher between each test for a clean slate. When Watcher executes it updates the .watches index and upon this update it will go through the postIndex method and end up added that watch to the trigger service (and stats). Functionally this is not a problem, if Watcher is stopping or stopped since Watcher is also paused and will not execute the watch. However, with specific timing and expectations of a clean slate can cause issues the test assertions against the stats. This commit ensures that the postIndex action only adds to the trigger service if the Watcher state is not stopping or stopped. When started back up it will re-read index .watches. This commit also un-mutes the tests related to #53177 and #56534
This commit is contained in:
parent
a56fb6192e
commit
a010f4f624
|
@ -427,7 +427,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
|
||||||
final WatcherLifeCycleService watcherLifeCycleService =
|
final WatcherLifeCycleService watcherLifeCycleService =
|
||||||
new WatcherLifeCycleService(clusterService, watcherService);
|
new WatcherLifeCycleService(clusterService, watcherService);
|
||||||
|
|
||||||
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService);
|
listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState());
|
||||||
clusterService.addListener(listener);
|
clusterService.addListener(listener);
|
||||||
|
|
||||||
return Arrays.asList(registry, inputRegistry, historyStore, triggerService, triggeredWatchParser,
|
return Arrays.asList(registry, inputRegistry, historyStore, triggerService, triggeredWatchParser,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.xpack.core.watcher.WatcherState;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||||
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
|
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
|
||||||
import org.elasticsearch.xpack.watcher.watch.WatchParser;
|
import org.elasticsearch.xpack.watcher.watch.WatchParser;
|
||||||
|
@ -38,11 +39,13 @@ import java.time.ZonedDateTime;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
|
||||||
private final WatchParser parser;
|
private final WatchParser parser;
|
||||||
private final Clock clock;
|
private final Clock clock;
|
||||||
private final TriggerService triggerService;
|
private final TriggerService triggerService;
|
||||||
|
private final Supplier<WatcherState> watcherState;
|
||||||
private volatile Configuration configuration = INACTIVE;
|
private volatile Configuration configuration = INACTIVE;
|
||||||
|
|
||||||
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) {
|
WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier<WatcherState> watcherState) {
|
||||||
this.parser = parser;
|
this.parser = parser;
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
this.triggerService = triggerService;
|
this.triggerService = triggerService;
|
||||||
|
this.watcherState = watcherState;
|
||||||
}
|
}
|
||||||
|
|
||||||
// package private for testing
|
// package private for testing
|
||||||
|
@ -119,8 +124,9 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
|
boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
|
||||||
if (shouldBeTriggered) {
|
WatcherState currentState = watcherState.get();
|
||||||
if (watch.status().state().isActive()) {
|
if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) {
|
||||||
|
if (watch.status().state().isActive() ) {
|
||||||
logger.debug("adding watch [{}] to trigger service", watch.id());
|
logger.debug("adding watch [{}] to trigger service", watch.id());
|
||||||
triggerService.add(watch);
|
triggerService.add(watch);
|
||||||
} else {
|
} else {
|
||||||
|
@ -128,7 +134,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
|
||||||
triggerService.remove(watch.id());
|
triggerService.remove(watch.id());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("watch [{}] should not be triggered", watch.id());
|
logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
|
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
||||||
|
@ -203,7 +204,7 @@ public class WatcherLifeCycleService implements ClusterStateListener {
|
||||||
return previousShardRoutings.get();
|
return previousShardRoutings.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public WatcherState getState() {
|
public Supplier<WatcherState> getState(){
|
||||||
return state.get();
|
return () -> state.get();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
|
||||||
@Override
|
@Override
|
||||||
protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) {
|
protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request) {
|
||||||
WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
|
WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
|
||||||
statsResponse.setWatcherState(lifeCycleService.getState());
|
statsResponse.setWatcherState(lifeCycleService.getState().get());
|
||||||
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
|
statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
|
||||||
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
|
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
|
||||||
if (request.includeCurrentWatches()) {
|
if (request.includeCurrentWatches()) {
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.xpack.core.watcher.WatcherState;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||||
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
|
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
|
||||||
|
@ -89,7 +90,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
clock.freeze();
|
clock.freeze();
|
||||||
listener = new WatcherIndexingListener(parser, clock, triggerService);
|
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED);
|
||||||
|
|
||||||
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
|
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
|
||||||
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
|
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
|
||||||
|
@ -140,6 +141,29 @@ public class WatcherIndexingListenerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPostIndexWhenStopped() throws Exception {
|
||||||
|
listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED);
|
||||||
|
Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
|
||||||
|
map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
|
||||||
|
listener.setConfiguration(new Configuration(Watch.INDEX, map));
|
||||||
|
when(operation.id()).thenReturn(randomAlphaOfLength(10));
|
||||||
|
when(operation.source()).thenReturn(BytesArray.EMPTY);
|
||||||
|
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
|
||||||
|
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
|
||||||
|
types.remove(Engine.Result.Type.FAILURE);
|
||||||
|
when(result.getResultType()).thenReturn(randomFrom(types));
|
||||||
|
|
||||||
|
boolean watchActive = randomBoolean();
|
||||||
|
boolean isNewWatch = randomBoolean();
|
||||||
|
Watch watch = mockWatch("_id", watchActive, isNewWatch);
|
||||||
|
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
|
||||||
|
|
||||||
|
listener.postIndex(shardId, operation, result);
|
||||||
|
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
|
||||||
|
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
|
||||||
|
verifyZeroInteractions(triggerService);
|
||||||
|
}
|
||||||
|
|
||||||
// this test emulates an index with 10 shards, and ensures that triggering only happens on a
|
// this test emulates an index with 10 shards, and ensures that triggering only happens on a
|
||||||
// single shard
|
// single shard
|
||||||
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
|
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
|
||||||
|
|
|
@ -179,9 +179,9 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
|
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
verify(watcherService, times(1))
|
verify(watcherService, times(1))
|
||||||
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
|
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
|
||||||
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
|
assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get());
|
||||||
captor.getValue().run();
|
captor.getValue().run();
|
||||||
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
|
assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get());
|
||||||
|
|
||||||
// Starting via cluster state update, as the watcher metadata block is removed/set to true
|
// Starting via cluster state update, as the watcher metadata block is removed/set to true
|
||||||
reset(watcherService);
|
reset(watcherService);
|
||||||
|
@ -480,7 +480,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
new HashSet<>(roles), Version.CURRENT))).build();
|
new HashSet<>(roles), Version.CURRENT))).build();
|
||||||
|
|
||||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
|
||||||
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
|
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataNodeWithoutDataCanStart() {
|
public void testDataNodeWithoutDataCanStart() {
|
||||||
|
@ -494,7 +494,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
|
||||||
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
|
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
// this emulates a node outage somewhere in the cluster that carried a watcher shard
|
// this emulates a node outage somewhere in the cluster that carried a watcher shard
|
||||||
|
@ -584,7 +584,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
when(watcherService.validate(state)).thenReturn(true);
|
when(watcherService.validate(state)).thenReturn(true);
|
||||||
|
|
||||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
|
||||||
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
|
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
|
||||||
verify(watcherService, times(1)).reload(eq(state), anyString());
|
verify(watcherService, times(1)).reload(eq(state), anyString());
|
||||||
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
|
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class TransportWatcherStatsActionTests extends ESTestCase {
|
||||||
when(clusterService.state()).thenReturn(clusterState);
|
when(clusterService.state()).thenReturn(clusterState);
|
||||||
|
|
||||||
WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
|
WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
|
||||||
when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED);
|
when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);
|
||||||
|
|
||||||
ExecutionService executionService = mock(ExecutionService.class);
|
ExecutionService executionService = mock(ExecutionService.class);
|
||||||
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);
|
when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);
|
||||||
|
|
Loading…
Reference in New Issue