Merge branch 'master' of github.com:elastic/x-plugins

Original commit: elastic/x-pack-elasticsearch@43a0149126
This commit is contained in:
Robert Muir 2015-08-06 06:13:32 -04:00
commit faec3cffa9
29 changed files with 290 additions and 130 deletions

View File

@ -47,7 +47,7 @@ public class ClusterStatsCollector extends AbstractCollector<ClusterStatsCollect
protected Collection<MarvelDoc> doCollect() throws Exception { protected Collection<MarvelDoc> doCollect() throws Exception {
ImmutableList.Builder<MarvelDoc> results = ImmutableList.builder(); ImmutableList.Builder<MarvelDoc> results = ImmutableList.builder();
ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get(); ClusterStatsResponse clusterStatsResponse = client.admin().cluster().prepareClusterStats().get(marvelSettings.clusterStatsTimeout());
results.add(buildMarvelDoc(clusterName.value(), TYPE, System.currentTimeMillis(), clusterStatsResponse)); results.add(buildMarvelDoc(clusterName.value(), TYPE, System.currentTimeMillis(), clusterStatsResponse));
return results.build(); return results.build();
} }

View File

@ -38,6 +38,10 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
final TimeValueSetting clusterStateTimeout = MarvelSetting.timeSetting(CLUSTER_STATE_TIMEOUT, TimeValue.timeValueMinutes(10), final TimeValueSetting clusterStateTimeout = MarvelSetting.timeSetting(CLUSTER_STATE_TIMEOUT, TimeValue.timeValueMinutes(10),
"Timeout value when collecting the cluster state (default to 10m)"); "Timeout value when collecting the cluster state (default to 10m)");
public static final String CLUSTER_STATS_TIMEOUT = PREFIX + "cluster.stats.timeout";
final TimeValueSetting clusterStatsTimeout = MarvelSetting.timeSetting(CLUSTER_STATS_TIMEOUT, TimeValue.timeValueMinutes(10),
"Timeout value when collecting the cluster statistics (default to 10m)");
public static final String INDEX_RECOVERY_TIMEOUT = PREFIX + "index.recovery.timeout"; public static final String INDEX_RECOVERY_TIMEOUT = PREFIX + "index.recovery.timeout";
final TimeValueSetting recoveryTimeout = MarvelSetting.timeSetting(INDEX_RECOVERY_TIMEOUT, TimeValue.timeValueMinutes(10), final TimeValueSetting recoveryTimeout = MarvelSetting.timeSetting(INDEX_RECOVERY_TIMEOUT, TimeValue.timeValueMinutes(10),
"Timeout value when collecting the recovery information (default to 10m)"); "Timeout value when collecting the recovery information (default to 10m)");
@ -54,6 +58,7 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
builder.add(indexStatsTimeout); builder.add(indexStatsTimeout);
builder.add(indices); builder.add(indices);
builder.add(clusterStateTimeout); builder.add(clusterStateTimeout);
builder.add(clusterStatsTimeout);
builder.add(recoveryTimeout); builder.add(recoveryTimeout);
builder.add(recoveryActiveOnly); builder.add(recoveryActiveOnly);
this.settings = builder.build(); this.settings = builder.build();
@ -106,6 +111,10 @@ public class MarvelSettingsService extends AbstractComponent implements NodeSett
return clusterStateTimeout.getValue(); return clusterStateTimeout.getValue();
} }
public TimeValue clusterStatsTimeout() {
return clusterStatsTimeout.getValue();
}
public TimeValue recoveryTimeout() { public TimeValue recoveryTimeout() {
return recoveryTimeout.getValue(); return recoveryTimeout.getValue();
} }

View File

