Use custom metadata to remember that Watcher stopped via an explicit stop api call.
Also expose WatcherMetaData in stats api Original commit: elastic/x-pack-elasticsearch@5581615f9c
This commit is contained in:
parent
e87b3d0681
commit
3f0509923a
|
@ -3,3 +3,4 @@
|
|||
|
||||
- do: {watcher.stats: {}}
|
||||
- match: { "watcher_state": "started" }
|
||||
- match: { "manually_stopped": false }
|
||||
|
|
|
@ -5,17 +5,20 @@
|
|||
*/
|
||||
package org.elasticsearch.watcher;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.ack.AckedRequest;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
|
||||
|
@ -24,26 +27,25 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
private final WatcherService watcherService;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
// TODO: If Watcher was stopped via api and the master is changed then Watcher will start regardless of the previous
|
||||
// stop command, so at some point this needs to be a cluster setting
|
||||
private volatile boolean manuallyStopped;
|
||||
private volatile WatcherMetaData watcherMetaData;
|
||||
|
||||
@Inject
|
||||
public WatcherLifeCycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, WatcherService watcherService) {
|
||||
public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
WatcherService watcherService) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.watcherService = watcherService;
|
||||
this.clusterService = clusterService;
|
||||
clusterService.add(this);
|
||||
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
|
||||
// happen because we're shutting down and an watch is scheduled.
|
||||
clusterService.addLifecycleListener(new LifecycleListener() {
|
||||
@Override
|
||||
public void beforeStop() {
|
||||
stop(true);
|
||||
stop(false);
|
||||
}
|
||||
});
|
||||
manuallyStopped = !settings.getAsBoolean("watcher.start_immediately", true);
|
||||
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("watcher.start_immediately", true));
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -55,16 +57,15 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
}
|
||||
|
||||
private synchronized void stop(boolean manual) {
|
||||
//This is set here since even if we are not started if we have requested a manual stop we do not want an automatic start to start watcher
|
||||
manuallyStopped = manual;
|
||||
|
||||
WatcherState watcherState = watcherService.state();
|
||||
if (watcherState != WatcherState.STARTED) {
|
||||
logger.debug("not stopping watcher. watcher can only stop if its current state is [{}], but its current state now is [{}]", WatcherState.STARTED, watcherState);
|
||||
return;
|
||||
} else {
|
||||
watcherService.stop();
|
||||
}
|
||||
if (manual) {
|
||||
updateManualStopped(true);
|
||||
}
|
||||
|
||||
watcherService.stop();
|
||||
}
|
||||
|
||||
private synchronized void start(ClusterState state, boolean manual) {
|
||||
|
@ -76,31 +77,39 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
|
||||
// If we start from a cluster state update we need to check if previously we stopped manually
|
||||
// otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
|
||||
if (!manual && manuallyStopped) {
|
||||
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-start");
|
||||
if (!manual && watcherMetaData.manuallyStopped()) {
|
||||
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started");
|
||||
return;
|
||||
}
|
||||
|
||||
//If we are manually starting we should clear the manuallyStopped flag
|
||||
if (manual && manuallyStopped) {
|
||||
manuallyStopped = false;
|
||||
}
|
||||
|
||||
if (!watcherService.validate(state)) {
|
||||
if (watcherService.validate(state)) {
|
||||
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
|
||||
try {
|
||||
watcherService.start(state);
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
|
||||
}
|
||||
} else {
|
||||
logger.debug("not starting watcher. because the cluster isn't ready yet to run watcher");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
|
||||
try {
|
||||
watcherService.start(state);
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
|
||||
if (manual) {
|
||||
updateManualStopped(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(final ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
|
||||
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
|
||||
return;
|
||||
}
|
||||
|
||||
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
|
||||
if (watcherMetaData != null) {
|
||||
this.watcherMetaData = watcherMetaData;
|
||||
}
|
||||
|
||||
if (!event.localNodeMaster()) {
|
||||
if (watcherService.state() != WatcherState.STARTED) {
|
||||
// to avoid unnecessary forking of threads...
|
||||
|
@ -118,11 +127,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
}
|
||||
});
|
||||
} else {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
|
||||
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
|
||||
return;
|
||||
}
|
||||
if (watcherService.state() != WatcherState.STOPPED) {
|
||||
// to avoid unnecessary forking of threads...
|
||||
return;
|
||||
|
@ -137,4 +141,67 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
public WatcherMetaData watcherMetaData() {
|
||||
return watcherMetaData;
|
||||
}
|
||||
|
||||
private void updateManualStopped(final boolean stopped) {
|
||||
watcherMetaData = new WatcherMetaData(stopped);
|
||||
|
||||
// We need to make sure that the new WatcherMetaData has arrived on all nodes,
|
||||
// so in order to do this we need to use AckedClusterStateUpdateTask which
|
||||
// requires a AckedRequest and ActionListener...
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean aBoolean) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable throwable) {
|
||||
logger.warn("updating manually stopped isn't acked", throwable);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
AckedRequest request = new AckedRequest() {
|
||||
@Override
|
||||
public TimeValue ackTimeout() {
|
||||
return TimeValue.timeValueSeconds(30);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue masterNodeTimeout() {
|
||||
return TimeValue.timeValueSeconds(30);
|
||||
}
|
||||
};
|
||||
clusterService.submitStateUpdateTask("update_watcher_manually_stopped", new AckedClusterStateUpdateTask<Boolean>(request, listener) {
|
||||
|
||||
@Override
|
||||
protected Boolean newResponse(boolean result) {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState clusterState) throws Exception {
|
||||
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
|
||||
builder.metaData(MetaData.builder(clusterState.getMetaData())
|
||||
.putCustom(WatcherMetaData.TYPE, new WatcherMetaData(stopped)));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Throwable throwable) {
|
||||
latch.countDown();
|
||||
logger.warn("couldn't update watcher metadata [{}]", throwable, source);
|
||||
}
|
||||
});
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.watcher;
|
||||
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
public class WatcherMetaData extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
|
||||
|
||||
public final static String TYPE = "watcher";
|
||||
public final static WatcherMetaData PROTO = new WatcherMetaData(false);
|
||||
|
||||
private final boolean manuallyStopped;
|
||||
|
||||
public WatcherMetaData(boolean manuallyStopped) {
|
||||
this.manuallyStopped = manuallyStopped;
|
||||
}
|
||||
|
||||
public boolean manuallyStopped() {
|
||||
return manuallyStopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnumSet<MetaData.XContentContext> context() {
|
||||
return EnumSet.of(MetaData.XContentContext.GATEWAY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData.Custom readFrom(StreamInput streamInput) throws IOException {
|
||||
return new WatcherMetaData(streamInput.readBoolean());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput streamOutput) throws IOException {
|
||||
streamOutput.writeBoolean(manuallyStopped);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetaData.Custom fromXContent(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
switch (token) {
|
||||
case FIELD_NAME:
|
||||
currentFieldName = parser.currentName();
|
||||
break;
|
||||
case VALUE_BOOLEAN:
|
||||
if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.MANUALLY_STOPPED)) {
|
||||
return new WatcherMetaData(parser.booleanValue());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Field.MANUALLY_STOPPED.getPreferredName(), manuallyStopped);
|
||||
return builder;
|
||||
}
|
||||
|
||||
interface Field {
|
||||
|
||||
ParseField MANUALLY_STOPPED = new ParseField("manually_stopped");
|
||||
|
||||
}
|
||||
}
|
|
@ -7,16 +7,15 @@ package org.elasticsearch.watcher;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.AbstractPlugin;
|
||||
import org.elasticsearch.script.ScriptModes;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
|
||||
import org.elasticsearch.watcher.history.HistoryModule;
|
||||
import org.elasticsearch.watcher.license.LicenseService;
|
||||
import org.elasticsearch.watcher.support.Script;
|
||||
import org.elasticsearch.watcher.support.http.HttpClient;
|
||||
import org.elasticsearch.watcher.support.init.InitializingService;
|
||||
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
|
||||
|
@ -31,6 +30,10 @@ public class WatcherPlugin extends AbstractPlugin {
|
|||
public static final String NAME = "watcher";
|
||||
public static final String ENABLED_SETTING = NAME + ".enabled";
|
||||
|
||||
static {
|
||||
MetaData.registerPrototype(WatcherMetaData.TYPE, WatcherMetaData.PROTO);
|
||||
}
|
||||
|
||||
private final Settings settings;
|
||||
private final boolean transportClient;
|
||||
protected final boolean enabled;
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.watcher.*;
|
||||
import org.elasticsearch.watcher.WatcherBuild;
|
||||
import org.elasticsearch.watcher.WatcherService;
|
||||
import org.elasticsearch.watcher.WatcherVersion;
|
||||
|
@ -31,14 +32,16 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
|
|||
|
||||
private final WatcherService watcherService;
|
||||
private final ExecutionService executionService;
|
||||
private final WatcherLifeCycleService lifeCycleService;
|
||||
|
||||
@Inject
|
||||
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
|
||||
ExecutionService executionService, LicenseService licenseService) {
|
||||
ExecutionService executionService, LicenseService licenseService, WatcherLifeCycleService lifeCycleService) {
|
||||
super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, WatcherStatsRequest.class);
|
||||
this.watcherService = watcherService;
|
||||
this.executionService = executionService;
|
||||
this.lifeCycleService = lifeCycleService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -61,6 +64,7 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
|
|||
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
|
||||
statsResponse.setVersion(WatcherVersion.CURRENT);
|
||||
statsResponse.setBuild(WatcherBuild.CURRENT);
|
||||
statsResponse.setWatcherMetaData(lifeCycleService.watcherMetaData());
|
||||
|
||||
if (request.includeCurrentWatches()) {
|
||||
statsResponse.setSnapshots(executionService.currentExecutions());
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.common.Nullable;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.watcher.WatcherBuild;
|
||||
import org.elasticsearch.watcher.WatcherMetaData;
|
||||
import org.elasticsearch.watcher.WatcherState;
|
||||
import org.elasticsearch.watcher.WatcherVersion;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -30,6 +31,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
|||
private WatcherState watcherState;
|
||||
private long threadPoolQueueSize;
|
||||
private long threadPoolMaxSize;
|
||||
private WatcherMetaData watcherMetaData;
|
||||
|
||||
private List<WatchExecutionSnapshot> snapshots;
|
||||
private List<QueuedWatch> queuedWatches;
|
||||
|
@ -121,6 +123,14 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
|||
this.queuedWatches = queuedWatches;
|
||||
}
|
||||
|
||||
public WatcherMetaData getWatcherMetaData() {
|
||||
return watcherMetaData;
|
||||
}
|
||||
|
||||
public void setWatcherMetaData(WatcherMetaData watcherMetaData) {
|
||||
this.watcherMetaData = watcherMetaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
|
@ -145,6 +155,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
|||
queuedWatches.add(new QueuedWatch(in));
|
||||
}
|
||||
}
|
||||
watcherMetaData = (WatcherMetaData) WatcherMetaData.PROTO.readFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,6 +186,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
|||
} else {
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
watcherMetaData.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -201,6 +213,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
|
|||
}
|
||||
builder.endArray();
|
||||
}
|
||||
watcherMetaData.toXContent(builder, params);
|
||||
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
|
|
@ -6,10 +6,7 @@
|
|||
package org.elasticsearch.watcher;
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -18,6 +15,8 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -34,9 +33,18 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
public void prepareServices() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
|
||||
watcherService = mock(WatcherService.class);
|
||||
clusterService = mock(ClusterService.class);
|
||||
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, threadPool, watcherService);
|
||||
Answer<Object> answer = new Answer<Object>() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1];
|
||||
updateTask.onAllNodesAcked(null);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(answer).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
|
||||
watcherService = mock(WatcherService.class);
|
||||
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, threadPool, clusterService, watcherService);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.junit.Test;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.joda.time.DateTimeZone.UTC;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
|
@ -130,13 +131,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
|||
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
|
||||
.setSource(jsonBuilder().startObject()
|
||||
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
|
||||
.field(event.type(), event)
|
||||
.field(event.type(), event)
|
||||
.endObject()
|
||||
.startObject(Watch.Field.CONDITION.getPreferredName())
|
||||
.field(condition.type(), condition)
|
||||
.field(condition.type(), condition)
|
||||
.endObject()
|
||||
.startObject(Watch.Field.INPUT.getPreferredName())
|
||||
.startObject("none").endObject()
|
||||
.startObject("none").endObject()
|
||||
.endObject()
|
||||
.endObject())
|
||||
.setConsistencyLevel(WriteConsistencyLevel.ALL)
|
||||
|
@ -148,13 +149,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
|||
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
|
||||
.setSource(jsonBuilder().startObject()
|
||||
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
|
||||
.field(event.type(), event)
|
||||
.field(event.type(), event)
|
||||
.endObject()
|
||||
.startObject(Watch.Field.CONDITION.getPreferredName())
|
||||
.startObject("unknown").endObject()
|
||||
.startObject("unknown").endObject()
|
||||
.endObject()
|
||||
.startObject(Watch.Field.INPUT.getPreferredName())
|
||||
.startObject("none").endObject()
|
||||
.startObject("none").endObject()
|
||||
.endObject()
|
||||
.endObject())
|
||||
.setConsistencyLevel(WriteConsistencyLevel.ALL)
|
||||
|
@ -349,5 +350,17 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManuallyStopped() throws Exception {
|
||||
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
|
||||
assertThat(response.getWatcherMetaData().manuallyStopped(), is(false));
|
||||
stopWatcher();
|
||||
response = watcherClient().prepareWatcherStats().get();
|
||||
assertThat(response.getWatcherMetaData().manuallyStopped(), is(true));
|
||||
startWatcher();
|
||||
response = watcherClient().prepareWatcherStats().get();
|
||||
assertThat(response.getWatcherMetaData().manuallyStopped(), is(false));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue