Watcher: Dont hide exceptions during watch execution

When a painless exception is raised in the script condition, it was not bubbled up due to
catching exceptions on during execution. This removes the different catching of exceptions
and allows the watch record construct to contain an exception that is also serialized correctly
so that it can be stored in the watch history but also returned in the execute watch API.

This also updates the watch history template, so that exceptions are not indexed, but logged.

Relates elastic/elasticsearch#2587

Original commit: elastic/x-pack-elasticsearch@4dffb672bf
This commit is contained in:
Alexander Reelsen 2016-07-05 09:33:44 +02:00
parent f1670a3845
commit e8c1e7f9d8
18 changed files with 229 additions and 436 deletions

View File

@ -14,18 +14,18 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.GeneralScriptException;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
import org.elasticsearch.xpack.watcher.condition.script.ScriptConditionFactory;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -41,12 +41,10 @@ import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
public class ScriptConditionTests extends ESTestCase {
ThreadPool tp = null;
private ThreadPool tp = null;
@Before
public void init() {
@ -54,8 +52,8 @@ public class ScriptConditionTests extends ESTestCase {
}
@After
public void cleanup() {
tp.shutdownNow();
public void cleanup() throws InterruptedException {
terminate(tp);
}
public void testExecute() throws Exception {
@ -136,13 +134,8 @@ public class ScriptConditionTests extends ESTestCase {
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
try {
conditionParser.createExecutable(scriptCondition);
fail("expected a condition validation exception trying to create an executable with a bad or missing script");
} catch (GeneralScriptException e) {
// TODO add these when the test if fixed
// assertThat(e.getMessage(), is("ASDF"));
}
GeneralScriptException exception = expectThrows(GeneralScriptException.class,
() -> conditionParser.createExecutable(scriptCondition));
}
public void testScriptConditionParser_badLang() throws Exception {
@ -153,13 +146,9 @@ public class ScriptConditionTests extends ESTestCase {
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
try {
conditionParser.createExecutable(scriptCondition);
fail("expected a condition validation exception trying to create an executable with an invalid language");
} catch (GeneralScriptException e) {
// TODO add these when the test if fixed
// assertThat(e.getMessage(), is("ASDF"));
}
GeneralScriptException exception = expectThrows(GeneralScriptException.class,
() -> conditionParser.createExecutable(scriptCondition));
assertThat(exception.getMessage(), containsString("script_lang not supported [not_a_valid_lang]]"));
}
public void testScriptConditionThrowException() throws Exception {
@ -168,25 +157,19 @@ public class ScriptConditionTests extends ESTestCase {
new ScriptCondition(Script.inline("null.foo").build()), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
ScriptCondition.Result result = condition.execute(ctx);
assertThat(result, notNullValue());
assertThat(result.status(), is(Condition.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.reason(), containsString("NullPointerException"));
assertThat(result.reason(), containsString("Cannot get property 'foo' on null object"));
ScriptException exception = expectThrows(ScriptException.class, () -> condition.execute(ctx));
assertThat(exception.getMessage(), containsString("Error evaluating null.foo"));
}
public void testScriptConditionReturnObject() throws Exception {
public void testScriptConditionReturnObjectThrowsException() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
ExecutableScriptCondition condition = new ExecutableScriptCondition(
new ScriptCondition(Script.inline("return new Object()").build()), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
ScriptCondition.Result result = condition.execute(ctx);
assertThat(result, notNullValue());
assertThat(result.status(), is(Condition.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.reason(), containsString("ScriptException"));
Exception exception = expectThrows(GeneralScriptException.class, () -> condition.execute(ctx));
assertThat(exception.getMessage(),
containsString("condition [script] must return a boolean value (true|false) but instead returned [_name]"));
}
public void testScriptConditionAccessCtx() throws Exception {

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.condition;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -38,13 +37,6 @@ public interface Condition extends ToXContent {
this.reason = null;
}
protected Result(String type, Exception e) {
this.status = Status.FAILURE;
this.type = type;
this.met = false;
this.reason = ExceptionsHelper.detailedMessage(e);
}
public String type() {
return type;
}

View File

@ -36,16 +36,8 @@ public abstract class AbstractExecutableCompareCondition<C extends Condition, R
@Override
public R execute(WatchExecutionContext ctx) {
Map<String, Object> resolvedValues = new HashMap<>();
try {
Map<String, Object> model = Variables.createCtxModel(ctx, ctx.payload());
return doExecute(model, resolvedValues);
} catch (Exception e) {
logger.error("failed to execute [{}] condition for [{}]", e, type(), ctx.id());
if (resolvedValues.isEmpty()) {
resolvedValues = null;
}
return doFailure(resolvedValues, e);
}
Map<String, Object> model = Variables.createCtxModel(ctx, ctx.payload());
return doExecute(model, resolvedValues);
}
protected Object resolveConfiguredValue(Map<String, Object> resolvedValues, Map<String, Object> model, Object configuredValue) {
@ -70,7 +62,5 @@ public abstract class AbstractExecutableCompareCondition<C extends Condition, R
return configuredValue;
}
protected abstract R doExecute(Map<String, Object> model, Map<String, Object> resolvedValues) throws Exception;
protected abstract R doFailure(Map<String, Object> resolvedValues, Exception e);
protected abstract R doExecute(Map<String, Object> model, Map<String, Object> resolvedValues);
}

View File

@ -138,11 +138,6 @@ public class CompareCondition implements Condition {
this.resolveValues = resolveValues;
}
Result(@Nullable Map<String, Object> resolvedValues, Exception e) {
super(TYPE, e);
this.resolveValues = resolvedValues;
}
public Map<String, Object> getResolveValues() {
return resolveValues;
}

View File

@ -21,7 +21,7 @@ public class ExecutableCompareCondition extends AbstractExecutableCompareConditi
}
@Override
protected CompareCondition.Result doExecute(Map<String, Object> model, Map<String, Object> resolvedValues) throws Exception {
protected CompareCondition.Result doExecute(Map<String, Object> model, Map<String, Object> resolvedValues) {
Object configuredValue = resolveConfiguredValue(resolvedValues, model, condition.getValue());
Object resolvedValue = ObjectPath.eval(condition.getPath(), model);
@ -29,9 +29,4 @@ public class ExecutableCompareCondition extends AbstractExecutableCompareConditi
return new CompareCondition.Result(resolvedValues, condition.getOp().eval(resolvedValue, configuredValue));
}
@Override
protected CompareCondition.Result doFailure(Map<String, Object> resolvedValues, Exception e) {
return new CompareCondition.Result(resolvedValues, e);
}
}

View File

@ -11,9 +11,9 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.common.xcontent.XContentUtils;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.compare.LenientCompare;
import org.elasticsearch.xpack.common.xcontent.XContentUtils;
import java.io.IOException;
import java.util.List;
@ -209,11 +209,6 @@ public class ArrayCompareCondition implements Condition {
this.resolvedValues = resolvedValues;
}
Result(@Nullable Map<String, Object> resolvedValues, Exception e) {
super(TYPE, e);
this.resolvedValues = resolvedValues;
}
public Map<String, Object> getResolvedValues() {
return resolvedValues;
}

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.watcher.condition.compare.array;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.xpack.watcher.condition.compare.AbstractExecutableCompareCondition;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.condition.compare.AbstractExecutableCompareCondition;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import java.util.ArrayList;
@ -22,8 +22,7 @@ public class ExecutableArrayCompareCondition extends AbstractExecutableCompareCo
super(condition, logger, clock);
}
@SuppressWarnings("unchecked")
public ArrayCompareCondition.Result doExecute(Map<String, Object> model, Map<String, Object> resolvedValues) throws Exception {
public ArrayCompareCondition.Result doExecute(Map<String, Object> model, Map<String, Object> resolvedValues) {
Object configuredValue = resolveConfiguredValue(resolvedValues, model, condition.getValue());
Object object = ObjectPath.eval(condition.getArrayPath(), model);
@ -31,6 +30,7 @@ public class ExecutableArrayCompareCondition extends AbstractExecutableCompareCo
throw new IllegalStateException("array path " + condition.getArrayPath() + " did not evaluate to array, was " + object);
}
@SuppressWarnings("unchecked")
List<Object> resolvedArray = object != null ? (List<Object>) object : Collections.emptyList();
List<Object> resolvedValue = new ArrayList<>(resolvedArray.size());
@ -42,9 +42,4 @@ public class ExecutableArrayCompareCondition extends AbstractExecutableCompareCo
return new ArrayCompareCondition.Result(resolvedValues, condition.getQuantifier().eval(resolvedValue, configuredValue,
condition.getOp()));
}
@Override
protected ArrayCompareCondition.Result doFailure(Map<String, Object> resolvedValues, Exception e) {
return new ArrayCompareCondition.Result(resolvedValues, e);
}
}

View File

@ -8,10 +8,10 @@ package org.elasticsearch.xpack.watcher.condition.script;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.condition.ExecutableCondition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Variables;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import java.util.Map;
@ -38,15 +38,10 @@ public class ExecutableScriptCondition extends ExecutableCondition<ScriptConditi
@Override
public ScriptCondition.Result execute(WatchExecutionContext ctx) {
try {
return doExecute(ctx);
} catch (Exception e) {
logger.error("failed to execute [{}] condition for [{}]", e, ScriptCondition.TYPE, ctx.id());
return new ScriptCondition.Result(e);
}
return doExecute(ctx);
}
public ScriptCondition.Result doExecute(WatchExecutionContext ctx) throws Exception {
public ScriptCondition.Result doExecute(WatchExecutionContext ctx) {
Map<String, Object> parameters = Variables.createCtxModel(ctx, ctx.payload());
if (condition.script.params() != null && !condition.script.params().isEmpty()) {
parameters.putAll(condition.script.params());

View File

@ -78,10 +78,6 @@ public class ScriptCondition implements Condition {
super(TYPE, met);
}
Result(Exception e) {
super(TYPE, e);
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
return builder;

View File

@ -13,12 +13,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.history.WatchRecord;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.support.validation.WatcherSettingsValidation;
import org.elasticsearch.xpack.watcher.transform.Transform;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
@ -38,8 +38,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
/**
*/
public class ExecutionService extends AbstractComponent {
@ -264,32 +262,14 @@ public class ExecutionService extends AbstractComponent {
} else {
logger.debug("executing watch [{}]", ctx.id().watchId());
try {
record = executeInner(ctx);
} catch (Exception e) {
logger.warn("failed to execute watch [{}]", e, ctx.id());
record = ctx.abortFailedExecution(ExceptionsHelper.detailedMessage(e));
}
if (record != null && ctx.recordExecution()) {
try {
watchStore.updateStatus(ctx.watch());
} catch (Exception e) {
logger.warn("failed to update watch status [{}]", e, ctx.id());
record = new WatchRecord(record, ExecutionState.FAILED, format("failed to update watch status [{}]...{}", ctx.id(),
ExceptionsHelper.detailedMessage(e)));
}
record = executeInner(ctx);
if (ctx.recordExecution()) {
watchStore.updateStatus(ctx.watch());
}
}
} catch (Exception e) {
logger.warn("failed to execute watch [{}]", e, ctx.id());
if (record != null) {
record = new WatchRecord(record, ExecutionState.FAILED, format("failed to execute watch. {}",
ExceptionsHelper.detailedMessage(e)));
} else {
record = ctx.abortFailedExecution(ExceptionsHelper.detailedMessage(e));
}
record = createWatchRecord(record, ctx, e);
logWatchRecord(ctx, e);
} finally {
if (ctx.knownWatch() && record != null && ctx.recordExecution()) {
try {
@ -300,6 +280,7 @@ public class ExecutionService extends AbstractComponent {
}
} catch (Exception e) {
logger.error("failed to update watch record [{}]", e, ctx.id());
// TODO log watch record in logger, when saving in history store failed, otherwise the info is gone!
}
}
try {
@ -317,6 +298,28 @@ public class ExecutionService extends AbstractComponent {
return record;
}
private WatchRecord createWatchRecord(WatchRecord existingRecord, WatchExecutionContext ctx, Exception e) {
// it is possible that the watch store update failed, the execution phase is finished
if (ctx.executionPhase().sealed()) {
if (existingRecord == null) {
return new WatchRecord.ExceptionWatchRecord(ctx, e);
} else {
return new WatchRecord.ExceptionWatchRecord(existingRecord, e);
}
} else {
return ctx.abortFailedExecution(e);
}
}
private void logWatchRecord(WatchExecutionContext ctx, Exception e) {
// failed watches stack traces are only logged in debug, otherwise they should be checked out in the history
if (logger.isDebugEnabled()) {
logger.debug("failed to execute watch [{}]", e, ctx.id());
} else {
logger.warn("Failed to execute watch [{}]", ctx.id());
}
}
/*
The execution of an watch is split into two phases:
1. the trigger part which just makes sure to store the associated watch record in the history
@ -398,7 +401,7 @@ public class ExecutionService extends AbstractComponent {
if (watch == null) {
String message = "unable to find watch for record [" + triggeredWatch.id().watchId() + "]/[" + triggeredWatch.id() +
"], perhaps it has been deleted, ignoring...";
WatchRecord record = new WatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(),
WatchRecord record = new WatchRecord.MessageWatchRecord(triggeredWatch.id(), triggeredWatch.triggerEvent(),
ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
historyStore.forcePut(record);
triggeredWatchStore.delete(triggeredWatch.id());

View File

@ -190,7 +190,7 @@ public abstract class WatchExecutionContext {
public WatchRecord abortBeforeExecution(ExecutionState state, String message) {
assert !phase.sealed();
phase = ExecutionPhase.ABORTED;
return new WatchRecord(id, triggerEvent, state, message);
return new WatchRecord.MessageWatchRecord(id, triggerEvent, state, message);
}
public WatchRecord abortFailedExecution(String message) {
@ -198,7 +198,15 @@ public abstract class WatchExecutionContext {
phase = ExecutionPhase.ABORTED;
long executionFinishMs = System.currentTimeMillis();
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
return new WatchRecord(this, result, message);
return new WatchRecord.MessageWatchRecord(this, result, message);
}
public WatchRecord abortFailedExecution(Exception e) {
assert !phase.sealed();
phase = ExecutionPhase.ABORTED;
long executionFinishMs = System.currentTimeMillis();
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
return new WatchRecord.ExceptionWatchRecord(this, result, e);
}
public WatchRecord finish() {
@ -206,7 +214,7 @@ public abstract class WatchExecutionContext {
phase = ExecutionPhase.FINISHED;
long executionFinishMs = System.currentTimeMillis();
WatchExecutionResult result = new WatchExecutionResult(this, executionFinishMs - startTimestamp);
return new WatchRecord(this, result);
return new WatchRecord.MessageWatchRecord(this, result);
}
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {

View File

@ -96,7 +96,6 @@ public class WatchExecutionResult implements ToXContent {
ParseField INPUT = new ParseField("input");
ParseField CONDITION = new ParseField("condition");
ParseField ACTIONS = new ParseField("actions");
ParseField TYPE = new ParseField("type");
}
}

View File

@ -107,7 +107,7 @@ public class HistoryStore extends AbstractComponent {
client.index(request, (TimeValue) null);
} catch (VersionConflictEngineException vcee) {
logger.warn("watch record [{}] has executed multiple times, this can happen during watcher restarts", watchRecord);
watchRecord = new WatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES,
"watch record has been stored before, previous state [" + watchRecord.state() + "]");
IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value())
.source(XContentFactory.jsonBuilder().value(watchRecord));

View File

@ -5,9 +5,11 @@
*/
package org.elasticsearch.xpack.watcher.history;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.watcher.condition.Condition;
@ -23,95 +25,65 @@ import org.elasticsearch.xpack.watcher.watch.Watch;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class WatchRecord implements ToXContent {
public abstract class WatchRecord implements ToXContent {
private final Wid id;
private final TriggerEvent triggerEvent;
private final ExecutionState state;
protected final Wid id;
protected final TriggerEvent triggerEvent;
protected final ExecutionState state;
// only emitted to xcontent in "debug" mode
private final Map<String, Object> vars;
protected final Map<String, Object> vars;
@Nullable private final ExecutableInput input;
@Nullable private final Condition condition;
@Nullable private final Map<String,Object> metadata;
@Nullable protected final ExecutableInput input;
@Nullable protected final Condition condition;
@Nullable protected final Map<String,Object> metadata;
@Nullable protected final WatchExecutionResult executionResult;
@Nullable private final String[] messages;
@Nullable private final WatchExecutionResult executionResult;
/**
* Called when the execution was aborted before it started
*/
public WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, String message) {
public WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map<String, Object> vars, ExecutableInput input,
Condition condition, Map<String, Object> metadata, WatchExecutionResult executionResult) {
this.id = id;
this.triggerEvent = triggerEvent;
this.state = state;
this.messages = new String[] { message };
this.vars = Collections.emptyMap();
this.executionResult = null;
this.condition = null;
this.input = null;
this.metadata = null;
}
/**
* Called when the execution was aborted due to an error during execution (the given result should reflect
* were exactly the execution failed)
*/
public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult, String message) {
this.id = context.id();
this.triggerEvent = context.triggerEvent();
this.state = ExecutionState.FAILED;
this.messages = new String[] { message };
this.vars = context.vars();
this.vars = vars;
this.input = input;
this.condition = condition;
this.metadata = metadata;
this.executionResult = executionResult;
this.condition = context.watch().condition().condition();
this.input = context.watch().input();
this.metadata = context.watch().metadata();
}
/**
* Called when the execution finished.
*/
public WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state) {
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null);
}
public WatchRecord(WatchRecord record, ExecutionState state) {
this(record.id, record.triggerEvent, state, record.vars, record.input, record.condition(), record.metadata, record.executionResult);
}
public WatchRecord(WatchExecutionContext context, ExecutionState state) {
this(context.id(), context.triggerEvent(), state, context.vars(), context.watch().input(), context.watch().condition().condition(),
context.watch().metadata(), null);
}
public WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
this.id = context.id();
this.triggerEvent = context.triggerEvent();
this.messages = Strings.EMPTY_ARRAY;
this.vars = context.vars();
this.executionResult = executionResult;
this.condition = context.watch().condition().condition();
this.input = context.watch().input();
this.metadata = context.watch().metadata();
if (!this.executionResult.conditionResult().met()) {
state = ExecutionState.EXECUTION_NOT_NEEDED;
} else {
if (this.executionResult.actionsResults().throttled()) {
state = ExecutionState.THROTTLED;
} else {
state = ExecutionState.EXECUTED;
}
}
this(context.id(), context.triggerEvent(), getState(executionResult), context.vars(), context.watch().input(),
context.watch().condition().condition(), context.watch().metadata(), executionResult);
}
public WatchRecord(WatchRecord record, ExecutionState state, String message) {
this.id = record.id;
this.triggerEvent = record.triggerEvent;
this.vars = record.vars;
this.executionResult = record.executionResult;
this.condition = record.condition;
this.input = record.input;
this.metadata = record.metadata;
this.state = state;
private static ExecutionState getState(WatchExecutionResult executionResult) {
if (executionResult == null || executionResult.conditionResult() == null) {
return ExecutionState.FAILED;
}
if (record.messages.length == 0) {
this.messages = new String[] { message };
if (executionResult.conditionResult().met()) {
if (executionResult.actionsResults().throttled()) {
return ExecutionState.THROTTLED;
} else {
return ExecutionState.EXECUTED;
}
} else {
String[] newMessages = new String[record.messages.length + 1];
System.arraycopy(record.messages, 0, newMessages, 0, record.messages.length);
newMessages[record.messages.length] = message;
this.messages = newMessages;
return ExecutionState.EXECUTION_NOT_NEEDED;
}
}
@ -137,10 +109,6 @@ public class WatchRecord implements ToXContent {
return state;
}
public String[] messages(){
return messages;
}
public Map<String, Object> metadata() {
return metadata;
}
@ -172,29 +140,26 @@ public class WatchRecord implements ToXContent {
.field(condition.type(), condition, params)
.endObject();
}
if (messages != null) {
builder.field(Field.MESSAGES.getPreferredName(), messages);
}
if (metadata != null) {
builder.field(Field.METADATA.getPreferredName(), metadata);
}
if (executionResult != null) {
builder.field(Field.EXECUTION_RESULT.getPreferredName(), executionResult, params);
}
innerToXContent(builder, params);
builder.endObject();
return builder;
}
abstract void innerToXContent(XContentBuilder builder, Params params) throws IOException;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WatchRecord entry = (WatchRecord) o;
if (!id.equals(entry.id)) return false;
return true;
return Objects.equals(id, entry.id);
}
@Override
@ -215,5 +180,109 @@ public class WatchRecord implements ToXContent {
ParseField VARS = new ParseField("vars");
ParseField METADATA = new ParseField("metadata");
ParseField EXECUTION_RESULT = new ParseField("result");
ParseField EXCEPTION = new ParseField("exception");
}
public static class MessageWatchRecord extends WatchRecord {
@Nullable private final String[] messages;
/**
* Called when the execution was aborted before it started
*/
public MessageWatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, String message) {
super(id, triggerEvent, state);
this.messages = new String[] { message };
}
/**
* Called when the execution was aborted due to an error during execution (the given result should reflect
* were exactly the execution failed)
*/
public MessageWatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult, String message) {
super(context, executionResult);
this.messages = new String[] { message };
}
/**
* Called when the execution finished.
*/
public MessageWatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
super(context, executionResult);
this.messages = Strings.EMPTY_ARRAY;
}
public MessageWatchRecord(WatchRecord record, ExecutionState state, String message) {
super(record, state);
if (record instanceof MessageWatchRecord) {
MessageWatchRecord messageWatchRecord = (MessageWatchRecord) record;
if (messageWatchRecord.messages.length == 0) {
this.messages = new String[] { message };
} else {
String[] newMessages = new String[messageWatchRecord.messages.length + 1];
System.arraycopy(messageWatchRecord.messages, 0, newMessages, 0, messageWatchRecord.messages.length);
newMessages[messageWatchRecord.messages.length] = message;
this.messages = newMessages;
}
} else {
messages = new String []{ message };
}
}
public String[] messages(){
return messages;
}
@Override
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (messages != null) {
builder.field(Field.MESSAGES.getPreferredName(), messages);
}
}
}
public static class ExceptionWatchRecord extends WatchRecord {
private static final Map<String, String> STACK_TRACE_ENABLED_PARAMS = MapBuilder.<String, String>newMapBuilder()
.put(ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE, "false")
.immutableMap();
@Nullable private final Exception exception;
public ExceptionWatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult, Exception exception) {
super(context, executionResult);
this.exception = exception;
}
public ExceptionWatchRecord(WatchRecord record, Exception exception) {
super(record, ExecutionState.FAILED);
this.exception = exception;
}
public ExceptionWatchRecord(WatchExecutionContext context, Exception exception) {
super(context, ExecutionState.FAILED);
this.exception = exception;
}
public Exception getException() {
return exception;
}
@Override
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (exception != null) {
if (exception instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) exception;
builder.startObject(Field.EXCEPTION.getPreferredName());
Params delegatingParams = new DelegatingMapParams(STACK_TRACE_ENABLED_PARAMS, params);
elasticsearchException.toXContent(builder, delegatingParams);
builder.endObject();
} else {
builder.startObject(Field.EXCEPTION.getPreferredName())
.field("type", ElasticsearchException.getExceptionName(exception))
.field("reason", exception.getMessage())
.endObject();
}
}
}
}
}

View File

@ -93,6 +93,10 @@
"messages": {
"type": "text"
},
"exception" : {
"type" : "object",
"enabled" : false
},
"result": {
"type": "object",
"dynamic": true,

View File

@ -1,226 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.condition.script;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.GeneralScriptException;
import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script;
import org.elasticsearch.xpack.common.ScriptServiceProxy;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalArgument;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.getScriptServiceProxy;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.mockExecutionContext;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/724")
public class ScriptConditionTests extends ESTestCase {
ThreadPool tp = null;
@Before
public void init() {
tp = new TestThreadPool(ThreadPool.Names.SAME);
}
@After
public void cleanup() {
tp.shutdownNow();
}
public void testExecute() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
ExecutableScriptCondition condition = new ExecutableScriptCondition(
new ScriptCondition(Script.inline("ctx.payload.hits.total > 1").build()), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertFalse(condition.execute(ctx).met());
}
public void testExecuteMergedParams() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
Script script = Script.inline("ctx.payload.hits.total > threshold")
.lang(Script.DEFAULT_LANG).params(singletonMap("threshold", 1)).build();
ExecutableScriptCondition executable = new ExecutableScriptCondition(
new ScriptCondition(script), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertFalse(executable.execute(ctx).met());
}
public void testParserValid() throws Exception {
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), getScriptServiceProxy(tp));
XContentBuilder builder = createConditionContent("ctx.payload.hits.total > 1", null, ScriptType.INLINE);
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ScriptCondition condition = factory.parseCondition("_watch", parser);
ExecutableScriptCondition executable = factory.createExecutable(condition);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertFalse(executable.execute(ctx).met());
builder = createConditionContent("return true", null, ScriptType.INLINE);
parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
condition = factory.parseCondition("_watch", parser);
executable = factory.createExecutable(condition);
ctx = mockExecutionContext("_name", new Payload.XContent(response));
assertTrue(executable.execute(ctx).met());
}
public void testParserInvalid() throws Exception {
ScriptConditionFactory factory = new ScriptConditionFactory(Settings.builder().build(), getScriptServiceProxy(tp));
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject().endObject();
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
try {
factory.parseCondition("_id", parser);
fail("expected a condition exception trying to parse an invalid condition XContent");
} catch (ElasticsearchParseException e) {
// TODO add these when the test if fixed
// assertThat(e.getMessage(), is("ASDF"));
}
}
public void testScriptConditionParserBadScript() throws Exception {
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), getScriptServiceProxy(tp));
ScriptType scriptType = randomFrom(ScriptType.values());
String script;
switch (scriptType) {
case STORED:
case FILE:
script = "nonExisting_script";
break;
case INLINE:
default:
script = "foo = = 1";
}
XContentBuilder builder = createConditionContent(script, "groovy", scriptType);
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
try {
conditionParser.createExecutable(scriptCondition);
fail("expected a condition validation exception trying to create an executable with a bad or missing script");
} catch (GeneralScriptException e) {
// TODO add these when the test if fixed
// assertThat(e.getMessage(), is("ASDF"));
}
}
public void testScriptConditionParser_badLang() throws Exception {
ScriptConditionFactory conditionParser = new ScriptConditionFactory(Settings.builder().build(), getScriptServiceProxy(tp));
ScriptType scriptType = ScriptType.INLINE;
String script = "return true";
XContentBuilder builder = createConditionContent(script, "not_a_valid_lang", scriptType);
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
parser.nextToken();
ScriptCondition scriptCondition = conditionParser.parseCondition("_watch", parser);
try {
conditionParser.createExecutable(scriptCondition);
fail("expected a condition validation exception trying to create an executable with an invalid language");
} catch (GeneralScriptException e) {
// TODO add these when the test if fixed
// assertThat(e.getMessage(), is("ASDF"));
}
}
public void testScriptConditionThrowException() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
ExecutableScriptCondition condition =
new ExecutableScriptCondition(new ScriptCondition(Script.inline("assert false").build()), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
ScriptCondition.Result result = condition.execute(ctx);
assertThat(result, notNullValue());
assertThat(result.status(), is(Condition.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.reason(), containsString("Assertion"));
}
public void testScriptConditionReturnObject() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
ExecutableScriptCondition condition =
new ExecutableScriptCondition(new ScriptCondition(Script.inline("return new Object()").build()), logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new Payload.XContent(response));
ScriptCondition.Result result = condition.execute(ctx);
assertThat(result, notNullValue());
assertThat(result.status(), is(Condition.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.reason(), containsString("ScriptException"));
}
public void testScriptConditionAccessCtx() throws Exception {
ScriptServiceProxy scriptService = getScriptServiceProxy(tp);
ExecutableScriptCondition condition = new ExecutableScriptCondition(
new ScriptCondition(Script.inline("ctx.trigger.scheduled_time.getMillis() < System.currentTimeMillis() ").build()),
logger, scriptService);
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500L, new ShardSearchFailure[0]);
WatchExecutionContext ctx = mockExecutionContext("_name", new DateTime(DateTimeZone.UTC), new Payload.XContent(response));
Thread.sleep(10);
assertThat(condition.execute(ctx).met(), is(true));
}
private static XContentBuilder createConditionContent(String script, String scriptLang, ScriptType scriptType) throws IOException {
XContentBuilder builder = jsonBuilder();
if (scriptType == null) {
return builder.value(script);
}
builder.startObject();
switch (scriptType) {
case INLINE:
builder.field("inline", script);
break;
case FILE:
builder.field("file", script);
break;
case STORED:
builder.field("id", script);
break;
default:
throw illegalArgument("unsupported script type [{}]", scriptType);
}
if (scriptLang != null) {
builder.field("lang", scriptLang);
}
return builder.endObject();
}
}

View File

@ -44,7 +44,7 @@ public class HistoryStoreTests extends ESTestCase {
public void testPut() throws Exception {
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, event, ExecutionState.EXECUTED, null);
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null);
IndexResponse indexResponse = mock(IndexResponse.class);
IndexRequest indexRequest = indexRequest(".watcher-history-1970.01.01", HistoryStore.DOC_TYPE, wid.value()
@ -57,7 +57,7 @@ public class HistoryStoreTests extends ESTestCase {
public void testPutStopped() throws Exception {
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), new DateTime(0, UTC), new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, event, ExecutionState.EXECUTED, null);
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null);
historyStore.stop();
try {

View File

@ -375,7 +375,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
.setSource(jsonBuilder().value(triggeredWatch))
.get();
WatchRecord watchRecord = new WatchRecord(wid, event, ExecutionState.EXECUTED, "executed");
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed");
client().prepareIndex(watchRecordIndex, HistoryStore.DOC_TYPE, watchRecord.id().value())
.setSource(jsonBuilder().value(watchRecord))
.get();