@ -220,12 +220,12 @@ shield.authc.realms:
._esusers_ Realm ._esusers_ Realm
[options="header"] [options="header"]
|====== |======
| Name | Required | Default | Description | Name | Required | Default | Description
| `files.users` | no | `ES_HOME/config/shield/users` | `files.users` | no | `ES_HOME/config/shield/users` | The location of the <<users-file, users>> file.
| `files.users_roles` | no | `ES_HOME/config/shield/users_roles`| The location of <<users_defining-roles, _users_roles_>> file | `files.users_roles` | no | `ES_HOME/config/shield/users_roles`| The location of the <<users_defining-roles, users_roles>> file.
| `cache.ttl` | no | `20m` | Specified the time-to-live for cached user entries (a user and its credentials will be cached for this configured period of time). Defaults to `20m` (use the standard Elasticsearch {ref}/common-options.html#time-units[time units]). | `cache.ttl` | no | `20m` | The time-to-live for cached user entries--user credentials are cached for this configured period of time. Defaults to `20m`. Specify values using the standard Elasticsearch {ref}/common-options.html#time-units[time units].
| `cache.max_users` | no | 100000 | Specified the maximum number of user entries that can live in the cache at a given time. Defaults to 100,000. | `cache.max_users` | no | 100000 | The maximum number of user entries that can live in the cache at a given time. Defaults to 100,000.
| `cache.hash_algo` | no | `ssha256` | (Expert Setting) Specifies the hashing algorithm that will be used for the in-memory cached user credentials (see <<ref-cache-hash-algo,Cache hash algorithms>> table for all possible values). | `cache.hash_algo` | no | `ssha256` | (Expert Setting) The hashing algorithm that is used for the in-memory cached user credentials. See the <<ref-cache-hash-algo,Cache hash algorithms>> table for all possible values.
|====== |======
[[ref-ldap-settings]] [[ref-ldap-settings]]

View File

@ -3,3 +3,4 @@
- do: {watcher.stats: {}} - do: {watcher.stats: {}}
- match: { "watcher_state": "started" } - match: { "watcher_state": "started" }
- match: { "manually_stopped": false }

View File

@ -5,17 +5,20 @@
*/ */
package org.elasticsearch.watcher; package org.elasticsearch.watcher;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CountDownLatch;
/** /**
*/ */
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener { public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
@ -24,26 +27,25 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private final WatcherService watcherService; private final WatcherService watcherService;
private final ClusterService clusterService; private final ClusterService clusterService;
// TODO: If Watcher was stopped via api and the master is changed then Watcher will start regardless of the previous private volatile WatcherMetaData watcherMetaData;
// stop command, so at some point this needs to be a cluster setting
private volatile boolean manuallyStopped;
@Inject @Inject
public WatcherLifeCycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, WatcherService watcherService) { public WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
super(settings); super(settings);
this.clusterService = clusterService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.watcherService = watcherService; this.watcherService = watcherService;
this.clusterService = clusterService;
clusterService.add(this); clusterService.add(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled. // happen because we're shutting down and an watch is scheduled.
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override
public void beforeStop() { public void beforeStop() {
stop(true); stop(false);
} }
}); });
manuallyStopped = !settings.getAsBoolean("watcher.start_immediately", true); watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("watcher.start_immediately", true));
} }
public void start() { public void start() {
@ -55,16 +57,15 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
private synchronized void stop(boolean manual) { private synchronized void stop(boolean manual) {
//This is set here since even if we are not started if we have requested a manual stop we do not want an automatic start to start watcher
manuallyStopped = manual;
WatcherState watcherState = watcherService.state(); WatcherState watcherState = watcherService.state();
if (watcherState != WatcherState.STARTED) { if (watcherState != WatcherState.STARTED) {
logger.debug("not stopping watcher. watcher can only stop if its current state is [{}], but its current state now is [{}]", WatcherState.STARTED, watcherState); logger.debug("not stopping watcher. watcher can only stop if its current state is [{}], but its current state now is [{}]", WatcherState.STARTED, watcherState);
return; } else {
watcherService.stop();
}
if (manual) {
updateManualStopped(true);
} }
watcherService.stop();
} }
private synchronized void start(ClusterState state, boolean manual) { private synchronized void start(ClusterState state, boolean manual) {
@ -76,31 +77,39 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// If we start from a cluster state update we need to check if previously we stopped manually // If we start from a cluster state update we need to check if previously we stopped manually
// otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run // otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
if (!manual && manuallyStopped) { if (!manual && watcherMetaData.manuallyStopped()) {
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-start"); logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started");
return; return;
} }
//If we are manually starting we should clear the manuallyStopped flag if (watcherService.validate(state)) {
if (manual && manuallyStopped) { logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
manuallyStopped = false; try {
} watcherService.start(state);
} catch (Exception e) {
if (!watcherService.validate(state)) { logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
}
} else {
logger.debug("not starting watcher. because the cluster isn't ready yet to run watcher"); logger.debug("not starting watcher. because the cluster isn't ready yet to run watcher");
return;
} }
if (manual) {
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual); updateManualStopped(false);
try {
watcherService.start(state);
} catch (Exception e) {
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
} }
} }
@Override @Override
public void clusterChanged(final ClusterChangedEvent event) { public void clusterChanged(final ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;
}
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);
if (watcherMetaData != null) {
this.watcherMetaData = watcherMetaData;
}
if (!event.localNodeMaster()) { if (!event.localNodeMaster()) {
if (watcherService.state() != WatcherState.STARTED) { if (watcherService.state() != WatcherState.STARTED) {
// to avoid unnecessary forking of threads... // to avoid unnecessary forking of threads...
@ -118,11 +127,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
}); });
} else { } else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;
}
if (watcherService.state() != WatcherState.STOPPED) { if (watcherService.state() != WatcherState.STOPPED) {
// to avoid unnecessary forking of threads... // to avoid unnecessary forking of threads...
return; return;
@ -137,4 +141,67 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}); });
} }
} }
public WatcherMetaData watcherMetaData() {
return watcherMetaData;
}
private void updateManualStopped(final boolean stopped) {
watcherMetaData = new WatcherMetaData(stopped);
// We need to make sure that the new WatcherMetaData has arrived on all nodes,
// so in order to do this we need to use AckedClusterStateUpdateTask which
// requires a AckedRequest and ActionListener...
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
latch.countDown();
}
@Override
public void onFailure(Throwable throwable) {
logger.warn("updating manually stopped isn't acked", throwable);
latch.countDown();
}
};
AckedRequest request = new AckedRequest() {
@Override
public TimeValue ackTimeout() {
return TimeValue.timeValueSeconds(30);
}
@Override
public TimeValue masterNodeTimeout() {
return TimeValue.timeValueSeconds(30);
}
};
clusterService.submitStateUpdateTask("update_watcher_manually_stopped", new AckedClusterStateUpdateTask<Boolean>(request, listener) {
@Override
protected Boolean newResponse(boolean result) {
return result;
}
@Override
public ClusterState execute(ClusterState clusterState) throws Exception {
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
builder.metaData(MetaData.builder(clusterState.getMetaData())
.putCustom(WatcherMetaData.TYPE, new WatcherMetaData(stopped)));
return builder.build();
}
@Override
public void onFailure(String source, Throwable throwable) {
latch.countDown();
logger.warn("couldn't update watcher metadata [{}]", throwable, source);
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.interrupted();
}
}
} }

