From 50f4a1c0e3578d5879bb131bd54d4f84fb4531ed Mon Sep 17 00:00:00 2001 From: uboness Date: Thu, 5 Mar 2015 15:12:02 +0100 Subject: [PATCH] Introducing TimeWarp mode for tests The idea behind a time warp mode is that while it's enabled the time related constructs in the alerts module are replaced with a mock test friendly version.. so we'll be able to control time and therefore avoid sleeping the threads. In time warp mode: - The `SchedulerMock` is used to manually fire jobs - The `ClockMock` is used to set and fast forward time - The alerts are executed on the same thread as the scheduler mock... so we don't have to deal with async nature at all. This is accomplished by the added `AlertsExecutor` abstraction. By default, the time warp mode is enabled and tests run in it. If a test must not use the time warp mode, it is possible to add `@TimeWarped(false)` annotation to the test and it will then run with the standard scheduler & clock. It is also possible to disable this mode all together by running the tests with `-Dtests.timewarp=false`. All the updated tests now work in both modes (whether the time warp mode is dis/enabled). This is important as on the server we would like to run the tests outside of this mode as well, but locally we'd like to run them with time warped enabled (so they'll be faster) Also, cleaned up the tests.. we now only do `assertThat(...)` calls (no `assertTrue` or `assertEquals`... for consistency sake) Original commit: elastic/x-pack-elasticsearch@11e09f6deaa06a45359d48e3d4bd37ff5c098edb --- pom.xml | 2 + .../alerts/history/AlertsExecutor.java | 21 ++ .../alerts/history/HistoryModule.java | 12 + .../alerts/history/HistoryService.java | 28 +-- .../history/InternalAlertsExecutor.java | 45 ++++ .../alerts/input/search/SearchInput.java | 3 +- .../alerts/scheduler/InternalScheduler.java | 25 +- .../alerts/scheduler/SchedulerModule.java | 13 +- .../stats/TransportAlertsStatsAction.java | 4 +- .../script/ScriptConditionSearchTests.java | 7 +- .../alerts/history/FiredAlertTests.java | 20 +- .../alerts/history/HistoryServiceTests.java | 5 +- .../scheduler/InternalSchedulerTests.java | 3 +- .../alerts/scheduler/SchedulerMock.java | 44 +++- .../alerts/support/clock/ClockMock.java | 18 +- .../test/AbstractAlertsIntegrationTests.java | 81 ++++++- .../alerts/test/TimeWarpedAlertsPlugin.java | 116 +++++++++ .../test/integration/AlertMetadataTests.java | 11 +- .../test/integration/AlertStatsTests.java | 25 +- .../test/integration/AlertThrottleTests.java | 204 ++++++++-------- .../test/integration/BasicAlertsTests.java | 225 +++++++++++------- .../test/integration/BootStrapTests.java | 34 ++- .../test/integration/NoMasterNodeTests.java | 8 +- .../integration/TransformSearchTests.java | 15 +- 24 files changed, 685 insertions(+), 284 deletions(-) create mode 100644 src/main/java/org/elasticsearch/alerts/history/AlertsExecutor.java create mode 100644 src/main/java/org/elasticsearch/alerts/history/InternalAlertsExecutor.java create mode 100644 src/test/java/org/elasticsearch/alerts/test/TimeWarpedAlertsPlugin.java diff --git a/pom.xml b/pom.xml index 4d324b77cc6..4cbef88bd8b 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ .local-${project.version}-execution-hints.log false false + true @@ -400,6 +401,7 @@ ${tests.cluster} ${tests.heap.size} ${tests.filter} + ${tests.timewarp} ${env.ES_TEST_LOCAL} ${es.node.mode} ${es.logger.level} diff --git a/src/main/java/org/elasticsearch/alerts/history/AlertsExecutor.java b/src/main/java/org/elasticsearch/alerts/history/AlertsExecutor.java new file mode 100644 index 00000000000..6cf806503d7 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/AlertsExecutor.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import java.util.concurrent.BlockingQueue; + +/** + * + */ +public interface AlertsExecutor { + + BlockingQueue queue(); + + long largestPoolSize(); + + void execute(Runnable runnable); + +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java b/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java index 6b015239c7d..c24b0cfbb23 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java @@ -11,10 +11,22 @@ import org.elasticsearch.common.inject.AbstractModule; */ public class HistoryModule extends AbstractModule { + private final Class executorClass; + + public HistoryModule() { + this(InternalAlertsExecutor.class); + } + + protected HistoryModule(Class executorClass) { + this.executorClass = executorClass; + } + @Override protected void configure() { bind(FiredAlert.Parser.class).asEagerSingleton(); bind(HistoryStore.class).asEagerSingleton(); bind(HistoryService.class).asEagerSingleton(); + bind(executorClass).asEagerSingleton(); + bind(AlertsExecutor.class).to(executorClass); } } diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index 5df688def4f..a0f062b645a 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -24,8 +24,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; @@ -38,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class HistoryService extends AbstractComponent { private final HistoryStore historyStore; - private final ThreadPool threadPool; + private final AlertsExecutor executor; private final AlertsStore alertsStore; private final ClusterService clusterService; private final AlertLockService alertLockService; @@ -48,12 +46,12 @@ public class HistoryService extends AbstractComponent { private final AtomicInteger initializationRetries = new AtomicInteger(); @Inject - public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, + public HistoryService(Settings settings, HistoryStore historyStore, AlertsExecutor executor, AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler, ClusterService clusterService, Clock clock) { super(settings); this.historyStore = historyStore; - this.threadPool = threadPool; + this.executor = executor; this.alertsStore = alertsStore; this.alertLockService = alertLockService; this.clusterService = clusterService; @@ -67,7 +65,7 @@ public class HistoryService extends AbstractComponent { return; } - assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements."; + assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements."; HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION); if (!loadResult.succeeded()) { retry(callback); @@ -87,7 +85,7 @@ public class HistoryService extends AbstractComponent { // We could also rely on the shutdown in #updateSettings call, but // this is a forceful shutdown that also interrupts the worker threads in the threadpool List cancelledTasks = new ArrayList<>(); - alertsThreadPool().getQueue().drainTo(cancelledTasks); + executor.queue().drainTo(cancelledTasks); logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); logger.debug("stopped history service"); } @@ -98,13 +96,13 @@ public class HistoryService extends AbstractComponent { } // TODO: should be removed from the stats api? This is already visible in the thread pool cat api. - public long getQueueSize() { - return alertsThreadPool().getQueue().size(); + public long queueSize() { + return executor.queue().size(); } // TODO: should be removed from the stats api? This is already visible in the thread pool cat api. - public long getLargestQueueSize() { - return alertsThreadPool().getLargestPoolSize(); + public long largestQueueSize() { + return executor.largestPoolSize(); } void fire(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException { @@ -130,7 +128,7 @@ public class HistoryService extends AbstractComponent { void executeAsync(FiredAlert firedAlert, Alert alert) { try { - alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert)); + executor.execute(new AlertExecutionTask(firedAlert, alert)); } catch (EsRejectedExecutionException e) { logger.debug("[{}] failed to execute fired alert", firedAlert.name()); firedAlert.update(FiredAlert.State.FAILED, "failed to run fired alert due to thread pool capacity"); @@ -180,10 +178,6 @@ public class HistoryService extends AbstractComponent { } } - private EsThreadPoolExecutor alertsThreadPool() { - return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME); - } - private void retry(final Callback callback) { ClusterStateListener clusterStateListener = new ClusterStateListener() { @@ -193,7 +187,7 @@ public class HistoryService extends AbstractComponent { assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; clusterService.remove(this); // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { diff --git a/src/main/java/org/elasticsearch/alerts/history/InternalAlertsExecutor.java b/src/main/java/org/elasticsearch/alerts/history/InternalAlertsExecutor.java new file mode 100644 index 00000000000..fbe48ddee4d --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/InternalAlertsExecutor.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import org.elasticsearch.alerts.AlertsPlugin; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.BlockingQueue; + +/** + * + */ +public class InternalAlertsExecutor implements AlertsExecutor { + + private final ThreadPool threadPool; + + @Inject + public InternalAlertsExecutor(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public BlockingQueue queue() { + return executor().getQueue(); + } + + @Override + public long largestPoolSize() { + return executor().getLargestPoolSize(); + } + + @Override + public void execute(Runnable runnable) { + executor().execute(runnable); + } + + private EsThreadPoolExecutor executor() { + return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME); + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java index 1a0789220ea..21ef858e5e3 100644 --- a/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java +++ b/src/main/java/org/elasticsearch/alerts/input/search/SearchInput.java @@ -127,7 +127,8 @@ public class SearchInput extends Input { } else if (requestPrototype.templateName() != null) { MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) .put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime)) - .put(Variables.FIRE_TIME, formatDate(fireTime)); + .put(Variables.FIRE_TIME, formatDate(fireTime)) + .put(Variables.EXECUTION_TIME, formatDate(executionTime)); request.templateParams(templateParams.map()); request.templateName(requestPrototype.templateName()); request.templateType(requestPrototype.templateType()); diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/InternalScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/InternalScheduler.java index e591aa65565..7767ad71c1e 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/InternalScheduler.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/InternalScheduler.java @@ -10,6 +10,7 @@ import org.elasticsearch.alerts.AlertsSettingsException; import org.elasticsearch.alerts.scheduler.schedule.CronnableSchedule; import org.elasticsearch.alerts.scheduler.schedule.IntervalSchedule; import org.elasticsearch.alerts.scheduler.schedule.Schedule; +import org.elasticsearch.alerts.support.clock.Clock; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; @@ -31,17 +32,19 @@ public class InternalScheduler extends AbstractComponent implements Scheduler { // Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool private volatile static ThreadPool threadPool; - private volatile org.quartz.Scheduler scheduler; - - private List listeners; - + private final Clock clock; private final DateTimeZone defaultTimeZone; + private volatile org.quartz.Scheduler scheduler; + private List listeners; + + @Inject - public InternalScheduler(Settings settings, ThreadPool threadPool) { + public InternalScheduler(Settings settings, ThreadPool threadPool, Clock clock) { super(settings); - this.listeners = new CopyOnWriteArrayList<>(); InternalScheduler.threadPool = threadPool; + this.clock = clock; + this.listeners = new CopyOnWriteArrayList<>(); String timeZoneStr = componentSettings.get("time_zone", "UTC"); try { this.defaultTimeZone = DateTimeZone.forID(timeZoneStr); @@ -65,7 +68,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler { scheduler.setJobFactory(new SimpleJobFactory()); Map> quartzJobs = new HashMap<>(); for (Job alert : jobs) { - quartzJobs.put(jobDetail(alert.name(), this), createTrigger(alert.schedule(), defaultTimeZone)); + quartzJobs.put(jobDetail(alert.name(), this), createTrigger(alert.schedule(), defaultTimeZone, clock)); } scheduler.scheduleJobs(quartzJobs, false); scheduler.start(); @@ -107,7 +110,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler { public void add(Job job) { try { logger.trace("scheduling [{}] with schedule [{}]", job.name(), job.schedule()); - scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(job.schedule(), defaultTimeZone), true); + scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(job.schedule(), defaultTimeZone, clock), true); } catch (org.quartz.SchedulerException se) { logger.error("Failed to schedule job",se); throw new SchedulerException("Failed to schedule job", se); @@ -122,13 +125,13 @@ public class InternalScheduler extends AbstractComponent implements Scheduler { } } - static Set createTrigger(Schedule schedule, DateTimeZone timeZone) { + static Set createTrigger(Schedule schedule, DateTimeZone timeZone, Clock clock) { HashSet triggers = new HashSet<>(); if (schedule instanceof CronnableSchedule) { for (String cron : ((CronnableSchedule) schedule).crons()) { triggers.add(TriggerBuilder.newTrigger() .withSchedule(CronScheduleBuilder.cronSchedule(cron).inTimeZone(timeZone.toTimeZone())) - .startNow() + .startAt(clock.now().toDate()) .build()); } } else { @@ -137,7 +140,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler { triggers.add(TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds((int) interval.seconds()) .repeatForever()) - .startNow() + .startAt(clock.now().toDate()) .build()); } return triggers; diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/SchedulerModule.java b/src/main/java/org/elasticsearch/alerts/scheduler/SchedulerModule.java index d69a198b023..d56183dd2b4 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/SchedulerModule.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/SchedulerModule.java @@ -17,10 +17,16 @@ import java.util.Map; */ public class SchedulerModule extends AbstractModule { + private final Class schedulerClass; + private final Map> parsers = new HashMap<>(); - public void registerSchedule(String type, Class parser) { - parsers.put(type, parser); + public SchedulerModule() { + this(InternalScheduler.class); + } + + protected SchedulerModule(Class schedulerClass) { + this.schedulerClass = schedulerClass; } @Override @@ -48,6 +54,7 @@ public class SchedulerModule extends AbstractModule { } bind(ScheduleRegistry.class).asEagerSingleton(); - bind(Scheduler.class).to(InternalScheduler.class).asEagerSingleton(); + bind(schedulerClass).asEagerSingleton(); + bind(Scheduler.class).to(schedulerClass); } } diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java index 972444f705d..5be22cbfaac 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java @@ -59,9 +59,9 @@ public class TransportAlertsStatsAction extends TransportMasterNodeOperationActi AlertsStatsResponse statsResponse = new AlertsStatsResponse(); statsResponse.setAlertManagerState(alertsService.state()); statsResponse.setAlertActionManagerStarted(historyService.started()); - statsResponse.setAlertActionManagerQueueSize(historyService.getQueueSize()); + statsResponse.setAlertActionManagerQueueSize(historyService.queueSize()); statsResponse.setNumberOfRegisteredAlerts(alertsService.getNumberOfAlerts()); - statsResponse.setAlertActionManagerLargestQueueSize(historyService.getLargestQueueSize()); + statsResponse.setAlertActionManagerLargestQueueSize(historyService.largestQueueSize()); statsResponse.setVersion(AlertsVersion.CURRENT); statsResponse.setBuild(AlertsBuild.CURRENT); listener.onResponse(statsResponse); diff --git a/src/test/java/org/elasticsearch/alerts/condition/script/ScriptConditionSearchTests.java b/src/test/java/org/elasticsearch/alerts/condition/script/ScriptConditionSearchTests.java index af247efcec8..e04cb5eb6b3 100644 --- a/src/test/java/org/elasticsearch/alerts/condition/script/ScriptConditionSearchTests.java +++ b/src/test/java/org/elasticsearch/alerts/condition/script/ScriptConditionSearchTests.java @@ -36,6 +36,7 @@ import java.util.HashSet; import java.util.Set; import static org.elasticsearch.alerts.test.AlertsTestUtils.mockExecutionContext; +import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.when; /** @@ -88,7 +89,7 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests { .get(); ctx = mockExecutionContext("_name", new Payload.ActionResponse(response)); - assertTrue(condition.execute(ctx).met()); + assertThat(condition.execute(ctx).met(), is(true)); } @Test @@ -102,10 +103,10 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests { SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 500l, new ShardSearchFailure[0]); ExecutionContext ctx = mockExecutionContext("_alert_name", new Payload.ActionResponse(response)); - assertTrue(condition.execute(ctx).met()); + assertThat(condition.execute(ctx).met(), is(true)); hit.score(2f); when(ctx.payload()).thenReturn(new Payload.ActionResponse(response)); - assertFalse(condition.execute(ctx).met()); + assertThat(condition.execute(ctx).met(), is(false)); } } diff --git a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java index 41503282898..057d0e30cfc 100644 --- a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java +++ b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTests.java @@ -5,7 +5,10 @@ */ package org.elasticsearch.alerts.history; -import org.elasticsearch.alerts.*; +import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertExecution; +import org.elasticsearch.alerts.ExecutionContext; +import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.actions.email.EmailAction; import org.elasticsearch.alerts.actions.webhook.WebhookAction; import org.elasticsearch.alerts.condition.Condition; @@ -22,25 +25,24 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.junit.Test; +import static org.hamcrest.Matchers.equalTo; + /** */ public class FiredAlertTests extends AbstractAlertsIntegrationTests { @Test public void testParser() throws Exception { - Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger); FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0); - XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder(); parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS); - assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8()); - + assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8())); } @Test @@ -60,9 +62,11 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0); + XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder(); parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS); - assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8()); + + assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8())); } @Test @@ -82,9 +86,11 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0); + XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder(); parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS); - assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8()); + + assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8())); } diff --git a/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java b/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java index 7cf02076729..7f3e520a4ca 100644 --- a/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java +++ b/src/test/java/org/elasticsearch/alerts/history/HistoryServiceTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import org.junit.Test; @@ -50,12 +49,12 @@ public class HistoryServiceTests extends ElasticsearchTestCase { when(input.execute(any(ExecutionContext.class))).thenReturn(inputResult); HistoryStore historyStore = mock(HistoryStore.class); - ThreadPool threadPool = mock(ThreadPool.class); + AlertsExecutor executor = mock(AlertsExecutor.class); AlertsStore alertsStore = mock(AlertsStore.class); AlertLockService alertLockService = mock(AlertLockService.class); Scheduler scheduler = mock(Scheduler.class); ClusterService clusterService = mock(ClusterService.class); - historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE); + historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, executor, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE); } @Test diff --git a/src/test/java/org/elasticsearch/alerts/scheduler/InternalSchedulerTests.java b/src/test/java/org/elasticsearch/alerts/scheduler/InternalSchedulerTests.java index a7161feff37..cfbcaaab98c 100644 --- a/src/test/java/org/elasticsearch/alerts/scheduler/InternalSchedulerTests.java +++ b/src/test/java/org/elasticsearch/alerts/scheduler/InternalSchedulerTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.alerts.AlertsPlugin; import org.elasticsearch.alerts.scheduler.schedule.Schedule; import org.elasticsearch.alerts.scheduler.schedule.support.DayOfWeek; import org.elasticsearch.alerts.scheduler.schedule.support.WeekTimes; +import org.elasticsearch.alerts.support.clock.SystemClock; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.settings.ImmutableSettings; @@ -47,7 +48,7 @@ public class InternalSchedulerTests extends ElasticsearchTestCase { .put("name", "test") .build(); threadPool = new ThreadPool(settings, null); - scheduler = new InternalScheduler(ImmutableSettings.EMPTY, threadPool); + scheduler = new InternalScheduler(ImmutableSettings.EMPTY, threadPool, SystemClock.INSTANCE); } @After diff --git a/src/test/java/org/elasticsearch/alerts/scheduler/SchedulerMock.java b/src/test/java/org/elasticsearch/alerts/scheduler/SchedulerMock.java index 340a5368daa..6e0faef639b 100644 --- a/src/test/java/org/elasticsearch/alerts/scheduler/SchedulerMock.java +++ b/src/test/java/org/elasticsearch/alerts/scheduler/SchedulerMock.java @@ -5,7 +5,14 @@ */ package org.elasticsearch.alerts.scheduler; +import org.elasticsearch.alerts.support.clock.Clock; +import org.elasticsearch.alerts.support.clock.ClockMock; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import java.util.Collection; import java.util.List; @@ -19,9 +26,16 @@ import java.util.concurrent.CopyOnWriteArrayList; */ public class SchedulerMock implements Scheduler { + private final ESLogger logger; private final List listeners = new CopyOnWriteArrayList<>(); - private final ConcurrentMap jobs = new ConcurrentHashMap<>(); + private final Clock clock; + + @Inject + public SchedulerMock(Settings settings, Clock clock) { + this.logger = Loggers.getLogger(SchedulerMock.class, settings); + this.clock = clock; + } @Override public void start(Collection jobs) { @@ -47,9 +61,31 @@ public class SchedulerMock implements Scheduler { } public void fire(String jobName) { - DateTime now = new DateTime(); - for (Listener listener : listeners) { - listener.fire(jobName, now ,now); + fire(jobName, 1, null); + } + + public void fire(String jobName, int times) { + fire(jobName, times, null); + } + + public void fire(String jobName, int times, TimeValue interval) { + for (int i = 0; i < times; i++) { + DateTime now = clock.now(); + logger.debug("firing [" + jobName + "] at [" + now + "]"); + for (Listener listener : listeners) { + listener.fire(jobName, now, now); + } + if (clock instanceof ClockMock) { + ((ClockMock) clock).fastForward(interval == null ? TimeValue.timeValueMillis(10) : interval); + } else { + if (interval != null) { + try { + Thread.sleep(interval.millis()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } } } } diff --git a/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java b/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java index bccd9ace250..30369ff47b4 100644 --- a/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java +++ b/src/test/java/org/elasticsearch/alerts/support/clock/ClockMock.java @@ -38,16 +38,24 @@ public class ClockMock implements Clock { return TimeValue.timeValueMillis(new Duration(time, now).getMillis()); } - public void setTime(DateTime now) { + public ClockMock setTime(DateTime now) { this.now = now; + return this; } - public void fastForward(TimeValue timeValue) { - setTime(now.plusMillis((int) timeValue.millis())); + public ClockMock fastForward(TimeValue timeValue) { + return setTime(now.plusMillis((int) timeValue.millis())); } - public void rewind(TimeValue timeValue) { - setTime(now.minusMillis((int) timeValue.millis())); + public ClockMock fastForwardSeconds(int seconds) { + return fastForward(TimeValue.timeValueSeconds(seconds)); } + public ClockMock rewind(TimeValue timeValue) { + return setTime(now.minusMillis((int) timeValue.millis())); + } + + public ClockMock rewindSeconds(int seconds) { + return rewind(TimeValue.timeValueSeconds(seconds)); + } } diff --git a/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java b/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java index 712410284f7..30e6ce68ceb 100644 --- a/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/AbstractAlertsIntegrationTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.AlertsPlugin; import org.elasticsearch.alerts.AlertsService; import org.elasticsearch.alerts.actions.email.service.Authentication; @@ -20,7 +21,12 @@ import org.elasticsearch.alerts.actions.webhook.HttpClient; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryStore; +import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.scheduler.SchedulerMock; +import org.elasticsearch.alerts.scheduler.schedule.Schedule; +import org.elasticsearch.alerts.scheduler.schedule.Schedules; import org.elasticsearch.alerts.support.AlertUtils; +import org.elasticsearch.alerts.support.clock.ClockMock; import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.alerts.support.template.Template; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; @@ -30,11 +36,13 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.common.netty.util.internal.SystemPropertyUtil; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -60,15 +68,43 @@ import static org.hamcrest.core.IsNot.not; @ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false) public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegrationTest { + private static final boolean timeWarpEnabled = SystemPropertyUtil.getBoolean("tests.timewarp", true); + + private TimeWarp timeWarp; + @Override protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.builder() .put(super.nodeSettings(nodeOrdinal)) .put("scroll.size", randomIntBetween(1, 100)) - .put("plugin.types", AlertsPlugin.class.getName()) + .put("plugin.types", timeWarped() ? TimeWarpedAlertsPlugin.class.getName() : AlertsPlugin.class.getName()) .build(); } + /** + * @return whether the test suite should run in time warp mode. By default this will be determined globally + * to all test suites based on {@code -Dtests.timewarp} system property (when missing, defaults to + * {@code true}). If a test suite requires to force the mode or force not running under this mode + * this method can be overridden. + */ + protected boolean timeWarped() { + return timeWarpEnabled; + } + + @Before + public void setupTimeWarp() throws Exception { + if (timeWarped()) { + timeWarp = new TimeWarp( + internalTestCluster().getInstance(SchedulerMock.class, internalTestCluster().getMasterName()), + internalTestCluster().getInstance(ClockMock.class, internalTestCluster().getMasterName())); + } + } + + protected TimeWarp timeWarp() { + assert timeWarped() : "cannot access TimeWarp when test context is not time warped"; + return timeWarp; + } + public boolean randomizeNumberOfShardsAndReplicas() { return false; } @@ -103,6 +139,10 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr stopAlerting(); } + protected long docCount(String index, String type, QueryBuilder query) { + return docCount(index, type, SearchSourceBuilder.searchSource().query(query)); + } + protected long docCount(String index, String type, SearchSourceBuilder source) { SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT); if (type != null) { @@ -116,12 +156,20 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr return createAlertSource(cron, conditionRequest, conditionScript, null); } + protected BytesReference createAlertSource(Schedule schedule, SearchRequest conditionRequest, String conditionScript) throws IOException { + return createAlertSource(schedule, conditionRequest, conditionScript, null); + } + protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript, Map metadata) throws IOException { + return createAlertSource(Schedules.cron(cron), conditionRequest, conditionScript, metadata); + } + + protected BytesReference createAlertSource(Schedule schedule, SearchRequest conditionRequest, String conditionScript, Map metadata) throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); { builder.startObject("schedule") - .field("cron", cron) + .field(schedule.type(), schedule) .endObject(); if (metadata != null) { @@ -162,6 +210,14 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr return builder.bytes(); } + protected Alert.Parser alertParser() { + return internalTestCluster().getInstance(Alert.Parser.class, internalTestCluster().getMasterName()); + } + + protected Scheduler scheduler() { + return internalTestCluster().getInstance(Scheduler.class, internalTestCluster().getMasterName()); + } + protected AlertsClient alertClient() { return internalTestCluster().getInstance(AlertsClient.class); } @@ -225,7 +281,7 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.EXECUTED.id()))) .get(); - assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); + assertThat("could not find executed fired alert", searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); if (assertConditionMet) { assertThat((Integer) XContentMapValues.extractValue("alert_execution.input_result.search.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); } @@ -444,4 +500,23 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr } } + protected static class TimeWarp { + + protected final SchedulerMock scheduler; + protected final ClockMock clock; + + public TimeWarp(SchedulerMock scheduler, ClockMock clock) { + this.scheduler = scheduler; + this.clock = clock; + } + + public SchedulerMock scheduler() { + return scheduler; + } + + public ClockMock clock() { + return clock; + } + } + } diff --git a/src/test/java/org/elasticsearch/alerts/test/TimeWarpedAlertsPlugin.java b/src/test/java/org/elasticsearch/alerts/test/TimeWarpedAlertsPlugin.java new file mode 100644 index 00000000000..73726b591db --- /dev/null +++ b/src/test/java/org/elasticsearch/alerts/test/TimeWarpedAlertsPlugin.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.test; + +import org.elasticsearch.alerts.AlertsPlugin; +import org.elasticsearch.alerts.history.AlertsExecutor; +import org.elasticsearch.alerts.history.HistoryModule; +import org.elasticsearch.alerts.scheduler.SchedulerMock; +import org.elasticsearch.alerts.scheduler.SchedulerModule; +import org.elasticsearch.alerts.support.clock.Clock; +import org.elasticsearch.alerts.support.clock.ClockMock; +import org.elasticsearch.alerts.support.clock.ClockModule; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** + * + */ +public class TimeWarpedAlertsPlugin extends AlertsPlugin { + + public TimeWarpedAlertsPlugin(Settings settings) { + super(settings); + Loggers.getLogger(TimeWarpedAlertsPlugin.class, settings).info("using time warped alerts plugin"); + + } + + @Override + public Collection> modules() { + return ImmutableList.>of(AlertsModule.class); + } + + /** + * + */ + public static class AlertsModule extends org.elasticsearch.alerts.AlertsModule { + + @Override + public Iterable spawnModules() { + List modules = new ArrayList<>(); + for (Module module : super.spawnModules()) { + + if (module instanceof SchedulerModule) { + // replacing scheduler module so we'll + // have control on when it fires a job + modules.add(new MockSchedulerModule()); + + } else if (module instanceof ClockModule) { + // replacing the clock module so we'll be able + // to control time in tests + modules.add(new MockClockModule()); + + } else if (module instanceof HistoryModule) { + // replacing the history module so all the alerts will be + // executed on the same thread as the schedule fire + modules.add(new MockHistoryModule()); + + } else { + modules.add(module); + } + } + return modules; + } + + public static class MockSchedulerModule extends SchedulerModule { + + public MockSchedulerModule() { + super(SchedulerMock.class); + } + + } + + public static class MockClockModule extends ClockModule { + @Override + protected void configure() { + bind(ClockMock.class).asEagerSingleton(); + bind(Clock.class).to(ClockMock.class); + } + } + + public static class MockHistoryModule extends HistoryModule { + + public MockHistoryModule() { + super(SameThreadExecutor.class); + } + + public static class SameThreadExecutor implements AlertsExecutor { + + @Override + public BlockingQueue queue() { + return new ArrayBlockingQueue(1); + } + + @Override + public long largestPoolSize() { + return 1; + } + + @Override + public void execute(Runnable runnable) { + runnable.run(); + } + } + } + } +} diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/AlertMetadataTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/AlertMetadataTests.java index e8e1e9e62ea..d029e03de08 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/AlertMetadataTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/AlertMetadataTests.java @@ -41,15 +41,20 @@ public class AlertMetadataTests extends AbstractAlertsIntegrationTests { metaList.add("test"); metadata.put("baz", metaList); - alertClient().preparePutAlert("1") + alertClient().preparePutAlert("_name") .source(alertSourceBuilder() .schedule(cron("0/5 * * * * ? *")) .input(searchInput(AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(matchAllQuery())))) .condition(scriptCondition("ctx.payload.hits.total == 1")) .metadata(metadata)) .get(); - // Wait for a no action entry to be added. (the condition search request will not match, because there are no docs in my-index) - assertAlertWithNoActionNeeded("1", 1); + + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + } else { + // Wait for a no action entry to be added. (the condition search request will not match, because there are no docs in my-index) + assertAlertWithNoActionNeeded("_name", 1); + } refresh(); SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*") diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/AlertStatsTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/AlertStatsTests.java index 45a6bd6cad2..7441a6f4168 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/AlertStatsTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/AlertStatsTests.java @@ -6,7 +6,9 @@ package org.elasticsearch.alerts.test.integration; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.alerts.*; +import org.elasticsearch.alerts.AlertsBuild; +import org.elasticsearch.alerts.AlertsService; +import org.elasticsearch.alerts.AlertsVersion; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests; import org.elasticsearch.alerts.test.AlertsTestUtils; @@ -14,7 +16,6 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.junit.Test; @@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -41,7 +43,7 @@ public class AlertStatsTests extends AbstractAlertsIntegrationTests { assertThat(response.getAlertManagerStarted(), is(AlertsService.State.STARTED)); assertThat(response.getAlertActionManagerQueueSize(), is(0L)); assertThat(response.getNumberOfRegisteredAlerts(), is(0L)); - assertThat(response.getAlertActionManagerLargestQueueSize(), is(0L)); + assertThat(response.getAlertActionManagerLargestQueueSize(), is(timeWarped() ? 1L : 0L)); assertThat(response.getVersion(), is(AlertsVersion.CURRENT)); assertThat(response.getBuild(), is(AlertsBuild.CURRENT)); } @@ -56,21 +58,24 @@ public class AlertStatsTests extends AbstractAlertsIntegrationTests { assertThat(response.isAlertActionManagerStarted(), is(true)); assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED)); - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); + SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value"))); BytesReference alertSource = createAlertSource("* * * * * ? *", searchRequest, "ctx.payload.hits.total == 1"); - alertClient().preparePutAlert("testAlert") + alertClient().preparePutAlert("_name") .source(alertSource) .get(); - response = alertClient().alertsStats(alertsStatsRequest).actionGet(); + if (timeWarped()) { + timeWarp().scheduler().fire("_name", 30, TimeValue.timeValueSeconds(1)); + } else { + //Wait a little until we should have queued an action + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } - //Wait a little until we should have queued an action - TimeValue waitTime = new TimeValue(30, TimeUnit.SECONDS); - Thread.sleep(waitTime.getMillis()); + response = alertClient().alertsStats(alertsStatsRequest).actionGet(); assertThat(response.isAlertActionManagerStarted(), is(true)); assertThat(response.getAlertManagerStarted(), is(AlertsService.State.STARTED)); assertThat(response.getNumberOfRegisteredAlerts(), is(1L)); - //assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThan(0L)); + assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThan(0L)); } } diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/AlertThrottleTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/AlertThrottleTests.java index 71018dec7af..8370b22b719 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/AlertThrottleTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/AlertThrottleTests.java @@ -5,11 +5,8 @@ */ package org.elasticsearch.alerts.test.integration; -import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.actions.ActionBuilders; import org.elasticsearch.alerts.client.AlertsClient; @@ -19,11 +16,8 @@ import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests; import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; +import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.junit.Assert; import org.junit.Test; import java.util.concurrent.TimeUnit; @@ -31,11 +25,12 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.alerts.client.AlertSourceBuilder.alertSourceBuilder; import static org.elasticsearch.alerts.condition.ConditionBuilders.scriptCondition; import static org.elasticsearch.alerts.input.InputBuilders.searchInput; -import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform; import static org.elasticsearch.alerts.scheduler.schedule.Schedules.cron; +import static org.elasticsearch.alerts.scheduler.schedule.Schedules.interval; import static org.elasticsearch.alerts.test.AlertsTestUtils.matchAllRequest; +import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.hamcrest.Matchers.*; import static org.hamcrest.core.IsEqual.equalTo; @@ -44,145 +39,148 @@ import static org.hamcrest.core.IsEqual.equalTo; public class AlertThrottleTests extends AbstractAlertsIntegrationTests { @Test - public void testAckThrottle() throws Exception{ + public void testAckThrottle() throws Exception { AlertsClient alertsClient = alertClient(); - createIndex("action-index", "test-index"); - ensureGreen("action-index", "test-index"); + createIndex("actions", "events"); + ensureGreen("actions", "events"); - IndexResponse dummyEventIndexResponse = client().prepareIndex("test-index", "test-type").setSource( XContentFactory.jsonBuilder().startObject().field("test_field", "error").endObject()).get(); - assertTrue(dummyEventIndexResponse.isCreated()); + IndexResponse eventIndexResponse = client().prepareIndex("events", "event") + .setSource("level", "error") + .get(); + assertThat(eventIndexResponse.isCreated(), is(true)); refresh(); PutAlertResponse putAlertResponse = alertsClient.preparePutAlert() - .alertName("throttled-alert") + .alertName("_name") .source(alertSourceBuilder() .schedule(cron("0/5 * * * * ? *")) - .input(searchInput(matchAllRequest().indices("test-index"))) + .input(searchInput(matchAllRequest().indices("events"))) .condition(scriptCondition("ctx.payload.hits.total > 0")) - .transform(searchTransform(matchAllRequest().indices("test-index"))) - .addAction(ActionBuilders.indexAction("action-index", "action-type")) - .throttlePeriod(TimeValue.timeValueMillis(0))) + .transform(searchTransform(matchAllRequest().indices("events"))) + .addAction(ActionBuilders.indexAction("actions", "action"))) .get(); - assertTrue(putAlertResponse.indexResponse().isCreated()); + assertThat(putAlertResponse.indexResponse().isCreated(), is(true)); - Thread.sleep(20000); - AckAlertResponse ackResponse = alertsClient.prepareAckAlert("throttled-alert").get(); - Assert.assertEquals(Alert.Status.AckStatus.State.ACKED, ackResponse.getStatus().ackStatus().state()); + if (timeWarped()) { + timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } + AckAlertResponse ackResponse = alertsClient.prepareAckAlert("_name").get(); + assertThat(ackResponse.getStatus().ackStatus().state(), is(Alert.Status.AckStatus.State.ACKED)); refresh(); - SearchResponse searchResponse = client() - .prepareSearch("action-index") - .setTypes("action-type") - .setSearchType(SearchType.COUNT) - .setSource(searchSource().query(matchAllQuery()).buildAsBytes()) - .get(); - long countAfterAck = searchResponse.getHits().getTotalHits(); + long countAfterAck = docCount("actions", "action", matchAllQuery()); assertThat(countAfterAck, greaterThanOrEqualTo((long) 1)); - Thread.sleep(20000); + if (timeWarped()) { + timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } refresh(); - searchResponse = client() - .prepareSearch("action-index") - .setTypes("action-type") - .setSearchType(SearchType.COUNT) - .setSource(searchSource().query(matchAllQuery()).buildAsBytes()) - .get(); - long countAfterSleep = searchResponse.getHits().getTotalHits(); - assertThat("There shouldn't be more entries in the index after we ack the alert", countAfterAck, equalTo(countAfterSleep)); + + // There shouldn't be more actions in the index after we ack the alert, even though the alert was fired + long countAfterPostAckFires = docCount("actions", "action", matchAllQuery()); + assertThat(countAfterPostAckFires, equalTo(countAfterAck)); //Now delete the event and the ack state should change to AWAITS_EXECUTION - DeleteResponse response = client().prepareDelete("test-index", "test-type", dummyEventIndexResponse.getId()).get(); - assertTrue(response.isFound()); + DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get(); + assertThat(response.isFound(), is(true)); + refresh(); - Thread.sleep(20000); - GetAlertResponse getAlertResponse = alertsClient.prepareGetAlert("throttled-alert").get(); - assertTrue(getAlertResponse.getResponse().isExists()); + if (timeWarped()) { + timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5)); + } else { + Thread.sleep(20000); + } - final Alert.Parser alertParser = - internalTestCluster().getInstance(Alert.Parser.class, internalTestCluster().getMasterName()); + GetAlertResponse getAlertResponse = alertsClient.prepareGetAlert("_name").get(); + assertThat(getAlertResponse.getResponse().isExists(), is(true)); - Alert parsedAlert = alertParser.parse(getAlertResponse.getResponse().getId(), true, + Alert parsedAlert = alertParser().parse(getAlertResponse.getResponse().getId(), true, getAlertResponse.getResponse().getSourceAsBytesRef()); - assertThat(parsedAlert.status().ackStatus().state(), equalTo(Alert.Status.AckStatus.State.AWAITS_EXECUTION)); + assertThat(parsedAlert.status().ackStatus().state(), is(Alert.Status.AckStatus.State.AWAITS_EXECUTION)); - CountResponse countOfThrottledActions = client() - .prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id())) - .get(); - assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); + long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null, + matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id())); + assertThat(throttledCount, greaterThan(0L)); } @Test public void testTimeThrottle() throws Exception { AlertsClient alertsClient = alertClient(); - createIndex("action-index", "test-index"); - ensureGreen("action-index", "test-index"); + createIndex("actions", "events"); + ensureGreen("actions", "events"); - IndexResponse dummyEventIndexResponse = client().prepareIndex("test-index", "test-type").setSource(XContentFactory.jsonBuilder().startObject().field("test_field", "error").endObject()).get(); - assertTrue(dummyEventIndexResponse.isCreated()); + IndexResponse eventIndexResponse = client().prepareIndex("events", "event") + .setSource("level", "error") + .get(); + assertTrue(eventIndexResponse.isCreated()); refresh(); PutAlertResponse putAlertResponse = alertsClient.preparePutAlert() - .alertName("throttled-alert") + .alertName("_name") .source(alertSourceBuilder() - .schedule(cron("0/5 * * * * ? *")) - .input(searchInput(matchAllRequest().indices("test-index"))) + .schedule(interval("5s")) + .input(searchInput(matchAllRequest().indices("events"))) .condition(scriptCondition("ctx.payload.hits.total > 0")) - .transform(searchTransform(matchAllRequest().indices("test-index"))) - .addAction(ActionBuilders.indexAction("action-index", "action-type")) + .transform(searchTransform(matchAllRequest().indices("events"))) + .addAction(ActionBuilders.indexAction("actions", "action")) .throttlePeriod(TimeValue.timeValueSeconds(10))) .get(); - assertTrue(putAlertResponse.indexResponse().isCreated()); + assertThat(putAlertResponse.indexResponse().isCreated(), is(true)); - forceFullSleepTime(new TimeValue(5, TimeUnit.SECONDS)); - refresh(); - CountResponse countResponse = client() - .prepareCount("action-index") - .setTypes("action-type") - .setSource(searchSource().query(matchAllQuery()).buildAsBytes()) - .get(); + if (timeWarped()) { + timeWarp().clock().setTime(DateTime.now()); - if (countResponse.getCount() != 1){ - SearchResponse actionResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(matchAllQuery()) - .get(); - for (SearchHit hit : actionResponse.getHits()) { - logger.info("Got action hit [{}]", hit.getSourceRef().toUtf8()); - } - } + timeWarp().scheduler().fire("_name"); + refresh(); - assertThat(countResponse.getCount(), greaterThanOrEqualTo(1L)); - assertThat(countResponse.getCount(), lessThanOrEqualTo(3L)); + // the first fire should work + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); - forceFullSleepTime(new TimeValue(20, TimeUnit.SECONDS)); + timeWarp().clock().fastForwardSeconds(5); + timeWarp().scheduler().fire("_name"); + refresh(); - refresh(); - countResponse = client() - .prepareCount("action-index") - .setTypes("action-type") - .setSource(searchSource().query(matchAllQuery()).buildAsBytes()) - .get(); - assertThat(countResponse.getCount(), greaterThanOrEqualTo(2L)); - assertThat(countResponse.getCount(), lessThanOrEqualTo(4L)); + // the last fire should have been throttled, so number of actions shouldn't change + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + timeWarp().clock().fastForwardSeconds(10); + timeWarp().scheduler().fire("_name"); + refresh(); - CountResponse countOfThrottledActions = client() - .prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id())) - .get(); - assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); - } + // the last fire occurred passed the throttle period, so a new action should have been added + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(2L)); - private void forceFullSleepTime(TimeValue value){ - long start = System.currentTimeMillis(); - while(System.currentTimeMillis() < start + value.getMillis()){ - try{ - Thread.sleep(value.getMillis() - (System.currentTimeMillis() - start)); - } catch (InterruptedException ie) { - logger.error("interrupted", ie); - } + long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null, + matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id())); + assertThat(throttledCount, is(1L)); + + } else { + + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + refresh(); + + // the first fire should work so we should have a single action in the actions index + long actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + + // we should still be within the throttling period... so the number of actions shouldn't change + actionsCount = docCount("actions", "action", matchAllQuery()); + assertThat(actionsCount, is(1L)); + + long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null, + matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id())); + assertThat(throttledCount, greaterThanOrEqualTo(1L)); } } diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/BasicAlertsTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/BasicAlertsTests.java index 6379aefbc9c..fa3eca68077 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/BasicAlertsTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/BasicAlertsTests.java @@ -12,10 +12,7 @@ import org.elasticsearch.alerts.client.AlertSourceBuilder; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.scheduler.schedule.IntervalSchedule; import org.elasticsearch.alerts.support.AlertUtils; -import org.elasticsearch.alerts.support.Variables; import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests; -import org.elasticsearch.alerts.test.AlertsTestUtils; -import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; @@ -25,22 +22,21 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.Test; -import java.util.Locale; - import static org.elasticsearch.alerts.actions.ActionBuilders.indexAction; import static org.elasticsearch.alerts.client.AlertSourceBuilder.alertSourceBuilder; import static org.elasticsearch.alerts.condition.ConditionBuilders.scriptCondition; import static org.elasticsearch.alerts.input.InputBuilders.searchInput; import static org.elasticsearch.alerts.scheduler.schedule.Schedules.cron; import static org.elasticsearch.alerts.scheduler.schedule.Schedules.interval; +import static org.elasticsearch.alerts.support.Variables.*; +import static org.elasticsearch.alerts.test.AlertsTestUtils.newInputSearchRequest; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.FilterBuilders.rangeFilter; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.*; /** */ @@ -49,19 +45,26 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests { @Test public void testIndexAlert() throws Exception { AlertsClient alertsClient = alertClient(); - createIndex("my-index"); + createIndex("idx"); // Have a sample document in the index, the alert is going to evaluate - client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); - alertsClient.preparePutAlert("my-first-alert") + client().prepareIndex("idx", "type").setSource("field", "value").get(); + refresh(); + SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value"))); + alertsClient.preparePutAlert("_name") .source(alertSourceBuilder() .schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)) .input(searchInput(searchRequest)) .condition(scriptCondition("ctx.payload.hits.total == 1"))) .get(); - assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1); - GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("my-first-alert").get(); + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } + + assertAlertWithMinimumPerformedActionsCount("_name", 1); + + GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("_name").get(); assertThat(getAlertResponse.getResponse().isExists(), is(true)); assertThat(getAlertResponse.getResponse().isSourceEmpty(), is(false)); } @@ -69,27 +72,39 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests { @Test public void testIndexAlert_registerAlertBeforeTargetIndex() throws Exception { AlertsClient alertsClient = alertClient(); - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); - alertsClient.preparePutAlert("my-first-alert") + SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value"))); + alertsClient.preparePutAlert("_name") .source(alertSourceBuilder() .schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)) .input(searchInput(searchRequest)) .condition(scriptCondition("ctx.payload.hits.total == 1"))) .get(); + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } + // The alert's condition won't meet because there is no data that matches with the query - assertAlertWithNoActionNeeded("my-first-alert", 1); + assertAlertWithNoActionNeeded("_name", 1); // Index sample doc after we register the alert and the alert's condition should meet - client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); - assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1); + client().prepareIndex("idx", "type").setSource("field", "value").get(); + refresh(); + + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } + + assertAlertWithMinimumPerformedActionsCount("_name", 1); } @Test public void testDeleteAlert() throws Exception { AlertsClient alertsClient = alertClient(); - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(matchAllQuery())); - PutAlertResponse indexResponse = alertsClient.preparePutAlert("my-first-alert") + SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(matchAllQuery())); + PutAlertResponse indexResponse = alertsClient.preparePutAlert("_name") .source(alertSourceBuilder() .schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS)) .input(searchInput(searchRequest)) @@ -97,47 +112,46 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests { .get(); assertThat(indexResponse.indexResponse().isCreated(), is(true)); - // TODO: when MockScheduler can be used this workaround can be removed: - // Although there is no added benefit in this test for waiting for the alert to fire, however - // we need to wait here because of a test timing issue. When we tear down a test we delete the alert and delete all - // indices, but there may still be inflight fired alerts, which may trigger the alert history to be created again, before - // we finished the tear down phase. - assertAlertWithNoActionNeeded("my-first-alert", 1); + if (!timeWarped()) { + // Although there is no added benefit in this test for waiting for the alert to fire, however + // we need to wait here because of a test timing issue. When we tear down a test we delete the alert and delete all + // indices, but there may still be inflight fired alerts, which may trigger the alert history to be created again, before + // we finished the tear down phase. + assertAlertWithNoActionNeeded("_name", 1); + } - DeleteAlertRequest deleteAlertRequest = new DeleteAlertRequest("my-first-alert"); - DeleteAlertResponse deleteAlertResponse = alertsClient.deleteAlert(deleteAlertRequest).actionGet(); - assertNotNull(deleteAlertResponse.deleteResponse()); - assertTrue(deleteAlertResponse.deleteResponse().isFound()); + DeleteAlertResponse deleteAlertResponse = alertsClient.prepareDeleteAlert("_name").get(); + assertThat(deleteAlertResponse.deleteResponse(), notNullValue()); + assertThat(deleteAlertResponse.deleteResponse().isFound(), is(true)); refresh(); assertHitCount(client().prepareCount(AlertsStore.ALERT_INDEX).get(), 0l); // Deleting the same alert for the second time - deleteAlertRequest = new DeleteAlertRequest("my-first-alert"); - deleteAlertResponse = alertsClient.deleteAlert(deleteAlertRequest).actionGet(); - assertNotNull(deleteAlertResponse.deleteResponse()); - assertFalse(deleteAlertResponse.deleteResponse().isFound()); + deleteAlertResponse = alertsClient.prepareDeleteAlert("_name").get(); + assertThat(deleteAlertResponse.deleteResponse(), notNullValue()); + assertThat(deleteAlertResponse.deleteResponse().isFound(), is(false)); } @Test public void testMalformedAlert() throws Exception { AlertsClient alertsClient = alertClient(); - createIndex("my-index"); + createIndex("idx"); // Have a sample document in the index, the alert is going to evaluate - client().prepareIndex("my-index", "my-type").setSource("field", "value").get(); + client().prepareIndex("idx", "type").setSource("field", "value").get(); XContentBuilder alertSource = jsonBuilder(); alertSource.startObject(); - alertSource.field("malformed_field", "x"); + alertSource.field("unknown_field", "x"); alertSource.startObject("schedule").field("cron", "0/5 * * * * ? *").endObject(); alertSource.startObject("condition").startObject("script").field("script", "return true").field("request"); - AlertUtils.writeSearchRequest(AlertsTestUtils.newInputSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS); + AlertUtils.writeSearchRequest(newInputSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS); alertSource.endObject(); alertSource.endObject(); try { - alertsClient.preparePutAlert("my-first-alert") + alertsClient.preparePutAlert("_name") .source(alertSource.bytes()) .get(); fail(); @@ -145,7 +159,7 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests { // In AlertStore we fail parsing if an alert contains undefined fields. } try { - client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, "my-first-alert") + client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, "_name") .setSource(alertSource) .get(); fail(); @@ -156,98 +170,143 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests { @Test public void testModifyAlerts() throws Exception { - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index") + SearchRequest searchRequest = newInputSearchRequest("idx") .source(searchSource().query(matchAllQuery())); AlertSourceBuilder source = alertSourceBuilder() - .schedule(cron("0/5 * * * * ? *")) + .schedule(interval("5s")) .input(searchInput(searchRequest)) - .addAction(indexAction("my-index", "trail")); + .addAction(indexAction("idx", "action")); - - alertClient().preparePutAlert("1") + alertClient().preparePutAlert("_name") .source(source.condition(scriptCondition("ctx.payload.hits.total == 1"))) .get(); - assertAlertWithMinimumPerformedActionsCount("1", 0, false); - alertClient().preparePutAlert("1") + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } + assertAlertWithMinimumPerformedActionsCount("_name", 0, false); + + alertClient().preparePutAlert("_name") .source(source.condition(scriptCondition("ctx.payload.hits.total == 0"))) .get(); - assertAlertWithMinimumPerformedActionsCount("1", 1, false); - alertClient().preparePutAlert("1") - .source(source.schedule(cron("0/5 * * * * ? 2020")).condition(scriptCondition("ctx.payload.hits.total == 0"))) + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } + assertAlertWithMinimumPerformedActionsCount("_name", 1, false); + + alertClient().preparePutAlert("_name") + .source(source.schedule(cron("0/1 * * * * ? 2020")).condition(scriptCondition("ctx.payload.hits.total == 0"))) .get(); - Thread.sleep(5000); - long count = findNumberOfPerformedActions("1"); - Thread.sleep(5000); - assertThat(count, equalTo(findNumberOfPerformedActions("1"))); - } + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } else { + Thread.sleep(1000); + } - private final SearchSourceBuilder searchSourceBuilder = searchSource().query( - filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{" + Variables.SCHEDULED_FIRE_TIME + "}}||-30s").to("{{" + Variables.SCHEDULED_FIRE_TIME + "}}")) - ); + long count = findNumberOfPerformedActions("_name"); + + if (timeWarped()) { + timeWarp().scheduler().fire("_name"); + refresh(); + } else { + Thread.sleep(1000); + } + + assertThat(count, equalTo(findNumberOfPerformedActions("_name"))); + } @Test public void testConditionSearchWithSource() throws Exception { - testConditionSearch( - AlertsTestUtils.newInputSearchRequest("my-index").source(searchSourceBuilder) - ); + String variable = randomFrom(EXECUTION_TIME, SCHEDULED_FIRE_TIME, FIRE_TIME); + SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( + matchQuery("level", "a"), + rangeFilter("_timestamp") + .from("{{" + variable + "}}||-30s") + .to("{{" + variable + "}}"))); + + testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder)); } @Test public void testConditionSearchWithIndexedTemplate() throws Exception { + String variable = randomFrom(EXECUTION_TIME, SCHEDULED_FIRE_TIME, FIRE_TIME); + SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery( + matchQuery("level", "a"), + rangeFilter("_timestamp") + .from("{{" + variable + "}}||-30s") + .to("{{" + variable + "}}"))); + client().preparePutIndexedScript() .setScriptLang("mustache") .setId("my-template") .setSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject()) .get(); - SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index"); + refresh(); + SearchRequest searchRequest = newInputSearchRequest("events"); searchRequest.templateName("my-template"); searchRequest.templateType(ScriptService.ScriptType.INDEXED); testConditionSearch(searchRequest); } private void testConditionSearch(SearchRequest request) throws Exception { - long scheduleTimeInMs = 5000; - String alertName = "red-alert"; - assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true", "event_type", "type=string")); + String alertName = "_name"; + assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string")); alertClient().prepareDeleteAlert(alertName).get(); alertClient().preparePutAlert(alertName) - .source(createAlertSource(String.format(Locale.ROOT, "0/%s * * * * ? *", (scheduleTimeInMs / 1000)), request, "return ctx.payload.hits.total >= 3")) + .source(createAlertSource(interval("5s"), request, "return ctx.payload.hits.total >= 3")) .get(); - long time1 = System.currentTimeMillis(); - client().prepareIndex("my-index", "my-type") + client().prepareIndex("events", "event") .setCreate(true) - .setSource("event_type", "a") + .setSource("level", "a") .get(); - client().prepareIndex("my-index", "my-type") + client().prepareIndex("events", "event") .setCreate(true) - .setSource("event_type", "a") + .setSource("level", "a") .get(); - long timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1); - Thread.sleep(timeLeft); + refresh(); + if (timeWarped()) { + timeWarp().clock().fastForwardSeconds(5); + timeWarp().scheduler().fire(alertName); + refresh(); + } else { + Thread.sleep(5000); + } assertAlertWithNoActionNeeded(alertName, 1); - time1 = System.currentTimeMillis(); - client().prepareIndex("my-index", "my-type") + client().prepareIndex("events", "event") .setCreate(true) - .setSource("event_type", "b") + .setSource("level", "b") .get(); - timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1); - Thread.sleep(timeLeft); + refresh(); + if (timeWarped()) { + timeWarp().clock().fastForwardSeconds(5); + timeWarp().scheduler().fire(alertName); + refresh(); + } else { + Thread.sleep(5000); + } assertAlertWithNoActionNeeded(alertName, 2); - time1 = System.currentTimeMillis(); - client().prepareIndex("my-index", "my-type") + client().prepareIndex("events", "event") .setCreate(true) - .setSource("event_type", "a") + .setSource("level", "a") .get(); - timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1); - Thread.sleep(timeLeft); + refresh(); + if (timeWarped()) { + timeWarp().clock().fastForwardSeconds(5); + timeWarp().scheduler().fire(alertName); + refresh(); + } else { + Thread.sleep(5000); + } assertAlertWithMinimumPerformedActionsCount(alertName, 1); } } diff --git a/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java index fd0bd37458e..229a4233b0d 100644 --- a/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/alerts/test/integration/BootStrapTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.alerts.test.integration; import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.alerts.Alert; @@ -33,7 +32,6 @@ import org.elasticsearch.common.joda.time.DateTimeZone; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.Test; @@ -67,7 +65,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { startAlerting(); AlertsStatsResponse response = alertClient().prepareAlertsStats().get(); - assertTrue(response.isAlertActionManagerStarted()); + assertThat(response.isAlertActionManagerStarted(), is(true)); assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L)); } @@ -78,7 +76,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { ensureAlertingStarted(); AlertsStatsResponse response = alertClient().prepareAlertsStats().get(); - assertTrue(response.isAlertActionManagerStarted()); + assertThat(response.isAlertActionManagerStarted(), is(true)); assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L)); @@ -114,13 +112,13 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { .setConsistencyLevel(WriteConsistencyLevel.ALL) .setSource(jsonBuilder().value(firedAlert)) .get(); - assertTrue(indexResponse.isCreated()); + assertThat(indexResponse.isCreated(), is(true)); stopAlerting(); startAlerting(); response = alertClient().prepareAlertsStats().get(); - assertTrue(response.isAlertActionManagerStarted()); + assertThat(response.isAlertActionManagerStarted(), is(true)); assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L)); assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThanOrEqualTo(1l)); @@ -130,8 +128,8 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { @TestLogging("alerts.actions:DEBUG") public void testBootStrapManyHistoryIndices() throws Exception { DateTime now = new DateTime(DateTimeZone.UTC); - long numberOfAlertHistoryIndices = randomIntBetween(2,8); - long numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5,10); + long numberOfAlertHistoryIndices = randomIntBetween(2, 8); + long numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5, 10); SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value"))); for (int i = 0; i < numberOfAlertHistoryIndices; i++) { @@ -141,10 +139,10 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests { ensureGreen(actionHistoryIndex); logger.info("Created index {}", actionHistoryIndex); - for (int j=0; j