Introducing TimeWarp mode for tests

The idea behind a time warp mode is that while it's enabled the time related constructs in the alerts module are replaced with a mock test friendly version.. so we'll be able to control time and therefore avoid sleeping the threads.

 In time warp mode:

-  The `SchedulerMock` is used to manually fire jobs
-  The `ClockMock` is used to set and fast forward time
-  The alerts are executed on the same thread as the scheduler mock... so we don't have to deal with async nature at all. This is accomplished by the added `AlertsExecutor` abstraction.

By default, the time warp mode is enabled and tests run in it. If a test must not use the time warp mode, it is possible to add `@TimeWarped(false)` annotation to the test and it will then run with the standard scheduler & clock. It is also possible to disable this mode all together by running the tests with `-Dtests.timewarp=false`.

All the updated tests now work in both modes (whether the time warp mode is dis/enabled). This is important as on the server we would like to run the tests outside of this mode as well, but locally we'd like to run them with time warped enabled (so they'll be faster)

Also, cleaned up the tests.. we now only do `assertThat(...)` calls (no `assertTrue` or `assertEquals`... for consistency sake)

Original commit: elastic/x-pack-elasticsearch@11e09f6dea
This commit is contained in:
uboness 2015-03-05 15:12:02 +01:00
parent ffdf23b411
commit 50f4a1c0e3
24 changed files with 685 additions and 284 deletions

View File

@ -31,6 +31,7 @@
<execution.hint.file>.local-${project.version}-execution-hints.log</execution.hint.file>
<tests.rest>false</tests.rest>
<tests.slow>false</tests.slow>
<tests.timewarp>true</tests.timewarp>
</properties>
<dependencies>
@ -400,6 +401,7 @@
<tests.cluster>${tests.cluster}</tests.cluster>
<tests.heap.size>${tests.heap.size}</tests.heap.size>
<tests.filter>${tests.filter}</tests.filter>
<tests.timewarp>${tests.timewarp}</tests.timewarp>
<es.node.local>${env.ES_TEST_LOCAL}</es.node.local>
<es.node.mode>${es.node.mode}</es.node.mode>
<es.logger.level>${es.logger.level}</es.logger.level>

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import java.util.concurrent.BlockingQueue;
/**
*
*/
public interface AlertsExecutor {
BlockingQueue queue();
long largestPoolSize();
void execute(Runnable runnable);
}

View File

@ -11,10 +11,22 @@ import org.elasticsearch.common.inject.AbstractModule;
*/
public class HistoryModule extends AbstractModule {
private final Class<? extends AlertsExecutor> executorClass;
public HistoryModule() {
this(InternalAlertsExecutor.class);
}
protected HistoryModule(Class<? extends AlertsExecutor> executorClass) {
this.executorClass = executorClass;
}
@Override
protected void configure() {
bind(FiredAlert.Parser.class).asEagerSingleton();
bind(HistoryStore.class).asEagerSingleton();
bind(HistoryService.class).asEagerSingleton();
bind(executorClass).asEagerSingleton();
bind(AlertsExecutor.class).to(executorClass);
}
}

View File