View File

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.EnumSet;
public class WatcherMetaData extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
public final static String TYPE = "watcher";
public final static WatcherMetaData PROTO = new WatcherMetaData(false);
private final boolean manuallyStopped;
public WatcherMetaData(boolean manuallyStopped) {
this.manuallyStopped = manuallyStopped;
}
public boolean manuallyStopped() {
return manuallyStopped;
}
@Override
public String type() {
return TYPE;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY);
}
@Override
public MetaData.Custom readFrom(StreamInput streamInput) throws IOException {
return new WatcherMetaData(streamInput.readBoolean());
}
@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeBoolean(manuallyStopped);
}
@Override
public MetaData.Custom fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
switch (token) {
case FIELD_NAME:
currentFieldName = parser.currentName();
break;
case VALUE_BOOLEAN:
if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.MANUALLY_STOPPED)) {
return new WatcherMetaData(parser.booleanValue());
}
break;
}
}
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Field.MANUALLY_STOPPED.getPreferredName(), manuallyStopped);
return builder;
}
interface Field {
ParseField MANUALLY_STOPPED = new ParseField("manually_stopped");
}
}

View File

@ -7,16 +7,15 @@ package org.elasticsearch.watcher;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin; import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.script.ScriptModes;
import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.watcher.actions.email.service.InternalEmailService; import org.elasticsearch.watcher.actions.email.service.InternalEmailService;
import org.elasticsearch.watcher.history.HistoryModule; import org.elasticsearch.watcher.history.HistoryModule;
import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.http.HttpClient; import org.elasticsearch.watcher.support.http.HttpClient;
import org.elasticsearch.watcher.support.init.InitializingService; import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
@ -31,6 +30,10 @@ public class WatcherPlugin extends AbstractPlugin {
public static final String NAME = "watcher"; public static final String NAME = "watcher";
public static final String ENABLED_SETTING = NAME + ".enabled"; public static final String ENABLED_SETTING = NAME + ".enabled";
static {
MetaData.registerPrototype(WatcherMetaData.TYPE, WatcherMetaData.PROTO);
}
private final Settings settings; private final Settings settings;
private final boolean transportClient; private final boolean transportClient;
protected final boolean enabled; protected final boolean enabled;

View File

@ -10,7 +10,6 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -197,7 +196,7 @@ public class Email implements ToXContent {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PRIORITY)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.PRIORITY)) {
email.priority(Email.Priority.resolve(parser.text())); email.priority(Email.Priority.resolve(parser.text()));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.SENT_DATE)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.SENT_DATE)) {
email.sentDate(new DateTime(parser.text())); email.sentDate(new DateTime(parser.text(), DateTimeZone.UTC));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.SUBJECT)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.SUBJECT)) {
email.subject(parser.text()); email.subject(parser.text());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.BODY)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.BODY)) {

View File

@ -94,7 +94,7 @@ public class ExecutionService extends AbstractComponent {
} }
public boolean validate(ClusterState state) { public boolean validate(ClusterState state) {
return historyStore.validate(state) && triggeredWatchStore.validate(state); return triggeredWatchStore.validate(state);
} }
public void stop() { public void stop() {

View File

@ -7,10 +7,6 @@ package org.elasticsearch.watcher.history;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -40,7 +36,6 @@ public class HistoryStore extends AbstractComponent {
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic"); private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final ClientProxy client; private final ClientProxy client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock putUpdateLock = readWriteLock.readLock(); private final Lock putUpdateLock = readWriteLock.readLock();
@ -48,36 +43,15 @@ public class HistoryStore extends AbstractComponent {
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
@Inject @Inject
public HistoryStore(Settings settings, ClientProxy client, IndexNameExpressionResolver indexNameExpressionResolver) { public HistoryStore(Settings settings, ClientProxy client) {
super(settings); super(settings);
this.client = client; this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
} }
public void start() { public void start() {
started.set(true); started.set(true);
} }
public boolean validate(ClusterState state) {
String[] indices = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.debug("no history indices exist, so we can load");
return true;
}
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.debug("not all primary shards of the [{}] index are started, so we cannot load watcher records", index);
return false;
}
}
}
return true;
}
public void stop() { public void stop() {
stopLock.lock(); //This will block while put or update actions are underway stopLock.lock(); //This will block while put or update actions are underway
try { try {

View File

@ -18,8 +18,6 @@ public interface Clock {
long nanos(); long nanos();
DateTime now();
DateTime nowUTC(); DateTime nowUTC();
DateTime now(DateTimeZone timeZone); DateTime now(DateTimeZone timeZone);

View File

@ -29,11 +29,6 @@ public final class SystemClock implements Clock {
return System.nanoTime(); return System.nanoTime();
} }
@Override
public DateTime now() {
return now(DateTimeZone.getDefault());
}
@Override @Override
public DateTime nowUTC() { public DateTime nowUTC() {
return now(DateTimeZone.UTC); return now(DateTimeZone.UTC);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.*;
import org.elasticsearch.watcher.WatcherBuild; import org.elasticsearch.watcher.WatcherBuild;
import org.elasticsearch.watcher.WatcherService; import org.elasticsearch.watcher.WatcherService;
import org.elasticsearch.watcher.WatcherVersion; import org.elasticsearch.watcher.WatcherVersion;
@ -31,14 +32,16 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
private final WatcherService watcherService; private final WatcherService watcherService;
private final ExecutionService executionService; private final ExecutionService executionService;
private final WatcherLifeCycleService lifeCycleService;
@Inject @Inject
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
ExecutionService executionService, LicenseService licenseService) { ExecutionService executionService, LicenseService licenseService, WatcherLifeCycleService lifeCycleService) {
super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, WatcherStatsRequest.class); super(settings, WatcherStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, licenseService, WatcherStatsRequest.class);
this.watcherService = watcherService; this.watcherService = watcherService;
this.executionService = executionService; this.executionService = executionService;
this.lifeCycleService = lifeCycleService;
} }
@Override @Override
@ -61,6 +64,7 @@ public class TransportWatcherStatsAction extends WatcherTransportAction<WatcherS
statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize()); statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
statsResponse.setVersion(WatcherVersion.CURRENT); statsResponse.setVersion(WatcherVersion.CURRENT);
statsResponse.setBuild(WatcherBuild.CURRENT); statsResponse.setBuild(WatcherBuild.CURRENT);
statsResponse.setWatcherMetaData(lifeCycleService.watcherMetaData());
if (request.includeCurrentWatches()) { if (request.includeCurrentWatches()) {
statsResponse.setSnapshots(executionService.currentExecutions()); statsResponse.setSnapshots(executionService.currentExecutions());

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.WatcherBuild; import org.elasticsearch.watcher.WatcherBuild;
import org.elasticsearch.watcher.WatcherMetaData;
import org.elasticsearch.watcher.WatcherState; import org.elasticsearch.watcher.WatcherState;
import org.elasticsearch.watcher.WatcherVersion; import org.elasticsearch.watcher.WatcherVersion;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -30,6 +31,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
private WatcherState watcherState; private WatcherState watcherState;
private long threadPoolQueueSize; private long threadPoolQueueSize;
private long threadPoolMaxSize; private long threadPoolMaxSize;
private WatcherMetaData watcherMetaData;
private List<WatchExecutionSnapshot> snapshots; private List<WatchExecutionSnapshot> snapshots;
private List<QueuedWatch> queuedWatches; private List<QueuedWatch> queuedWatches;
@ -121,6 +123,14 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
this.queuedWatches = queuedWatches; this.queuedWatches = queuedWatches;
} }
public WatcherMetaData getWatcherMetaData() {
return watcherMetaData;
}
public void setWatcherMetaData(WatcherMetaData watcherMetaData) {
this.watcherMetaData = watcherMetaData;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
@ -145,6 +155,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
queuedWatches.add(new QueuedWatch(in)); queuedWatches.add(new QueuedWatch(in));
} }
} }
watcherMetaData = (WatcherMetaData) WatcherMetaData.PROTO.readFrom(in);
} }
@Override @Override
@ -175,6 +186,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
watcherMetaData.writeTo(out);
} }
@Override @Override
@ -201,6 +213,7 @@ public class WatcherStatsResponse extends ActionResponse implements ToXContent {
} }
builder.endArray(); builder.endArray();
} }
watcherMetaData.toXContent(builder, params);
builder.endObject(); builder.endObject();
return builder; return builder;

