Watcher: Cleanup - remove Clock interface (elastic/elasticsearch#3985)

The clock interface is merely used to create mock clocks and inject them into tests.
We can do this with the java8 based java.time.Clock class as well, so there is no need
to keep this interface.

Original commit: elastic/x-pack-elasticsearch@ae30dc29ca
This commit is contained in:
Alexander Reelsen 2016-11-07 09:10:25 +01:00 committed by GitHub
parent 4e0457276d
commit 8b6552516e
51 changed files with 264 additions and 310 deletions

View File

@ -30,8 +30,8 @@ import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.support.clock.Clock;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;

View File

@ -83,8 +83,6 @@ import org.elasticsearch.xpack.security.authc.AuthenticationService;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.ssl.SSLConfigurationReloader;
import org.elasticsearch.xpack.ssl.SSLService;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
@ -92,6 +90,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -196,7 +195,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
// overridable by tests
protected Clock getClock() {
return SystemClock.INSTANCE;
return Clock.systemUTC();
}
@Override

View File

@ -8,8 +8,8 @@ package org.elasticsearch.xpack.scheduler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.xpack.support.clock.Clock;
import java.time.Clock;
import java.util.Collection;
import java.util.List;
import java.util.Map;

View File

@ -1,24 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.support.clock;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
public interface Clock {
long millis();
long nanos();
DateTime nowUTC();
DateTime now(DateTimeZone timeZone);
TimeValue timeElapsedSince(DateTime time);
}

View File

@ -5,11 +5,15 @@
*/
package org.elasticsearch.xpack.support.clock;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
public class HaltedClock implements Clock {
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
public class HaltedClock extends Clock {
private final DateTime now;
@ -17,28 +21,27 @@ public class HaltedClock implements Clock {
this.now = now.toDateTime(DateTimeZone.UTC);
}
@Override
public ZoneId getZone() {
return ZoneOffset.UTC;
}
@Override
public Clock withZone(ZoneId zoneId) {
if (zoneId.equals(ZoneOffset.UTC)) {
return this;
}
throw new IllegalArgumentException("Halted clock time zone cannot be changed");
}
@Override
public long millis() {
return now.getMillis();
}
@Override
public long nanos() {
return millis() * 1000000;
}
@Override
public DateTime nowUTC() {
return now;
}
@Override
public DateTime now(DateTimeZone timeZone) {
return now.toDateTime(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(millis() - time.getMillis());
public Instant instant() {
return Instant.ofEpochMilli(now.getMillis());
}
}

View File

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.support.clock;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
public final class SystemClock implements Clock {
public static final SystemClock INSTANCE = new SystemClock();
private SystemClock() {
}
@Override
public long millis() {
return System.currentTimeMillis();
}
@Override
public long nanos() {
return System.nanoTime();
}
@Override
public DateTime nowUTC() {
return now(DateTimeZone.UTC);
}
@Override
public DateTime now(DateTimeZone timeZone) {
return DateTime.now(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(millis() - time.getMillis());
}
}

View File

@ -41,7 +41,6 @@ import org.elasticsearch.xpack.notification.hipchat.HipChatService;
import org.elasticsearch.xpack.notification.pagerduty.PagerDutyService;
import org.elasticsearch.xpack.notification.slack.SlackService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.actions.ActionRegistry;
import org.elasticsearch.xpack.watcher.actions.email.EmailAction;
@ -113,6 +112,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

View File

@ -14,9 +14,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
@ -25,15 +23,16 @@ import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException;
import static org.joda.time.DateTimeZone.UTC;
public class WatcherService extends AbstractComponent {
@ -118,7 +117,7 @@ public class WatcherService extends AbstractComponent {
public IndexResponse putWatch(String id, BytesReference watchSource, boolean active) throws IOException {
ensureStarted();
DateTime now = clock.nowUTC();
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = watchParser.parseWithSecrets(id, false, watchSource, now);
watch.setState(active, now);
WatchStore.WatchPut result = watchStore.put(watch);
@ -173,7 +172,7 @@ public class WatcherService extends AbstractComponent {
throw illegalArgument("watch [{}] does not exist", id);
}
// we need to create a safe copy of the status
if (watch.ack(clock.now(DateTimeZone.UTC), actionIds)) {
if (watch.ack(new DateTime(clock.millis(), UTC), actionIds)) {
try {
watchStore.updateStatus(watch);
} catch (IOException ioe) {
@ -212,7 +211,7 @@ public class WatcherService extends AbstractComponent {
if (watch == null) {
throw illegalArgument("watch [{}] does not exist", id);
}
if (watch.setState(active, clock.nowUTC())) {
if (watch.setState(active, new DateTime(clock.millis(), UTC))) {
try {
watchStore.updateStatus(watch);
if (active) {

View File

@ -8,12 +8,12 @@ package org.elasticsearch.xpack.watcher.actions;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.support.validation.Validation;
import org.elasticsearch.xpack.watcher.transform.TransformRegistry;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

View File

@ -17,7 +17,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.watcher.actions.throttler.Throttler;
import org.elasticsearch.xpack.watcher.condition.Condition;
@ -29,6 +28,7 @@ import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import java.time.Clock;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

View File

@ -9,7 +9,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.Clock;
import java.time.Clock;
public class ActionThrottler implements Throttler {

View File

@ -9,9 +9,10 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.Clock;
import org.joda.time.PeriodType;
import java.time.Clock;
/**
* This throttler throttles the action based on its last <b>successful</b> execution time. If the time passed since
* the last successful execution is lower than the given period, the aciton will be throttled.
@ -51,7 +52,7 @@ public class PeriodThrottler implements Throttler {
if (status.lastSuccessfulExecution() == null) {
return Result.NO;
}
TimeValue timeElapsed = clock.timeElapsedSince(status.lastSuccessfulExecution().timestamp());
TimeValue timeElapsed = TimeValue.timeValueMillis(clock.millis() - status.lastSuccessfulExecution().timestamp().getMillis());
if (timeElapsed.getMillis() <= period.getMillis()) {
return Result.throttle("throttling interval is set to [{}] but time elapsed since last execution is [{}]",
period.format(periodType), timeElapsed.format(periodType));

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.condition;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
@ -13,6 +12,7 @@ import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;

View File

@ -9,10 +9,10 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.xcontent.XContentUtils;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -29,11 +29,6 @@ public final class ArrayCompareCondition extends AbstractCompareCondition {
private final Object value;
private final Quantifier quantifier;
public ArrayCompareCondition(String arrayPath, String path, Op op, Object value,
Quantifier quantifier) {
this(arrayPath, path, op, value, quantifier, null);
}
ArrayCompareCondition(String arrayPath, String path, Op op, Object value,
Quantifier quantifier,
Clock clock) {

View File

@ -9,10 +9,10 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.xcontent.XContentUtils;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import java.io.IOException;
import java.time.Clock;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

View File

@ -5,11 +5,10 @@
*/
package org.elasticsearch.xpack.watcher.condition;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.support.clock.Clock;
import java.io.IOException;
import java.time.Clock;
/**
* Parses xcontent to a concrete condition of the same type.

View File

@ -7,9 +7,9 @@ package org.elasticsearch.xpack.watcher.condition;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.support.clock.Clock;
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
public class ConditionRegistry {

View File

@ -20,7 +20,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.condition.Condition;
@ -33,8 +32,8 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -45,6 +44,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.joda.time.DateTimeZone.UTC;
public class ExecutionService extends AbstractComponent {
public static final Setting<TimeValue> DEFAULT_THROTTLE_PERIOD_SETTING =
@ -174,7 +175,7 @@ public class ExecutionService extends AbstractComponent {
final LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now(DateTimeZone.UTC);
DateTime now = new DateTime(clock.millis(), UTC);
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
@ -220,7 +221,7 @@ public class ExecutionService extends AbstractComponent {
final LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now(DateTimeZone.UTC);
DateTime now = new DateTime(clock.millis(), UTC);
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
@ -427,7 +428,7 @@ public class ExecutionService extends AbstractComponent {
historyStore.forcePut(record);
triggeredWatchStore.delete(triggeredWatch.id());
} else {
TriggeredExecutionContext ctx = new StartupExecutionContext(watch, clock.now(DateTimeZone.UTC),
TriggeredExecutionContext ctx = new StartupExecutionContext(watch, new DateTime(clock.millis(), UTC),
triggeredWatch.triggerEvent(), defaultThrottlePeriod);
executeAsync(ctx, triggeredWatch);
counter++;

View File

@ -15,11 +15,11 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.support.clock.Clock;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

View File

@ -5,20 +5,19 @@
*/
package org.elasticsearch.xpack.watcher.support.xcontent;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.common.secret.Secret;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import java.io.IOException;
import java.time.Clock;
import java.util.List;
import java.util.Map;
/**
* A xcontent parser that is used by watcher. This is a special parser that is
@ -62,7 +61,7 @@ public class WatcherXContentParser implements XContentParser {
if (parser instanceof WatcherXContentParser) {
return ((WatcherXContentParser) parser).clock;
}
return SystemClock.INSTANCE;
return Clock.systemUTC();
}
private final Clock clock;

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
@ -39,11 +38,12 @@ import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.Map;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.joda.time.DateTimeZone.UTC;
/**
* Performs the watch execution operation.
@ -108,7 +108,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch,
new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
DateTime executionTime = clock.now(DateTimeZone.UTC);
DateTime executionTime = new DateTime(clock.millis(), UTC);
ctxBuilder.executionTime(executionTime);
for (Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
ctxBuilder.actionMode(entry.getKey(), entry.getValue());

View File

@ -9,15 +9,16 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.AbstractTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.joda.time.DateTimeZone.UTC;
public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<ScheduleTrigger, ScheduleTriggerEvent> {
@ -39,7 +40,7 @@ public abstract class ScheduleTriggerEngine extends AbstractTriggerEngine<Schedu
@Override
public ScheduleTriggerEvent simulateEvent(String jobId, @Nullable Map<String, Object> data, TriggerService service) {
DateTime now = clock.nowUTC();
DateTime now = new DateTime(clock.millis(), UTC);
if (data == null) {
return new ScheduleTriggerEvent(jobId, now, now);
}

View File

@ -11,12 +11,12 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
public class ScheduleTriggerEvent extends TriggerEvent {

View File

@ -8,20 +8,21 @@ package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.joda.time.DateTimeZone.UTC;
public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final SchedulerEngine schedulerEngine;
@ -45,7 +46,7 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
schedulerJobs.add(new SchedulerEngine.Job(job.id(), trigger.getSchedule()));
});
schedulerEngine.start(schedulerJobs);
logger.debug("schedule engine started at [{}]", clock.nowUTC());
logger.debug("schedule engine started at [{}]", new DateTime(clock.millis(), UTC));
}
@Override
@ -68,10 +69,10 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, DateTimeZone.UTC),
new DateTime(scheduledTime, DateTimeZone.UTC));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, DateTimeZone.UTC),
new DateTime(scheduledTime, DateTimeZone.UTC));
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, UTC),
new DateTime(scheduledTime, UTC));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, UTC),
new DateTime(scheduledTime, UTC));
for (Listener listener : listeners) {
listener.triggered(Collections.<TriggerEvent>singletonList(event));
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.Schedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
@ -16,8 +15,8 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -25,6 +24,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import static org.joda.time.DateTimeZone.UTC;
public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final TimeValue tickInterval;
@ -76,9 +77,9 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name,
new DateTime(triggeredTime, DateTimeZone.UTC), new DateTime(scheduledTime, DateTimeZone.UTC));
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, DateTimeZone.UTC),
new DateTime(scheduledTime, DateTimeZone.UTC)));
new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, UTC),
new DateTime(scheduledTime, UTC)));
if (events.size() >= 1000) {
notifyListeners(events);
events.clear();
@ -140,7 +141,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
@Override
public void run() {
while (active) {
logger.trace("checking jobs [{}]", clock.nowUTC());
logger.trace("checking jobs [{}]", new DateTime(clock.millis(), UTC));
checkJobs();
try {
sleep(tickInterval.millis());

View File

@ -20,14 +20,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.secret.Secret;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.HaltedClock;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.actions.ActionRegistry;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
import org.elasticsearch.xpack.watcher.input.InputRegistry;
import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
@ -41,6 +40,7 @@ import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -52,6 +52,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentHelper.createParser;
import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException;
import static org.joda.time.DateTimeZone.UTC;
public class Watch implements TriggerEngine.Job, ToXContent {
@ -232,11 +233,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
}
public Watch parse(String name, boolean includeStatus, BytesReference source) throws IOException {
return parse(name, includeStatus, false, source, clock.nowUTC(), false);
return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), false);
}
public Watch parse(String name, boolean includeStatus, BytesReference source, boolean upgradeSource) throws IOException {
return parse(name, includeStatus, false, source, clock.nowUTC(), upgradeSource);
return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), upgradeSource);
}
public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now) throws IOException {
@ -345,11 +346,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
} else {
// we need to create the initial statuses for the actions
Map<String, ActionStatus> actionsStatuses = new HashMap<>();
DateTime now = WatcherXContentParser.clock(parser).nowUTC();
DateTime now = new DateTime(WatcherXContentParser.clock(parser).millis(), UTC);
for (ActionWrapper action : actions) {
actionsStatuses.put(action.id(), new ActionStatus(now));
}
status = new WatchStatus(WatcherXContentParser.clock(parser).nowUTC(), unmodifiableMap(actionsStatuses));
status = new WatchStatus(now, unmodifiableMap(actionsStatuses));
}
return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status);

View File

@ -15,15 +15,14 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.throttler.AckThrottler;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherXContentParser;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
@ -34,6 +33,7 @@ import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.readD
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.readOptionalDate;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.writeDate;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.writeOptionalDate;
import static org.joda.time.DateTimeZone.UTC;
public class WatchStatus implements ToXContent, Streamable {
@ -213,15 +213,15 @@ public class WatchStatus implements ToXContent, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in, DateTimeZone.UTC);
lastMetCondition = readOptionalDate(in, DateTimeZone.UTC);
lastChecked = readOptionalDate(in, UTC);
lastMetCondition = readOptionalDate(in, UTC);
int count = in.readInt();
Map<String, ActionStatus> actions = new HashMap<>(count);
for (int i = 0; i < count; i++) {
actions.put(in.readString(), ActionStatus.readFrom(in));
}
this.actions = unmodifiableMap(actions);
state = new State(in.readBoolean(), readDate(in, DateTimeZone.UTC));
state = new State(in.readBoolean(), readDate(in, UTC));
}
public static WatchStatus read(StreamInput in) throws IOException {
@ -273,14 +273,14 @@ public class WatchStatus implements ToXContent, Streamable {
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.LAST_CHECKED)) {
if (token.isValue()) {
lastChecked = parseDate(currentFieldName, parser, DateTimeZone.UTC);
lastChecked = parseDate(currentFieldName, parser, UTC);
} else {
throw new ElasticsearchParseException("could not parse watch status for [{}]. expecting field [{}] to hold a date " +
"value, found [{}] instead", watchId, currentFieldName, token);
}
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.LAST_MET_CONDITION)) {
if (token.isValue()) {
lastMetCondition = parseDate(currentFieldName, parser, DateTimeZone.UTC);
lastMetCondition = parseDate(currentFieldName, parser, UTC);
} else {
throw new ElasticsearchParseException("could not parse watch status for [{}]. expecting field [{}] to hold a date " +
"value, found [{}] instead", watchId, currentFieldName, token);
@ -307,7 +307,7 @@ public class WatchStatus implements ToXContent, Streamable {
// this is to support old watches that weren't upgraded yet to
// contain the state
if (state == null) {
state = new State(true, WatcherXContentParser.clock(parser).nowUTC());
state = new State(true, new DateTime(WatcherXContentParser.clock(parser).millis(), UTC));
}
actions = actions == null ? emptyMap() : unmodifiableMap(actions);
@ -345,7 +345,7 @@ public class WatchStatus implements ToXContent, Streamable {
throw new ElasticsearchParseException("expected an object but found [{}] instead", parser.currentToken());
}
boolean active = true;
DateTime timestamp = SystemClock.INSTANCE.nowUTC();
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -354,7 +354,7 @@ public class WatchStatus implements ToXContent, Streamable {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.ACTIVE)) {
active = parser.booleanValue();
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.TIMESTAMP)) {
timestamp = parseDate(currentFieldName, parser, DateTimeZone.UTC);
timestamp = parseDate(currentFieldName, parser, UTC);
}
}
return new State(active, timestamp);

View File

@ -6,11 +6,11 @@
package org.elasticsearch.xpack;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.test.TimeWarpedWatcher;
import java.io.IOException;
import java.time.Clock;
public class TimeWarpedXPackPlugin extends XPackPlugin {
private final ClockMock clock = new ClockMock();

View File

@ -9,14 +9,42 @@ import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.concurrent.TimeUnit;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
/**
* A clock that can be modified for testing.
*/
public class ClockMock implements Clock {
public class ClockMock extends Clock {
private DateTime now = DateTime.now(DateTimeZone.UTC);
private final ZoneId zoneId;
private DateTime now;
public ClockMock() {
zoneId = ZoneOffset.UTC;
now = DateTime.now(DateTimeZone.UTC);
}
private ClockMock(ZoneId zoneId) {
this.zoneId = zoneId;
now = DateTime.now(DateTimeZone.forID(zoneId.getId()));
}
@Override
public ZoneId getZone() {
return ZoneOffset.UTC;
}
@Override
public Clock withZone(ZoneId zoneId) {
if (zoneId.equals(this.zoneId)) {
return this;
}
return new ClockMock(zoneId);
}
@Override
public long millis() {
@ -24,23 +52,8 @@ public class ClockMock implements Clock {
}
@Override
public long nanos() {
return TimeUnit.MILLISECONDS.toNanos(now.getMillis());
}
@Override
public DateTime nowUTC() {
return now(DateTimeZone.UTC);
}
@Override
public DateTime now(DateTimeZone timeZone) {
return now.toDateTime(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(now.getMillis() - time.getMillis());
public Instant instant() {
return Instant.ofEpochMilli(now.getMillis());
}
public ClockMock setTime(DateTime now) {

View File

@ -6,6 +6,9 @@
package org.elasticsearch.xpack.support.clock;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;
import java.time.Clock;
import static org.hamcrest.Matchers.equalTo;
import static org.joda.time.DateTimeZone.UTC;
@ -13,7 +16,7 @@ import static org.joda.time.DateTimeZone.UTC;
public class ClockTests extends ESTestCase {
public void testNowUTC() {
Clock clockMock = new ClockMock();
assertThat(clockMock.now(UTC).getZone(), equalTo(UTC));
assertThat(SystemClock.INSTANCE.now(UTC).getZone(), equalTo(UTC));
assertThat(new DateTime(clockMock.millis(), UTC).getZone(), equalTo(UTC));
assertThat(new DateTime(Clock.systemUTC().millis(), UTC).getZone(), equalTo(UTC));
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.trigger.Trigger;
@ -25,14 +24,15 @@ import org.elasticsearch.xpack.watcher.watch.WatchLockService;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.time.Clock;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
@ -72,7 +72,7 @@ public class WatcherServiceTests extends ESTestCase {
IndexResponse indexResponse = mock(IndexResponse.class);
Watch newWatch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
when(status.state()).thenReturn(new WatchStatus.State(activeByDefault, clock.nowUTC()));
when(status.state()).thenReturn(new WatchStatus.State(activeByDefault, new DateTime(clock.millis(), UTC)));
when(newWatch.status()).thenReturn(status);
WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class);
@ -85,7 +85,7 @@ public class WatcherServiceTests extends ESTestCase {
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), activeByDefault);
assertThat(response, sameInstance(indexResponse));
verify(newWatch, times(1)).setState(activeByDefault, clock.nowUTC());
verify(newWatch, times(1)).setState(activeByDefault, new DateTime(clock.millis(), UTC));
if (activeByDefault) {
verify(triggerService, times(1)).add(any(TriggerEngine.Job.class));
} else {
@ -102,7 +102,8 @@ public class WatcherServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
WatchStatus status = mock(WatchStatus.class);
boolean active = randomBoolean();
when(status.state()).thenReturn(new WatchStatus.State(active, clock.nowUTC()));
DateTime now = new DateTime(clock.millis(), UTC);
when(status.state()).thenReturn(new WatchStatus.State(active, now));
when(watch.status()).thenReturn(status);
when(watch.trigger()).thenReturn(trigger);
WatchStore.WatchPut watchPut = mock(WatchStore.WatchPut.class);
@ -112,12 +113,12 @@ public class WatcherServiceTests extends ESTestCase {
Watch previousWatch = mock(Watch.class);
WatchStatus previousStatus = mock(WatchStatus.class);
boolean prevActive = randomBoolean();
when(previousStatus.state()).thenReturn(new WatchStatus.State(prevActive, clock.nowUTC()));
when(previousStatus.state()).thenReturn(new WatchStatus.State(prevActive, now));
when(previousWatch.status()).thenReturn(previousStatus);
when(previousWatch.trigger()).thenReturn(trigger);
when(watchPut.previous()).thenReturn(previousWatch);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(clock.nowUTC()))).thenReturn(watch);
when(watchParser.parseWithSecrets(any(String.class), eq(false), any(BytesReference.class), eq(now))).thenReturn(watch);
when(watchStore.put(watch)).thenReturn(watchPut);
IndexResponse response = watcherService.putWatch("_id", new BytesArray("{}"), active);
@ -160,7 +161,7 @@ public class WatcherServiceTests extends ESTestCase {
}
public void testAckWatch() throws Exception {
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
clock.setTime(now);
Watch watch = mock(Watch.class);
when(watch.ack(now, "_all")).thenReturn(true);
@ -197,7 +198,7 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should not change
// - the watch doesn't need to be updated in the store
// - the watch should not be removed or re-added to the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
clock.setTime(now);
Watch watch = mock(Watch.class);
@ -221,7 +222,7 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch needs to be updated in the store
// - the watch should be re-added to the trigger service (the assumption is that it's not there)
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
clock.setTime(now);
Watch watch = mock(Watch.class);
@ -243,7 +244,7 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should change
// - the watch needs to be updated in the store
// - the watch should be removed from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
clock.setTime(now);
Watch watch = mock(Watch.class);
@ -266,7 +267,7 @@ public class WatcherServiceTests extends ESTestCase {
// - the watch status should not be updated
// - the watch should not be updated in the store
// - the watch should be re-added or removed to/from the trigger service
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
clock.setTime(now);
Watch watch = mock(Watch.class);
@ -285,7 +286,7 @@ public class WatcherServiceTests extends ESTestCase {
}
public void testAckWatchNotAck() throws Exception {
DateTime now = SystemClock.INSTANCE.nowUTC();
DateTime now = new DateTime(Clock.systemUTC().millis(), UTC);
Watch watch = mock(Watch.class);
when(watch.ack(now)).thenReturn(false);
WatchStatus status = new WatchStatus(now, emptyMap());
@ -300,14 +301,7 @@ public class WatcherServiceTests extends ESTestCase {
public void testAckWatchNoWatch() throws Exception {
when(watchStore.get("_id")).thenReturn(null);
try {
watcherService.ackWatch("_id", Strings.EMPTY_ARRAY);
fail();
} catch (IllegalArgumentException iae) {
// expected
}
expectThrows(IllegalArgumentException.class, () -> watcherService.ackWatch("_id", Strings.EMPTY_ARRAY));
verify(watchStore, never()).updateStatus(any(Watch.class));
}
}

View File

@ -8,22 +8,24 @@ package org.elasticsearch.xpack.watcher.actions.throttler;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import java.time.Clock;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.formatDate;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AckThrottlerTests extends ESTestCase {
public void testWhenAcked() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.nowUTC();
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);
@ -38,7 +40,7 @@ public class AckThrottlerTests extends ESTestCase {
}
public void testThrottleWhenAwaitsSuccessfulExecution() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.nowUTC();
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);
@ -54,7 +56,7 @@ public class AckThrottlerTests extends ESTestCase {
}
public void testThrottleWhenAckable() throws Exception {
DateTime timestamp = SystemClock.INSTANCE.nowUTC();
DateTime timestamp = new DateTime(Clock.systemUTC().millis(), UTC);
WatchExecutionContext ctx = mockExecutionContext("_watch", EMPTY_PAYLOAD);
Watch watch = ctx.watch();
ActionStatus actionStatus = mock(ActionStatus.class);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.notification.email.EmailTemplate;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.actions.email.EmailAction;
@ -36,6 +35,7 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -361,7 +361,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
ManualTriggerEvent triggerEvent = new ManualTriggerEvent("_id",
new ScheduleTriggerEvent(new DateTime(DateTimeZone.UTC), new DateTime(DateTimeZone.UTC)));
return ManualExecutionContext.builder(watchService().getWatch("_id"), true, triggerEvent, throttlePeriod)
.executionTime(timeWarped() ? timeWarp().clock().nowUTC() : SystemClock.INSTANCE.nowUTC())
.executionTime(timeWarped() ? new DateTime(timeWarp().clock().millis()) : new DateTime(Clock.systemUTC().millis()))
.allActionsMode(ActionExecutionMode.SIMULATE)
.recordExecution(true)
.build();

View File

@ -9,10 +9,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.joda.time.PeriodType;
import java.time.Clock;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.EMPTY_PAYLOAD;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.CoreMatchers.nullValue;
@ -26,12 +28,13 @@ public class PeriodThrottlerTests extends ESTestCase {
public void testBelowPeriodSuccessful() throws Exception {
PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes());
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType);
PeriodThrottler throttler = new PeriodThrottler(Clock.systemUTC(), period, periodType);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
ActionStatus actionStatus = mock(ActionStatus.class);
DateTime now = new DateTime(Clock.systemUTC().millis());
when(actionStatus.lastSuccessfulExecution())
.thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.nowUTC().minusSeconds((int) period.seconds() - 1)));
.thenReturn(ActionStatus.Execution.successful(now.minusSeconds((int) period.seconds() - 1)));
WatchStatus status = mock(WatchStatus.class);
when(status.actionStatus("_action")).thenReturn(actionStatus);
when(ctx.watch().status()).thenReturn(status);
@ -46,12 +49,13 @@ public class PeriodThrottlerTests extends ESTestCase {
public void testAbovePeriod() throws Exception {
PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes());
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType);
PeriodThrottler throttler = new PeriodThrottler(Clock.systemUTC(), period, periodType);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
ActionStatus actionStatus = mock(ActionStatus.class);
DateTime now = new DateTime(Clock.systemUTC().millis());
when(actionStatus.lastSuccessfulExecution())
.thenReturn(ActionStatus.Execution.successful(SystemClock.INSTANCE.nowUTC().minusSeconds((int) period.seconds() + 1)));
.thenReturn(ActionStatus.Execution.successful(now.minusSeconds((int) period.seconds() + 1)));
WatchStatus status = mock(WatchStatus.class);
when(status.actionStatus("_action")).thenReturn(actionStatus);
when(ctx.watch().status()).thenReturn(status);

View File

@ -12,7 +12,8 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.SystemClock;
import java.time.Clock;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
@ -56,11 +57,11 @@ public class AlwaysConditionTests extends ESTestCase {
return new ScriptCondition(new Script("_script"), scriptService);
case CompareCondition.TYPE:
return new CompareCondition("_path", randomFrom(CompareCondition.Op.values()), randomFrom(5, "3"),
SystemClock.INSTANCE);
Clock.systemUTC());
case ArrayCompareCondition.TYPE:
return new ArrayCompareCondition("_array_path", "_path",
randomFrom(ArrayCompareCondition.Op.values()), randomFrom(5, "3"), ArrayCompareCondition.Quantifier.SOME,
SystemClock.INSTANCE);
Clock.systemUTC());
default:
return AlwaysCondition.INSTANCE;
}

View File

@ -8,14 +8,12 @@ package org.elasticsearch.xpack.watcher.condition;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.xpack.watcher.condition.ArrayCompareCondition;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@ -50,8 +48,7 @@ public class ArrayCompareConditionSearchTests extends AbstractWatcherIntegration
ArrayCompareCondition condition = new ArrayCompareCondition("ctx.payload.aggregations.top_tweeters.buckets" , "doc_count", op,
numberOfDocumentsWatchingFor, quantifier, SystemClock.INSTANCE
);
numberOfDocumentsWatchingFor, quantifier, Clock.systemUTC());
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
Condition.Result result = condition.execute(ctx);

View File

@ -10,14 +10,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -107,7 +108,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
logger.debug("met [{}]", met);
ArrayCompareCondition condition = new ArrayCompareCondition("ctx.payload.value", "", op, value, quantifier,
SystemClock.INSTANCE);
Clock.systemUTC());
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", values));
assertThat(condition.execute(ctx).met(), is(met));
}
@ -137,7 +138,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
logger.debug("met [{}]", met);
ArrayCompareCondition condition = new ArrayCompareCondition("ctx.payload.value", "doc_count", op, value, quantifier,
SystemClock.INSTANCE);
Clock.systemUTC());
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", values));
assertThat(condition.execute(ctx).met(), is(met));
}
@ -156,7 +157,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
List<Object> values = new ArrayList<>(numberOfValues);
for (int i = 0; i < numberOfValues; i++) {
clock.fastForwardSeconds(1);
values.add(clock.nowUTC());
values.add(new DateTime(clock.millis()));
}
ArrayCompareCondition condition = new ArrayCompareCondition("ctx.payload.value", "", op, value, quantifier, clock);

View File

@ -16,12 +16,11 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.watch.Payload;
import java.time.Clock;
import java.util.Map;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
@ -50,7 +49,7 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC
.get();
CompareCondition condition = new CompareCondition("ctx.payload.aggregations.rate.buckets.0.doc_count", CompareCondition.Op.GTE, 5,
SystemClock.INSTANCE);
Clock.systemUTC());
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
CompareCondition.Result result = condition.execute(ctx);
assertThat(result.met(), is(false));
@ -78,7 +77,7 @@ public class CompareConditionSearchTests extends AbstractWatcherIntegrationTestC
public void testExecuteAccessHits() throws Exception {
CompareCondition condition = new CompareCondition("ctx.payload.hits.hits.0._score", CompareCondition.Op.EQ, 1,
SystemClock.INSTANCE);
Clock.systemUTC());
InternalSearchHit hit = new InternalSearchHit(0, "1", new Text("type"), null);
hit.score(1f);
hit.shard(new SearchShardTarget("a", new Index("a", "indexUUID"), 0));

View File

@ -10,13 +10,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.condition.CompareCondition.Op;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import java.time.Clock;
import java.util.Arrays;
import java.util.Locale;
@ -128,7 +128,7 @@ public class CompareConditionTests extends ESTestCase {
int payloadValue = randomInt(10);
boolean met = op.eval(payloadValue, value);
CompareCondition condition = new CompareCondition("ctx.payload.value", op, value, SystemClock.INSTANCE);
CompareCondition condition = new CompareCondition("ctx.payload.value", op, value, Clock.systemUTC());
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", payloadValue));
assertThat(condition.execute(ctx).met(), is(met));
}
@ -139,7 +139,7 @@ public class CompareConditionTests extends ESTestCase {
Op op = met ? randomFrom(CompareCondition.Op.GT, CompareCondition.Op.GTE, CompareCondition.Op.NOT_EQ) :
randomFrom(CompareCondition.Op.LT, CompareCondition.Op.LTE, CompareCondition.Op.EQ);
String value = "<{now-1d}>";
DateTime payloadValue = clock.nowUTC();
DateTime payloadValue = new DateTime(clock.millis());
CompareCondition condition = new CompareCondition("ctx.payload.value", op, value, clock);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.Simple("value", payloadValue));

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
@ -41,6 +40,7 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -115,8 +115,9 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -167,7 +168,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -207,8 +208,9 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
input = mock(ExecutableInput.class);
Input.Result inputResult = mock(Input.Result.class);
@ -245,7 +247,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -276,8 +278,9 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition condition = mock(Condition.class);
Condition.Result conditionResult = mock(Condition.Result.class);
@ -310,7 +313,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -341,8 +344,9 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -374,7 +378,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -405,8 +409,9 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -453,7 +458,7 @@ public class ExecutionServiceTests extends ESTestCase {
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(clock.nowUTC())));
WatchStatus watchStatus = new WatchStatus(now, singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -532,7 +537,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -579,7 +584,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -640,7 +645,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
when(action.type()).thenReturn("_type");
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -695,7 +700,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(action.type()).thenReturn("_type");
when(action.logger()).thenReturn(logger);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
@ -742,7 +747,7 @@ public class ExecutionServiceTests extends ESTestCase {
ExecutableAction action = mock(ExecutableAction.class);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionCondition, actionTransform, action);
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), singletonMap("_action", new ActionStatus(now)));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), singletonMap("_action", new ActionStatus(now)));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);

View File

@ -14,7 +14,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
@ -43,6 +42,7 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -313,16 +313,16 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
.addAction("log", loggingAction("foobar"));
watcherClient().putWatch(new PutWatchRequest("_id", watchBuilder)).actionGet();
TriggerEvent triggerEvent = new ScheduleTriggerEvent(SystemClock.INSTANCE.nowUTC(), SystemClock.INSTANCE.nowUTC());
DateTime now = new DateTime(Clock.systemUTC().millis());
TriggerEvent triggerEvent = new ScheduleTriggerEvent(now, now);
Map<String, Object> executeWatchResult = watcherClient().prepareExecuteWatch()
.setId("_id")
.setTriggerEvent(triggerEvent)
.get().getRecordSource().getAsMap();
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString()));
assertThat(ObjectPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTION_NOT_NEEDED.toString()));
assertThat(ObjectPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
watchBuilder = watchBuilder()
.trigger(schedule(cron("0 0 0 1 * ? 2099")))
@ -336,15 +336,15 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTestCase {
.setId("_id").setTriggerEvent(triggerEvent).setRecordExecution(true)
.get().getRecordSource().getAsMap();
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString()));
assertThat(ObjectPath.<String>eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(ObjectPath.<String>eval("result.actions.0.id", executeWatchResult), equalTo("log"));
assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.EXECUTED.toString()));
assertThat(ObjectPath.eval("result.input.payload.foo", executeWatchResult), equalTo("bar"));
assertThat(ObjectPath.eval("result.actions.0.id", executeWatchResult), equalTo("log"));
executeWatchResult = watcherClient().prepareExecuteWatch()
.setId("_id").setTriggerEvent(triggerEvent)
.get().getRecordSource().getAsMap();
assertThat(ObjectPath.<String>eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString()));
assertThat(ObjectPath.eval("state", executeWatchResult), equalTo(ExecutionState.THROTTLED.toString()));
}
public void testWatchExecutionDuration() throws Exception {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -56,7 +57,7 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
assertWatchWithMinimumPerformedActionsCount("_id", 1, false);
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().nowUTC());
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(new DateTime(timeWarp().clock().millis()));
logger.info("checking index [{}]", indexName);
assertBusy(() -> {
flush();
@ -67,7 +68,7 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
}
public void testDynamicIndexSearchInput() throws Exception {
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().nowUTC());
final String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(new DateTime(timeWarp().clock().millis()));
createIndex(indexName);
index(indexName, "type", "1", "key", "value");
flush();
@ -93,7 +94,7 @@ public class DynamicIndexNameIntegrationTests extends AbstractWatcherIntegration
}
public void testDynamicIndexSearchTransform() throws Exception {
String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(timeWarp().clock().nowUTC());
String indexName = "idx-" + DateTimeFormat.forPattern("YYYY.MM.dd").print(new DateTime(timeWarp().clock().millis()));
createIndex(indexName);
index(indexName, "type", "1", "key", "value");
flush();

View File

@ -17,13 +17,14 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
@ -31,8 +32,6 @@ import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.script.ScriptType;
import static org.elasticsearch.xpack.watcher.input.search.ExecutableSearchInput.DEFAULT_SEARCH_TYPE;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.formatDate;
import static org.elasticsearch.xpack.watcher.support.WatcherUtils.flattenModel;
@ -45,7 +44,7 @@ import static org.hamcrest.Matchers.nullValue;
public class WatcherUtilsTests extends ESTestCase {
public void testFlattenModel() throws Exception {
DateTime now = SystemClock.INSTANCE.nowUTC();
DateTime now = new DateTime(Clock.systemUTC().millis());
Map<String, Object> map = new HashMap<>();
map.put("a", singletonMap("a1", new int[] { 0, 1, 2 }));
map.put("b", new String[] { "b0", "b1", "b2" });

View File

@ -54,7 +54,6 @@ import org.elasticsearch.xpack.security.authc.file.FileRealm;
import org.elasticsearch.xpack.security.authc.support.Hasher;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
@ -84,6 +83,7 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

View File

@ -9,7 +9,6 @@ import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.trigger.Trigger;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
@ -21,6 +20,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.SchedulerScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.engine.TickerScheduleTriggerEngine;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -79,7 +79,7 @@ public class ScheduleEngineTriggerBenchmark {
final ScheduleTriggerEngine scheduler;
switch (impl) {
case "schedule":
scheduler = new SchedulerScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, SystemClock.INSTANCE) {
scheduler = new SchedulerScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC()) {
@Override
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
@ -90,7 +90,7 @@ public class ScheduleEngineTriggerBenchmark {
};
break;
case "ticker":
scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, SystemClock.INSTANCE) {
scheduler = new TickerScheduleTriggerEngine(settings, scheduleRegistry, Clock.systemUTC()) {
@Override
protected void notifyListeners(List<TriggerEvent> events) {

View File

@ -33,11 +33,11 @@ import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.elasticsearch.xpack.XPackPlugin;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;

View File

@ -13,7 +13,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
@ -29,6 +28,9 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.Schedules;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.MonthTimes;
import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.joda.time.DateTime;
import java.time.Clock;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -368,7 +370,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
private void testConditionSearch(WatcherSearchTemplateRequest request) throws Exception {
// reset, so we don't miss event docs when we filter over the _timestamp field.
timeWarp().clock().setTime(SystemClock.INSTANCE.nowUTC());
timeWarp().clock().setTime(new DateTime(Clock.systemUTC().millis()));
String watchName = "_name";
assertAcked(prepareCreate("events").addMapping("event", "level", "type=text"));
@ -380,7 +382,7 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
.condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.GTE, 3L)))
.get();
logger.info("created watch [{}] at [{}]", watchName, SystemClock.INSTANCE.nowUTC());
logger.info("created watch [{}] at [{}]", watchName, new DateTime(Clock.systemUTC().millis()));
client().prepareIndex("events", "event")
.setSource("level", "a")

View File

@ -11,16 +11,15 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@ -81,7 +80,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
public void trigger(String jobName, int times, TimeValue interval) {
for (int i = 0; i < times; i++) {
DateTime now = clock.now(DateTimeZone.UTC);
DateTime now = new DateTime(clock.millis());
logger.debug("firing [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
for (Listener listener : listeners) {

View File

@ -11,7 +11,8 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.support.clock.SystemClock;
import java.time.Clock;
import static org.hamcrest.Matchers.is;
@ -28,7 +29,7 @@ public class ScheduleTriggerEventTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(jsonBuilder.bytes());
parser.nextToken();
ScheduleTriggerEvent scheduleTriggerEvent = ScheduleTriggerEvent.parse(parser, "_id", "_context", SystemClock.INSTANCE);
ScheduleTriggerEvent scheduleTriggerEvent = ScheduleTriggerEvent.parse(parser, "_id", "_context", Clock.systemUTC());
assertThat(scheduleTriggerEvent.scheduledTime().isAfter(0), is(true));
assertThat(scheduleTriggerEvent.triggeredTime().isAfter(0), is(true));
}

View File

@ -29,6 +29,7 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeZone.UTC;
public abstract class BaseTriggerEngineTestCase extends ESTestCase {
@ -78,12 +79,12 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
});
engine.start(jobs);
advanceClockIfNeeded(clock.nowUTC().plusMillis(1100));
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!firstLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
advanceClockIfNeeded(clock.nowUTC().plusMillis(1100));
advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100));
if (!secondLatch.await(3 * count, TimeUnit.SECONDS)) {
fail("waiting too long for all watches to be triggered");
}
@ -104,7 +105,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
});
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = clock.nowUTC().withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime);
@ -125,7 +126,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
engine.register(events -> {
for (TriggerEvent event : events) {
assertThat(event.jobName(), is(name));
logger.info("triggered job on [{}]", clock.nowUTC());
logger.info("triggered job on [{}]", new DateTime(clock.millis(), UTC));
latch.countDown();
}
});
@ -133,7 +134,8 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
int randomHour = randomIntBetween(0, 23);
int randomMinute = randomIntBetween(0, 59);
DateTime testNowTime = clock.nowUTC().withHourOfDay(randomHour).withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime testNowTime = new DateTime(clock.millis(), UTC).withHourOfDay(randomHour)
.withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);
logger.info("Setting current time to [{}], job execution time [{}]", testNowTime, scheduledTime);
@ -162,7 +164,7 @@ public abstract class BaseTriggerEngineTestCase extends ESTestCase {
int randomMinute = randomIntBetween(0, 59);
int randomDay = randomIntBetween(1, 7);
DateTime testNowTime = clock.nowUTC().withDayOfWeek(randomDay).withHourOfDay(randomHour)
DateTime testNowTime = new DateTime(clock.millis(), UTC).withDayOfWeek(randomDay).withHourOfDay(randomHour)
.withMinuteOfHour(randomMinute).withSecondOfMinute(59);
DateTime scheduledTime = testNowTime.plusSeconds(2);

View File

@ -38,9 +38,7 @@ import org.elasticsearch.xpack.notification.email.HtmlSanitizer;
import org.elasticsearch.xpack.notification.email.Profile;
import org.elasticsearch.xpack.notification.email.attachment.EmailAttachments;
import org.elasticsearch.xpack.notification.email.attachment.EmailAttachmentsParser;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.support.clock.SystemClock;
import org.elasticsearch.xpack.watcher.actions.ActionFactory;
import org.elasticsearch.xpack.watcher.actions.ActionRegistry;
import org.elasticsearch.xpack.watcher.actions.ActionStatus;
@ -55,13 +53,13 @@ import org.elasticsearch.xpack.watcher.actions.throttler.ActionThrottler;
import org.elasticsearch.xpack.watcher.actions.webhook.ExecutableWebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookActionFactory;
import org.elasticsearch.xpack.watcher.condition.ConditionFactory;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.condition.AlwaysConditionTests;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.condition.ArrayCompareCondition;
import org.elasticsearch.xpack.watcher.condition.CompareCondition;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.ConditionFactory;
import org.elasticsearch.xpack.watcher.condition.ConditionRegistry;
import org.elasticsearch.xpack.watcher.condition.NeverCondition;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.input.ExecutableInput;
@ -113,6 +111,7 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -193,7 +192,7 @@ public class WatchTests extends ESTestCase {
for (ActionWrapper action : actions) {
actionsStatuses.put(action.id(), new ActionStatus(now));
}
WatchStatus watchStatus = new WatchStatus(clock.nowUTC(), unmodifiableMap(actionsStatuses));
WatchStatus watchStatus = new WatchStatus(new DateTime(clock.millis()), unmodifiableMap(actionsStatuses));
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10000));
@ -249,7 +248,7 @@ public class WatchTests extends ESTestCase {
public void testParserDefaults() throws Exception {
Schedule schedule = randomSchedule();
ScheduleRegistry scheduleRegistry = registry(schedule);
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, SystemClock.INSTANCE);
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC());
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
ConditionRegistry conditionRegistry = conditionRegistry();
@ -263,7 +262,7 @@ public class WatchTests extends ESTestCase {
.field(ScheduleTrigger.TYPE, schedule(schedule).build())
.endObject();
builder.endObject();
Watch.Parser watchParser = new Watch.Parser(settings, triggerService, actionRegistry, inputRegistry, null, SystemClock.INSTANCE);
Watch.Parser watchParser = new Watch.Parser(settings, triggerService, actionRegistry, inputRegistry, null, Clock.systemUTC());
Watch watch = watchParser.parse("failure", false, builder.bytes());
assertThat(watch, notNullValue());
assertThat(watch.trigger(), instanceOf(ScheduleTrigger.class));
@ -277,14 +276,14 @@ public class WatchTests extends ESTestCase {
public void testParseWatch_verifyScriptLangDefault() throws Exception {
ScheduleRegistry scheduleRegistry = registry(new IntervalSchedule(new IntervalSchedule.Interval(1,
IntervalSchedule.Interval.Unit.SECONDS)));
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, SystemClock.INSTANCE);
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, Clock.systemUTC());
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
ConditionRegistry conditionRegistry = conditionRegistry();
InputRegistry inputRegistry = registry(SearchInput.TYPE);
TransformRegistry transformRegistry = transformRegistry();
ActionRegistry actionRegistry = registry(Collections.emptyList(), conditionRegistry, transformRegistry);
Watch.Parser watchParser = new Watch.Parser(settings, triggerService, actionRegistry, inputRegistry, null, SystemClock.INSTANCE);
Watch.Parser watchParser = new Watch.Parser(settings, triggerService, actionRegistry, inputRegistry, null, Clock.systemUTC());
IndicesQueriesRegistry queryRegistry = new IndicesQueriesRegistry();
QueryParser<MatchAllQueryBuilder> queryParser1 = MatchAllQueryBuilder::fromXContent;
@ -530,11 +529,11 @@ public class WatchTests extends ESTestCase {
break;
}
}
return new ActionRegistry(unmodifiableMap(parsers), conditionRegistry, transformRegistry, SystemClock.INSTANCE, licenseState);
return new ActionRegistry(unmodifiableMap(parsers), conditionRegistry, transformRegistry, Clock.systemUTC(), licenseState);
}
private ActionThrottler randomThrottler() {
return new ActionThrottler(SystemClock.INSTANCE, randomBoolean() ? null : timeValueSeconds(randomIntBetween(1, 10000)),
return new ActionThrottler(Clock.systemUTC(), randomBoolean() ? null : timeValueSeconds(randomIntBetween(1, 10000)),
licenseState);
}