diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 63d1a75a6e0..bdd3ab50c90 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; @@ -31,7 +32,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.xpack.common.stats.Counters; import org.elasticsearch.xpack.watcher.Watcher; @@ -226,20 +226,18 @@ public class ExecutionService extends AbstractComponent { * @param events The iterable list of trigger events to create the two lists from * @return Two linked lists that contain the triggered watches and contexts */ - private Tuple, List> createTriggeredWatchesAndContext(Iterable events) - throws IOException { + private Tuple, List> createTriggeredWatchesAndContext(Iterable events) { final LinkedList triggeredWatches = new LinkedList<>(); final LinkedList contexts = new LinkedList<>(); DateTime now = new DateTime(clock.millis(), UTC); for (TriggerEvent event : events) { - GetResponse response = client.prepareGet(Watch.INDEX, Watch.DOC_TYPE, event.jobName()).get(TimeValue.timeValueSeconds(10)); + GetResponse response = getWatch(event.jobName()); if (response.isExists() == false) { logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName()); continue; } - Watch watch = parser.parseWithSecrets(response.getId(), true, response.getSourceAsBytesRef(), now, XContentType.JSON); - TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event, defaultThrottlePeriod); + TriggeredExecutionContext ctx = new TriggeredExecutionContext(event.jobName(), now, event, defaultThrottlePeriod); contexts.add(ctx); triggeredWatches.add(new TriggeredWatch(ctx.id(), event)); } @@ -268,32 +266,39 @@ public class ExecutionService extends AbstractComponent { public WatchRecord execute(WatchExecutionContext ctx) { ctx.setNodeId(clusterService.localNode().getId()); WatchRecord record = null; + final String watchId = ctx.id().watchId(); try { - boolean executionAlreadyExists = currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { - logger.trace("not executing watch [{}] because it is already queued", ctx.watch().id()); + logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); - } else if (ctx.watch().status().state().isActive() == false) { - logger.debug("not executing watch [{}] because it is marked as inactive", ctx.watch().id()); - record = ctx.abortBeforeExecution(ExecutionState.EXECUTION_NOT_NEEDED, "Watch is not active"); } else { - boolean watchExists = false; try { - GetResponse response = getWatch(ctx.watch().id()); - watchExists = response.isExists(); - } catch (IndexNotFoundException e) {} - - if (ctx.knownWatch() && watchExists == false) { - // fail fast if we are trying to execute a deleted watch - String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring..."; + ctx.ensureWatchExists(() -> { + GetResponse resp = getWatch(watchId); + if (resp.isExists() == false) { + throw new ResourceNotFoundException("watch [{}] does not exist", watchId); + } + return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON); + }); + } catch (ResourceNotFoundException e) { + String message = "unable to find watch for record [" + ctx.id() + "]"; record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message); + } catch (Exception e) { + record = ctx.abortFailedExecution(e); + } - } else { - logger.debug("executing watch [{}]", ctx.id().watchId()); + if (ctx.watch() != null) { + if (ctx.watch().status().state().isActive()) { + logger.debug("executing watch [{}]", watchId); - record = executeInner(ctx); - if (ctx.recordExecution()) { - updateWatchStatus(ctx.watch()); + record = executeInner(ctx); + if (ctx.recordExecution()) { + updateWatchStatus(ctx.watch()); + } + } else { + logger.debug("not executing watch [{}] because it is marked as inactive", watchId); + record = ctx.abortBeforeExecution(ExecutionState.EXECUTION_NOT_NEEDED, "Watch is not active"); } } } @@ -320,8 +325,8 @@ public class ExecutionService extends AbstractComponent { logger.error((Supplier) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e); } } - currentExecutions.remove(ctx.watch().id()); - logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id()); + currentExecutions.remove(watchId); + logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; } @@ -373,9 +378,9 @@ public class ExecutionService extends AbstractComponent { private void logWatchRecord(WatchExecutionContext ctx, Exception e) { // failed watches stack traces are only logged in debug, otherwise they should be checked out in the history if (logger.isDebugEnabled()) { - logger.debug((Supplier) () -> new ParameterizedMessage("failed to execute watch [{}]", ctx.watch().id()), e); + logger.debug((Supplier) () -> new ParameterizedMessage("failed to execute watch [{}]", ctx.id().watchId()), e); } else { - logger.warn("failed to execute watch [{}]", ctx.watch().id()); + logger.warn("failed to execute watch [{}]", ctx.id().watchId()); } } @@ -484,15 +489,10 @@ public class ExecutionService extends AbstractComponent { triggeredWatchStore.delete(triggeredWatch.id()); } else { DateTime now = new DateTime(clock.millis(), UTC); - try { - Watch watch = parser.parseWithSecrets(response.getId(), true, response.getSourceAsBytesRef(), now, XContentType.JSON); - TriggeredExecutionContext ctx = - new StartupExecutionContext(watch, now, triggeredWatch.triggerEvent(), defaultThrottlePeriod); - executeAsync(ctx, triggeredWatch); - counter++; - } catch (IOException e) { - logger.error((Supplier) () -> new ParameterizedMessage("Error parsing triggered watch"), e); - } + TriggeredExecutionContext ctx = new TriggeredExecutionContext(triggeredWatch.id().watchId(), now, + triggeredWatch.triggerEvent(), defaultThrottlePeriod, true); + executeAsync(ctx, triggeredWatch); + counter++; } } logger.debug("triggered execution of [{}] watches", counter); @@ -532,18 +532,6 @@ public class ExecutionService extends AbstractComponent { currentExecutions = new CurrentExecutions(); } - private static final class StartupExecutionContext extends TriggeredExecutionContext { - - StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { - super(watch, executionTime, triggerEvent, defaultThrottlePeriod); - } - - @Override - public boolean overrideRecordOnConflict() { - return true; - } - } - // the watch execution task takes another runnable as parameter // the best solution would be to move the whole execute() method, which is handed over as ctor parameter // over into this class, this is the quicker way though diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionContext.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionContext.java index 954624c088a..32749b9d3a3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/ManualExecutionContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.ActionWrapper; @@ -25,16 +26,18 @@ public class ManualExecutionContext extends WatchExecutionContext { private final Map actionModes; private final boolean recordExecution; private final boolean knownWatch; + private final Watch watch; ManualExecutionContext(Watch watch, boolean knownWatch, DateTime executionTime, ManualTriggerEvent triggerEvent, TimeValue defaultThrottlePeriod, Input.Result inputResult, Condition.Result conditionResult, Map actionModes, boolean recordExecution) { - super(watch, executionTime, triggerEvent, defaultThrottlePeriod); + super(watch.id(), executionTime, triggerEvent, defaultThrottlePeriod); this.actionModes = actionModes; this.recordExecution = recordExecution; this.knownWatch = knownWatch; + this.watch = watch; if (inputResult != null) { onInputResult(inputResult); @@ -60,6 +63,12 @@ public class ManualExecutionContext extends WatchExecutionContext { } } + // a noop operation, as the watch is already loaded via ctor + @Override + public void ensureWatchExists(CheckedSupplier supplier) throws Exception { + super.ensureWatchExists(() -> watch); + } + @Override public boolean knownWatch() { return knownWatch; @@ -88,6 +97,11 @@ public class ManualExecutionContext extends WatchExecutionContext { return recordExecution; } + @Override + public Watch watch() { + return watch; + } + public static Builder builder(Watch watch, boolean knownWatch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) { return new Builder(watch, knownWatch, event, defaultThrottlePeriod); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/QueuedWatch.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/QueuedWatch.java index a945bc9a515..c491e4e0bce 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/QueuedWatch.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/QueuedWatch.java @@ -27,7 +27,7 @@ public class QueuedWatch implements Streamable, ToXContentObject { } public QueuedWatch(WatchExecutionContext ctx) { - this.watchId = ctx.watch().id(); + this.watchId = ctx.id().watchId(); this.watchRecordId = ctx.id().value(); this.triggeredTime = ctx.triggerEvent().triggeredTime(); this.executionTime = ctx.executionTime(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredExecutionContext.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredExecutionContext.java index c1c4f0e8ea7..89baba7d44e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredExecutionContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredExecutionContext.java @@ -12,8 +12,21 @@ import org.joda.time.DateTime; public class TriggeredExecutionContext extends WatchExecutionContext { - public TriggeredExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { - super(watch, executionTime, triggerEvent, defaultThrottlePeriod); + private final boolean overrideOnConflict; + + public TriggeredExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { + this(watchId, executionTime, triggerEvent, defaultThrottlePeriod, false); + } + + TriggeredExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod, + boolean overrideOnConflict) { + super(watchId, executionTime, triggerEvent, defaultThrottlePeriod); + this.overrideOnConflict = overrideOnConflict; + } + + @Override + public boolean overrideRecordOnConflict() { + return overrideOnConflict; } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java index 70c8e949ca4..7788e3b9db5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionContext.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.xpack.watcher.actions.ActionWrapper; @@ -25,13 +26,13 @@ import java.util.concurrent.ConcurrentMap; public abstract class WatchExecutionContext { private final Wid id; - private final Watch watch; private final DateTime executionTime; private final TriggerEvent triggerEvent; private final TimeValue defaultThrottlePeriod; private ExecutionPhase phase = ExecutionPhase.AWAITS_EXECUTION; private long startTimestamp; + private Watch watch; private Payload payload; private Map vars = new HashMap<>(); @@ -42,9 +43,8 @@ public abstract class WatchExecutionContext { private ConcurrentMap actionsResults = ConcurrentCollections.newConcurrentMap(); private String nodeId; - public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { - this.id = new Wid(watch.id(), executionTime); - this.watch = watch; + public WatchExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) { + this.id = new Wid(watchId, executionTime); this.executionTime = executionTime; this.triggerEvent = triggerEvent; this.defaultThrottlePeriod = defaultThrottlePeriod; @@ -72,14 +72,20 @@ public abstract class WatchExecutionContext { */ public abstract boolean recordExecution(); - public Wid id() { - return id; - } - public Watch watch() { return watch; } + public void ensureWatchExists(CheckedSupplier supplier) throws Exception { + if (watch == null) { + watch = supplier.get(); + } + } + + public Wid id() { + return id; + } + public DateTime executionTime() { return executionTime; } @@ -156,7 +162,7 @@ public abstract class WatchExecutionContext { public void onConditionResult(Condition.Result conditionResult) { assert !phase.sealed(); this.conditionResult = conditionResult; - watch.status().onCheck(conditionResult.met(), executionTime); + watch().status().onCheck(conditionResult.met(), executionTime); } public Condition.Result conditionResult() { @@ -188,7 +194,7 @@ public abstract class WatchExecutionContext { public void onActionResult(ActionWrapper.Result result) { assert !phase.sealed(); actionsResults.put(result.id(), result); - watch.status().onActionResult(result.id(), executionTime, result.action()); + watch().status().onActionResult(result.id(), executionTime, result.action()); } public Map actionsResults() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionSnapshot.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionSnapshot.java index 722741434e0..584a66fb46e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionSnapshot.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/execution/WatchExecutionSnapshot.java @@ -31,7 +31,7 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject { } WatchExecutionSnapshot(WatchExecutionContext context, StackTraceElement[] executionStackTrace) { - watchId = context.watch().id(); + watchId = context.id().watchId(); watchRecordId = context.id().value(); triggeredTime = context.triggerEvent().triggeredTime(); executionTime = context.executionTime(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/history/WatchRecord.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/history/WatchRecord.java index 3f64f7b3782..7a1553f59f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/history/WatchRecord.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/history/WatchRecord.java @@ -82,8 +82,12 @@ public abstract class WatchRecord implements ToXContentObject { } private WatchRecord(WatchExecutionContext context, ExecutionState state) { - this(context.id(), context.triggerEvent(), state, context.vars(), context.watch().input(), context.watch().condition(), - context.watch().metadata(), context.watch(), null, context.getNodeId()); + this(context.id(), context.triggerEvent(), state, context.vars(), + context.watch() != null ? context.watch().input() : null, + context.watch() != null ? context.watch().condition() : null, + context.watch() != null ? context.watch().metadata() : null, + context.watch(), + null, context.getNodeId()); } private WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/support/Variables.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/support/Variables.java index 8c3439b0265..27c7b1b4a27 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/support/Variables.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/support/Variables.java @@ -25,7 +25,7 @@ public final class Variables { public static Map createCtxModel(WatchExecutionContext ctx, Payload payload) { Map ctxModel = new HashMap<>(); ctxModel.put(ID, ctx.id().value()); - ctxModel.put(WATCH_ID, ctx.watch().id()); + ctxModel.put(WATCH_ID, ctx.id().watchId()); ctxModel.put(EXECUTION_TIME, ctx.executionTime()); ctxModel.put(TRIGGER, ctx.triggerEvent().data()); if (payload != null) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java index 2522def9f6d..41d2637bd72 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/email/EmailActionTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.watcher.actions.email; import io.netty.handler.codec.http.HttpHeaders; - import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.MapBuilder; @@ -94,12 +93,7 @@ public class EmailActionTests extends ESTestCase { public void testExecute() throws Exception { final String account = "account1"; - EmailService service = new AbstractWatcherIntegrationTestCase.NoopEmailService() { - @Override - public EmailService.EmailSent send(Email email, Authentication auth, Profile profile, String accountName) { - return new EmailService.EmailSent(account, email); - } - }; + EmailService service = new AbstractWatcherIntegrationTestCase.NoopEmailService(); TextTemplateEngine engine = mock(TextTemplateEngine.class); HtmlSanitizer htmlSanitizer = mock(HtmlSanitizer.class); @@ -138,7 +132,7 @@ public class EmailActionTests extends ESTestCase { DateTime now = DateTime.now(DateTimeZone.UTC); - Wid wid = new Wid(randomAlphaOfLength(5), now); + Wid wid = new Wid("watch1", now); WatchExecutionContext ctx = mockExecutionContextBuilder("watch1") .wid(wid) .payload(payload) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java index d7a412a385b..2273b48c419 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookActionTests.java @@ -26,9 +26,6 @@ import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory; import org.elasticsearch.xpack.common.text.TextTemplate; import org.elasticsearch.xpack.common.text.TextTemplateEngine; import org.elasticsearch.xpack.notification.email.Attachment; -import org.elasticsearch.xpack.notification.email.Authentication; -import org.elasticsearch.xpack.notification.email.Email; -import org.elasticsearch.xpack.notification.email.Profile; import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.watcher.actions.Action; import org.elasticsearch.xpack.watcher.actions.Action.Result.Status; @@ -69,18 +66,16 @@ import static org.mockito.Mockito.when; public class WebhookActionTests extends ESTestCase { - static final String TEST_HOST = "test.com"; - static final int TEST_PORT = 8089; + private static final String TEST_HOST = "test.com"; + private static final int TEST_PORT = 8089; + private static final String TEST_BODY_STRING = "ERROR HAPPENED"; + private static final String TEST_PATH_STRING = "/testPath"; private TextTemplateEngine templateEngine; private HttpAuthRegistry authRegistry; private TextTemplate testBody; private TextTemplate testPath; - static final String TEST_BODY_STRING = "ERROR HAPPENED"; - static final String TEST_PATH_STRING = "/testPath"; - - @Before public void init() throws Exception { templateEngine = new MockTextTemplateEngine(); @@ -235,9 +230,10 @@ public class WebhookActionTests extends ESTestCase { ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine); String watchId = "test_url_encode" + randomAlphaOfLength(10); - Watch watch = createWatch(watchId, "account1"); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(UTC), + TriggeredExecutionContext ctx = new TriggeredExecutionContext(watchId, new DateTime(UTC), new ScheduleTriggerEvent(watchId, new DateTime(UTC), new DateTime(UTC)), timeValueSeconds(5)); + Watch watch = createWatch(watchId); + ctx.ensureWatchExists(() -> watch); executable.execute("_id", ctx, new Payload.Simple()); assertThat(proxyServer.requests(), hasSize(1)); @@ -259,27 +255,22 @@ public class WebhookActionTests extends ESTestCase { ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, client, templateEngine); - Watch watch = createWatch(watchId, "account1"); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(UTC), + TriggeredExecutionContext ctx = new TriggeredExecutionContext(watchId, new DateTime(UTC), new ScheduleTriggerEvent(watchId, new DateTime(UTC), new DateTime(UTC)), timeValueSeconds(5)); + Watch watch = createWatch(watchId); + ctx.ensureWatchExists(() -> watch); Action.Result result = executable.execute("_id", ctx, new Payload.Simple()); assertThat(result, Matchers.instanceOf(WebhookAction.Result.Success.class)); } - private Watch createWatch(String watchId, final String account) throws AddressException, IOException { + private Watch createWatch(String watchId) throws AddressException, IOException { return WatcherTestUtils.createTestWatch(watchId, mock(Client.class), ExecuteScenario.Success.client(), - new AbstractWatcherIntegrationTestCase.NoopEmailService() { - - @Override - public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) { - return new EmailSent(account, email); - } - }, + new AbstractWatcherIntegrationTestCase.NoopEmailService(), mock(WatcherSearchTemplateService.class), logger); - }; + } private enum ExecuteScenario { ErrorCode() { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 1234ec6915f..e0164d45974 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; @@ -68,13 +69,13 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -144,7 +145,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -237,7 +239,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); input = mock(ExecutableInput.class); Input.Result inputResult = mock(Input.Result.class); @@ -305,7 +308,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); Condition condition = mock(Condition.class); Condition.Result conditionResult = mock(Condition.Result.class); @@ -369,7 +373,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -429,10 +434,11 @@ public class ExecutionServiceTests extends ESTestCase { GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); mockGetWatchResponse(client, "_id", getResponse); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -509,7 +515,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -584,7 +591,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -635,7 +643,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -697,7 +706,8 @@ public class ExecutionServiceTests extends ESTestCase { Watch watch = mock(Watch.class); when(watch.id()).thenReturn(getTestName()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -724,7 +734,7 @@ public class ExecutionServiceTests extends ESTestCase { when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); - when(watch.actions()).thenReturn(Arrays.asList(actionWrapper)); + when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper)); when(watch.status()).thenReturn(watchStatus); WatchRecord watchRecord = executionService.executeInner(context); @@ -751,7 +761,8 @@ public class ExecutionServiceTests extends ESTestCase { DateTime now = DateTime.now(UTC); Watch watch = mock(Watch.class); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); - WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5)); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); Condition.Result conditionResult = NeverCondition.RESULT_INSTANCE; Condition condition = mock(Condition.class); @@ -772,7 +783,7 @@ public class ExecutionServiceTests extends ESTestCase { when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(condition); when(watch.transform()).thenReturn(watchTransform); - when(watch.actions()).thenReturn(Arrays.asList(actionWrapper)); + when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper)); when(watch.status()).thenReturn(watchStatus); WatchRecord watchRecord = executionService.executeInner(context); @@ -842,7 +853,7 @@ public class ExecutionServiceTests extends ESTestCase { when(watch.input()).thenReturn(input); when(watch.condition()).thenReturn(AlwaysCondition.INSTANCE); - when(watch.actions()).thenReturn(Arrays.asList(actionWrapper)); + when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper)); when(watch.status()).thenReturn(watchStatus); executionService.execute(context); @@ -854,6 +865,8 @@ public class ExecutionServiceTests extends ESTestCase { Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); when(ctx.watch()).thenReturn(watch); + Wid wid = new Wid(watch.id(), DateTime.now(UTC)); + when(ctx.id()).thenReturn(wid); executionService.getCurrentExecutions().put("_id", new ExecutionService.WatchExecution(ctx, Thread.currentThread())); @@ -865,44 +878,51 @@ public class ExecutionServiceTests extends ESTestCase { public void testExecuteWatchNotFound() throws Exception { Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); - WatchExecutionContext ctx = mock(WatchExecutionContext.class); - when(ctx.knownWatch()).thenReturn(true); - when(watch.status()).thenReturn(new WatchStatus(now(), emptyMap())); - when(ctx.watch()).thenReturn(watch); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), + new DateTime(0, UTC), + new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), + TimeValue.timeValueSeconds(5)); - GetResponse getResponse = mock(GetResponse.class); - when(getResponse.isExists()).thenReturn(false); - boolean exceptionThrown = false; - if (randomBoolean()) { - mockGetWatchResponse(client, "_id", getResponse); - } else { - // this emulates any failure while getting the watch, while index not found is an accepted issue - if (randomBoolean()) { - exceptionThrown = true; - ElasticsearchException e = new ElasticsearchException("something went wrong, i.e. index not found"); - mockGetWatchException(client, "_id", e); - WatchExecutionResult result = new WatchExecutionResult(ctx, randomInt(10)); - WatchRecord wr = new WatchRecord.ExceptionWatchRecord(ctx, result, e); - when(ctx.abortFailedExecution(eq(e))).thenReturn(wr); - } else { - mockGetWatchException(client, "_id", new IndexNotFoundException(".watch")); - } - } + GetResponse notFoundResponse = mock(GetResponse.class); + when(notFoundResponse.isExists()).thenReturn(false); + mockGetWatchResponse(client, "_id", notFoundResponse); - WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class); - when(record.state()).thenReturn(ExecutionState.NOT_EXECUTED_WATCH_MISSING); - when(ctx.abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_WATCH_MISSING), any())).thenReturn(record); - when(ctx.executionPhase()).thenReturn(ExecutionPhase.AWAITS_EXECUTION); - - WatchRecord watchRecord = executionService.execute(ctx); - if (exceptionThrown) { - assertThat(watchRecord.state(), is(ExecutionState.FAILED)); - } else { - assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING)); - } + WatchRecord watchRecord = executionService.execute(context); + assertThat(watchRecord, not(nullValue())); + assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING)); } - public void testWatchInactive() { + public void testExecuteWatchIndexNotFoundException() { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), + new DateTime(0, UTC), + new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), + TimeValue.timeValueSeconds(5)); + + mockGetWatchException(client, "_id", new IndexNotFoundException(".watch")); + WatchRecord watchRecord = executionService.execute(context); + assertThat(watchRecord, not(nullValue())); + assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING)); + } + + public void testExecuteWatchParseWatchException() { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("_id"); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), + new DateTime(0, UTC), + new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), + TimeValue.timeValueSeconds(5)); + + IOException e = new IOException("something went wrong, i.e. index not found"); + mockGetWatchException(client, "_id", e); + + WatchRecord watchRecord = executionService.execute(context); + assertThat(watchRecord, not(nullValue())); + assertThat(watchRecord.state(), is(ExecutionState.FAILED)); + } + + public void testWatchInactive() throws Exception { Watch watch = mock(Watch.class); when(watch.id()).thenReturn("_id"); WatchExecutionContext ctx = mock(WatchExecutionContext.class); @@ -911,6 +931,13 @@ public class ExecutionServiceTests extends ESTestCase { when(status.state()).thenReturn(new WatchStatus.State(false, now())); when(watch.status()).thenReturn(status); when(ctx.watch()).thenReturn(watch); + Wid wid = new Wid(watch.id(), DateTime.now(UTC)); + when(ctx.id()).thenReturn(wid); + + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + mockGetWatchResponse(client, "_id", getResponse); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class); when(record.state()).thenReturn(ExecutionState.EXECUTION_NOT_NEEDED); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ChainInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ChainInputTests.java index a79e526c52c..dd3b7f7b027 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ChainInputTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/chain/ChainInputTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.input.chain; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -18,29 +17,19 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.SecuritySettingsSource; import org.elasticsearch.xpack.common.http.HttpRequestTemplate; import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth; -import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; import org.elasticsearch.xpack.watcher.condition.ScriptCondition; -import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.input.InputFactory; import org.elasticsearch.xpack.watcher.input.InputRegistry; import org.elasticsearch.xpack.watcher.input.http.HttpInput; -import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput; import org.elasticsearch.xpack.watcher.input.simple.SimpleInput; import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory; -import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; import org.elasticsearch.xpack.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchStatus; -import org.joda.time.DateTime; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import static java.util.Collections.emptyMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; @@ -55,8 +44,6 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; -import static org.joda.time.DateTimeZone.UTC; public class ChainInputTests extends ESTestCase { @@ -95,7 +82,8 @@ public class ChainInputTests extends ESTestCase { // now execute ExecutableChainInput executableChainInput = chainInputFactory.createExecutable(chainInput); - ChainInput.Result result = executableChainInput.execute(createContext(), new Payload.Simple()); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); + ChainInput.Result result = executableChainInput.execute(ctx, new Payload.Simple()); Payload payload = result.payload(); assertThat(payload.data(), hasKey("first")); assertThat(payload.data(), hasKey("second")); @@ -230,23 +218,4 @@ public class ChainInputTests extends ESTestCase { expectThrows(ElasticsearchParseException.class, () -> chainInputFactory.parseInput("test", parser)); assertThat(e.getMessage(), containsString("Expected starting JSON object after [first] in watch [test]")); } - - private WatchExecutionContext createContext() { - Watch watch = new Watch("test-watch", - new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), - new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), - AlwaysCondition.INSTANCE, - null, - null, - new ArrayList<>(), - null, - new WatchStatus(new DateTime(0, UTC), emptyMap())); - WatchExecutionContext ctx = new TriggeredExecutionContext(watch, - new DateTime(0, UTC), - new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), - TimeValue.timeValueSeconds(5)); - - return ctx; - } - } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/http/HttpInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/http/HttpInputTests.java index 41bf9762561..5fb08c88e5c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/http/HttpInputTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/input/http/HttpInputTests.java @@ -9,7 +9,6 @@ import io.netty.handler.codec.http.HttpHeaders; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,20 +29,11 @@ import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth; import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory; import org.elasticsearch.xpack.common.text.TextTemplate; import org.elasticsearch.xpack.common.text.TextTemplateEngine; -import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; -import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.input.InputBuilders; -import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput; -import org.elasticsearch.xpack.watcher.input.simple.SimpleInput; import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath; -import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; +import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; import org.elasticsearch.xpack.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchStatus; -import org.joda.time.DateTime; import org.junit.Before; import java.io.IOException; @@ -55,7 +45,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; @@ -67,8 +56,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; -import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -131,7 +118,7 @@ public class HttpInputTests extends ESTestCase { when(httpClient.execute(any(HttpRequest.class))).thenReturn(response); when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body"); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.type(), equalTo(HttpInput.TYPE)); assertThat(result.payload().data(), hasEntry("key", "value")); @@ -150,7 +137,7 @@ public class HttpInputTests extends ESTestCase { when(httpClient.execute(any(HttpRequest.class))).thenReturn(response); when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body"); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.type(), equalTo(HttpInput.TYPE)); assertThat(result.payload().data().get("_value").toString(), equalTo(notJson)); @@ -264,7 +251,7 @@ public class HttpInputTests extends ESTestCase { when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body"); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.type(), equalTo(HttpInput.TYPE)); @@ -290,7 +277,8 @@ public class HttpInputTests extends ESTestCase { HttpResponse httpResponse = new HttpResponse(200, body, headers); when(httpClient.execute(any())).thenReturn(httpResponse); - HttpInput.Result result = input.execute(createWatchExecutionContext(), Payload.EMPTY); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); + HttpInput.Result result = input.execute(ctx, Payload.EMPTY); assertThat(result.payload().data(), hasEntry("_value", body)); assertThat(result.payload().data(), not(hasKey("foo"))); } @@ -303,7 +291,7 @@ public class HttpInputTests extends ESTestCase { HttpInput httpInput = InputBuilders.httpInput(request.build()).build(); ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.statusCode, is(200)); assertThat(result.payload().data(), hasKey("_status_code")); @@ -320,7 +308,7 @@ public class HttpInputTests extends ESTestCase { HttpInput httpInput = InputBuilders.httpInput(request.build()).build(); ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.statusCode, is(200)); assertThat(result.payload().data(), not(hasKey("_value"))); @@ -339,7 +327,7 @@ public class HttpInputTests extends ESTestCase { HttpInput httpInput = InputBuilders.httpInput(request.build()).build(); ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine); - WatchExecutionContext ctx = createWatchExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); HttpInput.Result result = input.execute(ctx, new Payload.Simple()); assertThat(result.getException(), is(notNullValue())); @@ -358,20 +346,4 @@ public class HttpInputTests extends ESTestCase { } } } - - private WatchExecutionContext createWatchExecutionContext() { - Watch watch = new Watch("test-watch", - new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), - new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), - AlwaysCondition.INSTANCE, - null, - null, - new ArrayList<>(), - null, - new WatchStatus(new DateTime(0, UTC), emptyMap())); - return new TriggeredExecutionContext(watch, - new DateTime(0, UTC), - new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), - TimeValue.timeValueSeconds(5)); - } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index 39663e58bc0..94ed64f6043 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.watcher.actions.email.ExecutableEmailAction; import org.elasticsearch.xpack.watcher.actions.webhook.ExecutableWebhookAction; import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction; import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; +import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.execution.Wid; import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput; @@ -44,7 +45,9 @@ import org.elasticsearch.xpack.watcher.transform.search.ExecutableSearchTransfor import org.elasticsearch.xpack.watcher.transform.search.SearchTransform; import org.elasticsearch.xpack.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.watch.Payload; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchStatus; @@ -58,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static java.util.Collections.emptyMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; @@ -119,6 +123,25 @@ public final class WatcherTestUtils { .buildMock(); } + public static WatchExecutionContext createWatchExecutionContext(Logger logger) throws Exception { + Watch watch = new Watch("test-watch", + new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), + new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), + AlwaysCondition.INSTANCE, + null, + null, + new ArrayList<>(), + null, + new WatchStatus(new DateTime(0, UTC), emptyMap())); + TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), + new DateTime(0, UTC), + new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), + TimeValue.timeValueSeconds(5)); + context.ensureWatchExists(() -> watch); + return context; + } + + public static Watch createTestWatch(String watchName, Client client, HttpClient httpClient, EmailService emailService, WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException { List actions = new ArrayList<>(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java index d69bc42974e..2ea2f0e8732 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SearchInputTests.java @@ -29,35 +29,22 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.watcher.Watcher; -import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; -import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.input.Input; import org.elasticsearch.xpack.watcher.input.search.ExecutableSearchInput; import org.elasticsearch.xpack.watcher.input.search.SearchInput; import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory; -import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput; -import org.elasticsearch.xpack.watcher.input.simple.SimpleInput; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService; import org.elasticsearch.xpack.watcher.test.WatcherTestUtils; -import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger; -import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.xpack.watcher.watch.Payload; -import org.elasticsearch.xpack.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.watch.WatchStatus; -import org.joda.time.DateTime; import org.junit.Before; import org.mockito.ArgumentCaptor; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static java.util.Collections.emptyMap; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; @@ -70,7 +57,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; public class SearchInputTests extends ESTestCase { @@ -104,7 +90,7 @@ public class SearchInputTests extends ESTestCase { WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder); ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger, client, watcherSearchTemplateService(), TimeValue.timeValueMinutes(1)); - WatchExecutionContext ctx = createExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple()); assertThat(result.status(), is(Input.Result.Status.SUCCESS)); @@ -112,23 +98,6 @@ public class SearchInputTests extends ESTestCase { assertThat(searchRequest.searchType(), is(request.getSearchType())); assertThat(searchRequest.indicesOptions(), is(request.getIndicesOptions())); assertThat(searchRequest.indices(), is(arrayContainingInAnyOrder(request.getIndices()))); - - } - - private TriggeredExecutionContext createExecutionContext() { - return new TriggeredExecutionContext( - new Watch("test-watch", - new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))), - new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger), - AlwaysCondition.INSTANCE, - null, - null, - new ArrayList<>(), - null, - new WatchStatus(new DateTime(0, UTC), emptyMap())), - new DateTime(0, UTC), - new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)), - timeValueSeconds(5)); } public void testDifferentSearchType() throws Exception { @@ -145,7 +114,7 @@ public class SearchInputTests extends ESTestCase { ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger, client, watcherSearchTemplateService(), TimeValue.timeValueMinutes(1)); - WatchExecutionContext ctx = createExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple()); assertThat(result.status(), is(Input.Result.Status.SUCCESS)); @@ -194,7 +163,7 @@ public class SearchInputTests extends ESTestCase { assertThat(input.getRequest().getSearchSource(), is(BytesArray.EMPTY)); ExecutableSearchInput executableSearchInput = factory.createExecutable(input); - WatchExecutionContext ctx = createExecutionContext(); + WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger); SearchInput.Result result = executableSearchInput.execute(ctx, Payload.Simple.EMPTY); assertThat(result.status(), is(Input.Result.Status.SUCCESS)); // no body in the search request