View File

@ -51,7 +51,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
} }
} }
this.schedules = new Schedules(schedules); this.schedules = new Schedules(schedules);
logger.debug("schedule engine started at [{}]", clock.now()); logger.debug("schedule engine started at [{}]", clock.nowUTC());
} }
@Override @Override

View File

@ -142,7 +142,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
while (clock.millis() % 1000 > 15) { while (clock.millis() % 1000 > 15) {
} }
while (active) { while (active) {
logger.trace("checking jobs [{}]", clock.now()); logger.trace("checking jobs [{}]", clock.nowUTC());
checkJobs(); checkJobs();
try { try {
sleep(tickInterval.millis()); sleep(tickInterval.millis());

View File

@ -83,7 +83,7 @@ public class CronEvalTool extends CliTool {
terminal.println("Valid!"); terminal.println("Valid!");
DateTime date = DateTime.now(DateTimeZone.getDefault()); DateTime date = DateTime.now(DateTimeZone.UTC);
terminal.println("Now is [" + formatter.print(date) + "]"); terminal.println("Now is [" + formatter.print(date) + "]");
terminal.println("Here are the next " + count + " times this cron expression will trigger:"); terminal.println("Here are the next " + count + " times this cron expression will trigger:");

View File

@ -6,10 +6,7 @@
package org.elasticsearch.watcher; package org.elasticsearch.watcher;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -18,6 +15,8 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -34,9 +33,18 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
public void prepareServices() { public void prepareServices() {
ThreadPool threadPool = mock(ThreadPool.class); ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService()); when(threadPool.executor(anyString())).thenReturn(MoreExecutors.newDirectExecutorService());
watcherService = mock(WatcherService.class);
clusterService = mock(ClusterService.class); clusterService = mock(ClusterService.class);
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, clusterService, threadPool, watcherService); Answer<Object> answer = new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
AckedClusterStateUpdateTask updateTask = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1];
updateTask.onAllNodesAcked(null);
return null;
}
};
doAnswer(answer).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
watcherService = mock(WatcherService.class);
lifeCycleService = new WatcherLifeCycleService(Settings.EMPTY, threadPool, clusterService, watcherService);
} }
@Test @Test

