Watcher: Add dedicated acknowledged state (elastic/elasticsearch#4588)
In order to display a better UI page for the watch history, the state of a throttled watch should be visualized. However, right now there is no way to differentiate between a time throttled watch and a user acknowledged watch (or action). This commit introduces a new type in a throttled result, which in turn is used to set the execution state of a watch. Closes elastic/elasticsearch#4531 Original commit: elastic/x-pack-elasticsearch@b86e666e54
This commit is contained in:
parent
5e8dd26d93
commit
984b1b0dd1
|
@ -24,6 +24,7 @@ public interface Action extends ToXContentObject {
|
|||
SUCCESS,
|
||||
FAILURE,
|
||||
PARTIAL_FAILURE,
|
||||
ACKNOWLEDGED,
|
||||
THROTTLED,
|
||||
CONDITION_FAILED,
|
||||
SIMULATED;
|
||||
|
@ -98,6 +99,17 @@ public interface Action extends ToXContentObject {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code Acknowledged} is a {@link StoppedResult} with a status of {@link Status#ACKNOWLEDGED} for actions that
|
||||
* have been throttled.
|
||||
*/
|
||||
public static class Acknowledged extends StoppedResult {
|
||||
|
||||
public Acknowledged(String type, String reason) {
|
||||
super(type, Status.ACKNOWLEDGED, reason);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code ConditionFailed} is a {@link StoppedResult} with a status of {@link Status#FAILURE} for actions that have been skipped
|
||||
* because the action's condition failed (either expected or unexpected).
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
|
||||
|
@ -99,7 +100,11 @@ public class ActionWrapper implements ToXContentObject {
|
|||
if (!ctx.skipThrottling(id)) {
|
||||
Throttler.Result throttleResult = throttler.throttle(id, ctx);
|
||||
if (throttleResult.throttle()) {
|
||||
return new ActionWrapper.Result(id, new Action.Result.Throttled(action.type(), throttleResult.reason()));
|
||||
if (throttleResult.type() == Throttler.Type.ACK) {
|
||||
return new ActionWrapper.Result(id, new Action.Result.Acknowledged(action.type(), throttleResult.reason()));
|
||||
} else {
|
||||
return new ActionWrapper.Result(id, new Action.Result.Throttled(action.type(), throttleResult.reason()));
|
||||
}
|
||||
}
|
||||
}
|
||||
Condition.Result conditionResult = null;
|
||||
|
@ -284,19 +289,15 @@ public class ActionWrapper implements ToXContentObject {
|
|||
|
||||
Result result = (Result) o;
|
||||
|
||||
if (!id.equals(result.id)) return false;
|
||||
if (condition != null ? !condition.equals(result.condition) : result.condition != null) return false;
|
||||
if (transform != null ? !transform.equals(result.transform) : result.transform != null) return false;
|
||||
return action.equals(result.action);
|
||||
return Objects.equals(id, result.id) &&
|
||||
Objects.equals(condition, result.condition) &&
|
||||
Objects.equals(transform, result.transform) &&
|
||||
Objects.equals(action, result.action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = id.hashCode();
|
||||
result = 31 * result + (condition != null ? condition.hashCode() : 0);
|
||||
result = 31 * result + (transform != null ? transform.hashCode() : 0);
|
||||
result = 31 * result + action.hashCode();
|
||||
return result;
|
||||
return Objects.hash(id, condition, transform, action);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.xpack.watcher.actions.ActionStatus;
|
|||
import org.elasticsearch.xpack.watcher.actions.ActionStatus.AckStatus;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.throttler.Throttler.Type.ACK;
|
||||
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.formatDate;
|
||||
|
||||
public class AckThrottler implements Throttler {
|
||||
|
@ -18,7 +19,7 @@ public class AckThrottler implements Throttler {
|
|||
ActionStatus actionStatus = ctx.watch().status().actionStatus(actionId);
|
||||
AckStatus ackStatus = actionStatus.ackStatus();
|
||||
if (ackStatus.state() == AckStatus.State.ACKED) {
|
||||
return Result.throttle("action [{}] was acked at [{}]", actionId, formatDate(ackStatus.timestamp()));
|
||||
return Result.throttle(ACK, "action [{}] was acked at [{}]", actionId, formatDate(ackStatus.timestamp()));
|
||||
}
|
||||
return Result.NO;
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
|||
|
||||
import java.time.Clock;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.throttler.Throttler.Type.LICENSE;
|
||||
|
||||
public class ActionThrottler implements Throttler {
|
||||
|
||||
private static final AckThrottler ACK_THROTTLER = new AckThrottler();
|
||||
|
@ -37,7 +39,7 @@ public class ActionThrottler implements Throttler {
|
|||
@Override
|
||||
public Result throttle(String actionId, WatchExecutionContext ctx) {
|
||||
if (licenseState.isWatcherAllowed() == false) {
|
||||
return Result.throttle("watcher license does not allow action execution");
|
||||
return Result.throttle(LICENSE, "watcher license does not allow action execution");
|
||||
}
|
||||
if (periodThrottler != null) {
|
||||
Result throttleResult = periodThrottler.throttle(actionId, ctx);
|
||||
|
|
|
@ -13,6 +13,8 @@ import org.joda.time.PeriodType;
|
|||
|
||||
import java.time.Clock;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.throttler.Throttler.Type.PERIOD;
|
||||
|
||||
/**
|
||||
* This throttler throttles the action based on its last <b>successful</b> execution time. If the time passed since
|
||||
* the last successful execution is lower than the given period, the aciton will be throttled.
|
||||
|
@ -54,7 +56,7 @@ public class PeriodThrottler implements Throttler {
|
|||
}
|
||||
TimeValue timeElapsed = TimeValue.timeValueMillis(clock.millis() - status.lastSuccessfulExecution().timestamp().getMillis());
|
||||
if (timeElapsed.getMillis() <= period.getMillis()) {
|
||||
return Result.throttle("throttling interval is set to [{}] but time elapsed since last execution is [{}]",
|
||||
return Result.throttle(PERIOD, "throttling interval is set to [{}] but time elapsed since last execution is [{}]",
|
||||
period.format(periodType), timeElapsed.format(periodType));
|
||||
}
|
||||
return Result.NO;
|
||||
|
|
|
@ -9,24 +9,44 @@ import org.elasticsearch.common.ParseField;
|
|||
import org.elasticsearch.common.logging.LoggerMessageFormat;
|
||||
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.throttler.Throttler.Type.NONE;
|
||||
|
||||
public interface Throttler {
|
||||
|
||||
Result throttle(String actionId, WatchExecutionContext ctx);
|
||||
|
||||
enum Type {
|
||||
// throttling happened because a user actively acknowledged the action, which means it is muted until the condition becomes false
|
||||
// the current implementation uses an implementation of a throttler to decide that an action should not be executed because
|
||||
// it has been acked/muted before
|
||||
ACK,
|
||||
|
||||
// throttling happened because of license reasons
|
||||
LICENSE,
|
||||
|
||||
// time based throttling for a certain period of time
|
||||
PERIOD,
|
||||
|
||||
// no throttling, used to indicate a not throttledresult
|
||||
NONE;
|
||||
}
|
||||
|
||||
class Result {
|
||||
|
||||
public static final Result NO = new Result(false, null);
|
||||
|
||||
public static final Result NO = new Result(NONE, false, null);
|
||||
|
||||
private Type type;
|
||||
private final boolean throttle;
|
||||
private final String reason;
|
||||
|
||||
private Result(boolean throttle, String reason) {
|
||||
private Result(Type type, boolean throttle, String reason) {
|
||||
this.type = type;
|
||||
this.throttle = throttle;
|
||||
this.reason = reason;
|
||||
}
|
||||
|
||||
public static Result throttle(String reason, Object... args) {
|
||||
return new Result(true, LoggerMessageFormat.format(reason, args));
|
||||
public static Result throttle(Type type, String reason, Object... args) {
|
||||
return new Result(type, true, LoggerMessageFormat.format(reason, args));
|
||||
}
|
||||
|
||||
public boolean throttle() {
|
||||
|
@ -37,6 +57,9 @@ public interface Throttler {
|
|||
return reason;
|
||||
}
|
||||
|
||||
public Type type() {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
interface Field {
|
||||
|
|
|
@ -12,9 +12,12 @@ public enum ExecutionState {
|
|||
// the condition of the watch was not met
|
||||
EXECUTION_NOT_NEEDED,
|
||||
|
||||
// Execution has been throttled due to ack/time-based throttling
|
||||
// Execution has been throttled due to time-based throttling - this might only affect a single action though
|
||||
THROTTLED,
|
||||
|
||||
// Execution has been throttled due to ack-based throttling/muting of an action - this might only affect a single action though
|
||||
ACKNOWLEDGED,
|
||||
|
||||
// regular execution
|
||||
EXECUTED,
|
||||
|
||||
|
|
|
@ -80,7 +80,10 @@ public abstract class WatchRecord implements ToXContentObject {
|
|||
}
|
||||
if (executionResult.conditionResult().met()) {
|
||||
final Collection<ActionWrapper.Result> values = executionResult.actionsResults().values();
|
||||
if (values.stream().anyMatch((r) -> r.action().status() == Action.Result.Status.THROTTLED)) {
|
||||
// acknowledged as state wins because the user had explicitely set this, where as throttled may happen due to execution
|
||||
if (values.stream().anyMatch((r) -> r.action().status() == Action.Result.Status.ACKNOWLEDGED)) {
|
||||
return ExecutionState.ACKNOWLEDGED;
|
||||
} else if (values.stream().anyMatch((r) -> r.action().status() == Action.Result.Status.THROTTLED)) {
|
||||
return ExecutionState.THROTTLED;
|
||||
} else {
|
||||
return ExecutionState.EXECUTED;
|
||||
|
|
|
@ -37,6 +37,7 @@ public class AckThrottlerTests extends ESTestCase {
|
|||
Throttler.Result result = throttler.throttle("_action", ctx);
|
||||
assertThat(result.throttle(), is(true));
|
||||
assertThat(result.reason(), is("action [_action] was acked at [" + formatDate(timestamp) + "]"));
|
||||
assertThat(result.type(), is(Throttler.Type.ACK));
|
||||
}
|
||||
|
||||
public void testThrottleWhenAwaitsSuccessfulExecution() throws Exception {
|
||||
|
|
|
@ -87,7 +87,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
|
|||
responseMap = executeWatchRequestBuilder.get().getRecordSource().getAsMap();
|
||||
status = ObjectPath.eval("result.actions.0.status", responseMap);
|
||||
if (ack) {
|
||||
assertThat(status, equalTo(Action.Result.Status.THROTTLED.toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(status, equalTo(Action.Result.Status.ACKNOWLEDGED.toString().toLowerCase(Locale.ROOT)));
|
||||
} else {
|
||||
assertThat(status, equalTo(Action.Result.Status.SIMULATED.toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ public class ActionThrottleTests extends AbstractWatcherIntegrationTestCase {
|
|||
List<Map<String, String>> actions = ObjectPath.eval("result.actions", responseMap);
|
||||
for (Map<String, String> result : actions) {
|
||||
if (ackingActions.contains(result.get("id"))) {
|
||||
assertThat(result.get("status"), equalTo(Action.Result.Status.THROTTLED.toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(result.get("status"), equalTo(Action.Result.Status.ACKNOWLEDGED.toString().toLowerCase(Locale.ROOT)));
|
||||
} else {
|
||||
assertThat(result.get("status"), equalTo(Action.Result.Status.SIMULATED.toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ public class PeriodThrottlerTests extends ESTestCase {
|
|||
assertThat(result.throttle(), is(true));
|
||||
assertThat(result.reason(), notNullValue());
|
||||
assertThat(result.reason(), startsWith("throttling interval is set to [" + period.format(periodType) + "]"));
|
||||
assertThat(result.type(), is(Throttler.Type.PERIOD));
|
||||
}
|
||||
|
||||
public void testAbovePeriod() throws Exception {
|
||||
|
|
|
@ -21,7 +21,7 @@ public class WatchThrottlerTests extends ESTestCase {
|
|||
AckThrottler ackThrottler = mock(AckThrottler.class);
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
when(periodThrottler.throttle("_action", ctx)).thenReturn(Throttler.Result.NO);
|
||||
Throttler.Result expectedResult = Throttler.Result.throttle("_reason");
|
||||
Throttler.Result expectedResult = Throttler.Result.throttle(Throttler.Type.ACK, "_reason");
|
||||
when(ackThrottler.throttle("_action", ctx)).thenReturn(expectedResult);
|
||||
XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
when(licenseState.isWatcherAllowed()).thenReturn(true);
|
||||
|
@ -35,7 +35,7 @@ public class WatchThrottlerTests extends ESTestCase {
|
|||
PeriodThrottler periodThrottler = mock(PeriodThrottler.class);
|
||||
AckThrottler ackThrottler = mock(AckThrottler.class);
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
Throttler.Result expectedResult = Throttler.Result.throttle("_reason");
|
||||
Throttler.Result expectedResult = Throttler.Result.throttle(Throttler.Type.PERIOD, "_reason");
|
||||
when(periodThrottler.throttle("_action", ctx)).thenReturn(expectedResult);
|
||||
when(ackThrottler.throttle("_action", ctx)).thenReturn(Throttler.Result.NO);
|
||||
XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
|
@ -50,9 +50,9 @@ public class WatchThrottlerTests extends ESTestCase {
|
|||
PeriodThrottler periodThrottler = mock(PeriodThrottler.class);
|
||||
AckThrottler ackThrottler = mock(AckThrottler.class);
|
||||
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
|
||||
Throttler.Result periodResult = Throttler.Result.throttle("_reason_period");
|
||||
Throttler.Result periodResult = Throttler.Result.throttle(Throttler.Type.PERIOD, "_reason_period");
|
||||
when(periodThrottler.throttle("_action", ctx)).thenReturn(periodResult);
|
||||
Throttler.Result ackResult = Throttler.Result.throttle("_reason_ack");
|
||||
Throttler.Result ackResult = Throttler.Result.throttle(Throttler.Type.ACK, "_reason_ack");
|
||||
when(ackThrottler.throttle("_action", ctx)).thenReturn(ackResult);
|
||||
XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
when(licenseState.isWatcherAllowed()).thenReturn(true);
|
||||
|
@ -101,5 +101,6 @@ public class WatchThrottlerTests extends ESTestCase {
|
|||
Throttler.Result result = throttler.throttle("_action", ctx);
|
||||
assertThat(result, notNullValue());
|
||||
assertThat(result.reason(), is("watcher license does not allow action execution"));
|
||||
assertThat(result.type(), is(Throttler.Type.LICENSE));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
|
|||
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.THROTTLED.id()));
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
|
||||
assertThat(throttledCount, greaterThan(0L));
|
||||
}
|
||||
|
||||
|
@ -194,7 +194,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
|
|||
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
|
||||
long throttledCount = docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.THROTTLED.id()));
|
||||
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
|
||||
assertThat(throttledCount, greaterThan(0L));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue