diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index 89d5e9a8702..99af1147f01 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -16,6 +16,7 @@ import org.elasticsearch.alerts.input.NoneInput; import org.elasticsearch.alerts.scheduler.Scheduler; import org.elasticsearch.alerts.scheduler.schedule.Schedule; import org.elasticsearch.alerts.scheduler.schedule.ScheduleRegistry; +import org.elasticsearch.alerts.support.clock.Clock; import org.elasticsearch.alerts.throttle.AlertThrottler; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; @@ -60,7 +61,7 @@ public class Alert implements Scheduler.Job, ToXContent { @Nullable private final Transform transform; - public Alert(String name, Schedule schedule, Input input, Condition condition, @Nullable Transform transform, Actions actions, @Nullable Map metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) { + public Alert(String name, Clock clock, Schedule schedule, Input input, Condition condition, @Nullable Transform transform, Actions actions, @Nullable Map metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) { this.name = name; this.schedule = schedule; this.input = input; @@ -70,7 +71,7 @@ public class Alert implements Scheduler.Job, ToXContent { this.throttlePeriod = throttlePeriod; this.metadata = metadata; this.transform = transform; - throttler = new AlertThrottler(throttlePeriod); + throttler = new AlertThrottler(clock, throttlePeriod); } public String name() { @@ -175,6 +176,7 @@ public class Alert implements Scheduler.Job, ToXContent { private final TransformRegistry transformRegistry; private final ActionRegistry actionRegistry; private final InputRegistry inputRegistry; + private final Clock clock; private final Input defaultInput; private final Condition defaultCondition; @@ -182,7 +184,7 @@ public class Alert implements Scheduler.Job, ToXContent { @Inject public Parser(Settings settings, ConditionRegistry conditionRegistry, ScheduleRegistry scheduleRegistry, TransformRegistry transformRegistry, ActionRegistry actionRegistry, - InputRegistry inputRegistry) { + InputRegistry inputRegistry, Clock clock) { super(settings); this.conditionRegistry = conditionRegistry; @@ -190,6 +192,7 @@ public class Alert implements Scheduler.Job, ToXContent { this.transformRegistry = transformRegistry; this.actionRegistry = actionRegistry; this.inputRegistry = inputRegistry; + this.clock = clock; this.defaultInput = new NoneInput(logger); this.defaultCondition = new AlwaysTrueCondition(logger); @@ -256,7 +259,7 @@ public class Alert implements Scheduler.Job, ToXContent { throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert actions"); } - return new Alert(name, schedule, input, condition, transform, actions, metatdata, throttlePeriod, status); + return new Alert(name, clock, schedule, input, condition, transform, actions, metatdata, throttlePeriod, status); } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index dc1febc7f52..189029001a7 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -14,6 +14,7 @@ import org.elasticsearch.alerts.input.InputModule; import org.elasticsearch.alerts.rest.AlertsRestModule; import org.elasticsearch.alerts.scheduler.SchedulerModule; import org.elasticsearch.alerts.support.TemplateUtils; +import org.elasticsearch.alerts.support.clock.ClockModule; import org.elasticsearch.alerts.support.init.InitializingModule; import org.elasticsearch.alerts.support.template.TemplateModule; import org.elasticsearch.alerts.transform.TransformModule; @@ -31,6 +32,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules { return ImmutableList.of( new InitializingModule(), new TemplateModule(), + new ClockModule(), new AlertsClientModule(), new TransformModule(), new AlertsRestModule(), @@ -44,14 +46,12 @@ public class AlertsModule extends AbstractModule implements SpawnModules { @Override protected void configure() { - bind(Alert.Parser.class).asEagerSingleton(); bind(AlertLockService.class).asEagerSingleton(); bind(AlertsLifeCycleService.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); - } } diff --git a/src/main/java/org/elasticsearch/alerts/ExecutionContext.java b/src/main/java/org/elasticsearch/alerts/ExecutionContext.java index 527fc3cf5fa..e5a32bf99a0 100644 --- a/src/main/java/org/elasticsearch/alerts/ExecutionContext.java +++ b/src/main/java/org/elasticsearch/alerts/ExecutionContext.java @@ -22,6 +22,7 @@ public class ExecutionContext { private final String id; private final Alert alert; + private final DateTime executionTime; private final DateTime fireTime; private final DateTime scheduledTime; @@ -33,9 +34,10 @@ public class ExecutionContext { private Payload payload; - public ExecutionContext(String id, Alert alert, DateTime fireTime, DateTime scheduledTime) { + public ExecutionContext(String id, Alert alert, DateTime executionTime, DateTime fireTime, DateTime scheduledTime) { this.id = id; this.alert = alert; + this.executionTime = executionTime; this.fireTime = fireTime; this.scheduledTime = scheduledTime; } @@ -48,6 +50,10 @@ public class ExecutionContext { return alert; } + public DateTime executionTime() { + return executionTime; + } + public DateTime fireTime() { return fireTime; } @@ -70,7 +76,7 @@ public class ExecutionContext { } public void onConditionResult(Condition.Result conditionResult) { - alert.status().onCheck(conditionResult.met(), fireTime); + alert.status().onCheck(conditionResult.met(), executionTime); this.conditionResult = conditionResult; } @@ -81,9 +87,9 @@ public class ExecutionContext { public void onThrottleResult(Throttler.Result throttleResult) { this.throttleResult = throttleResult; if (throttleResult.throttle()) { - alert.status().onThrottle(fireTime, throttleResult.reason()); + alert.status().onThrottle(executionTime, throttleResult.reason()); } else { - alert.status().onExecution(fireTime); + alert.status().onExecution(executionTime); } } diff --git a/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java index 4c6a8fa51c2..e8e4fe8e373 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java @@ -62,7 +62,7 @@ public class IndexAction extends Action { XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); resultBuilder.startObject(); resultBuilder.field("data", payload.data()); - resultBuilder.field("timestamp", ctx.fireTime()); + resultBuilder.field("timestamp", ctx.executionTime()); resultBuilder.endObject(); indexRequest.source(resultBuilder); } catch (IOException ioe) { diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index 3fe75425f48..5df688def4f 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -12,6 +12,7 @@ import org.elasticsearch.alerts.condition.Condition; import org.elasticsearch.alerts.input.Input; import org.elasticsearch.alerts.scheduler.Scheduler; import org.elasticsearch.alerts.support.Callback; +import org.elasticsearch.alerts.support.clock.Clock; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -41,6 +42,7 @@ public class HistoryService extends AbstractComponent { private final AlertsStore alertsStore; private final ClusterService clusterService; private final AlertLockService alertLockService; + private final Clock clock; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicInteger initializationRetries = new AtomicInteger(); @@ -48,13 +50,14 @@ public class HistoryService extends AbstractComponent { @Inject public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler, - ClusterService clusterService) { + ClusterService clusterService, Clock clock) { super(settings); this.historyStore = historyStore; this.threadPool = threadPool; this.alertsStore = alertsStore; this.alertLockService = alertLockService; this.clusterService = clusterService; + this.clock = clock; scheduler.addListener(new SchedulerListener()); } @@ -227,7 +230,7 @@ public class HistoryService extends AbstractComponent { try { firedAlert.update(FiredAlert.State.CHECKING, null); logger.debug("checking alert [{}]", firedAlert.name()); - ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime()); + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, clock.now(), firedAlert.fireTime(), firedAlert.scheduledTime()); AlertExecution alertExecution = execute(ctx); firedAlert.update(alertExecution); historyStore.update(firedAlert); diff --git a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java index af89103051e..1a0789220ea 100644 --- a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java +++ b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java @@ -68,7 +68,7 @@ public class SearchInput extends Input { @Override public Result execute(ExecutionContext ctx) throws IOException { - SearchRequest request = createSearchRequestWithTimes(this.searchRequest, ctx.scheduledTime(), ctx.fireTime(), scriptService); + SearchRequest request = createSearchRequestWithTimes(this.searchRequest, ctx.scheduledTime(), ctx.fireTime(), ctx.executionTime(), scriptService); if (logger.isTraceEnabled()) { logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true)); } @@ -111,7 +111,7 @@ public class SearchInput extends Input { /** * Creates a new search request applying the scheduledFireTime and fireTime to the original request */ - public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, ScriptServiceProxy scriptService) throws IOException { + public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, DateTime executionTime, ScriptServiceProxy scriptService) throws IOException { SearchRequest request = new SearchRequest(requestPrototype) .indicesOptions(requestPrototype.indicesOptions()) .searchType(requestPrototype.searchType()) @@ -120,6 +120,7 @@ public class SearchInput extends Input { Map templateParams = new HashMap<>(); templateParams.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime)); templateParams.put(Variables.FIRE_TIME, formatDate(fireTime)); + templateParams.put(Variables.EXECUTION_TIME, formatDate(executionTime)); String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false); ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams); request.source((BytesReference) script.unwrap(script.run()), false); diff --git a/src/main/java/org/elasticsearch/alerts/support/Variables.java b/src/main/java/org/elasticsearch/alerts/support/Variables.java index a6a75e6fba0..60418f64ac9 100644 --- a/src/main/java/org/elasticsearch/alerts/support/Variables.java +++ b/src/main/java/org/elasticsearch/alerts/support/Variables.java @@ -19,17 +19,19 @@ public final class Variables { public static final String CTX = "ctx"; public static final String ALERT_NAME = "alert_name"; + public static final String EXECUTION_TIME = "execution_time"; public static final String FIRE_TIME = "fire_time"; public static final String SCHEDULED_FIRE_TIME = "scheduled_fire_time"; public static final String PAYLOAD = "payload"; public static Map createCtxModel(ExecutionContext ctx, Payload payload) { - return createCtxModel(ctx.alert().name(), ctx.fireTime(), ctx.scheduledTime(), payload); + return createCtxModel(ctx.alert().name(), ctx.executionTime(), ctx.fireTime(), ctx.scheduledTime(), payload); } - public static Map createCtxModel(String alertName, DateTime fireTime, DateTime scheduledTime, Payload payload) { + public static Map createCtxModel(String alertName, DateTime executionTime, DateTime fireTime, DateTime scheduledTime, Payload payload) { Map vars = new HashMap<>(); vars.put(ALERT_NAME, alertName); + vars.put(EXECUTION_TIME, executionTime); vars.put(FIRE_TIME, fireTime); vars.put(SCHEDULED_FIRE_TIME, scheduledTime); vars.put(PAYLOAD, payload.data()); diff --git a/src/main/java/org/elasticsearch/alerts/support/clock/Clock.java b/src/main/java/org/elasticsearch/alerts/support/clock/Clock.java new file mode 100644 index 00000000000..ff47001ca91 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/support/clock/Clock.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support.clock; + +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.unit.TimeValue; + +/** + * + */ +public interface Clock { + + long millis(); + + long nanos(); + + DateTime now(); + + TimeValue timeElapsedSince(DateTime time); + +} diff --git a/src/main/java/org/elasticsearch/alerts/support/clock/ClockModule.java b/src/main/java/org/elasticsearch/alerts/support/clock/ClockModule.java new file mode 100644 index 00000000000..03e35235113 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/support/clock/ClockModule.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support.clock; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + * + */ +public class ClockModule extends AbstractModule { + + @Override + protected void configure() { + bind(Clock.class).toInstance(SystemClock.INSTANCE); + } + +} diff --git a/src/main/java/org/elasticsearch/alerts/support/clock/SystemClock.java b/src/main/java/org/elasticsearch/alerts/support/clock/SystemClock.java new file mode 100644 index 00000000000..60e5e7bf5f1 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/support/clock/SystemClock.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support.clock; + +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.unit.TimeValue; + +/** + * + */ +public final class SystemClock implements Clock { + + public static final SystemClock INSTANCE = new SystemClock(); + + private SystemClock() { + } + + @Override + public long millis() { + return System.currentTimeMillis(); + } + + @Override + public long nanos() { + return System.nanoTime(); + } + + @Override + public DateTime now() { + return DateTime.now(); + } + + @Override + public TimeValue timeElapsedSince(DateTime time) { + return TimeValue.timeValueMillis(System.currentTimeMillis() - time.getMillis()); + } + +} diff --git a/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java index fc5173de7fe..b6d243c757a 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java @@ -6,6 +6,7 @@ package org.elasticsearch.alerts.throttle; import org.elasticsearch.alerts.ExecutionContext; +import org.elasticsearch.alerts.support.clock.Clock; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; @@ -19,8 +20,8 @@ public class AlertThrottler implements Throttler { private final PeriodThrottler periodThrottler; private final AckThrottler ackThrottler; - public AlertThrottler(@Nullable TimeValue throttlePeriod) { - this(throttlePeriod != null ? new PeriodThrottler(throttlePeriod) : null, ACK_THROTTLER); + public AlertThrottler(Clock clock, @Nullable TimeValue throttlePeriod) { + this(throttlePeriod != null ? new PeriodThrottler(clock, throttlePeriod) : null, ACK_THROTTLER); } AlertThrottler(PeriodThrottler periodThrottler, AckThrottler ackThrottler) { diff --git a/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java index 5d52100791c..fbfdbe96e24 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java @@ -7,6 +7,7 @@ package org.elasticsearch.alerts.throttle; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.ExecutionContext; +import org.elasticsearch.alerts.support.clock.Clock; import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.unit.TimeValue; @@ -17,14 +18,16 @@ public class PeriodThrottler implements Throttler { private final TimeValue period; private final PeriodType periodType; + private final Clock clock; - public PeriodThrottler(TimeValue period) { - this(period, PeriodType.minutes()); + public PeriodThrottler(Clock clock, TimeValue period) { + this(clock, period, PeriodType.minutes()); } - public PeriodThrottler(TimeValue period, PeriodType periodType) { + public PeriodThrottler(Clock clock, TimeValue period, PeriodType periodType) { this.period = period; this.periodType = periodType; + this.clock = clock; } public TimeValue interval() { @@ -35,7 +38,7 @@ public class PeriodThrottler implements Throttler { public Result throttle(ExecutionContext ctx) { Alert.Status status = ctx.alert().status(); if (status.lastExecuted() != null) { - TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastExecuted().getMillis()); + TimeValue timeElapsed = clock.timeElapsedSince(status.lastExecuted()); if (timeElapsed.getMillis() <= period.getMillis()) { return Result.throttle("throttling interval is set to [" + period.format(periodType) + "] but time elapsed since last execution is [" + timeElapsed.format(periodType) + "]"); diff --git a/src/test/java/org/elasticsearch/alerts/AlertTests.java b/src/test/java/org/elasticsearch/alerts/AlertTests.java index 9d39fb26a60..d66a672d2ea 100644 --- a/src/test/java/org/elasticsearch/alerts/AlertTests.java +++ b/src/test/java/org/elasticsearch/alerts/AlertTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.alerts.scheduler.schedule.*; import org.elasticsearch.alerts.scheduler.schedule.support.*; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.support.Script; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.alerts.support.template.ScriptTemplate; @@ -99,11 +100,11 @@ public class AlertTests extends ElasticsearchTestCase { TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10)); - Alert alert = new Alert("_name", schedule, input, condition, transform, actions, metadata, throttlePeriod, status); + Alert alert = new Alert("_name", SystemClock.INSTANCE, schedule, input, condition, transform, actions, metadata, throttlePeriod, status); BytesReference bytes = XContentFactory.jsonBuilder().value(alert).bytes(); logger.info(bytes.toUtf8()); - Alert.Parser alertParser = new Alert.Parser(settings, conditionRegistry, scheduleRegistry, transformRegistry, actionRegistry, inputRegistry); + Alert.Parser alertParser = new Alert.Parser(settings, conditionRegistry, scheduleRegistry, transformRegistry, actionRegistry, inputRegistry, SystemClock.INSTANCE); boolean includeStatus = randomBoolean(); Alert parsedAlert = alertParser.parse("_name", includeStatus, bytes); diff --git a/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTests.java b/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTests.java index fcdcf17f655..54ade05eb8a 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTests.java +++ b/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTests.java @@ -82,7 +82,7 @@ public class EmailActionTests extends ElasticsearchTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); String ctxId = randomAsciiOfLength(5); - ExecutionContext ctx = mockExecutionContext(now, now, "alert1", payload); + ExecutionContext ctx = mockExecutionContext(now, "alert1", payload); when(ctx.id()).thenReturn(ctxId); if (transform != null) { Transform.Result transformResult = mock(Transform.Result.class); @@ -94,6 +94,7 @@ public class EmailActionTests extends ElasticsearchTestCase { .put("ctx", ImmutableMap.builder() .put("alert_name", "alert1") .put("payload", transform == null ? data : new Payload.Simple("_key", "_value").data()) + .put("execution_time", now) .put("fire_time", now) .put("scheduled_fire_time", now).build()) .build(); diff --git a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java index 5cf093bde1c..41503282898 100644 --- a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java +++ b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java @@ -47,7 +47,7 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests { public void testParser_WithSealedFiredAlert() throws Exception { Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger); FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); - ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime()); + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime(), new DateTime()); ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah")); ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); Input.Result inputResult = new SimpleInput.Result(SimpleInput.TYPE, new Payload.Simple()); @@ -69,7 +69,7 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests { public void testParser_WithSealedFiredAlert_WithScriptSearchCondition() throws Exception { Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger); FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); - ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime()); + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime(), new DateTime()); ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah")); ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); Input.Result inputResult = new SimpleInput.Result(SimpleInput.TYPE, new Payload.Simple()); diff --git a/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java b/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java index 4d8fc217701..7cf02076729 100644 --- a/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java +++ b/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.alerts.condition.simple.AlwaysFalseCondition; import org.elasticsearch.alerts.condition.simple.AlwaysTrueCondition; import org.elasticsearch.alerts.input.Input; import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; import org.elasticsearch.cluster.ClusterService; @@ -25,9 +26,7 @@ import org.junit.Test; import java.util.Arrays; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; import static org.mockito.Mockito.*; @@ -56,7 +55,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase { AlertLockService alertLockService = mock(AlertLockService.class); Scheduler scheduler = mock(Scheduler.class); ClusterService clusterService = mock(ClusterService.class); - historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService); + historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE); } @Test @@ -87,7 +86,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase { when(alert.actions()).thenReturn(actions); when(alert.status()).thenReturn(alertStatus); - ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); + ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now()); AlertExecution alertExecution = historyService.execute(context); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); assertThat(alertExecution.transformResult(), sameInstance(transformResult)); @@ -130,7 +129,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase { when(alert.actions()).thenReturn(actions); when(alert.status()).thenReturn(alertStatus); - ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); + ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now()); AlertExecution alertExecution = historyService.execute(context); assertThat(alertExecution.inputResult(), sameInstance(inputResult)); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); @@ -173,7 +172,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase { when(alert.actions()).thenReturn(actions); when(alert.status()).thenReturn(alertStatus); - ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now()); + ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now()); AlertExecution alertExecution = historyService.execute(context); assertThat(alertExecution.inputResult(), sameInstance(inputResult)); assertThat(alertExecution.conditionResult(), sameInstance(conditionResult)); diff --git a/src/test/java/org/elasticsearch/alerts/input/search/SearchInputTests.java b/src/test/java/org/elasticsearch/alerts/input/search/SearchInputTests.java index 708370d8766..9cf9afe7f32 100644 --- a/src/test/java/org/elasticsearch/alerts/input/search/SearchInputTests.java +++ b/src/test/java/org/elasticsearch/alerts/input/search/SearchInputTests.java @@ -60,7 +60,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)), ClientProxy.of(client()), request); ExecutionContext ctx = new ExecutionContext("test-alert", null, - new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); SearchInput.Result result = searchInput.execute(ctx); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); @@ -86,7 +86,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest { ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)), ClientProxy.of(client()), request); ExecutionContext ctx = new ExecutionContext("test-alert", null, - new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); + new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)); SearchInput.Result result = searchInput.execute(ctx); assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0)); diff --git a/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java b/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java new file mode 100644 index 00000000000..bccd9ace250 --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support.clock; + +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.Duration; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.TimeUnit; + +/** + * + */ +public class ClockMock implements Clock { + + private DateTime now = DateTime.now(); + + @Override + public long millis() { + return now.getMillis(); + } + + @Override + public long nanos() { + return TimeUnit.MILLISECONDS.toNanos(now.getMillis()); + } + + @Override + public DateTime now() { + return now; + } + + @Override + public TimeValue timeElapsedSince(DateTime time) { + return TimeValue.timeValueMillis(new Duration(time, now).getMillis()); + } + + public void setTime(DateTime now) { + this.now = now; + } + + public void fastForward(TimeValue timeValue) { + setTime(now.plusMillis((int) timeValue.millis())); + } + + public void rewind(TimeValue timeValue) { + setTime(now.minusMillis((int) timeValue.millis())); + } + +} diff --git a/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java b/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java index a32188a94eb..712410284f7 100644 --- a/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java @@ -6,7 +6,9 @@ package org.elasticsearch.alerts.test; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.AlertsPlugin; import org.elasticsearch.alerts.AlertsService; @@ -33,6 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; @@ -100,6 +103,15 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr stopAlerting(); } + protected long docCount(String index, String type, SearchSourceBuilder source) { + SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT); + if (type != null) { + builder.setTypes(type); + } + builder.setSource(source.buildAsBytes()); + return builder.get().getHits().getTotalHits(); + } + protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript) throws IOException { return createAlertSource(cron, conditionRequest, conditionScript, null); } diff --git a/src/test/java/org/elasticsearch/alerts/test/AlertsTestUtils.java b/src/test/java/org/elasticsearch/alerts/test/AlertsTestUtils.java index 28b679666ec..02117f42287 100644 --- a/src/test/java/org/elasticsearch/alerts/test/AlertsTestUtils.java +++ b/src/test/java/org/elasticsearch/alerts/test/AlertsTestUtils.java @@ -24,6 +24,7 @@ import org.elasticsearch.alerts.input.search.SearchInput; import org.elasticsearch.alerts.scheduler.schedule.CronSchedule; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.support.Script; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.alerts.support.template.ScriptTemplate; @@ -85,14 +86,18 @@ public final class AlertsTestUtils { } public static ExecutionContext mockExecutionContext(String alertName, Payload payload) { - DateTime now = DateTime.now(); - return mockExecutionContext(now, now, alertName, payload); + return mockExecutionContext(DateTime.now(), alertName, payload); } - public static ExecutionContext mockExecutionContext(DateTime scheduledTime, DateTime firedTime, String alertName, Payload payload) { + public static ExecutionContext mockExecutionContext(DateTime time, String alertName, Payload payload) { + return mockExecutionContext(time, time, time, alertName, payload); + } + + public static ExecutionContext mockExecutionContext(DateTime executionTime, DateTime firedTime, DateTime scheduledTime, String alertName, Payload payload) { ExecutionContext ctx = mock(ExecutionContext.class); - when(ctx.scheduledTime()).thenReturn(scheduledTime); + when(ctx.executionTime()).thenReturn(executionTime); when(ctx.fireTime()).thenReturn(firedTime); + when(ctx.scheduledTime()).thenReturn(scheduledTime); Alert alert = mock(Alert.class); when(alert.name()).thenReturn(alertName); when(ctx.alert()).thenReturn(alert); @@ -134,9 +139,9 @@ public final class AlertsTestUtils { return new Alert( alertName, + SystemClock.INSTANCE, new CronSchedule("0/5 * * * * ? *"), - new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), - conditionRequest), + new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest), new ScriptCondition(logger, scriptService, new Script("return true")), new SearchTransform(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), transformRequest), new Actions(actions), diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java index 93625fa08dc..fd0bd37458e 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.alerts.history.HistoryStore; import org.elasticsearch.alerts.input.search.SearchInput; import org.elasticsearch.alerts.scheduler.schedule.CronSchedule; import org.elasticsearch.alerts.support.Script; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests; import org.elasticsearch.alerts.test.AlertsTestUtils; @@ -84,6 +85,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); Alert alert = new Alert( "test-serialization", + SystemClock.INSTANCE, new CronSchedule("0/5 * * * * ? 2035"), //Set this into the future so we don't get any extra runs new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest), new ScriptCondition(logger, scriptService(), new Script("return true")), @@ -143,6 +145,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { Alert alert = new Alert( "action-test-"+ i + " " + j, + SystemClock.INSTANCE, new CronSchedule("0/5 * * * * ? 2035"), //Set a cron schedule far into the future so this alert is never scheduled new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest), diff --git a/src/test/java/org/elasticsearch/alerts/throttle/PeriodThrottlerTests.java b/src/test/java/org/elasticsearch/alerts/throttle/PeriodThrottlerTests.java index 6c32e7e83e2..d44966df87d 100644 --- a/src/test/java/org/elasticsearch/alerts/throttle/PeriodThrottlerTests.java +++ b/src/test/java/org/elasticsearch/alerts/throttle/PeriodThrottlerTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.alerts.throttle; import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.ExecutionContext; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.unit.TimeValue; @@ -30,7 +31,7 @@ public class PeriodThrottlerTests extends ElasticsearchTestCase { public void testBelowPeriod() throws Exception { PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes()); TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5)); - PeriodThrottler throttler = new PeriodThrottler(period, periodType); + PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType); ExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); Alert.Status status = mock(Alert.Status.class); @@ -48,7 +49,7 @@ public class PeriodThrottlerTests extends ElasticsearchTestCase { public void testAbovePeriod() throws Exception { PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes()); TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5)); - PeriodThrottler throttler = new PeriodThrottler(period, periodType); + PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType); ExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); Alert.Status status = mock(Alert.Status.class); diff --git a/src/test/java/org/elasticsearch/alerts/transform/SearchTransformTests.java b/src/test/java/org/elasticsearch/alerts/transform/SearchTransformTests.java index ca9897273cf..9ef7b32f069 100644 --- a/src/test/java/org/elasticsearch/alerts/transform/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/alerts/transform/SearchTransformTests.java @@ -81,7 +81,7 @@ public class SearchTransformTests extends AbstractAlertsSingleNodeTests { // - we index 4 documents each one associated with a unique value and each is associated with a day // - we build a search transform such that with a filter that // - the date must be after [scheduled_time] variable - // - the date must be before [fired_time] variable + // - the date must be before [execution_time] variable // - the value must match [payload.value] variable // - the variable are set as such: // - scheduled_time = youngest document's date @@ -105,12 +105,12 @@ public class SearchTransformTests extends AbstractAlertsSingleNodeTests { SearchRequest request = Requests.searchRequest("idx").source(searchSource().query(filteredQuery(matchAllQuery(), boolFilter() .must(rangeFilter("date").gt("{{" + Variables.CTX + "." + Variables.SCHEDULED_FIRE_TIME + "}}")) - .must(rangeFilter("date").lt("{{" + Variables.CTX + "." + Variables.FIRE_TIME + "}}")) + .must(rangeFilter("date").lt("{{" + Variables.CTX + "." + Variables.EXECUTION_TIME + "}}")) .must(termFilter("value", "{{" + Variables.CTX + "." + Variables.PAYLOAD + ".value}}"))))); SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request); - ExecutionContext ctx = mockExecutionContext(parseDate("2015-01-01T00:00:00"), parseDate("2015-01-04T00:00:00"), "_name", EMPTY_PAYLOAD); + ExecutionContext ctx = mockExecutionContext(parseDate("2015-01-04T00:00:00"), parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"), "_name", EMPTY_PAYLOAD); Payload payload = simplePayload("value", "val_3");