View File

@ -36,7 +36,7 @@ public class EmailTest extends ESTestCase {
Email.AddressList possibleList = new Email.AddressList(addresses); Email.AddressList possibleList = new Email.AddressList(addresses);
Email.AddressList replyTo = randomFrom(possibleList, null); Email.AddressList replyTo = randomFrom(possibleList, null);
Email.Priority priority = randomFrom(Email.Priority.values()); Email.Priority priority = randomFrom(Email.Priority.values());
DateTime sentDate = new DateTime(randomInt(), DateTimeZone.getDefault()); DateTime sentDate = new DateTime(randomInt(), DateTimeZone.UTC);
Email.AddressList to = randomFrom(possibleList, null); Email.AddressList to = randomFrom(possibleList, null);
Email.AddressList cc = randomFrom(possibleList, null); Email.AddressList cc = randomFrom(possibleList, null);
Email.AddressList bcc = randomFrom(possibleList, null); Email.AddressList bcc = randomFrom(possibleList, null);

View File

@ -29,7 +29,7 @@ public class AckThrottlerTests extends ESTestCase {
@Test @Test
public void testWhenAcked() throws Exception { public void testWhenAcked() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.now(); DateTime timestamp = SystemClock.INSTANCE.nowUTC();
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD); WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch(); Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class); ActionStatus actionStatus = mock(ActionStatus.class);
@ -45,7 +45,7 @@ public class AckThrottlerTests extends ESTestCase {
@Test @Test
public void testThrottle_When_AwaitsSuccessfulExecution() throws Exception { public void testThrottle_When_AwaitsSuccessfulExecution() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.now(); DateTime timestamp = SystemClock.INSTANCE.nowUTC();
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD); WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch(); Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class); ActionStatus actionStatus = mock(ActionStatus.class);
@ -61,7 +61,7 @@ public class AckThrottlerTests extends ESTestCase {
@Test @Test
public void testThrottle_When_Ackable() throws Exception { public void testThrottle_When_Ackable() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.now(); DateTime timestamp = SystemClock.INSTANCE.nowUTC();
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD); WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch(); Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class); ActionStatus actionStatus = mock(ActionStatus.class);

View File

@ -35,7 +35,7 @@ public class PeriodThrottlerTests extends ESTestCase {
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
ActionStatus actionStatus = mock(ActionStatus.class); ActionStatus actionStatus = mock(ActionStatus.class);
when(actionStatus.lastSuccessfulExecution()).thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.now().minusSeconds((int) period.seconds() - 1))); when(actionStatus.lastSuccessfulExecution()).thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.nowUTC().minusSeconds((int) period.seconds() - 1)));
WatchStatus status = mock(WatchStatus.class); WatchStatus status = mock(WatchStatus.class);
when(status.actionStatus("_action")).thenReturn(actionStatus); when(status.actionStatus("_action")).thenReturn(actionStatus);
when(ctx.watch().status()).thenReturn(status); when(ctx.watch().status()).thenReturn(status);
@ -55,7 +55,7 @@ public class PeriodThrottlerTests extends ESTestCase {
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
ActionStatus actionStatus = mock(ActionStatus.class); ActionStatus actionStatus = mock(ActionStatus.class);
when(actionStatus.lastSuccessfulExecution()).thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.now().minusSeconds((int) period.seconds() + 1))); when(actionStatus.lastSuccessfulExecution()).thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.nowUTC().minusSeconds((int) period.seconds() + 1)));
WatchStatus status = mock(WatchStatus.class); WatchStatus status = mock(WatchStatus.class);
when(status.actionStatus("_action")).thenReturn(actionStatus); when(status.actionStatus("_action")).thenReturn(actionStatus);
when(ctx.watch().status()).thenReturn(status); when(ctx.watch().status()).thenReturn(status);

