mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-02 00:49:11 +00:00
Added tests for HistoryService#execute(...)
Original commit: elastic/x-pack-elasticsearch@b5c5fcf7f3
This commit is contained in:
parent
a2b71a94b7
commit
803fa4c4df
@ -111,10 +111,21 @@ public class HistoryService extends AbstractComponent {
|
||||
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime);
|
||||
logger.debug("adding fired alert [{}]", alert.name());
|
||||
historyStore.put(firedAlert);
|
||||
execute(firedAlert, alert);
|
||||
executeAsync(firedAlert, alert);
|
||||
}
|
||||
|
||||
void execute(FiredAlert firedAlert, Alert alert) {
|
||||
/*
|
||||
The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert
|
||||
has fired and an execute part which actually executed the alert.
|
||||
|
||||
The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we
|
||||
would not split the execution of an alert and many alerts fire in a small window of time then it can happen that
|
||||
thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never
|
||||
know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance
|
||||
we lose fired jobs signficantly.
|
||||
*/
|
||||
|
||||
void executeAsync(FiredAlert firedAlert, Alert alert) {
|
||||
try {
|
||||
alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert));
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
@ -124,6 +135,29 @@ public class HistoryService extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
AlertExecution execute(ExecutionContext ctx) throws IOException {
|
||||
Alert alert = ctx.alert();
|
||||
Input.Result inputResult = alert.input().execute(ctx);
|
||||
ctx.onInputResult(inputResult);
|
||||
Condition.Result conditionResult = alert.condition().execute(ctx);
|
||||
ctx.onConditionResult(conditionResult);
|
||||
|
||||
if (conditionResult.met()) {
|
||||
Throttler.Result throttleResult = alert.throttler().throttle(ctx);
|
||||
ctx.onThrottleResult(throttleResult);
|
||||
|
||||
if (!throttleResult.throttle()) {
|
||||
Transform.Result result = alert.transform().apply(ctx, inputResult.payload());
|
||||
ctx.onTransformResult(result);
|
||||
for (Action action : alert.actions()) {
|
||||
Action.Result actionResult = action.execute(ctx, result.payload());
|
||||
ctx.onActionResult(actionResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.finish();
|
||||
}
|
||||
|
||||
void executePreviouslyFiredAlerts(HistoryStore.LoadResult loadResult) {
|
||||
if (loadResult != null) {
|
||||
int counter = 0;
|
||||
@ -133,7 +167,7 @@ public class HistoryService extends AbstractComponent {
|
||||
logger.warn("unable to find alert [{}] in alert store, perhaps it has been deleted. skipping...", firedAlert.name());
|
||||
continue;
|
||||
}
|
||||
execute(firedAlert, alert);
|
||||
executeAsync(firedAlert, alert);
|
||||
counter++;
|
||||
}
|
||||
logger.debug("executed [{}] not executed previous fired alerts from the alert history index ", counter);
|
||||
@ -212,39 +246,6 @@ public class HistoryService extends AbstractComponent {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert
|
||||
has fired and an execute part which actually executed the alert.
|
||||
|
||||
The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we
|
||||
would not split the execution of an alert and many alerts fire in a small window of time then it can happen that
|
||||
thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never
|
||||
know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance
|
||||
we lose fired jobs signficantly.
|
||||
*/
|
||||
|
||||
AlertExecution execute(ExecutionContext ctx) throws IOException {
|
||||
|
||||
Input.Result inputResult = alert.input().execute(ctx);
|
||||
ctx.onInputResult(inputResult);
|
||||
Condition.Result conditionResult = alert.condition().execute(ctx);
|
||||
ctx.onConditionResult(conditionResult);
|
||||
|
||||
if (conditionResult.met()) {
|
||||
Throttler.Result throttleResult = alert.throttler().throttle(ctx);
|
||||
ctx.onThrottleResult(throttleResult);
|
||||
|
||||
if (!throttleResult.throttle()) {
|
||||
Transform.Result result = alert.transform().apply(ctx, inputResult.payload());
|
||||
ctx.onTransformResult(result);
|
||||
for (Action action : alert.actions()) {
|
||||
Action.Result actionResult = action.execute(ctx, result.payload());
|
||||
ctx.onActionResult(actionResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.finish();
|
||||
}
|
||||
}
|
||||
|
||||
private class SchedulerListener implements Scheduler.Listener {
|
||||
|
@ -0,0 +1,187 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.alerts.history;
|
||||
|
||||
import org.elasticsearch.alerts.*;
|
||||
import org.elasticsearch.alerts.actions.Action;
|
||||
import org.elasticsearch.alerts.actions.Actions;
|
||||
import org.elasticsearch.alerts.condition.Condition;
|
||||
import org.elasticsearch.alerts.condition.simple.AlwaysFalseCondition;
|
||||
import org.elasticsearch.alerts.condition.simple.AlwaysTrueCondition;
|
||||
import org.elasticsearch.alerts.input.Input;
|
||||
import org.elasticsearch.alerts.scheduler.Scheduler;
|
||||
import org.elasticsearch.alerts.throttle.Throttler;
|
||||
import org.elasticsearch.alerts.transform.Transform;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HistoryServiceTests extends ElasticsearchTestCase {
|
||||
|
||||
private Payload payload;
|
||||
private Input input;
|
||||
|
||||
private HistoryService historyService;
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
payload = mock(Payload.class);
|
||||
input = mock(Input.class);
|
||||
Input.Result inputResult = mock(Input.Result.class);
|
||||
when(inputResult.payload()).thenReturn(payload);
|
||||
when(input.execute(any(ExecutionContext.class))).thenReturn(inputResult);
|
||||
|
||||
HistoryStore historyStore = mock(HistoryStore.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
AlertsStore alertsStore = mock(AlertsStore.class);
|
||||
AlertLockService alertLockService = mock(AlertLockService.class);
|
||||
Scheduler scheduler = mock(Scheduler.class);
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
historyService = new HistoryService(ImmutableSettings.EMPTY, historyStore, threadPool, alertsStore, alertLockService, scheduler, clusterService);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecute() throws Exception {
|
||||
Condition.Result conditionResult = AlwaysTrueCondition.RESULT;
|
||||
Throttler.Result throttleResult = Throttler.Result.NO;
|
||||
Transform.Result transformResult = mock(Transform.Result.class);
|
||||
when(transformResult.payload()).thenReturn(payload);
|
||||
Action.Result actionResult = mock(Action.Result.class);
|
||||
when(actionResult.type()).thenReturn("actionResult");
|
||||
|
||||
Condition condition = mock(Condition.class);
|
||||
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
|
||||
Throttler throttler = mock(Throttler.class);
|
||||
when(throttler.throttle(any(ExecutionContext.class))).thenReturn(throttleResult);
|
||||
Transform transform = mock(Transform.class);
|
||||
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
|
||||
Action action = mock(Action.class);
|
||||
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult);
|
||||
Actions actions = new Actions(Arrays.asList(action));
|
||||
|
||||
Alert.Status alertStatus = new Alert.Status();
|
||||
Alert alert = mock(Alert.class);
|
||||
when(alert.input()).thenReturn(input);
|
||||
when(alert.condition()).thenReturn(condition);
|
||||
when(alert.throttler()).thenReturn(throttler);
|
||||
when(alert.transform()).thenReturn(transform);
|
||||
when(alert.actions()).thenReturn(actions);
|
||||
when(alert.status()).thenReturn(alertStatus);
|
||||
|
||||
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
|
||||
AlertExecution alertExecution = historyService.execute(context);
|
||||
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
|
||||
assertThat(alertExecution.payload(), sameInstance(payload));
|
||||
assertThat(alertExecution.throttleResult(), sameInstance(throttleResult));
|
||||
assertThat(alertExecution.actionsResults().get("actionResult"), sameInstance(actionResult));
|
||||
|
||||
verify(condition, times(1)).execute(any(ExecutionContext.class));
|
||||
verify(throttler, times(1)).throttle(any(ExecutionContext.class));
|
||||
verify(transform, times(1)).apply(any(ExecutionContext.class), same(payload));
|
||||
verify(action, times(1)).execute(any(ExecutionContext.class), same(payload));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecute_throttled() throws Exception {
|
||||
Condition.Result conditionResult = AlwaysTrueCondition.RESULT;
|
||||
Throttler.Result throttleResult = mock(Throttler.Result.class);
|
||||
when(throttleResult.throttle()).thenReturn(true);
|
||||
|
||||
Transform.Result transformResult = mock(Transform.Result.class);
|
||||
when(transformResult.payload()).thenReturn(payload);
|
||||
Action.Result actionResult = mock(Action.Result.class);
|
||||
when(actionResult.type()).thenReturn("actionResult");
|
||||
|
||||
Condition condition = mock(Condition.class);
|
||||
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
|
||||
Throttler throttler = mock(Throttler.class);
|
||||
when(throttler.throttle(any(ExecutionContext.class))).thenReturn(throttleResult);
|
||||
Transform transform = mock(Transform.class);
|
||||
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
|
||||
Action action = mock(Action.class);
|
||||
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult);
|
||||
Actions actions = new Actions(Arrays.asList(action));
|
||||
|
||||
Alert.Status alertStatus = new Alert.Status();
|
||||
Alert alert = mock(Alert.class);
|
||||
when(alert.input()).thenReturn(input);
|
||||
when(alert.condition()).thenReturn(condition);
|
||||
when(alert.throttler()).thenReturn(throttler);
|
||||
when(alert.transform()).thenReturn(transform);
|
||||
when(alert.actions()).thenReturn(actions);
|
||||
when(alert.status()).thenReturn(alertStatus);
|
||||
|
||||
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
|
||||
AlertExecution alertExecution = historyService.execute(context);
|
||||
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
|
||||
assertThat(alertExecution.payload(), sameInstance(payload));
|
||||
assertThat(alertExecution.throttleResult(), sameInstance(throttleResult));
|
||||
assertThat(alertExecution.actionsResults().isEmpty(), is(true));
|
||||
|
||||
verify(condition, times(1)).execute(any(ExecutionContext.class));
|
||||
verify(throttler, times(1)).throttle(any(ExecutionContext.class));
|
||||
verify(transform, never()).apply(any(ExecutionContext.class), same(payload));
|
||||
verify(action, never()).execute(any(ExecutionContext.class), same(payload));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecute_conditionNotMet() throws Exception {
|
||||
Condition.Result conditionResult = AlwaysFalseCondition.RESULT;
|
||||
Throttler.Result throttleResult = mock(Throttler.Result.class);
|
||||
when(throttleResult.throttle()).thenReturn(true);
|
||||
|
||||
Transform.Result transformResult = mock(Transform.Result.class);
|
||||
Action.Result actionResult = mock(Action.Result.class);
|
||||
when(actionResult.type()).thenReturn("actionResult");
|
||||
|
||||
Condition condition = mock(Condition.class);
|
||||
when(condition.execute(any(ExecutionContext.class))).thenReturn(conditionResult);
|
||||
Throttler throttler = mock(Throttler.class);
|
||||
when(throttler.throttle(any(ExecutionContext.class))).thenReturn(throttleResult);
|
||||
Transform transform = mock(Transform.class);
|
||||
when(transform.apply(any(ExecutionContext.class), same(payload))).thenReturn(transformResult);
|
||||
Action action = mock(Action.class);
|
||||
when(action.execute(any(ExecutionContext.class), same(payload))).thenReturn(actionResult);
|
||||
Actions actions = new Actions(Arrays.asList(action));
|
||||
|
||||
Alert.Status alertStatus = new Alert.Status();
|
||||
Alert alert = mock(Alert.class);
|
||||
when(alert.input()).thenReturn(input);
|
||||
when(alert.condition()).thenReturn(condition);
|
||||
when(alert.throttler()).thenReturn(throttler);
|
||||
when(alert.transform()).thenReturn(transform);
|
||||
when(alert.actions()).thenReturn(actions);
|
||||
when(alert.status()).thenReturn(alertStatus);
|
||||
|
||||
ExecutionContext context = new ExecutionContext("1", alert, DateTime.now(), DateTime.now());
|
||||
AlertExecution alertExecution = historyService.execute(context);
|
||||
assertThat(alertExecution.conditionResult(), sameInstance(conditionResult));
|
||||
assertThat(alertExecution.payload(), sameInstance(payload));
|
||||
assertThat(alertExecution.throttleResult(), nullValue());
|
||||
assertThat(alertExecution.actionsResults().isEmpty(), is(true));
|
||||
|
||||
verify(condition, times(1)).execute(any(ExecutionContext.class));
|
||||
verify(throttler, never()).throttle(any(ExecutionContext.class));
|
||||
verify(transform, never()).apply(any(ExecutionContext.class), same(payload));
|
||||
verify(action, never()).execute(any(ExecutionContext.class), same(payload));
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user