Watcher: Load for watch for execution as late as possible (elastic/x-pack-elasticsearch#2151)

When a watch is executed currently, it gets passed an in-memory
watch object, that was loaded, before the execution started.

This means there is a window of time, where an old watch could still
be executing, then a watch gets loaded for execution, then the old watch
execution finishes and updates the watch status and thus reindexes the
watch.

Now the watch, that got loaded for execution, executes and tries to
store its watch status, but fails, because the version of the watch
has changed.

This commit changes the point in time where the watch is loaded. Now
this only happens, while a watch is in its protected execution block,
and thus we can be sure, that there is no other execution of the watch
happening.

This will primarily impact watches, that execute often, but their
runtime is longer than the configured interval between executions.

Side fix: Removed some duplicate testing method and moved into 
WatcherTestUtils, fixed a tests with a ton of if's with random booleans
into separate tests.

relates elastic/x-pack-elasticsearch#395

Original commit: elastic/x-pack-elasticsearch@bf393023d7
This commit is contained in:
Alexander Reelsen 2017-08-21 10:43:41 +02:00 committed by GitHub
parent 44c3d5b3d9
commit 1b6d9d430c
15 changed files with 220 additions and 250 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
@ -31,7 +32,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.watcher.Watcher;
@ -226,20 +226,18 @@ public class ExecutionService extends AbstractComponent {
* @param events The iterable list of trigger events to create the two lists from
* @return Two linked lists that contain the triggered watches and contexts
*/
private Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> createTriggeredWatchesAndContext(Iterable<TriggerEvent> events)
throws IOException {
private Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> createTriggeredWatchesAndContext(Iterable<TriggerEvent> events) {
final LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = new DateTime(clock.millis(), UTC);
for (TriggerEvent event : events) {
GetResponse response = client.prepareGet(Watch.INDEX, Watch.DOC_TYPE, event.jobName()).get(TimeValue.timeValueSeconds(10));
GetResponse response = getWatch(event.jobName());
if (response.isExists() == false) {
logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName());
continue;
}
Watch watch = parser.parseWithSecrets(response.getId(), true, response.getSourceAsBytesRef(), now, XContentType.JSON);
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event, defaultThrottlePeriod);
TriggeredExecutionContext ctx = new TriggeredExecutionContext(event.jobName(), now, event, defaultThrottlePeriod);
contexts.add(ctx);
triggeredWatches.add(new TriggeredWatch(ctx.id(), event));
}
@ -268,32 +266,39 @@ public class ExecutionService extends AbstractComponent {
public WatchRecord execute(WatchExecutionContext ctx) {
ctx.setNodeId(clusterService.localNode().getId());
WatchRecord record = null;
final String watchId = ctx.id().watchId();
try {
boolean executionAlreadyExists = currentExecutions.put(ctx.watch().id(), new WatchExecution(ctx, Thread.currentThread()));
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
if (executionAlreadyExists) {
logger.trace("not executing watch [{}] because it is already queued", ctx.watch().id());
logger.trace("not executing watch [{}] because it is already queued", watchId);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
} else if (ctx.watch().status().state().isActive() == false) {
logger.debug("not executing watch [{}] because it is marked as inactive", ctx.watch().id());
record = ctx.abortBeforeExecution(ExecutionState.EXECUTION_NOT_NEEDED, "Watch is not active");
} else {
boolean watchExists = false;
try {
GetResponse response = getWatch(ctx.watch().id());
watchExists = response.isExists();
} catch (IndexNotFoundException e) {}
if (ctx.knownWatch() && watchExists == false) {
// fail fast if we are trying to execute a deleted watch
String message = "unable to find watch for record [" + ctx.id() + "], perhaps it has been deleted, ignoring...";
ctx.ensureWatchExists(() -> {
GetResponse resp = getWatch(watchId);
if (resp.isExists() == false) {
throw new ResourceNotFoundException("watch [{}] does not exist", watchId);
}
return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON);
});
} catch (ResourceNotFoundException e) {
String message = "unable to find watch for record [" + ctx.id() + "]";
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_WATCH_MISSING, message);
} catch (Exception e) {
record = ctx.abortFailedExecution(e);
}
} else {
logger.debug("executing watch [{}]", ctx.id().watchId());
if (ctx.watch() != null) {
if (ctx.watch().status().state().isActive()) {
logger.debug("executing watch [{}]", watchId);
record = executeInner(ctx);
if (ctx.recordExecution()) {
updateWatchStatus(ctx.watch());
record = executeInner(ctx);
if (ctx.recordExecution()) {
updateWatchStatus(ctx.watch());
}
} else {
logger.debug("not executing watch [{}] because it is marked as inactive", watchId);
record = ctx.abortBeforeExecution(ExecutionState.EXECUTION_NOT_NEEDED, "Watch is not active");
}
}
}
@ -320,8 +325,8 @@ public class ExecutionService extends AbstractComponent {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to delete triggered watch [{}]", ctx.id()), e);
}
}
currentExecutions.remove(ctx.watch().id());
logger.trace("finished [{}]/[{}]", ctx.watch().id(), ctx.id());
currentExecutions.remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
}
return record;
}
@ -373,9 +378,9 @@ public class ExecutionService extends AbstractComponent {
private void logWatchRecord(WatchExecutionContext ctx, Exception e) {
// failed watches stack traces are only logged in debug, otherwise they should be checked out in the history
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to execute watch [{}]", ctx.watch().id()), e);
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to execute watch [{}]", ctx.id().watchId()), e);
} else {
logger.warn("failed to execute watch [{}]", ctx.watch().id());
logger.warn("failed to execute watch [{}]", ctx.id().watchId());
}
}
@ -484,15 +489,10 @@ public class ExecutionService extends AbstractComponent {
triggeredWatchStore.delete(triggeredWatch.id());
} else {
DateTime now = new DateTime(clock.millis(), UTC);
try {
Watch watch = parser.parseWithSecrets(response.getId(), true, response.getSourceAsBytesRef(), now, XContentType.JSON);
TriggeredExecutionContext ctx =
new StartupExecutionContext(watch, now, triggeredWatch.triggerEvent(), defaultThrottlePeriod);
executeAsync(ctx, triggeredWatch);
counter++;
} catch (IOException e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("Error parsing triggered watch"), e);
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(triggeredWatch.id().watchId(), now,
triggeredWatch.triggerEvent(), defaultThrottlePeriod, true);
executeAsync(ctx, triggeredWatch);
counter++;
}
}
logger.debug("triggered execution of [{}] watches", counter);
@ -532,18 +532,6 @@ public class ExecutionService extends AbstractComponent {
currentExecutions = new CurrentExecutions();
}
private static final class StartupExecutionContext extends TriggeredExecutionContext {
StartupExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
super(watch, executionTime, triggerEvent, defaultThrottlePeriod);
}
@Override
public boolean overrideRecordOnConflict() {
return true;
}
}
// the watch execution task takes another runnable as parameter
// the best solution would be to move the whole execute() method, which is handed over as ctor parameter
// over into this class, this is the quicker way though

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
@ -25,16 +26,18 @@ public class ManualExecutionContext extends WatchExecutionContext {
private final Map<String, ActionExecutionMode> actionModes;
private final boolean recordExecution;
private final boolean knownWatch;
private final Watch watch;
ManualExecutionContext(Watch watch, boolean knownWatch, DateTime executionTime, ManualTriggerEvent triggerEvent,
TimeValue defaultThrottlePeriod, Input.Result inputResult, Condition.Result conditionResult,
Map<String, ActionExecutionMode> actionModes, boolean recordExecution) {
super(watch, executionTime, triggerEvent, defaultThrottlePeriod);
super(watch.id(), executionTime, triggerEvent, defaultThrottlePeriod);
this.actionModes = actionModes;
this.recordExecution = recordExecution;
this.knownWatch = knownWatch;
this.watch = watch;
if (inputResult != null) {
onInputResult(inputResult);
@ -60,6 +63,12 @@ public class ManualExecutionContext extends WatchExecutionContext {
}
}
// a noop operation, as the watch is already loaded via ctor
@Override
public void ensureWatchExists(CheckedSupplier<Watch, Exception> supplier) throws Exception {
super.ensureWatchExists(() -> watch);
}
@Override
public boolean knownWatch() {
return knownWatch;
@ -88,6 +97,11 @@ public class ManualExecutionContext extends WatchExecutionContext {
return recordExecution;
}
@Override
public Watch watch() {
return watch;
}
public static Builder builder(Watch watch, boolean knownWatch, ManualTriggerEvent event, TimeValue defaultThrottlePeriod) {
return new Builder(watch, knownWatch, event, defaultThrottlePeriod);
}

View File

@ -27,7 +27,7 @@ public class QueuedWatch implements Streamable, ToXContentObject {
}
public QueuedWatch(WatchExecutionContext ctx) {
this.watchId = ctx.watch().id();
this.watchId = ctx.id().watchId();
this.watchRecordId = ctx.id().value();
this.triggeredTime = ctx.triggerEvent().triggeredTime();
this.executionTime = ctx.executionTime();

View File

@ -12,8 +12,21 @@ import org.joda.time.DateTime;
public class TriggeredExecutionContext extends WatchExecutionContext {
public TriggeredExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
super(watch, executionTime, triggerEvent, defaultThrottlePeriod);
private final boolean overrideOnConflict;
public TriggeredExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this(watchId, executionTime, triggerEvent, defaultThrottlePeriod, false);
}
TriggeredExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod,
boolean overrideOnConflict) {
super(watchId, executionTime, triggerEvent, defaultThrottlePeriod);
this.overrideOnConflict = overrideOnConflict;
}
@Override
public boolean overrideRecordOnConflict() {
return overrideOnConflict;
}
@Override

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.xpack.watcher.actions.ActionWrapper;
@ -25,13 +26,13 @@ import java.util.concurrent.ConcurrentMap;
public abstract class WatchExecutionContext {
private final Wid id;
private final Watch watch;
private final DateTime executionTime;
private final TriggerEvent triggerEvent;
private final TimeValue defaultThrottlePeriod;
private ExecutionPhase phase = ExecutionPhase.AWAITS_EXECUTION;
private long startTimestamp;
private Watch watch;
private Payload payload;
private Map<String, Object> vars = new HashMap<>();
@ -42,9 +43,8 @@ public abstract class WatchExecutionContext {
private ConcurrentMap<String, ActionWrapper.Result> actionsResults = ConcurrentCollections.newConcurrentMap();
private String nodeId;
public WatchExecutionContext(Watch watch, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watch.id(), executionTime);
this.watch = watch;
public WatchExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watchId, executionTime);
this.executionTime = executionTime;
this.triggerEvent = triggerEvent;
this.defaultThrottlePeriod = defaultThrottlePeriod;
@ -72,14 +72,20 @@ public abstract class WatchExecutionContext {
*/
public abstract boolean recordExecution();
public Wid id() {
return id;
}
public Watch watch() {
return watch;
}
public void ensureWatchExists(CheckedSupplier<Watch, Exception> supplier) throws Exception {
if (watch == null) {
watch = supplier.get();
}
}
public Wid id() {
return id;
}
public DateTime executionTime() {
return executionTime;
}
@ -156,7 +162,7 @@ public abstract class WatchExecutionContext {
public void onConditionResult(Condition.Result conditionResult) {
assert !phase.sealed();
this.conditionResult = conditionResult;
watch.status().onCheck(conditionResult.met(), executionTime);
watch().status().onCheck(conditionResult.met(), executionTime);
}
public Condition.Result conditionResult() {
@ -188,7 +194,7 @@ public abstract class WatchExecutionContext {
public void onActionResult(ActionWrapper.Result result) {
assert !phase.sealed();
actionsResults.put(result.id(), result);
watch.status().onActionResult(result.id(), executionTime, result.action());
watch().status().onActionResult(result.id(), executionTime, result.action());
}
public Map<String, ActionWrapper.Result> actionsResults() {

View File

@ -31,7 +31,7 @@ public class WatchExecutionSnapshot implements Streamable, ToXContentObject {
}
WatchExecutionSnapshot(WatchExecutionContext context, StackTraceElement[] executionStackTrace) {
watchId = context.watch().id();
watchId = context.id().watchId();
watchRecordId = context.id().value();
triggeredTime = context.triggerEvent().triggeredTime();
executionTime = context.executionTime();

View File

@ -82,8 +82,12 @@ public abstract class WatchRecord implements ToXContentObject {
}
private WatchRecord(WatchExecutionContext context, ExecutionState state) {
this(context.id(), context.triggerEvent(), state, context.vars(), context.watch().input(), context.watch().condition(),
context.watch().metadata(), context.watch(), null, context.getNodeId());
this(context.id(), context.triggerEvent(), state, context.vars(),
context.watch() != null ? context.watch().input() : null,
context.watch() != null ? context.watch().condition() : null,
context.watch() != null ? context.watch().metadata() : null,
context.watch(),
null, context.getNodeId());
}
private WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {

View File

@ -25,7 +25,7 @@ public final class Variables {
public static Map<String, Object> createCtxModel(WatchExecutionContext ctx, Payload payload) {
Map<String, Object> ctxModel = new HashMap<>();
ctxModel.put(ID, ctx.id().value());
ctxModel.put(WATCH_ID, ctx.watch().id());
ctxModel.put(WATCH_ID, ctx.id().watchId());
ctxModel.put(EXECUTION_TIME, ctx.executionTime());
ctxModel.put(TRIGGER, ctx.triggerEvent().data());
if (payload != null) {

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.watcher.actions.email;
import io.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
@ -94,12 +93,7 @@ public class EmailActionTests extends ESTestCase {
public void testExecute() throws Exception {
final String account = "account1";
EmailService service = new AbstractWatcherIntegrationTestCase.NoopEmailService() {
@Override
public EmailService.EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailService.EmailSent(account, email);
}
};
EmailService service = new AbstractWatcherIntegrationTestCase.NoopEmailService();
TextTemplateEngine engine = mock(TextTemplateEngine.class);
HtmlSanitizer htmlSanitizer = mock(HtmlSanitizer.class);
@ -138,7 +132,7 @@ public class EmailActionTests extends ESTestCase {
DateTime now = DateTime.now(DateTimeZone.UTC);
Wid wid = new Wid(randomAlphaOfLength(5), now);
Wid wid = new Wid("watch1", now);
WatchExecutionContext ctx = mockExecutionContextBuilder("watch1")
.wid(wid)
.payload(payload)

View File

@ -26,9 +26,6 @@ import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.notification.email.Attachment;
import org.elasticsearch.xpack.notification.email.Authentication;
import org.elasticsearch.xpack.notification.email.Email;
import org.elasticsearch.xpack.notification.email.Profile;
import org.elasticsearch.xpack.ssl.SSLService;
import org.elasticsearch.xpack.watcher.actions.Action;
import org.elasticsearch.xpack.watcher.actions.Action.Result.Status;
@ -69,18 +66,16 @@ import static org.mockito.Mockito.when;
public class WebhookActionTests extends ESTestCase {
static final String TEST_HOST = "test.com";
static final int TEST_PORT = 8089;
private static final String TEST_HOST = "test.com";
private static final int TEST_PORT = 8089;
private static final String TEST_BODY_STRING = "ERROR HAPPENED";
private static final String TEST_PATH_STRING = "/testPath";
private TextTemplateEngine templateEngine;
private HttpAuthRegistry authRegistry;
private TextTemplate testBody;
private TextTemplate testPath;
static final String TEST_BODY_STRING = "ERROR HAPPENED";
static final String TEST_PATH_STRING = "/testPath";
@Before
public void init() throws Exception {
templateEngine = new MockTextTemplateEngine();
@ -235,9 +230,10 @@ public class WebhookActionTests extends ESTestCase {
ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, httpClient, templateEngine);
String watchId = "test_url_encode" + randomAlphaOfLength(10);
Watch watch = createWatch(watchId, "account1");
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(UTC),
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watchId, new DateTime(UTC),
new ScheduleTriggerEvent(watchId, new DateTime(UTC), new DateTime(UTC)), timeValueSeconds(5));
Watch watch = createWatch(watchId);
ctx.ensureWatchExists(() -> watch);
executable.execute("_id", ctx, new Payload.Simple());
assertThat(proxyServer.requests(), hasSize(1));
@ -259,27 +255,22 @@ public class WebhookActionTests extends ESTestCase {
ExecutableWebhookAction executable = new ExecutableWebhookAction(action, logger, client, templateEngine);
Watch watch = createWatch(watchId, "account1");
WatchExecutionContext ctx = new TriggeredExecutionContext(watch, new DateTime(UTC),
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watchId, new DateTime(UTC),
new ScheduleTriggerEvent(watchId, new DateTime(UTC), new DateTime(UTC)), timeValueSeconds(5));
Watch watch = createWatch(watchId);
ctx.ensureWatchExists(() -> watch);
Action.Result result = executable.execute("_id", ctx, new Payload.Simple());
assertThat(result, Matchers.instanceOf(WebhookAction.Result.Success.class));
}
private Watch createWatch(String watchId, final String account) throws AddressException, IOException {
private Watch createWatch(String watchId) throws AddressException, IOException {
return WatcherTestUtils.createTestWatch(watchId,
mock(Client.class),
ExecuteScenario.Success.client(),
new AbstractWatcherIntegrationTestCase.NoopEmailService() {
@Override
public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailSent(account, email);
}
},
new AbstractWatcherIntegrationTestCase.NoopEmailService(),
mock(WatcherSearchTemplateService.class),
logger);
};
}
private enum ExecuteScenario {
ErrorCode() {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -68,13 +69,13 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -144,7 +145,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -237,7 +239,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
input = mock(ExecutableInput.class);
Input.Result inputResult = mock(Input.Result.class);
@ -305,7 +308,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
Condition condition = mock(Condition.class);
Condition.Result conditionResult = mock(Condition.Result.class);
@ -369,7 +373,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -429,10 +434,11 @@ public class ExecutionServiceTests extends ESTestCase {
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
mockGetWatchResponse(client, "_id", getResponse);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -509,7 +515,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = now(UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -584,7 +591,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = now(UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -635,7 +643,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = now(UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -697,7 +706,8 @@ public class ExecutionServiceTests extends ESTestCase {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn(getTestName());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
Condition.Result conditionResult = AlwaysCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -724,7 +734,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.actions()).thenReturn(Arrays.asList(actionWrapper));
when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper));
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.executeInner(context);
@ -751,7 +761,8 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = DateTime.now(UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
Condition.Result conditionResult = NeverCondition.RESULT_INSTANCE;
Condition condition = mock(Condition.class);
@ -772,7 +783,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.transform()).thenReturn(watchTransform);
when(watch.actions()).thenReturn(Arrays.asList(actionWrapper));
when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper));
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.executeInner(context);
@ -842,7 +853,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(AlwaysCondition.INSTANCE);
when(watch.actions()).thenReturn(Arrays.asList(actionWrapper));
when(watch.actions()).thenReturn(Collections.singletonList(actionWrapper));
when(watch.status()).thenReturn(watchStatus);
executionService.execute(context);
@ -854,6 +865,8 @@ public class ExecutionServiceTests extends ESTestCase {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(ctx.watch()).thenReturn(watch);
Wid wid = new Wid(watch.id(), DateTime.now(UTC));
when(ctx.id()).thenReturn(wid);
executionService.getCurrentExecutions().put("_id", new ExecutionService.WatchExecution(ctx, Thread.currentThread()));
@ -865,44 +878,51 @@ public class ExecutionServiceTests extends ESTestCase {
public void testExecuteWatchNotFound() throws Exception {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(ctx.knownWatch()).thenReturn(true);
when(watch.status()).thenReturn(new WatchStatus(now(), emptyMap()));
when(ctx.watch()).thenReturn(watch);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(),
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(false);
boolean exceptionThrown = false;
if (randomBoolean()) {
mockGetWatchResponse(client, "_id", getResponse);
} else {
// this emulates any failure while getting the watch, while index not found is an accepted issue
if (randomBoolean()) {
exceptionThrown = true;
ElasticsearchException e = new ElasticsearchException("something went wrong, i.e. index not found");
mockGetWatchException(client, "_id", e);
WatchExecutionResult result = new WatchExecutionResult(ctx, randomInt(10));
WatchRecord wr = new WatchRecord.ExceptionWatchRecord(ctx, result, e);
when(ctx.abortFailedExecution(eq(e))).thenReturn(wr);
} else {
mockGetWatchException(client, "_id", new IndexNotFoundException(".watch"));
}
}
GetResponse notFoundResponse = mock(GetResponse.class);
when(notFoundResponse.isExists()).thenReturn(false);
mockGetWatchResponse(client, "_id", notFoundResponse);
WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class);
when(record.state()).thenReturn(ExecutionState.NOT_EXECUTED_WATCH_MISSING);
when(ctx.abortBeforeExecution(eq(ExecutionState.NOT_EXECUTED_WATCH_MISSING), any())).thenReturn(record);
when(ctx.executionPhase()).thenReturn(ExecutionPhase.AWAITS_EXECUTION);
WatchRecord watchRecord = executionService.execute(ctx);
if (exceptionThrown) {
assertThat(watchRecord.state(), is(ExecutionState.FAILED));
} else {
assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING));
}
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord, not(nullValue()));
assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING));
}
public void testWatchInactive() {
public void testExecuteWatchIndexNotFoundException() {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(),
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
mockGetWatchException(client, "_id", new IndexNotFoundException(".watch"));
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord, not(nullValue()));
assertThat(watchRecord.state(), is(ExecutionState.NOT_EXECUTED_WATCH_MISSING));
}
public void testExecuteWatchParseWatchException() {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(),
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
IOException e = new IOException("something went wrong, i.e. index not found");
mockGetWatchException(client, "_id", e);
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord, not(nullValue()));
assertThat(watchRecord.state(), is(ExecutionState.FAILED));
}
public void testWatchInactive() throws Exception {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
@ -911,6 +931,13 @@ public class ExecutionServiceTests extends ESTestCase {
when(status.state()).thenReturn(new WatchStatus.State(false, now()));
when(watch.status()).thenReturn(status);
when(ctx.watch()).thenReturn(watch);
Wid wid = new Wid(watch.id(), DateTime.now(UTC));
when(ctx.id()).thenReturn(wid);
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
mockGetWatchResponse(client, "_id", getResponse);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class);
when(record.state()).thenReturn(ExecutionState.EXECUTION_NOT_NEEDED);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.watcher.input.chain;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -18,29 +17,19 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.condition.ScriptCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.InputFactory;
import org.elasticsearch.xpack.watcher.input.InputRegistry;
import org.elasticsearch.xpack.watcher.input.http.HttpInput;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInputFactory;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
@ -55,8 +44,6 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
public class ChainInputTests extends ESTestCase {
@ -95,7 +82,8 @@ public class ChainInputTests extends ESTestCase {
// now execute
ExecutableChainInput executableChainInput = chainInputFactory.createExecutable(chainInput);
ChainInput.Result result = executableChainInput.execute(createContext(), new Payload.Simple());
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
ChainInput.Result result = executableChainInput.execute(ctx, new Payload.Simple());
Payload payload = result.payload();
assertThat(payload.data(), hasKey("first"));
assertThat(payload.data(), hasKey("second"));
@ -230,23 +218,4 @@ public class ChainInputTests extends ESTestCase {
expectThrows(ElasticsearchParseException.class, () -> chainInputFactory.parseInput("test", parser));
assertThat(e.getMessage(), containsString("Expected starting JSON object after [first] in watch [test]"));
}
private WatchExecutionContext createContext() {
Watch watch = new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
AlwaysCondition.INSTANCE,
null,
null,
new ArrayList<>(),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap()));
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
return ctx;
}
}

View File

@ -9,7 +9,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -30,20 +29,11 @@ import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuthFactory;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.InputBuilders;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.junit.Before;
import java.io.IOException;
@ -55,7 +45,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -67,8 +56,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -131,7 +118,7 @@ public class HttpInputTests extends ESTestCase {
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body");
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), hasEntry("key", "value"));
@ -150,7 +137,7 @@ public class HttpInputTests extends ESTestCase {
when(httpClient.execute(any(HttpRequest.class))).thenReturn(response);
when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body");
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data().get("_value").toString(), equalTo(notJson));
@ -264,7 +251,7 @@ public class HttpInputTests extends ESTestCase {
when(templateEngine.render(eq(new TextTemplate("_body")), any(Map.class))).thenReturn("_body");
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.type(), equalTo(HttpInput.TYPE));
@ -290,7 +277,8 @@ public class HttpInputTests extends ESTestCase {
HttpResponse httpResponse = new HttpResponse(200, body, headers);
when(httpClient.execute(any())).thenReturn(httpResponse);
HttpInput.Result result = input.execute(createWatchExecutionContext(), Payload.EMPTY);
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, Payload.EMPTY);
assertThat(result.payload().data(), hasEntry("_value", body));
assertThat(result.payload().data(), not(hasKey("foo")));
}
@ -303,7 +291,7 @@ public class HttpInputTests extends ESTestCase {
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.statusCode, is(200));
assertThat(result.payload().data(), hasKey("_status_code"));
@ -320,7 +308,7 @@ public class HttpInputTests extends ESTestCase {
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.statusCode, is(200));
assertThat(result.payload().data(), not(hasKey("_value")));
@ -339,7 +327,7 @@ public class HttpInputTests extends ESTestCase {
HttpInput httpInput = InputBuilders.httpInput(request.build()).build();
ExecutableHttpInput input = new ExecutableHttpInput(httpInput, logger, httpClient, templateEngine);
WatchExecutionContext ctx = createWatchExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
HttpInput.Result result = input.execute(ctx, new Payload.Simple());
assertThat(result.getException(), is(notNullValue()));
@ -358,20 +346,4 @@ public class HttpInputTests extends ESTestCase {
}
}
}
private WatchExecutionContext createWatchExecutionContext() {
Watch watch = new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
AlwaysCondition.INSTANCE,
null,
null,
new ArrayList<>(),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap()));
return new TriggeredExecutionContext(watch,
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.watcher.actions.email.ExecutableEmailAction;
import org.elasticsearch.xpack.watcher.actions.webhook.ExecutableWebhookAction;
import org.elasticsearch.xpack.watcher.actions.webhook.WebhookAction;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.execution.Wid;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
@ -44,7 +45,9 @@ import org.elasticsearch.xpack.watcher.transform.search.ExecutableSearchTransfor
import org.elasticsearch.xpack.watcher.transform.search.SearchTransform;
import org.elasticsearch.xpack.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
@ -58,6 +61,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
@ -119,6 +123,25 @@ public final class WatcherTestUtils {
.buildMock();
}
public static WatchExecutionContext createWatchExecutionContext(Logger logger) throws Exception {
Watch watch = new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
AlwaysCondition.INSTANCE,
null,
null,
new ArrayList<>(),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap()));
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(),
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
TimeValue.timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
return context;
}
public static Watch createTestWatch(String watchName, Client client, HttpClient httpClient, EmailService emailService,
WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException {
List<ActionWrapper> actions = new ArrayList<>();

View File

@ -29,35 +29,22 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.input.Input;
import org.elasticsearch.xpack.watcher.input.search.ExecutableSearchInput;
import org.elasticsearch.xpack.watcher.input.search.SearchInput;
import org.elasticsearch.xpack.watcher.input.search.SearchInputFactory;
import org.elasticsearch.xpack.watcher.input.simple.ExecutableSimpleInput;
import org.elasticsearch.xpack.watcher.input.simple.SimpleInput;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
@ -70,7 +57,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Mockito.mock;
public class SearchInputTests extends ESTestCase {
@ -104,7 +90,7 @@ public class SearchInputTests extends ESTestCase {
WatcherSearchTemplateRequest request = WatcherTestUtils.templateRequest(searchSourceBuilder);
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
client, watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = createExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
@ -112,23 +98,6 @@ public class SearchInputTests extends ESTestCase {
assertThat(searchRequest.searchType(), is(request.getSearchType()));
assertThat(searchRequest.indicesOptions(), is(request.getIndicesOptions()));
assertThat(searchRequest.indices(), is(arrayContainingInAnyOrder(request.getIndices())));
}
private TriggeredExecutionContext createExecutionContext() {
return new TriggeredExecutionContext(
new Watch("test-watch",
new ScheduleTrigger(new IntervalSchedule(new IntervalSchedule.Interval(1, IntervalSchedule.Interval.Unit.MINUTES))),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple()), logger),
AlwaysCondition.INSTANCE,
null,
null,
new ArrayList<>(),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap())),
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)),
timeValueSeconds(5));
}
public void testDifferentSearchType() throws Exception {
@ -145,7 +114,7 @@ public class SearchInputTests extends ESTestCase {
ExecutableSearchInput searchInput = new ExecutableSearchInput(new SearchInput(request, null, null, null), logger,
client, watcherSearchTemplateService(), TimeValue.timeValueMinutes(1));
WatchExecutionContext ctx = createExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
SearchInput.Result result = searchInput.execute(ctx, new Payload.Simple());
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
@ -194,7 +163,7 @@ public class SearchInputTests extends ESTestCase {
assertThat(input.getRequest().getSearchSource(), is(BytesArray.EMPTY));
ExecutableSearchInput executableSearchInput = factory.createExecutable(input);
WatchExecutionContext ctx = createExecutionContext();
WatchExecutionContext ctx = WatcherTestUtils.createWatchExecutionContext(logger);
SearchInput.Result result = executableSearchInput.execute(ctx, Payload.Simple.EMPTY);
assertThat(result.status(), is(Input.Result.Status.SUCCESS));
// no body in the search request