View File

@ -150,7 +150,7 @@ public class CompareConditionTests extends ESTestCase {
boolean met = randomBoolean(); boolean met = randomBoolean();
Op op = met ? randomFrom(Op.GT, Op.GTE, Op.NOT_EQ) : randomFrom(Op.LT, Op.LTE, Op.EQ); Op op = met ? randomFrom(Op.GT, Op.GTE, Op.NOT_EQ) : randomFrom(Op.LT, Op.LTE, Op.EQ);
String value = "<{now-1d}>"; String value = "<{now-1d}>";
DateTime payloadValue = clock.now(); DateTime payloadValue = clock.nowUTC();
ExecutableCompareCondition condition = new ExecutableCompareCondition(new CompareCondition("ctx.payload.value", op, value), logger, clock); ExecutableCompareCondition condition = new ExecutableCompareCondition(new CompareCondition("ctx.payload.value", op, value), logger, clock);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", payloadValue)); WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", payloadValue));

View File

@ -7,7 +7,6 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -31,13 +30,11 @@ public class HistoryStoreTests extends ESTestCase {
private HistoryStore historyStore; private HistoryStore historyStore;
private ClientProxy clientProxy; private ClientProxy clientProxy;
private IndexNameExpressionResolver indexNameExpressionResolver;
@Before @Before
public void init() { public void init() {
clientProxy = mock(ClientProxy.class); clientProxy = mock(ClientProxy.class);
indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); historyStore = new HistoryStore(Settings.EMPTY, clientProxy);
historyStore = new HistoryStore(Settings.EMPTY, clientProxy, indexNameExpressionResolver);
historyStore.start(); historyStore.start();
} }