@ -24,8 +24,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
@ -38,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class HistoryService extends AbstractComponent {
private final HistoryStore historyStore;
private final ThreadPool threadPool;
private final AlertsExecutor executor;
private final AlertsStore alertsStore;
private final ClusterService clusterService;
private final AlertLockService alertLockService;
@ -48,12 +46,12 @@ public class HistoryService extends AbstractComponent {
private final AtomicInteger initializationRetries = new AtomicInteger();
@Inject
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool,
public HistoryService(Settings settings, HistoryStore historyStore, AlertsExecutor executor,
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler,
ClusterService clusterService, Clock clock) {
super(settings);
this.historyStore = historyStore;
this.threadPool = threadPool;
this.executor = executor;
this.alertsStore = alertsStore;
this.alertLockService = alertLockService;
this.clusterService = clusterService;
@ -67,7 +65,7 @@ public class HistoryService extends AbstractComponent {
return;
}
assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements.";
assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements.";
HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION);
if (!loadResult.succeeded()) {
retry(callback);
@ -87,7 +85,7 @@ public class HistoryService extends AbstractComponent {
// We could also rely on the shutdown in #updateSettings call, but
// this is a forceful shutdown that also interrupts the worker threads in the threadpool
List<Runnable> cancelledTasks = new ArrayList<>();
alertsThreadPool().getQueue().drainTo(cancelledTasks);
executor.queue().drainTo(cancelledTasks);
logger.debug("cancelled [{}] queued tasks", cancelledTasks.size());
logger.debug("stopped history service");
}
@ -98,13 +96,13 @@ public class HistoryService extends AbstractComponent {
}
// TODO: should be removed from the stats api? This is already visible in the thread pool cat api.
public long getQueueSize() {
return alertsThreadPool().getQueue().size();
public long queueSize() {
return executor.queue().size();
}
// TODO: should be removed from the stats api? This is already visible in the thread pool cat api.
public long getLargestQueueSize() {
return alertsThreadPool().getLargestPoolSize();
public long largestQueueSize() {
return executor.largestPoolSize();
}
void fire(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
@ -130,7 +128,7 @@ public class HistoryService extends AbstractComponent {
void executeAsync(FiredAlert firedAlert, Alert alert) {
try {
alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert));
executor.execute(new AlertExecutionTask(firedAlert, alert));
} catch (EsRejectedExecutionException e) {
logger.debug("[{}] failed to execute fired alert", firedAlert.name());
firedAlert.update(FiredAlert.State.FAILED, "failed to run fired alert due to thread pool capacity");
@ -180,10 +178,6 @@ public class HistoryService extends AbstractComponent {
}
}
private EsThreadPoolExecutor alertsThreadPool() {
return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME);
}
private void retry(final Callback<ClusterState> callback) {
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@ -193,7 +187,7 @@ public class HistoryService extends AbstractComponent {
assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time";
clusterService.remove(this);
// We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
executor.execute(new Runnable() {
@Override
public void run() {

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.BlockingQueue;
/**
*
*/
public class InternalAlertsExecutor implements AlertsExecutor {
private final ThreadPool threadPool;
@Inject
public InternalAlertsExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}
@Override
public BlockingQueue queue() {
return executor().getQueue();
}
@Override
public long largestPoolSize() {
return executor().getLargestPoolSize();
}
@Override
public void execute(Runnable runnable) {
executor().execute(runnable);
}
private EsThreadPoolExecutor executor() {
return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME);
}
}

View File

@ -127,7 +127,8 @@ public class SearchInput extends Input<SearchInput.Result> {
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime))
.put(Variables.FIRE_TIME, formatDate(fireTime));
.put(Variables.FIRE_TIME, formatDate(fireTime))
.put(Variables.EXECUTION_TIME, formatDate(executionTime));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());

View File

@ -10,6 +10,7 @@ import org.elasticsearch.alerts.AlertsSettingsException;
import org.elasticsearch.alerts.scheduler.schedule.CronnableSchedule;
import org.elasticsearch.alerts.scheduler.schedule.IntervalSchedule;
import org.elasticsearch.alerts.scheduler.schedule.Schedule;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
@ -31,17 +32,19 @@ public class InternalScheduler extends AbstractComponent implements Scheduler {
// Not happy about it, but otherwise we're stuck with Quartz's SimpleThreadPool
private volatile static ThreadPool threadPool;
private volatile org.quartz.Scheduler scheduler;
private List<Listener> listeners;
private final Clock clock;
private final DateTimeZone defaultTimeZone;
private volatile org.quartz.Scheduler scheduler;
private List<Listener> listeners;
@Inject
public InternalScheduler(Settings settings, ThreadPool threadPool) {
public InternalScheduler(Settings settings, ThreadPool threadPool, Clock clock) {
super(settings);
this.listeners = new CopyOnWriteArrayList<>();
InternalScheduler.threadPool = threadPool;
this.clock = clock;
this.listeners = new CopyOnWriteArrayList<>();
String timeZoneStr = componentSettings.get("time_zone", "UTC");
try {
this.defaultTimeZone = DateTimeZone.forID(timeZoneStr);
@ -65,7 +68,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler {
scheduler.setJobFactory(new SimpleJobFactory());
Map<JobDetail, Set<? extends Trigger>> quartzJobs = new HashMap<>();
for (Job alert : jobs) {
quartzJobs.put(jobDetail(alert.name(), this), createTrigger(alert.schedule(), defaultTimeZone));
quartzJobs.put(jobDetail(alert.name(), this), createTrigger(alert.schedule(), defaultTimeZone, clock));
}
scheduler.scheduleJobs(quartzJobs, false);
scheduler.start();
@ -107,7 +110,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler {
public void add(Job job) {
try {
logger.trace("scheduling [{}] with schedule [{}]", job.name(), job.schedule());
scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(job.schedule(), defaultTimeZone), true);
scheduler.scheduleJob(jobDetail(job.name(), this), createTrigger(job.schedule(), defaultTimeZone, clock), true);
} catch (org.quartz.SchedulerException se) {
logger.error("Failed to schedule job",se);
throw new SchedulerException("Failed to schedule job", se);
@ -122,13 +125,13 @@ public class InternalScheduler extends AbstractComponent implements Scheduler {
}
}
static Set<Trigger> createTrigger(Schedule schedule, DateTimeZone timeZone) {
static Set<Trigger> createTrigger(Schedule schedule, DateTimeZone timeZone, Clock clock) {
HashSet<Trigger> triggers = new HashSet<>();
if (schedule instanceof CronnableSchedule) {
for (String cron : ((CronnableSchedule) schedule).crons()) {
triggers.add(TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(cron).inTimeZone(timeZone.toTimeZone()))
.startNow()
.startAt(clock.now().toDate())
.build());
}
} else {
@ -137,7 +140,7 @@ public class InternalScheduler extends AbstractComponent implements Scheduler {
triggers.add(TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds((int) interval.seconds())
.repeatForever())
.startNow()
.startAt(clock.now().toDate())
.build());
}
return triggers;

View File

@ -17,10 +17,16 @@ import java.util.Map;
*/
public class SchedulerModule extends AbstractModule {
private final Class<? extends Scheduler> schedulerClass;
private final Map<String, Class<? extends Schedule.Parser>> parsers = new HashMap<>();
public void registerSchedule(String type, Class<? extends Schedule.Parser> parser) {
parsers.put(type, parser);
public SchedulerModule() {
this(InternalScheduler.class);
}
protected SchedulerModule(Class<? extends Scheduler> schedulerClass) {
this.schedulerClass = schedulerClass;
}
@Override
@ -48,6 +54,7 @@ public class SchedulerModule extends AbstractModule {
}
bind(ScheduleRegistry.class).asEagerSingleton();
bind(Scheduler.class).to(InternalScheduler.class).asEagerSingleton();
bind(schedulerClass).asEagerSingleton();
bind(Scheduler.class).to(schedulerClass);
}
}

View File

@ -59,9 +59,9 @@ public class TransportAlertsStatsAction extends TransportMasterNodeOperationActi
AlertsStatsResponse statsResponse = new AlertsStatsResponse();
statsResponse.setAlertManagerState(alertsService.state());
statsResponse.setAlertActionManagerStarted(historyService.started());
statsResponse.setAlertActionManagerQueueSize(historyService.getQueueSize());
statsResponse.setAlertActionManagerQueueSize(historyService.queueSize());
statsResponse.setNumberOfRegisteredAlerts(alertsService.getNumberOfAlerts());
statsResponse.setAlertActionManagerLargestQueueSize(historyService.getLargestQueueSize());
statsResponse.setAlertActionManagerLargestQueueSize(historyService.largestQueueSize());
statsResponse.setVersion(AlertsVersion.CURRENT);
statsResponse.setBuild(AlertsBuild.CURRENT);
listener.onResponse(statsResponse);

View File

@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.alerts.test.AlertsTestUtils.mockExecutionContext;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
/**
@ -88,7 +89,7 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests {
.get();
ctx = mockExecutionContext("_name", new Payload.ActionResponse(response));
assertTrue(condition.execute(ctx).met());
assertThat(condition.execute(ctx).met(), is(true));
}
@Test
@ -102,10 +103,10 @@ public class ScriptConditionSearchTests extends AbstractAlertsSingleNodeTests {
SearchResponse response = new SearchResponse(internalSearchResponse, "", 3, 3, 500l, new ShardSearchFailure[0]);
ExecutionContext ctx = mockExecutionContext("_alert_name", new Payload.ActionResponse(response));
assertTrue(condition.execute(ctx).met());
assertThat(condition.execute(ctx).met(), is(true));
hit.score(2f);
when(ctx.payload()).thenReturn(new Payload.ActionResponse(response));
assertFalse(condition.execute(ctx).met());
assertThat(condition.execute(ctx).met(), is(false));
}
}

View File

@ -5,7 +5,10 @@
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertExecution;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.email.EmailAction;
import org.elasticsearch.alerts.actions.webhook.WebhookAction;
import org.elasticsearch.alerts.condition.Condition;
@ -22,25 +25,24 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class FiredAlertTests extends AbstractAlertsIntegrationTests {
@Test
public void testParser() throws Exception {
Alert alert = AlertsTestUtils.createTestAlert("fired_test", scriptService(), httpClient(), noopEmailService(), logger);
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8());
assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8()));
}
@Test
@ -60,9 +62,11 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8());
assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8()));
}
@Test
@ -82,9 +86,11 @@ public class FiredAlertTests extends AbstractAlertsIntegrationTests {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
assertEquals(jsonBuilder.bytes().toUtf8(), jsonBuilder2.bytes().toUtf8());
assertThat(jsonBuilder.bytes().toUtf8(), equalTo(jsonBuilder2.bytes().toUtf8()));
}

View File

@ -20,7 +20,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.junit.Test;
@ -50,12 +49,12 @@ public class HistoryServiceTests extends ElasticsearchTestCase {
when(input.execute(any(ExecutionContext.class))).thenReturn(inputResult);
HistoryStore historyStore = mock(HistoryStore.class);
ThreadPool threadPool = mock(ThreadPool.class);
AlertsExecutor executor = mock(AlertsExecutor.class);
AlertsStore alertsStore = mock(AlertsStore.class);
AlertLockService alertLockService = mock(AlertLockService.class);
Scheduler scheduler = mock(Scheduler.class);
ClusterService clusterService = mock(ClusterService.class);
historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE);
historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, executor, alertsStore, alertLockService, scheduler, clusterService, SystemClock.INSTANCE);
}
@Test

View File

@ -10,6 +10,7 @@ import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.scheduler.schedule.Schedule;
import org.elasticsearch.alerts.scheduler.schedule.support.DayOfWeek;
import org.elasticsearch.alerts.scheduler.schedule.support.WeekTimes;
import org.elasticsearch.alerts.support.clock.SystemClock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -47,7 +48,7 @@ public class InternalSchedulerTests extends ElasticsearchTestCase {
.put("name", "test")
.build();
threadPool = new ThreadPool(settings, null);
scheduler = new InternalScheduler(ImmutableSettings.EMPTY, threadPool);
scheduler = new InternalScheduler(ImmutableSettings.EMPTY, threadPool, SystemClock.INSTANCE);
}
@After

View File

@ -5,7 +5,14 @@
*/
package org.elasticsearch.alerts.scheduler;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.alerts.support.clock.ClockMock;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import java.util.Collection;
import java.util.List;
@ -19,9 +26,16 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public class SchedulerMock implements Scheduler {
private final ESLogger logger;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<String, Job> jobs = new ConcurrentHashMap<>();
private final Clock clock;
@Inject
public SchedulerMock(Settings settings, Clock clock) {
this.logger = Loggers.getLogger(SchedulerMock.class, settings);
this.clock = clock;
}
@Override
public void start(Collection<? extends Job> jobs) {
@ -47,9 +61,31 @@ public class SchedulerMock implements Scheduler {
}
public void fire(String jobName) {
DateTime now = new DateTime();
for (Listener listener : listeners) {
listener.fire(jobName, now ,now);
fire(jobName, 1, null);
}
public void fire(String jobName, int times) {
fire(jobName, times, null);
}
public void fire(String jobName, int times, TimeValue interval) {
for (int i = 0; i < times; i++) {
DateTime now = clock.now();
logger.debug("firing [" + jobName + "] at [" + now + "]");
for (Listener listener : listeners) {
listener.fire(jobName, now, now);
}
if (clock instanceof ClockMock) {
((ClockMock) clock).fastForward(interval == null ? TimeValue.timeValueMillis(10) : interval);
} else {
if (interval != null) {
try {
Thread.sleep(interval.millis());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
}
}

View File

@ -38,16 +38,24 @@ public class ClockMock implements Clock {
return TimeValue.timeValueMillis(new Duration(time, now).getMillis());
}
public void setTime(DateTime now) {
public ClockMock setTime(DateTime now) {
this.now = now;
return this;
}
public void fastForward(TimeValue timeValue) {
setTime(now.plusMillis((int) timeValue.millis()));
public ClockMock fastForward(TimeValue timeValue) {
return setTime(now.plusMillis((int) timeValue.millis()));
}
public void rewind(TimeValue timeValue) {
setTime(now.minusMillis((int) timeValue.millis()));
public ClockMock fastForwardSeconds(int seconds) {
return fastForward(TimeValue.timeValueSeconds(seconds));
}
public ClockMock rewind(TimeValue timeValue) {
return setTime(now.minusMillis((int) timeValue.millis()));
}
public ClockMock rewindSeconds(int seconds) {
return rewind(TimeValue.timeValueSeconds(seconds));
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.actions.email.service.Authentication;
@ -20,7 +21,12 @@ import org.elasticsearch.alerts.actions.webhook.HttpClient;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryStore;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.scheduler.SchedulerMock;
import org.elasticsearch.alerts.scheduler.schedule.Schedule;
import org.elasticsearch.alerts.scheduler.schedule.Schedules;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.clock.ClockMock;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.support.template.Template;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
@ -30,11 +36,13 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.common.netty.util.internal.SystemPropertyUtil;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
@ -60,15 +68,43 @@ import static org.hamcrest.core.IsNot.not;
@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false)
public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegrationTest {
private static final boolean timeWarpEnabled = SystemPropertyUtil.getBoolean("tests.timewarp", true);
private TimeWarp timeWarp;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("scroll.size", randomIntBetween(1, 100))
.put("plugin.types", AlertsPlugin.class.getName())
.put("plugin.types", timeWarped() ? TimeWarpedAlertsPlugin.class.getName() : AlertsPlugin.class.getName())
.build();
}
/**
* @return whether the test suite should run in time warp mode. By default this will be determined globally
* to all test suites based on {@code -Dtests.timewarp} system property (when missing, defaults to
* {@code true}). If a test suite requires to force the mode or force not running under this mode
* this method can be overridden.
*/
protected boolean timeWarped() {
return timeWarpEnabled;
}
@Before
public void setupTimeWarp() throws Exception {
if (timeWarped()) {
timeWarp = new TimeWarp(
internalTestCluster().getInstance(SchedulerMock.class, internalTestCluster().getMasterName()),
internalTestCluster().getInstance(ClockMock.class, internalTestCluster().getMasterName()));
}
}
protected TimeWarp timeWarp() {
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
return timeWarp;
}
public boolean randomizeNumberOfShardsAndReplicas() {
return false;
}
@ -103,6 +139,10 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
stopAlerting();
}
protected long docCount(String index, String type, QueryBuilder query) {
return docCount(index, type, SearchSourceBuilder.searchSource().query(query));
}
protected long docCount(String index, String type, SearchSourceBuilder source) {
SearchRequestBuilder builder = client().prepareSearch(index).setSearchType(SearchType.COUNT);
if (type != null) {
@ -116,12 +156,20 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
return createAlertSource(cron, conditionRequest, conditionScript, null);
}
protected BytesReference createAlertSource(Schedule schedule, SearchRequest conditionRequest, String conditionScript) throws IOException {
return createAlertSource(schedule, conditionRequest, conditionScript, null);
}
protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript, Map<String,Object> metadata) throws IOException {
return createAlertSource(Schedules.cron(cron), conditionRequest, conditionScript, metadata);
}
protected BytesReference createAlertSource(Schedule schedule, SearchRequest conditionRequest, String conditionScript, Map<String,Object> metadata) throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
{
builder.startObject("schedule")
.field("cron", cron)
.field(schedule.type(), schedule)
.endObject();
if (metadata != null) {
@ -162,6 +210,14 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
return builder.bytes();
}
protected Alert.Parser alertParser() {
return internalTestCluster().getInstance(Alert.Parser.class, internalTestCluster().getMasterName());
}
protected Scheduler scheduler() {
return internalTestCluster().getInstance(Scheduler.class, internalTestCluster().getMasterName());
}
protected AlertsClient alertClient() {
return internalTestCluster().getInstance(AlertsClient.class);
}
@ -225,7 +281,7 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.EXECUTED.id())))
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
assertThat("could not find executed fired alert", searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
if (assertConditionMet) {
assertThat((Integer) XContentMapValues.extractValue("alert_execution.input_result.search.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
}
@ -444,4 +500,23 @@ public abstract class AbstractAlertsIntegrationTests extends ElasticsearchIntegr
}
}
protected static class TimeWarp {
protected final SchedulerMock scheduler;
protected final ClockMock clock;
public TimeWarp(SchedulerMock scheduler, ClockMock clock) {
this.scheduler = scheduler;
this.clock = clock;
}
public SchedulerMock scheduler() {
return scheduler;
}
public ClockMock clock() {
return clock;
}
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.test;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.history.AlertsExecutor;
import org.elasticsearch.alerts.history.HistoryModule;
import org.elasticsearch.alerts.scheduler.SchedulerMock;
import org.elasticsearch.alerts.scheduler.SchedulerModule;
import org.elasticsearch.alerts.support.clock.Clock;
import org.elasticsearch.alerts.support.clock.ClockMock;
import org.elasticsearch.alerts.support.clock.ClockModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
*
*/
public class TimeWarpedAlertsPlugin extends AlertsPlugin {
public TimeWarpedAlertsPlugin(Settings settings) {
super(settings);
Loggers.getLogger(TimeWarpedAlertsPlugin.class, settings).info("using time warped alerts plugin");
}
@Override
public Collection<Class<? extends Module>> modules() {
return ImmutableList.<Class<? extends Module>>of(AlertsModule.class);
}
/**
*
*/
public static class AlertsModule extends org.elasticsearch.alerts.AlertsModule {
@Override
public Iterable<? extends Module> spawnModules() {
List<Module> modules = new ArrayList<>();
for (Module module : super.spawnModules()) {
if (module instanceof SchedulerModule) {
// replacing scheduler module so we'll
// have control on when it fires a job
modules.add(new MockSchedulerModule());
} else if (module instanceof ClockModule) {
// replacing the clock module so we'll be able
// to control time in tests
modules.add(new MockClockModule());
} else if (module instanceof HistoryModule) {
// replacing the history module so all the alerts will be
// executed on the same thread as the schedule fire
modules.add(new MockHistoryModule());
} else {
modules.add(module);
}
}
return modules;
}
public static class MockSchedulerModule extends SchedulerModule {
public MockSchedulerModule() {
super(SchedulerMock.class);
}
}
public static class MockClockModule extends ClockModule {
@Override
protected void configure() {
bind(ClockMock.class).asEagerSingleton();
bind(Clock.class).to(ClockMock.class);
}
}
public static class MockHistoryModule extends HistoryModule {
public MockHistoryModule() {
super(SameThreadExecutor.class);
}
public static class SameThreadExecutor implements AlertsExecutor {
@Override
public BlockingQueue queue() {
return new ArrayBlockingQueue(1);
}
@Override
public long largestPoolSize() {
return 1;
}
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}
}
}
}

View File

@ -41,15 +41,20 @@ public class AlertMetadataTests extends AbstractAlertsIntegrationTests {
metaList.add("test");
metadata.put("baz", metaList);
alertClient().preparePutAlert("1")
alertClient().preparePutAlert("_name")
.source(alertSourceBuilder()
.schedule(cron("0/5 * * * * ? *"))
.input(searchInput(AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(matchAllQuery()))))
.condition(scriptCondition("ctx.payload.hits.total == 1"))
.metadata(metadata))
.get();
// Wait for a no action entry to be added. (the condition search request will not match, because there are no docs in my-index)
assertAlertWithNoActionNeeded("1", 1);
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
} else {
// Wait for a no action entry to be added. (the condition search request will not match, because there are no docs in my-index)
assertAlertWithNoActionNeeded("_name", 1);
}
refresh();
SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")

View File

@ -6,7 +6,9 @@
package org.elasticsearch.alerts.test.integration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.AlertsBuild;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.AlertsVersion;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests;
import org.elasticsearch.alerts.test.AlertsTestUtils;
@ -14,7 +16,6 @@ import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsRequest;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
@ -41,7 +43,7 @@ public class AlertStatsTests extends AbstractAlertsIntegrationTests {
assertThat(response.getAlertManagerStarted(), is(AlertsService.State.STARTED));
assertThat(response.getAlertActionManagerQueueSize(), is(0L));
assertThat(response.getNumberOfRegisteredAlerts(), is(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), is(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), is(timeWarped() ? 1L : 0L));
assertThat(response.getVersion(), is(AlertsVersion.CURRENT));
assertThat(response.getBuild(), is(AlertsBuild.CURRENT));
}
@ -56,21 +58,24 @@ public class AlertStatsTests extends AbstractAlertsIntegrationTests {
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("* * * * * ? *", searchRequest, "ctx.payload.hits.total == 1");
alertClient().preparePutAlert("testAlert")
alertClient().preparePutAlert("_name")
.source(alertSource)
.get();
response = alertClient().alertsStats(alertsStatsRequest).actionGet();
if (timeWarped()) {
timeWarp().scheduler().fire("_name", 30, TimeValue.timeValueSeconds(1));
} else {
//Wait a little until we should have queued an action
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
}
//Wait a little until we should have queued an action
TimeValue waitTime = new TimeValue(30, TimeUnit.SECONDS);
Thread.sleep(waitTime.getMillis());
response = alertClient().alertsStats(alertsStatsRequest).actionGet();
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), is(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), is(1L));
//assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThan(0L));
assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThan(0L));
}
}

View File

@ -5,11 +5,8 @@
*/
package org.elasticsearch.alerts.test.integration;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.actions.ActionBuilders;
import org.elasticsearch.alerts.client.AlertsClient;
@ -19,11 +16,8 @@ import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@ -31,11 +25,12 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.alerts.client.AlertSourceBuilder.alertSourceBuilder;
import static org.elasticsearch.alerts.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.alerts.input.InputBuilders.searchInput;
import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.alerts.scheduler.schedule.Schedules.cron;
import static org.elasticsearch.alerts.scheduler.schedule.Schedules.interval;
import static org.elasticsearch.alerts.test.AlertsTestUtils.matchAllRequest;
import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsEqual.equalTo;
@ -44,145 +39,148 @@ import static org.hamcrest.core.IsEqual.equalTo;
public class AlertThrottleTests extends AbstractAlertsIntegrationTests {
@Test
public void testAckThrottle() throws Exception{
public void testAckThrottle() throws Exception {
AlertsClient alertsClient = alertClient();
createIndex("action-index", "test-index");
ensureGreen("action-index", "test-index");
createIndex("actions", "events");
ensureGreen("actions", "events");
IndexResponse dummyEventIndexResponse = client().prepareIndex("test-index", "test-type").setSource( XContentFactory.jsonBuilder().startObject().field("test_field", "error").endObject()).get();
assertTrue(dummyEventIndexResponse.isCreated());
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
.setSource("level", "error")
.get();
assertThat(eventIndexResponse.isCreated(), is(true));
refresh();
PutAlertResponse putAlertResponse = alertsClient.preparePutAlert()
.alertName("throttled-alert")
.alertName("_name")
.source(alertSourceBuilder()
.schedule(cron("0/5 * * * * ? *"))
.input(searchInput(matchAllRequest().indices("test-index")))
.input(searchInput(matchAllRequest().indices("events")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("test-index")))
.addAction(ActionBuilders.indexAction("action-index", "action-type"))
.throttlePeriod(TimeValue.timeValueMillis(0)))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction(ActionBuilders.indexAction("actions", "action")))
.get();
assertTrue(putAlertResponse.indexResponse().isCreated());
assertThat(putAlertResponse.indexResponse().isCreated(), is(true));
Thread.sleep(20000);
AckAlertResponse ackResponse = alertsClient.prepareAckAlert("throttled-alert").get();
Assert.assertEquals(Alert.Status.AckStatus.State.ACKED, ackResponse.getStatus().ackStatus().state());
if (timeWarped()) {
timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5));
} else {
Thread.sleep(20000);
}
AckAlertResponse ackResponse = alertsClient.prepareAckAlert("_name").get();
assertThat(ackResponse.getStatus().ackStatus().state(), is(Alert.Status.AckStatus.State.ACKED));
refresh();
SearchResponse searchResponse = client()
.prepareSearch("action-index")
.setTypes("action-type")
.setSearchType(SearchType.COUNT)
.setSource(searchSource().query(matchAllQuery()).buildAsBytes())
.get();
long countAfterAck = searchResponse.getHits().getTotalHits();
long countAfterAck = docCount("actions", "action", matchAllQuery());
assertThat(countAfterAck, greaterThanOrEqualTo((long) 1));
Thread.sleep(20000);
if (timeWarped()) {
timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5));
} else {
Thread.sleep(20000);
}
refresh();
searchResponse = client()
.prepareSearch("action-index")
.setTypes("action-type")
.setSearchType(SearchType.COUNT)
.setSource(searchSource().query(matchAllQuery()).buildAsBytes())
.get();
long countAfterSleep = searchResponse.getHits().getTotalHits();
assertThat("There shouldn't be more entries in the index after we ack the alert", countAfterAck, equalTo(countAfterSleep));
// There shouldn't be more actions in the index after we ack the alert, even though the alert was fired
long countAfterPostAckFires = docCount("actions", "action", matchAllQuery());
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
//Now delete the event and the ack state should change to AWAITS_EXECUTION
DeleteResponse response = client().prepareDelete("test-index", "test-type", dummyEventIndexResponse.getId()).get();
assertTrue(response.isFound());
DeleteResponse response = client().prepareDelete("events", "event", eventIndexResponse.getId()).get();
assertThat(response.isFound(), is(true));
refresh();
Thread.sleep(20000);
GetAlertResponse getAlertResponse = alertsClient.prepareGetAlert("throttled-alert").get();
assertTrue(getAlertResponse.getResponse().isExists());
if (timeWarped()) {
timeWarp().scheduler().fire("_name", 4, TimeValue.timeValueSeconds(5));
} else {
Thread.sleep(20000);
}
final Alert.Parser alertParser =
internalTestCluster().getInstance(Alert.Parser.class, internalTestCluster().getMasterName());
GetAlertResponse getAlertResponse = alertsClient.prepareGetAlert("_name").get();
assertThat(getAlertResponse.getResponse().isExists(), is(true));
Alert parsedAlert = alertParser.parse(getAlertResponse.getResponse().getId(), true,
Alert parsedAlert = alertParser().parse(getAlertResponse.getResponse().getId(), true,
getAlertResponse.getResponse().getSourceAsBytesRef());
assertThat(parsedAlert.status().ackStatus().state(), equalTo(Alert.Status.AckStatus.State.AWAITS_EXECUTION));
assertThat(parsedAlert.status().ackStatus().state(), is(Alert.Status.AckStatus.State.AWAITS_EXECUTION));
CountResponse countOfThrottledActions = client()
.prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id()))
.get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null,
matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id()));
assertThat(throttledCount, greaterThan(0L));
}
@Test
public void testTimeThrottle() throws Exception {
AlertsClient alertsClient = alertClient();
createIndex("action-index", "test-index");
ensureGreen("action-index", "test-index");
createIndex("actions", "events");
ensureGreen("actions", "events");
IndexResponse dummyEventIndexResponse = client().prepareIndex("test-index", "test-type").setSource(XContentFactory.jsonBuilder().startObject().field("test_field", "error").endObject()).get();
assertTrue(dummyEventIndexResponse.isCreated());
IndexResponse eventIndexResponse = client().prepareIndex("events", "event")
.setSource("level", "error")
.get();
assertTrue(eventIndexResponse.isCreated());
refresh();
PutAlertResponse putAlertResponse = alertsClient.preparePutAlert()
.alertName("throttled-alert")
.alertName("_name")
.source(alertSourceBuilder()
.schedule(cron("0/5 * * * * ? *"))
.input(searchInput(matchAllRequest().indices("test-index")))
.schedule(interval("5s"))
.input(searchInput(matchAllRequest().indices("events")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("test-index")))
.addAction(ActionBuilders.indexAction("action-index", "action-type"))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction(ActionBuilders.indexAction("actions", "action"))
.throttlePeriod(TimeValue.timeValueSeconds(10)))
.get();
assertTrue(putAlertResponse.indexResponse().isCreated());
assertThat(putAlertResponse.indexResponse().isCreated(), is(true));
forceFullSleepTime(new TimeValue(5, TimeUnit.SECONDS));
refresh();
CountResponse countResponse = client()
.prepareCount("action-index")
.setTypes("action-type")
.setSource(searchSource().query(matchAllQuery()).buildAsBytes())
.get();
if (timeWarped()) {
timeWarp().clock().setTime(DateTime.now());
if (countResponse.getCount() != 1){
SearchResponse actionResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(matchAllQuery())
.get();
for (SearchHit hit : actionResponse.getHits()) {
logger.info("Got action hit [{}]", hit.getSourceRef().toUtf8());
}
}
timeWarp().scheduler().fire("_name");
refresh();
assertThat(countResponse.getCount(), greaterThanOrEqualTo(1L));
assertThat(countResponse.getCount(), lessThanOrEqualTo(3L));
// the first fire should work
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
forceFullSleepTime(new TimeValue(20, TimeUnit.SECONDS));
timeWarp().clock().fastForwardSeconds(5);
timeWarp().scheduler().fire("_name");
refresh();
refresh();
countResponse = client()
.prepareCount("action-index")
.setTypes("action-type")
.setSource(searchSource().query(matchAllQuery()).buildAsBytes())
.get();
assertThat(countResponse.getCount(), greaterThanOrEqualTo(2L));
assertThat(countResponse.getCount(), lessThanOrEqualTo(4L));
// the last fire should have been throttled, so number of actions shouldn't change
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
timeWarp().clock().fastForwardSeconds(10);
timeWarp().scheduler().fire("_name");
refresh();
CountResponse countOfThrottledActions = client()
.prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id()))
.get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
}
// the last fire occurred passed the throttle period, so a new action should have been added
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(2L));
private void forceFullSleepTime(TimeValue value){
long start = System.currentTimeMillis();
while(System.currentTimeMillis() < start + value.getMillis()){
try{
Thread.sleep(value.getMillis() - (System.currentTimeMillis() - start));
} catch (InterruptedException ie) {
logger.error("interrupted", ie);
}
long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null,
matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id()));
assertThat(throttledCount, is(1L));
} else {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
refresh();
// the first fire should work so we should have a single action in the actions index
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
// we should still be within the throttling period... so the number of actions shouldn't change
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
long throttledCount = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", null,
matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.id()));
assertThat(throttledCount, greaterThanOrEqualTo(1L));
}
}

View File

@ -12,10 +12,7 @@ import org.elasticsearch.alerts.client.AlertSourceBuilder;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.scheduler.schedule.IntervalSchedule;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.test.AbstractAlertsIntegrationTests;
import org.elasticsearch.alerts.test.AlertsTestUtils;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
@ -25,22 +22,21 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import java.util.Locale;
import static org.elasticsearch.alerts.actions.ActionBuilders.indexAction;
import static org.elasticsearch.alerts.client.AlertSourceBuilder.alertSourceBuilder;
import static org.elasticsearch.alerts.condition.ConditionBuilders.scriptCondition;
import static org.elasticsearch.alerts.input.InputBuilders.searchInput;
import static org.elasticsearch.alerts.scheduler.schedule.Schedules.cron;
import static org.elasticsearch.alerts.scheduler.schedule.Schedules.interval;
import static org.elasticsearch.alerts.support.Variables.*;
import static org.elasticsearch.alerts.test.AlertsTestUtils.newInputSearchRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
/**
*/
@ -49,19 +45,26 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests {
@Test
public void testIndexAlert() throws Exception {
AlertsClient alertsClient = alertClient();
createIndex("my-index");
createIndex("idx");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
alertsClient.preparePutAlert("my-first-alert")
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
alertsClient.preparePutAlert("_name")
.source(alertSourceBuilder()
.schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))
.input(searchInput(searchRequest))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("my-first-alert").get();
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
}
assertAlertWithMinimumPerformedActionsCount("_name", 1);
GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("_name").get();
assertThat(getAlertResponse.getResponse().isExists(), is(true));
assertThat(getAlertResponse.getResponse().isSourceEmpty(), is(false));
}
@ -69,27 +72,39 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests {
@Test
public void testIndexAlert_registerAlertBeforeTargetIndex() throws Exception {
AlertsClient alertsClient = alertClient();
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
alertsClient.preparePutAlert("my-first-alert")
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(termQuery("field", "value")));
alertsClient.preparePutAlert("_name")
.source(alertSourceBuilder()
.schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))
.input(searchInput(searchRequest))
.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
}
// The alert's condition won't meet because there is no data that matches with the query
assertAlertWithNoActionNeeded("my-first-alert", 1);
assertAlertWithNoActionNeeded("_name", 1);
// Index sample doc after we register the alert and the alert's condition should meet
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
client().prepareIndex("idx", "type").setSource("field", "value").get();
refresh();
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
}
assertAlertWithMinimumPerformedActionsCount("_name", 1);
}
@Test
public void testDeleteAlert() throws Exception {
AlertsClient alertsClient = alertClient();
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
PutAlertResponse indexResponse = alertsClient.preparePutAlert("my-first-alert")
SearchRequest searchRequest = newInputSearchRequest("idx").source(searchSource().query(matchAllQuery()));
PutAlertResponse indexResponse = alertsClient.preparePutAlert("_name")
.source(alertSourceBuilder()
.schedule(interval(5, IntervalSchedule.Interval.Unit.SECONDS))
.input(searchInput(searchRequest))
@ -97,47 +112,46 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests {
.get();
assertThat(indexResponse.indexResponse().isCreated(), is(true));
// TODO: when MockScheduler can be used this workaround can be removed:
// Although there is no added benefit in this test for waiting for the alert to fire, however
// we need to wait here because of a test timing issue. When we tear down a test we delete the alert and delete all
// indices, but there may still be inflight fired alerts, which may trigger the alert history to be created again, before
// we finished the tear down phase.
assertAlertWithNoActionNeeded("my-first-alert", 1);
if (!timeWarped()) {
// Although there is no added benefit in this test for waiting for the alert to fire, however
// we need to wait here because of a test timing issue. When we tear down a test we delete the alert and delete all
// indices, but there may still be inflight fired alerts, which may trigger the alert history to be created again, before
// we finished the tear down phase.
assertAlertWithNoActionNeeded("_name", 1);
}
DeleteAlertRequest deleteAlertRequest = new DeleteAlertRequest("my-first-alert");
DeleteAlertResponse deleteAlertResponse = alertsClient.deleteAlert(deleteAlertRequest).actionGet();
assertNotNull(deleteAlertResponse.deleteResponse());
assertTrue(deleteAlertResponse.deleteResponse().isFound());
DeleteAlertResponse deleteAlertResponse = alertsClient.prepareDeleteAlert("_name").get();
assertThat(deleteAlertResponse.deleteResponse(), notNullValue());
assertThat(deleteAlertResponse.deleteResponse().isFound(), is(true));
refresh();
assertHitCount(client().prepareCount(AlertsStore.ALERT_INDEX).get(), 0l);
// Deleting the same alert for the second time
deleteAlertRequest = new DeleteAlertRequest("my-first-alert");
deleteAlertResponse = alertsClient.deleteAlert(deleteAlertRequest).actionGet();
assertNotNull(deleteAlertResponse.deleteResponse());
assertFalse(deleteAlertResponse.deleteResponse().isFound());
deleteAlertResponse = alertsClient.prepareDeleteAlert("_name").get();
assertThat(deleteAlertResponse.deleteResponse(), notNullValue());
assertThat(deleteAlertResponse.deleteResponse().isFound(), is(false));
}
@Test
public void testMalformedAlert() throws Exception {
AlertsClient alertsClient = alertClient();
createIndex("my-index");
createIndex("idx");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
client().prepareIndex("idx", "type").setSource("field", "value").get();
XContentBuilder alertSource = jsonBuilder();
alertSource.startObject();
alertSource.field("malformed_field", "x");
alertSource.field("unknown_field", "x");
alertSource.startObject("schedule").field("cron", "0/5 * * * * ? *").endObject();
alertSource.startObject("condition").startObject("script").field("script", "return true").field("request");
AlertUtils.writeSearchRequest(AlertsTestUtils.newInputSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS);
AlertUtils.writeSearchRequest(newInputSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS);
alertSource.endObject();
alertSource.endObject();
try {
alertsClient.preparePutAlert("my-first-alert")
alertsClient.preparePutAlert("_name")
.source(alertSource.bytes())
.get();
fail();
@ -145,7 +159,7 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests {
// In AlertStore we fail parsing if an alert contains undefined fields.
}
try {
client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, "my-first-alert")
client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, "_name")
.setSource(alertSource)
.get();
fail();
@ -156,98 +170,143 @@ public class BasicAlertsTests extends AbstractAlertsIntegrationTests {
@Test
public void testModifyAlerts() throws Exception {
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index")
SearchRequest searchRequest = newInputSearchRequest("idx")
.source(searchSource().query(matchAllQuery()));
AlertSourceBuilder source = alertSourceBuilder()
.schedule(cron("0/5 * * * * ? *"))
.schedule(interval("5s"))
.input(searchInput(searchRequest))
.addAction(indexAction("my-index", "trail"));
.addAction(indexAction("idx", "action"));
alertClient().preparePutAlert("1")
alertClient().preparePutAlert("_name")
.source(source.condition(scriptCondition("ctx.payload.hits.total == 1")))
.get();
assertAlertWithMinimumPerformedActionsCount("1", 0, false);
alertClient().preparePutAlert("1")
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
}
assertAlertWithMinimumPerformedActionsCount("_name", 0, false);
alertClient().preparePutAlert("_name")
.source(source.condition(scriptCondition("ctx.payload.hits.total == 0")))
.get();
assertAlertWithMinimumPerformedActionsCount("1", 1, false);
alertClient().preparePutAlert("1")
.source(source.schedule(cron("0/5 * * * * ? 2020")).condition(scriptCondition("ctx.payload.hits.total == 0")))
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
}
assertAlertWithMinimumPerformedActionsCount("_name", 1, false);
alertClient().preparePutAlert("_name")
.source(source.schedule(cron("0/1 * * * * ? 2020")).condition(scriptCondition("ctx.payload.hits.total == 0")))
.get();
Thread.sleep(5000);
long count = findNumberOfPerformedActions("1");
Thread.sleep(5000);
assertThat(count, equalTo(findNumberOfPerformedActions("1")));
}
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
} else {
Thread.sleep(1000);
}
private final SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{" + Variables.SCHEDULED_FIRE_TIME + "}}||-30s").to("{{" + Variables.SCHEDULED_FIRE_TIME + "}}"))
);
long count = findNumberOfPerformedActions("_name");
if (timeWarped()) {
timeWarp().scheduler().fire("_name");
refresh();
} else {
Thread.sleep(1000);
}
assertThat(count, equalTo(findNumberOfPerformedActions("_name")));
}
@Test
public void testConditionSearchWithSource() throws Exception {
testConditionSearch(
AlertsTestUtils.newInputSearchRequest("my-index").source(searchSourceBuilder)
);
String variable = randomFrom(EXECUTION_TIME, SCHEDULED_FIRE_TIME, FIRE_TIME);
SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery(
matchQuery("level", "a"),
rangeFilter("_timestamp")
.from("{{" + variable + "}}||-30s")
.to("{{" + variable + "}}")));
testConditionSearch(newInputSearchRequest("events").source(searchSourceBuilder));
}
@Test
public void testConditionSearchWithIndexedTemplate() throws Exception {
String variable = randomFrom(EXECUTION_TIME, SCHEDULED_FIRE_TIME, FIRE_TIME);
SearchSourceBuilder searchSourceBuilder = searchSource().query(filteredQuery(
matchQuery("level", "a"),
rangeFilter("_timestamp")
.from("{{" + variable + "}}||-30s")
.to("{{" + variable + "}}")));
client().preparePutIndexedScript()
.setScriptLang("mustache")
.setId("my-template")
.setSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject())
.get();
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index");
refresh();
SearchRequest searchRequest = newInputSearchRequest("events");
searchRequest.templateName("my-template");
searchRequest.templateType(ScriptService.ScriptType.INDEXED);
testConditionSearch(searchRequest);
}
private void testConditionSearch(SearchRequest request) throws Exception {
long scheduleTimeInMs = 5000;
String alertName = "red-alert";
assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true", "event_type", "type=string"));
String alertName = "_name";
assertAcked(prepareCreate("events").addMapping("event", "_timestamp", "enabled=true", "level", "type=string"));
alertClient().prepareDeleteAlert(alertName).get();
alertClient().preparePutAlert(alertName)
.source(createAlertSource(String.format(Locale.ROOT, "0/%s * * * * ? *", (scheduleTimeInMs / 1000)), request, "return ctx.payload.hits.total >= 3"))
.source(createAlertSource(interval("5s"), request, "return ctx.payload.hits.total >= 3"))
.get();
long time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
client().prepareIndex("events", "event")
.setCreate(true)
.setSource("event_type", "a")
.setSource("level", "a")
.get();
client().prepareIndex("my-index", "my-type")
client().prepareIndex("events", "event")
.setCreate(true)
.setSource("event_type", "a")
.setSource("level", "a")
.get();
long timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
refresh();
if (timeWarped()) {
timeWarp().clock().fastForwardSeconds(5);
timeWarp().scheduler().fire(alertName);
refresh();
} else {
Thread.sleep(5000);
}
assertAlertWithNoActionNeeded(alertName, 1);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
client().prepareIndex("events", "event")
.setCreate(true)
.setSource("event_type", "b")
.setSource("level", "b")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
refresh();
if (timeWarped()) {
timeWarp().clock().fastForwardSeconds(5);
timeWarp().scheduler().fire(alertName);
refresh();
} else {
Thread.sleep(5000);
}
assertAlertWithNoActionNeeded(alertName, 2);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
client().prepareIndex("events", "event")
.setCreate(true)
.setSource("event_type", "a")
.setSource("level", "a")
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
refresh();
if (timeWarped()) {
timeWarp().clock().fastForwardSeconds(5);
timeWarp().scheduler().fire(alertName);
refresh();
} else {
Thread.sleep(5000);
}
assertAlertWithMinimumPerformedActionsCount(alertName, 1);
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.alerts.test.integration;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.Alert;
@ -33,7 +32,6 @@ import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
@ -67,7 +65,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
startAlerting();
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
}
@ -78,7 +76,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
ensureAlertingStarted();
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
@ -114,13 +112,13 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(jsonBuilder().value(firedAlert))
.get();
assertTrue(indexResponse.isCreated());
assertThat(indexResponse.isCreated(), is(true));
stopAlerting();
startAlerting();
response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
assertThat(response.getAlertActionManagerLargestQueueSize(), greaterThanOrEqualTo(1l));
@ -130,8 +128,8 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
@TestLogging("alerts.actions:DEBUG")
public void testBootStrapManyHistoryIndices() throws Exception {
DateTime now = new DateTime(DateTimeZone.UTC);
long numberOfAlertHistoryIndices = randomIntBetween(2,8);
long numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5,10);
long numberOfAlertHistoryIndices = randomIntBetween(2, 8);
long numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5, 10);
SearchRequest searchRequest = AlertsTestUtils.newInputSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
for (int i = 0; i < numberOfAlertHistoryIndices; i++) {
@ -141,10 +139,10 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
ensureGreen(actionHistoryIndex);
logger.info("Created index {}", actionHistoryIndex);
for (int j=0; j<numberOfAlertHistoryEntriesPerIndex; ++j){
for (int j = 0; j < numberOfAlertHistoryEntriesPerIndex; j++) {
Alert alert = new Alert(
"action-test-"+ i + " " + j,
"action-test-" + i + " " + j,
SystemClock.INSTANCE,
new CronSchedule("0/5 * * * * ? 2035"), //Set a cron schedule far into the future so this alert is never scheduled
new SearchInput(logger, scriptService(), ClientProxy.of(client()),
@ -159,7 +157,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
PutAlertResponse putAlertResponse = alertClient().preparePutAlert(alert.name()).source(jsonBuilder.bytes()).get();
assertTrue(putAlertResponse.indexResponse().isCreated());
assertThat(putAlertResponse.indexResponse().isCreated(), is(true));
FiredAlert firedAlert = new FiredAlert(alert, historyIndexDate, historyIndexDate);
@ -170,7 +168,7 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(jsonBuilder2.bytes())
.get();
assertTrue(indexResponse.isCreated());
assertThat(indexResponse.isCreated(), is(true));
}
client().admin().indices().prepareRefresh(actionHistoryIndex).get();
}
@ -179,18 +177,16 @@ public class BootStrapTests extends AbstractAlertsIntegrationTests {
startAlerting();
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.isAlertActionManagerStarted(), is(true));
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
final long totalHistoryEntries = numberOfAlertHistoryEntriesPerIndex * numberOfAlertHistoryIndices ;
final long totalHistoryEntries = numberOfAlertHistoryEntriesPerIndex * numberOfAlertHistoryIndices;
assertBusy(new Runnable() {
@Override
public void run() {
CountResponse countResponse = client().prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setTypes(HistoryStore.ALERT_HISTORY_TYPE)
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.EXECUTED.id())).get();
assertEquals(totalHistoryEntries, countResponse.getCount());
long count = docCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*", HistoryStore.ALERT_HISTORY_TYPE,
termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.EXECUTED.id()));
assertThat(count, is(totalHistoryEntries));
}
}, 30, TimeUnit.SECONDS);

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerts.test.integration;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.AlertsService;
@ -22,7 +23,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -39,11 +39,17 @@ import static org.hamcrest.core.Is.is;
/**
*/
@Slow
@ClusterScope(scope = TEST, numClientNodes = 0, transportClientRatio = 0, randomDynamicTemplates = false, numDataNodes = 0)
public class NoMasterNodeTests extends AbstractAlertsIntegrationTests {
private ClusterDiscoveryConfiguration.UnicastZen config;
@Override
protected boolean timeWarped() {
return false;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings settings = super.nodeSettings(nodeOrdinal);

View File

@ -22,11 +22,11 @@ import java.util.Map;
import static org.elasticsearch.alerts.actions.ActionBuilders.indexAction;
import static org.elasticsearch.alerts.client.AlertSourceBuilder.alertSourceBuilder;
import static org.elasticsearch.alerts.input.InputBuilders.searchInput;
import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.alerts.scheduler.schedule.Schedules.interval;
import static org.elasticsearch.alerts.transform.TransformBuilders.searchTransform;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.*;
/**
*/
@ -37,7 +37,7 @@ public class TransformSearchTests extends AbstractAlertsIntegrationTests {
createIndex("my-condition-index", "my-payload-index", "my-payload-output");
ensureGreen("my-condition-index", "my-payload-index", "my-payload-output");
index("my-payload-index","payload", "mytestresult");
index("my-payload-index", "payload", "mytestresult");
refresh();
SearchRequest inputRequest = AlertsTestUtils.newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
@ -57,7 +57,12 @@ public class TransformSearchTests extends AbstractAlertsIntegrationTests {
.metadata(metadata)
.throttlePeriod(TimeValue.timeValueSeconds(0)))
.get();
assertTrue(putAlertResponse.indexResponse().isCreated());
assertThat(putAlertResponse.indexResponse().isCreated(), is(true));
if (timeWarped()) {
timeWarp().scheduler().fire("test-payload");
refresh();
}
assertAlertWithMinimumPerformedActionsCount("test-payload", 1, false);
refresh();
@ -69,6 +74,6 @@ public class TransformSearchTests extends AbstractAlertsIntegrationTests {
SearchHit hit = searchResponse.getHits().getHits()[0];
String source = hit.getSourceRef().toUtf8();
assertTrue(source.contains("mytestresult"));
assertThat(source, containsString("mytestresult"));
}
}