Introduced Clock as an abstraction over the system clock

Use in:

- Determining the execution time of an alert
- The period throttler to determine the time passed since last execution

Original commit: elastic/x-pack-elasticsearch@9197b86b68
This commit is contained in:
uboness 2015-03-04 23:26:56 +01:00
parent f28dee2e44
commit 6eb27e2519
23 changed files with 227 additions and 48 deletions

View File

@ -16,6 +16,7 @@ import org.elasticsearch.alerts.input.NoneInput;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.scheduler.schedule.Schedule;
import org.elasticsearch.alerts.scheduler.schedule.ScheduleRegistry;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.alerts.throttle.AlertThrottler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
@ -60,7 +61,7 @@ public class Alert implements Scheduler.Job, ToXContent {
@Nullable
private final Transform transform;
public Alert(String name, Schedule schedule, Input input, Condition condition, @Nullable Transform transform, Actions actions, @Nullable Map<String, Object> metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) {
public Alert(String name, Clock clock, Schedule schedule, Input input, Condition condition, @Nullable Transform transform, Actions actions, @Nullable Map<String, Object> metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) {
this.name = name;
this.schedule = schedule;
this.input = input;
@ -70,7 +71,7 @@ public class Alert implements Scheduler.Job, ToXContent {
this.throttlePeriod = throttlePeriod;
this.metadata = metadata;
this.transform = transform;
throttler = new AlertThrottler(throttlePeriod);
throttler = new AlertThrottler(clock, throttlePeriod);
}
public String name() {
@ -175,6 +176,7 @@ public class Alert implements Scheduler.Job, ToXContent {
private final TransformRegistry transformRegistry;
private final ActionRegistry actionRegistry;
private final InputRegistry inputRegistry;
private final Clock clock;
private final Input defaultInput;
private final Condition defaultCondition;
@ -182,7 +184,7 @@ public class Alert implements Scheduler.Job, ToXContent {
@Inject
public Parser(Settings settings, ConditionRegistry conditionRegistry, ScheduleRegistry scheduleRegistry,
TransformRegistry transformRegistry, ActionRegistry actionRegistry,
InputRegistry inputRegistry) {
InputRegistry inputRegistry, Clock clock) {
super(settings);
this.conditionRegistry = conditionRegistry;
@ -190,6 +192,7 @@ public class Alert implements Scheduler.Job, ToXContent {
this.transformRegistry = transformRegistry;
this.actionRegistry = actionRegistry;
this.inputRegistry = inputRegistry;
this.clock = clock;
this.defaultInput = new NoneInput(logger);
this.defaultCondition = new AlwaysTrueCondition(logger);
@ -256,7 +259,7 @@ public class Alert implements Scheduler.Job, ToXContent {
throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert actions");
}
return new Alert(name, schedule, input, condition, transform, actions, metatdata, throttlePeriod, status);
return new Alert(name, clock, schedule, input, condition, transform, actions, metatdata, throttlePeriod, status);
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.alerts.input.InputModule;
import org.elasticsearch.alerts.rest.AlertsRestModule;
import org.elasticsearch.alerts.scheduler.SchedulerModule;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.clock.ClockModule;
import org.elasticsearch.alerts.support.init.InitializingModule;
import org.elasticsearch.alerts.support.template.TemplateModule;
import org.elasticsearch.alerts.transform.TransformModule;
@ -31,6 +32,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
return ImmutableList.of(
new InitializingModule(),
new TemplateModule(),
new ClockModule(),
new AlertsClientModule(),
new TransformModule(),
new AlertsRestModule(),
@ -44,14 +46,12 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
@Override
protected void configure() {
bind(Alert.Parser.class).asEagerSingleton();
bind(AlertLockService.class).asEagerSingleton();
bind(AlertsLifeCycleService.class).asEagerSingleton();
bind(AlertsService.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
}
}

View File

@ -22,6 +22,7 @@ public class ExecutionContext {
private final String id;
private final Alert alert;
private final DateTime executionTime;
private final DateTime fireTime;
private final DateTime scheduledTime;
@ -33,9 +34,10 @@ public class ExecutionContext {
private Payload payload;
public ExecutionContext(String id, Alert alert, DateTime fireTime, DateTime scheduledTime) {
public ExecutionContext(String id, Alert alert, DateTime executionTime, DateTime fireTime, DateTime scheduledTime) {
this.id = id;
this.alert = alert;
this.executionTime = executionTime;
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
}
@ -48,6 +50,10 @@ public class ExecutionContext {
return alert;
}
public DateTime executionTime() {
return executionTime;
}
public DateTime fireTime() {
return fireTime;
}
@ -70,7 +76,7 @@ public class ExecutionContext {
}
public void onConditionResult(Condition.Result conditionResult) {
alert.status().onCheck(conditionResult.met(), fireTime);
alert.status().onCheck(conditionResult.met(), executionTime);
this.conditionResult = conditionResult;
}
@ -81,9 +87,9 @@ public class ExecutionContext {
public void onThrottleResult(Throttler.Result throttleResult) {
this.throttleResult = throttleResult;
if (throttleResult.throttle()) {
alert.status().onThrottle(fireTime, throttleResult.reason());
alert.status().onThrottle(executionTime, throttleResult.reason());
} else {
alert.status().onExecution(fireTime);
alert.status().onExecution(executionTime);
}
}

View File

@ -62,7 +62,7 @@ public class IndexAction extends Action<IndexAction.Result> {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("data", payload.data());
resultBuilder.field("timestamp", ctx.fireTime());
resultBuilder.field("timestamp", ctx.executionTime());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ioe) {

View File

@ -12,6 +12,7 @@ import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.input.Input;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.support.Callback;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -41,6 +42,7 @@ public class HistoryService extends AbstractComponent {
private final AlertsStore alertsStore;
private final ClusterService clusterService;
private final AlertLockService alertLockService;
private final Clock clock;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicInteger initializationRetries = new AtomicInteger();
@ -48,13 +50,14 @@ public class HistoryService extends AbstractComponent {
@Inject
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool,
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler,
ClusterService clusterService) {
ClusterService clusterService, Clock clock) {
super(settings);
this.historyStore = historyStore;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.alertLockService = alertLockService;
this.clusterService = clusterService;
this.clock = clock;
scheduler.addListener(new SchedulerListener());
}
@ -227,7 +230,7 @@ public class HistoryService extends AbstractComponent {
try {
firedAlert.update(FiredAlert.State.CHECKING, null);
logger.debug("checking alert [{}]", firedAlert.name());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, clock.now(), firedAlert.fireTime(), firedAlert.scheduledTime());
AlertExecution alertExecution = execute(ctx);
firedAlert.update(alertExecution);
historyStore.update(firedAlert);

View File

@ -68,7 +68,7 @@ public class SearchInput extends Input<SearchInput.Result> {
@Override
public Result execute(ExecutionContext ctx) throws IOException {
SearchRequest request = createSearchRequestWithTimes(this.searchRequest, ctx.scheduledTime(), ctx.fireTime(), scriptService);
SearchRequest request = createSearchRequestWithTimes(this.searchRequest, ctx.scheduledTime(), ctx.fireTime(), ctx.executionTime(), scriptService);
if (logger.isTraceEnabled()) {
logger.trace("[{}] running query for [{}] [{}]", ctx.id(), ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true));
}
@ -111,7 +111,7 @@ public class SearchInput extends Input<SearchInput.Result> {
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, ScriptServiceProxy scriptService) throws IOException {
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, DateTime executionTime, ScriptServiceProxy scriptService) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.searchType(requestPrototype.searchType())
@ -120,6 +120,7 @@ public class SearchInput extends Input<SearchInput.Result> {
Map<String, String> templateParams = new HashMap<>();
templateParams.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime));
templateParams.put(Variables.FIRE_TIME, formatDate(fireTime));
templateParams.put(Variables.EXECUTION_TIME, formatDate(executionTime));
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);

View File

@ -19,17 +19,19 @@ public final class Variables {
public static final String CTX = "ctx";
public static final String ALERT_NAME = "alert_name";
public static final String EXECUTION_TIME = "execution_time";
public static final String FIRE_TIME = "fire_time";
public static final String SCHEDULED_FIRE_TIME = "scheduled_fire_time";
public static final String PAYLOAD = "payload";
public static Map<String, Object> createCtxModel(ExecutionContext ctx, Payload payload) {
return createCtxModel(ctx.alert().name(), ctx.fireTime(), ctx.scheduledTime(), payload);
return createCtxModel(ctx.alert().name(), ctx.executionTime(), ctx.fireTime(), ctx.scheduledTime(), payload);
}
public static Map<String, Object> createCtxModel(String alertName, DateTime fireTime, DateTime scheduledTime, Payload payload) {
public static Map<String, Object> createCtxModel(String alertName, DateTime executionTime, DateTime fireTime, DateTime scheduledTime, Payload payload) {
Map<String, Object> vars = new HashMap<>();
vars.put(ALERT_NAME, alertName);
vars.put(EXECUTION_TIME, executionTime);
vars.put(FIRE_TIME, fireTime);
vars.put(SCHEDULED_FIRE_TIME, scheduledTime);
vars.put(PAYLOAD, payload.data());

View File

@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
/**
*
*/
public interface Clock {
long millis();
long nanos();
DateTime now();
TimeValue timeElapsedSince(DateTime time);
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support.clock;
import org.elasticsearch.common.inject.AbstractModule;
/**
*
*/
public class ClockModule extends AbstractModule {
@Override
protected void configure() {
bind(Clock.class).toInstance(SystemClock.INSTANCE);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
/**
*
*/
public final class SystemClock implements Clock {
public static final SystemClock INSTANCE = new SystemClock();
private SystemClock() {
}
@Override
public long millis() {
return System.currentTimeMillis();
}
@Override
public long nanos() {
return System.nanoTime();
}
@Override
public DateTime now() {
return DateTime.now();
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(System.currentTimeMillis() - time.getMillis());
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@ -19,8 +20,8 @@ public class AlertThrottler implements Throttler {
private final PeriodThrottler periodThrottler;
private final AckThrottler ackThrottler;
public AlertThrottler(@Nullable TimeValue throttlePeriod) {
this(throttlePeriod != null ? new PeriodThrottler(throttlePeriod) : null, ACK_THROTTLER);
public AlertThrottler(Clock clock, @Nullable TimeValue throttlePeriod) {
this(throttlePeriod != null ? new PeriodThrottler(clock, throttlePeriod) : null, ACK_THROTTLER);
}
AlertThrottler(PeriodThrottler periodThrottler, AckThrottler ackThrottler) {

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.unit.TimeValue;
@ -17,14 +18,16 @@ public class PeriodThrottler implements Throttler {
private final TimeValue period;
private final PeriodType periodType;
private final Clock clock;
public PeriodThrottler(TimeValue period) {
this(period, PeriodType.minutes());
public PeriodThrottler(Clock clock, TimeValue period) {
this(clock, period, PeriodType.minutes());
}
public PeriodThrottler(TimeValue period, PeriodType periodType) {
public PeriodThrottler(Clock clock, TimeValue period, PeriodType periodType) {
this.period = period;
this.periodType = periodType;
this.clock = clock;
}
public TimeValue interval() {
@ -35,7 +38,7 @@ public class PeriodThrottler implements Throttler {
public Result throttle(ExecutionContext ctx) {
Alert.Status status = ctx.alert().status();
if (status.lastExecuted() != null) {
TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastExecuted().getMillis());
TimeValue timeElapsed = clock.timeElapsedSince(status.lastExecuted());
if (timeElapsed.getMillis() <= period.getMillis()) {
return Result.throttle("throttling interval is set to [" + period.format(periodType) +
"] but time elapsed since last execution is [" + timeElapsed.format(periodType) + "]");

View File

@ -28,6 +28,7 @@ import org.elasticsearch.alerts.scheduler.schedule.*;
import org.elasticsearch.alerts.scheduler.schedule.support.*;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.support.template.ScriptTemplate;
@ -99,11 +100,11 @@ public class AlertTests extends ElasticsearchTestCase {
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10));
Alert alert = new Alert("_name", schedule, input, condition, transform, actions, metadata, throttlePeriod, status);
Alert alert = new Alert("_name", SystemClock.INSTANCE, schedule, input, condition, transform, actions, metadata, throttlePeriod, status);
BytesReference bytes = XContentFactory.jsonBuilder().value(alert).bytes();
logger.info(bytes.toUtf8());
Alert.Parser alertParser = new Alert.Parser(settings, conditionRegistry, scheduleRegistry, transformRegistry, actionRegistry, inputRegistry);
Alert.Parser alertParser = new Alert.Parser(settings, conditionRegistry, scheduleRegistry, transformRegistry, actionRegistry, inputRegistry, SystemClock.INSTANCE);
boolean includeStatus = randomBoolean();
Alert parsedAlert = alertParser.parse("_name", includeStatus, bytes);

View File

@ -82,7 +82,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC);
String ctxId = randomAsciiOfLength(5);
ExecutionContext ctx = mockExecutionContext(now, now, "alert1", payload);
ExecutionContext ctx = mockExecutionContext(now, "alert1", payload);
when(ctx.id()).thenReturn(ctxId);
if (transform != null) {
Transform.Result transformResult = mock(Transform.Result.class);
@ -94,6 +94,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
.put("ctx", ImmutableMap.<String, Object>builder()
.put("alert_name", "alert1")
.put("payload", transform == null ? data : new Payload.Simple("_key", "_value").data())
.put("execution_time", now)
.put("fire_time", now)
.put("scheduled_fire_time", now).build())
.build();

View File

@ -47,7 +47,7 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests {
public void testParser_WithSealedFiredAlert() throws Exception {
Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Input.Result inputResult = new SimpleInput.Result(SimpleInput.TYPE, new Payload.Simple());
@ -69,7 +69,7 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests {
public void testParser_WithSealedFiredAlert_WithScriptSearchCondition() throws Exception {
Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Input.Result inputResult = new SimpleInput.Result(SimpleInput.TYPE, new Payload.Simple());

View File

@ -13,6 +13,7 @@ import org.elasticsearch.alerts.condition.simple.AlwaysFalseCondition;
import org.elasticsearch.alerts.condition.simple.AlwaysTrueCondition;
import org.elasticsearch.alerts.input.Input;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.cluster.ClusterService;
@ -25,9 +26,7 @@ import org.junit.Test;
import java.util.Arrays;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
@ -56,7 +55,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
AlertLockService alertLockService = mock(AlertLockService.class);
Scheduler scheduler = mock(Scheduler.class);
ClusterService clusterService = mock(ClusterService.class);
historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService);
historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE);
}
@Test
@ -87,7 +86,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
when(alert.actions()).thenReturn(actions);
when(alert.status()).thenReturn(alertStatus);
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
assertThat(alertExecution.transformResult(), sameInstance(transformResult));
@ -130,7 +129,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
when(alert.actions()).thenReturn(actions);
when(alert.status()).thenReturn(alertStatus);
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.inputResult(), sameInstance(inputResult));
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
@ -173,7 +172,7 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
when(alert.actions()).thenReturn(actions);
when(alert.status()).thenReturn(alertStatus);
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now(), DateTime.now());
AlertExecution alertExecution = historyService.execute(context);
assertThat(alertExecution.inputResult(), sameInstance(inputResult));
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));

View File

@ -60,7 +60,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request);
ExecutionContext ctx = new ExecutionContext("test-alert", null,
new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
@ -86,7 +86,7 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
ScriptServiceProxy.of(internalCluster().getInstance(ScriptService.class)),
ClientProxy.of(client()), request);
ExecutionContext ctx = new ExecutionContext("test-alert", null,
new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.Duration;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class ClockMock implements Clock {
private DateTime now = DateTime.now();
@Override
public long millis() {
return now.getMillis();
}
@Override
public long nanos() {
return TimeUnit.MILLISECONDS.toNanos(now.getMillis());
}
@Override
public DateTime now() {
return now;
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(new Duration(time, now).getMillis());
}
public void setTime(DateTime now) {
this.now = now;
}
public void fastForward(TimeValue timeValue) {
setTime(now.plusMillis((int) timeValue.millis()));
}
public void rewind(TimeValue timeValue) {
setTime(now.minusMillis((int) timeValue.millis()));
}
}

View File

@ -6,7 +6,9 @@
package org.elasticsearch.alerts.test;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.AlertsService;
@ -33,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
@ -100,6 +103,15 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
stopAlerting();
}
protected long docCount(String index, String type, SearchSourceBuilder source) {
SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT);
if (type != null) {
builder.setTypes(type);
}
builder.setSource(source.buildAsBytes());
return builder.get().getHits().getTotalHits();
}
protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript) throws IOException {
return createAlertSource(cron, conditionRequest, conditionScript, null);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.alerts.input.search.SearchInput;
import org.elasticsearch.alerts.scheduler.schedule.CronSchedule;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.support.template.ScriptTemplate;
@ -85,14 +86,18 @@ public final class AlertsTestUtils {
}
public static ExecutionContext mockExecutionContext(String alertName, Payload payload) {
DateTime now = DateTime.now();
return mockExecutionContext(now, now, alertName, payload);
return mockExecutionContext(DateTime.now(), alertName, payload);
}
public static ExecutionContext mockExecutionContext(DateTime scheduledTime, DateTime firedTime, String alertName, Payload payload) {
public static ExecutionContext mockExecutionContext(DateTime time, String alertName, Payload payload) {
return mockExecutionContext(time, time, time, alertName, payload);
}
public static ExecutionContext mockExecutionContext(DateTime executionTime, DateTime firedTime, DateTime scheduledTime, String alertName, Payload payload) {
ExecutionContext ctx = mock(ExecutionContext.class);
when(ctx.scheduledTime()).thenReturn(scheduledTime);
when(ctx.executionTime()).thenReturn(executionTime);
when(ctx.fireTime()).thenReturn(firedTime);
when(ctx.scheduledTime()).thenReturn(scheduledTime);
Alert alert = mock(Alert.class);
when(alert.name()).thenReturn(alertName);
when(ctx.alert()).thenReturn(alert);
@ -134,9 +139,9 @@ public final class AlertsTestUtils {
return new Alert(
alertName,
SystemClock.INSTANCE,
new CronSchedule("0/5 * * * * ? *"),
new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()),
conditionRequest),
new SearchInput(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), conditionRequest),
new ScriptCondition(logger, scriptService, new Script("return true")),
new SearchTransform(logger, scriptService, ClientProxy.of(ElasticsearchIntegrationTest.client()), transformRequest),
new Actions(actions),

View File

@ -20,6 +20,7 @@ import org.elasticsearch.alerts.history.HistoryStore;
import org.elasticsearch.alerts.input.search.SearchInput;
import org.elasticsearch.alerts.scheduler.schedule.CronSchedule;
import org.elasticsearch.alerts.support.Script;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests;
import org.elasticsearch.alerts.test.AlertsTestUtils;
@ -84,6 +85,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
Alert alert = new Alert(
"test-serialization",
SystemClock.INSTANCE,
new CronSchedule("0/5 * * * * ? 2035"), //Set this into the future so we don't get any extra runs
new SearchInput(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new ScriptCondition(logger, scriptService(), new Script("return true")),
@ -143,6 +145,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
Alert alert = new Alert(
"action-test-"+ i + " " + j,
SystemClock.INSTANCE,
new CronSchedule("0/5 * * * * ? 2035"), //Set a cron schedule far into the future so this alert is never scheduled
new SearchInput(logger, scriptService(), ClientProxy.of(client()),
searchRequest),

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.throttle;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.unit.TimeValue;
@ -30,7 +31,7 @@ public class PeriodThrottlerTests extends ElasticsearchTestCase {
public void testBelowPeriod() throws Exception {
PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes());
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(period, periodType);
PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType);
ExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Alert.Status status = mock(Alert.Status.class);
@ -48,7 +49,7 @@ public class PeriodThrottlerTests extends ElasticsearchTestCase {
public void testAbovePeriod() throws Exception {
PeriodType periodType = randomFrom(PeriodType.millis(), PeriodType.seconds(), PeriodType.minutes());
TimeValue period = TimeValue.timeValueSeconds(randomIntBetween(2, 5));
PeriodThrottler throttler = new PeriodThrottler(period, periodType);
PeriodThrottler throttler = new PeriodThrottler(SystemClock.INSTANCE, period, periodType);
ExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Alert.Status status = mock(Alert.Status.class);

View File

@ -81,7 +81,7 @@ public class SearchTransformTests extends AbstractAlertsSingleNodeTests {
// - we index 4 documents each one associated with a unique value and each is associated with a day
// - we build a search transform such that with a filter that
// - the date must be after [scheduled_time] variable
// - the date must be before [fired_time] variable
// - the date must be before [execution_time] variable
// - the value must match [payload.value] variable
// - the variable are set as such:
// - scheduled_time = youngest document's date
@ -105,12 +105,12 @@ public class SearchTransformTests extends AbstractAlertsSingleNodeTests {
SearchRequest request = Requests.searchRequest("idx").source(searchSource().query(filteredQuery(matchAllQuery(), boolFilter()
.must(rangeFilter("date").gt("{{" + Variables.CTX + "." + Variables.SCHEDULED_FIRE_TIME + "}}"))
.must(rangeFilter("date").lt("{{" + Variables.CTX + "." + Variables.FIRE_TIME + "}}"))
.must(rangeFilter("date").lt("{{" + Variables.CTX + "." + Variables.EXECUTION_TIME + "}}"))
.must(termFilter("value", "{{" + Variables.CTX + "." + Variables.PAYLOAD + ".value}}")))));
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ExecutionContext ctx = mockExecutionContext(parseDate("2015-01-01T00:00:00"), parseDate("2015-01-04T00:00:00"), "_name", EMPTY_PAYLOAD);
ExecutionContext ctx = mockExecutionContext(parseDate("2015-01-04T00:00:00"), parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"), "_name", EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");