View File

@ -44,7 +44,7 @@ public class WatcherUtilsTests extends ESTestCase {
@Test @Test
public void testFlattenModel() throws Exception { public void testFlattenModel() throws Exception {
DateTime now = SystemClock.INSTANCE.now(); DateTime now = SystemClock.INSTANCE.nowUTC();
Map<String, Object> map = ImmutableMap.<String, Object>builder() Map<String, Object> map = ImmutableMap.<String, Object>builder()
.put("a", ImmutableMap.builder().put("a1", new int[] { 0, 1, 2 }).build()) .put("a", ImmutableMap.builder().put("a1", new int[] { 0, 1, 2 }).build())
.put("b", new String[] { "b0", "b1", "b2" }) .put("b", new String[] { "b0", "b1", "b2" })

View File

@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class ClockMock implements Clock { public class ClockMock implements Clock {
private DateTime now = DateTime.now(DateTimeZone.getDefault()); private DateTime now = DateTime.now(DateTimeZone.UTC);
@Override @Override
public long millis() { public long millis() {
@ -28,11 +28,6 @@ public class ClockMock implements Clock {
return TimeUnit.MILLISECONDS.toNanos(now.getMillis()); return TimeUnit.MILLISECONDS.toNanos(now.getMillis());
} }
@Override
public DateTime now() {
return now;
}
@Override @Override
public DateTime nowUTC() { public DateTime nowUTC() {
return now(DateTimeZone.UTC); return now(DateTimeZone.UTC);

View File

@ -437,7 +437,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
private void testConditionSearch(SearchRequest request) throws Exception { private void testConditionSearch(SearchRequest request) throws Exception {
if (timeWarped()) { if (timeWarped()) {
// reset, so we don't miss event docs when we filter over the _timestamp field. // reset, so we don't miss event docs when we filter over the _timestamp field.
timeWarp().clock().setTime(SystemClock.INSTANCE.now()); timeWarp().clock().setTime(SystemClock.INSTANCE.nowUTC());
} }
String watchName = "_name"; String watchName = "_name";
@ -450,7 +450,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTests {
.condition(ConditionBuilders.scriptCondition("return ctx.payload.hits.total >= 3"))) .condition(ConditionBuilders.scriptCondition("return ctx.payload.hits.total >= 3")))
.get(); .get();
logger.info("created watch [{}] at [{}]", watchName, SystemClock.INSTANCE.now()); logger.info("created watch [{}] at [{}]", watchName, SystemClock.INSTANCE.nowUTC());
client().prepareIndex("events", "event") client().prepareIndex("events", "event")
.setCreate(true) .setCreate(true)

View File

@ -30,6 +30,7 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@ -130,13 +131,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject() .setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event) .field(event.type(), event)
.endObject() .endObject()
.startObject(Watch.Field.CONDITION.getPreferredName()) .startObject(Watch.Field.CONDITION.getPreferredName())
.field(condition.type(), condition) .field(condition.type(), condition)
.endObject() .endObject()
.startObject(Watch.Field.INPUT.getPreferredName()) .startObject(Watch.Field.INPUT.getPreferredName())
.startObject("none").endObject() .startObject("none").endObject()
.endObject() .endObject()
.endObject()) .endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL) .setConsistencyLevel(WriteConsistencyLevel.ALL)
@ -148,13 +149,13 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value()) client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject() .setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event) .field(event.type(), event)
.endObject() .endObject()
.startObject(Watch.Field.CONDITION.getPreferredName()) .startObject(Watch.Field.CONDITION.getPreferredName())
.startObject("unknown").endObject() .startObject("unknown").endObject()
.endObject() .endObject()
.startObject(Watch.Field.INPUT.getPreferredName()) .startObject(Watch.Field.INPUT.getPreferredName())
.startObject("none").endObject() .startObject("none").endObject()
.endObject() .endObject()
.endObject()) .endObject())
.setConsistencyLevel(WriteConsistencyLevel.ALL) .setConsistencyLevel(WriteConsistencyLevel.ALL)
@ -349,5 +350,17 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
}); });
} }
@Test
public void testManuallyStopped() throws Exception {
WatcherStatsResponse response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatcherMetaData().manuallyStopped(), is(false));
stopWatcher();
response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatcherMetaData().manuallyStopped(), is(true));
startWatcher();
response = watcherClient().prepareWatcherStats().get();
assertThat(response.getWatcherMetaData().manuallyStopped(), is(false));
}
} }

View File

@ -5,9 +5,8 @@
*/ */
package org.elasticsearch.watcher.trigger.schedule.engine; package org.elasticsearch.watcher.trigger.schedule.engine;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.joda.time.DateTime;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.TriggerEvent;
@ -15,9 +14,9 @@ import org.elasticsearch.watcher.trigger.schedule.Schedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek; import org.elasticsearch.watcher.trigger.schedule.support.DayOfWeek;
import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes; import org.elasticsearch.watcher.trigger.schedule.support.WeekTimes;
import org.joda.time.DateTime;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -27,9 +26,9 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.*; import static org.elasticsearch.watcher.trigger.schedule.Schedules.*;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC;
public abstract class BaseTriggerEngineTestCase extends ESTestCase { public abstract class BaseTriggerEngineTestCase extends ESTestCase {
@ -63,11 +62,11 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
for (TriggerEvent event : events) { for (TriggerEvent event : events) {
int index = Integer.parseInt(event.jobName()); int index = Integer.parseInt(event.jobName());
if (!bits.get(index)) { if (!bits.get(index)) {
logger.info("job [{}] first fire: {}", index, SystemClock.INSTANCE.now()); logger.info("job [{}] first fire", index);
bits.set(index); bits.set(index);
} else { } else {
latch.countDown(); latch.countDown();
logger.info("job [{}] second fire: {}", index, SystemClock.INSTANCE.now()); logger.info("job [{}] second fire", index);
} }
} }
} }
@ -91,7 +90,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
public void triggered(Iterable<TriggerEvent> events) { public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) { for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name)); assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", SystemClock.INSTANCE.now()); logger.info("triggered job on [{}]", SystemClock.INSTANCE.nowUTC());
} }
latch.countDown(); latch.countDown();
} }
@ -125,7 +124,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
public void triggered(Iterable<TriggerEvent> events) { public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) { for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name)); assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", SystemClock.INSTANCE.now()); logger.info("triggered job on [{}]", SystemClock.INSTANCE.nowUTC());
latch.countDown(); latch.countDown();
} }
} }
@ -161,7 +160,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
public void triggered(Iterable<TriggerEvent> events) { public void triggered(Iterable<TriggerEvent> events) {
for (TriggerEvent event : events) { for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name)); assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", SystemClock.INSTANCE.now()); logger.info("triggered job");
} }
latch.countDown(); latch.countDown();
} }
@ -195,7 +194,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
@Override @Override
public void triggered(Iterable<TriggerEvent> events) { public void triggered(Iterable<TriggerEvent> events) {
logger.info("triggered job on [{}]", SystemClock.INSTANCE.now()); logger.info("triggered job");
} }
}); });

View File

@ -205,7 +205,7 @@ public class WatchServiceTests extends ESTestCase {
@Test @Test
public void testAckWatch_NotAck() throws Exception { public void testAckWatch_NotAck() throws Exception {
DateTime now = SystemClock.INSTANCE.now(); DateTime now = SystemClock.INSTANCE.nowUTC();
TimeValue timeout = TimeValue.timeValueSeconds(5); TimeValue timeout = TimeValue.timeValueSeconds(5);
WatchLockService.Lock lock = mock(WatchLockService.Lock.class); WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock); when(watchLockService.tryAcquire("_id", timeout)).thenReturn(lock);