Watcher: Refactoring of TriggeredWatchStore (elastic/x-pack-elasticsearch#1848)

* The TriggeredWatchStore now only has one method to put triggered
  watches
* All code is async in TriggeredWatchStore, locking has been removed
* The dedicated WatchRecord.Fields interface has been removed
* TriggeredWatchTests integration test has been moved to a unit test

Original commit: elastic/x-pack-elasticsearch@bc4b5820fb
This commit is contained in:
Alexander Reelsen 2017-06-27 17:47:00 +02:00 committed by GitHub
parent 84ec242636
commit 403cf8eba3
14 changed files with 179 additions and 217 deletions

View File

@ -7,7 +7,10 @@ package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.PlainActionFuture;
@ -16,6 +19,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Setting;
@ -44,11 +48,11 @@ import org.joda.time.DateTime;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@ -194,54 +198,46 @@ public class ExecutionService extends AbstractComponent {
if (!started.get()) {
throw new IllegalStateException("not started");
}
final List<TriggeredWatch> triggeredWatches = new ArrayList<>();
final List<TriggeredExecutionContext> contexts = new ArrayList<>();
DateTime now = new DateTime(clock.millis(), UTC);
threadPool.generic().execute(() -> {
for (TriggerEvent event : events) {
GetResponse response = getWatch(event.jobName());
if (response.isExists() == false) {
logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName());
} else {
try {
Watch watch =
parser.parseWithSecrets(response.getId(), true, response.getSourceAsBytesRef(), now, XContentType.JSON);
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, now, event, defaultThrottlePeriod);
contexts.add(ctx);
triggeredWatches.add(new TriggeredWatch(ctx.id(), event));
} catch (IOException e) {
logger.warn("unable to parse watch [{}]", event.jobName());
Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> watchesAndContext = createTriggeredWatchesAndContext(events);
List<TriggeredWatch> triggeredWatches = watchesAndContext.v1();
triggeredWatchStore.putAll(triggeredWatches, ActionListener.wrap(
response -> executeTriggeredWatches(response, watchesAndContext),
e -> {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof EsRejectedExecutionException) {
logger.debug("failed to store watch records due to overloaded threadpool [{}]",
ExceptionsHelper.detailedMessage(e));
} else {
logger.warn("failed to store watch records", e);
}
}
}
if (triggeredWatches.isEmpty() == false) {
logger.trace("saving triggered [{}] watches", triggeredWatches.size());
triggeredWatchStore.putAll(triggeredWatches, ActionListener.wrap(
(slots) -> {
int slot = 0;
while ((slot = slots.nextSetBit(slot)) != -1) {
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
slot++;
}
},
(e) -> logger.warn("failed to store watch [] records", e)));
}
});
}));
}
void processEventsSync(Iterable<TriggerEvent> events) throws Exception {
void processEventsSync(Iterable<TriggerEvent> events) throws IOException {
if (!started.get()) {
throw new IllegalStateException("not started");
}
final List<TriggeredWatch> triggeredWatches = new ArrayList<>();
final List<TriggeredExecutionContext> contexts = new ArrayList<>();
Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> watchesAndContext = createTriggeredWatchesAndContext(events);
List<TriggeredWatch> triggeredWatches = watchesAndContext.v1();
logger.debug("saving watch records [{}]", triggeredWatches.size());
BulkResponse bulkResponse = triggeredWatchStore.putAll(triggeredWatches);
executeTriggeredWatches(bulkResponse, watchesAndContext);
}
/**
* Create a tuple of triggered watches and their corresponding contexts, usable for sync and async processing
*
* @param events The iterable list of trigger events to create the two lists from
* @return Two linked lists that contain the triggered watches and contexts
*/
private Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> createTriggeredWatchesAndContext(Iterable<TriggerEvent> events)
throws IOException {
final LinkedList<TriggeredWatch> triggeredWatches = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = new DateTime(clock.millis(), UTC);
for (TriggerEvent event : events) {
GetResponse response = getWatch(event.jobName());
GetResponse response = client.prepareGet(Watch.INDEX, Watch.DOC_TYPE, event.jobName()).get(TimeValue.timeValueSeconds(10));
if (response.isExists() == false) {
logger.warn("unable to find watch [{}] in watch index, perhaps it has been deleted", event.jobName());
continue;
@ -252,13 +248,23 @@ public class ExecutionService extends AbstractComponent {
triggeredWatches.add(new TriggeredWatch(ctx.id(), event));
}
if (triggeredWatches.isEmpty() == false) {
logger.debug("saving triggered [{}] watches", triggeredWatches.size());
BitSet slots = triggeredWatchStore.putAll(triggeredWatches);
int slot = 0;
while ((slot = slots.nextSetBit(slot)) != -1) {
executeAsync(contexts.get(slot), triggeredWatches.get(slot));
slot++;
return Tuple.tuple(triggeredWatches, contexts);
}
/**
* Execute triggered watches, which have been successfully indexed into the triggered watches index
*
* @param response The bulk response containing the response of indexing triggered watches
* @param watchesAndContext The triggered watches and context objects needed for execution
*/
private void executeTriggeredWatches(final BulkResponse response,
final Tuple<List<TriggeredWatch>, List<TriggeredExecutionContext>> watchesAndContext) {
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
logger.error("could not store triggered watch with id [{}]: [{}]", itemResponse.getId(), itemResponse.getFailureMessage());
} else {
executeAsync(watchesAndContext.v2().get(i), watchesAndContext.v1().get(i));
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -25,6 +26,7 @@ import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
@ -36,20 +38,14 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.watcher.support.Exceptions.illegalState;
import static org.elasticsearch.xpack.watcher.support.Exceptions.ioException;
public class TriggeredWatchStore extends AbstractComponent {
@ -61,9 +57,6 @@ public class TriggeredWatchStore extends AbstractComponent {
private final TimeValue scrollTimeout;
private final TriggeredWatch.Parser triggeredWatchParser;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock accessLock = readWriteLock.readLock();
private final Lock stopLock = readWriteLock.writeLock();
private final AtomicBoolean started = new AtomicBoolean(false);
private final TimeValue defaultBulkTimeout;
private final TimeValue defaultSearchTimeout;
@ -104,92 +97,51 @@ public class TriggeredWatchStore extends AbstractComponent {
}
public void stop() {
stopLock.lock(); // This will block while put or update actions are underway
try {
started.set(false);
} finally {
stopLock.unlock();
}
started.set(false);
}
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BitSet> listener) {
public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionListener<BulkResponse> listener) throws IOException {
if (triggeredWatches.isEmpty()) {
listener.onResponse(new BitSet(0));
listener.onResponse(new BulkResponse(new BulkItemResponse[]{}, 0));
return;
}
ensureStarted();
client.bulk(createBulkRequest(triggeredWatches, DOC_TYPE), listener);
}
public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws IOException {
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
putAll(triggeredWatches, future);
return future.actionGet(defaultBulkTimeout);
}
/**
* Create a bulk request from the triggered watches with a specified document type
* @param triggeredWatches The list of triggered watches
* @param docType The document type to use, either the current one or legacy
* @return The bulk request for the triggered watches
* @throws IOException If a triggered watch could not be parsed to JSON, this exception is thrown
*/
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches, String docType) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
try {
IndexRequest indexRequest = new IndexRequest(INDEX_NAME, DOC_TYPE, triggeredWatch.id().value());
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
indexRequest.source(xContentBuilder.value(triggeredWatch));
}
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
} catch (IOException e) {
logger.warn("could not create JSON to store triggered watch [{}]", triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(INDEX_NAME, docType, triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
}
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
client.bulk(request, ActionListener.wrap(response -> {
BitSet successFullSlots = new BitSet(triggeredWatches.size());
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
logger.error("could not store triggered watch with id [{}], failed [{}]", itemResponse.getId(),
itemResponse.getFailureMessage());
} else {
successFullSlots.set(i);
}
}
listener.onResponse(successFullSlots);
}, listener::onFailure));
}
public void put(TriggeredWatch triggeredWatch) throws Exception {
putAll(Collections.singletonList(triggeredWatch));
}
public BitSet putAll(final List<TriggeredWatch> triggeredWatches) throws Exception {
ensureStarted();
try {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(INDEX_NAME, DOC_TYPE, triggeredWatch.id().value());
try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
indexRequest.source(xContentBuilder.value(triggeredWatch));
}
indexRequest.opType(IndexRequest.OpType.CREATE);
request.add(indexRequest);
}
BulkResponse response = client.bulk(request).get(defaultBulkTimeout.millis(), TimeUnit.MILLISECONDS);
BitSet successFullSlots = new BitSet(triggeredWatches.size());
for (int i = 0; i < response.getItems().length; i++) {
BulkItemResponse itemResponse = response.getItems()[i];
if (itemResponse.isFailed()) {
logger.error("could store triggered watch with id [{}], because failed [{}]", itemResponse.getId(),
itemResponse.getFailureMessage());
} else {
successFullSlots.set(i);
}
}
return successFullSlots;
} catch (IOException e) {
throw ioException("failed to persist triggered watches", e);
}
return request;
}
public void delete(Wid wid) {
ensureStarted();
accessLock.lock();
try {
DeleteRequest request = new DeleteRequest(INDEX_NAME, DOC_TYPE, wid.value());
client.delete(request);
logger.trace("successfully deleted triggered watch with id [{}]", wid);
} finally {
accessLock.unlock();
}
DeleteRequest request = new DeleteRequest(INDEX_NAME, DOC_TYPE, wid.value());
client.delete(request);
logger.trace("successfully deleted triggered watch with id [{}]", wid);
}
private void ensureStarted() {
@ -258,5 +210,4 @@ public class TriggeredWatchStore extends AbstractComponent {
return triggeredWatches;
}
}

View File

@ -32,6 +32,17 @@ import java.util.Objects;
public abstract class WatchRecord implements ToXContentObject {
public static final ParseField WATCH_ID = new ParseField("watch_id");
public static final ParseField STATE = new ParseField("state");
public static final ParseField TRIGGER_EVENT = new ParseField("trigger_event");
public static final ParseField NODE = new ParseField("node");
private static final ParseField MESSAGES = new ParseField("messages");
private static final ParseField STATUS = new ParseField("status");
private static final ParseField VARS = new ParseField("vars");
private static final ParseField METADATA = new ParseField("metadata");
private static final ParseField EXECUTION_RESULT = new ParseField("result");
private static final ParseField EXCEPTION = new ParseField("exception");
protected final Wid id;
protected final Watch watch;
private final String nodeId;
@ -132,19 +143,19 @@ public abstract class WatchRecord implements ToXContentObject {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.WATCH_ID.getPreferredName(), id.watchId());
builder.field(Field.NODE.getPreferredName(), nodeId);
builder.field(Field.STATE.getPreferredName(), state.id());
builder.field(WATCH_ID.getPreferredName(), id.watchId());
builder.field(NODE.getPreferredName(), nodeId);
builder.field(STATE.getPreferredName(), state.id());
if (watch != null && watch.status() != null) {
builder.field(Field.STATUS.getPreferredName(), watch.status(), params);
builder.field(STATUS.getPreferredName(), watch.status(), params);
}
builder.field(Field.TRIGGER_EVENT.getPreferredName());
builder.field(TRIGGER_EVENT.getPreferredName());
triggerEvent.recordXContent(builder, params);
if (!vars.isEmpty() && WatcherParams.debug(params)) {
builder.field(Field.VARS.getPreferredName(), vars);
builder.field(VARS.getPreferredName(), vars);
}
if (input != null) {
@ -158,10 +169,10 @@ public abstract class WatchRecord implements ToXContentObject {
.endObject();
}
if (metadata != null) {
builder.field(Field.METADATA.getPreferredName(), metadata);
builder.field(METADATA.getPreferredName(), metadata);
}
if (executionResult != null) {
builder.field(Field.EXECUTION_RESULT.getPreferredName(), executionResult, params);
builder.field(EXECUTION_RESULT.getPreferredName(), executionResult, params);
}
innerToXContent(builder, params);
builder.endObject();
@ -189,19 +200,6 @@ public abstract class WatchRecord implements ToXContentObject {
return id.toString();
}
public interface Field {
ParseField WATCH_ID = new ParseField("watch_id");
ParseField NODE = new ParseField("node");
ParseField TRIGGER_EVENT = new ParseField("trigger_event");
ParseField MESSAGES = new ParseField("messages");
ParseField STATE = new ParseField("state");
ParseField STATUS = new ParseField("status");
ParseField VARS = new ParseField("vars");
ParseField METADATA = new ParseField("metadata");
ParseField EXECUTION_RESULT = new ParseField("result");
ParseField EXCEPTION = new ParseField("exception");
}
public static class MessageWatchRecord extends WatchRecord {
@Nullable private final String[] messages;
@ -254,7 +252,7 @@ public abstract class WatchRecord implements ToXContentObject {
@Override
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (messages != null) {
builder.array(Field.MESSAGES.getPreferredName(), messages);
builder.array(MESSAGES.getPreferredName(), messages);
}
}
}
@ -291,12 +289,12 @@ public abstract class WatchRecord implements ToXContentObject {
if (exception != null) {
if (exception instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) exception;
builder.startObject(Field.EXCEPTION.getPreferredName());
builder.startObject(EXCEPTION.getPreferredName());
Params delegatingParams = new DelegatingMapParams(STACK_TRACE_ENABLED_PARAMS, params);
elasticsearchException.toXContent(builder, delegatingParams);
builder.endObject();
} else {
builder.startObject(Field.EXCEPTION.getPreferredName())
builder.startObject(EXCEPTION.getPreferredName())
.field("type", ElasticsearchException.getExceptionName(exception))
.field("reason", exception.getMessage())
.endObject();

View File

@ -31,6 +31,12 @@ import java.util.regex.Pattern;
public class WatcherIndexTemplateRegistry extends AbstractComponent implements ClusterStateListener {
// history (please add a comment why you increased the version here)
// version 1: initial
// version 2: added mappings for jira action
// version 3: include watch status in history
// version 6: upgrade to ES 6, removal of _status field
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "6";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;

View File

@ -14,20 +14,19 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction;
import org.elasticsearch.xpack.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import java.time.Clock;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.joda.time.DateTimeZone.UTC;
@ -50,16 +49,16 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
}
@Override
protected void doExecute(PutWatchRequest request, ActionListener<PutWatchResponse> listener) {
protected void doExecute(final PutWatchRequest request, final ActionListener<PutWatchResponse> listener) {
try {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType());
watch.setState(request.isActive(), now);
try (XContentBuilder builder = jsonBuilder()) {
Payload.XContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(Watch.INCLUDE_STATUS_KEY, "true"));
Payload.XContent.Params params = WatcherParams.builder().hideSecrets(false).put(Watch.INCLUDE_STATUS_KEY, "true").build();
watch.toXContent(builder, params);
BytesReference bytesReference = builder.bytes();
final BytesReference bytesReference = builder.bytes();
IndexRequest indexRequest = new IndexRequest(Watch.INDEX).type(Watch.DOC_TYPE).id(request.getId());
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

View File

@ -56,6 +56,8 @@ public class MockWebServer implements Closeable {
private final SSLContext sslContext;
private final boolean needClientAuth;
private final Set<CountDownLatch> latches = ConcurrentCollections.newConcurrentSet();
private String hostname;
private int port;
/**
* Instantiates a webserver without https
@ -92,6 +94,10 @@ public class MockWebServer implements Closeable {
}
server.start();
// Uses #InetSocketAddress.getHostString() to prevent reverse dns lookups, eager binding, so we can find out host/port regardless
// if the webserver was already shut down
this.hostname = server.getAddress().getHostString();
this.port = server.getAddress().getPort();
server.createContext("/", s -> {
try {
MockResponse response = responses.poll();
@ -182,17 +188,17 @@ public class MockWebServer implements Closeable {
}
/**
* @return The hostname the server is bound to. Uses #InetSocketAddress.getHostString() to prevent reverse dns lookups
* @return The hostname the server is bound to.
*/
public String getHostName() {
return server.getAddress().getHostString();
return hostname;
}
/**
* @return The tcp port that the server is bound to
*/
public int getPort() {
return server.getAddress().getPort();
return port;
}
/**

View File

@ -101,7 +101,7 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
SearchResponse response = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery()
.must(matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(termQuery("watch_id", id))))
.get();
List<Map<String, Object>> hits = Arrays.stream(response.getHits().getHits())
@ -152,7 +152,7 @@ public class TimeThrottleIntegrationTests extends AbstractWatcherIntegrationTest
SearchResponse searchResponse = client().prepareSearch(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*")
.setSource(new SearchSourceBuilder().query(QueryBuilders.boolQuery()
.must(matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.THROTTLED.id()))
.must(termQuery("watch_id", id))))
.get();
assertHitCount(searchResponse, 1);

View File

@ -92,7 +92,7 @@ public class WebhookHttpsIntegrationTests extends AbstractWatcherIntegrationTest
assertThat(webServer.requests().get(0).getBody(), equalTo("{key=value}"));
SearchResponse response =
searchWatchRecords(b -> b.setQuery(QueryBuilders.termQuery(WatchRecord.Field.STATE.getPreferredName(), "executed")));
searchWatchRecords(b -> b.setQuery(QueryBuilders.termQuery(WatchRecord.STATE.getPreferredName(), "executed")));
assertNoFailures(response);
XContentSource source = xContentSource(response.getHits().getAt(0).getSourceRef());

View File

@ -41,7 +41,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase {
private MockWebServer webServer = new MockWebServer();;
private MockWebServer webServer = new MockWebServer();
@Override
protected Settings nodeSettings(int nodeOrdinal) {
@ -92,7 +92,7 @@ public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase
assertThat(webServer.requests().get(0).getBody(), is("_body"));
SearchResponse response = searchWatchRecords(b -> QueryBuilders.termQuery(WatchRecord.Field.STATE.getPreferredName(), "executed"));
SearchResponse response = searchWatchRecords(b -> QueryBuilders.termQuery(WatchRecord.STATE.getPreferredName(), "executed"));
assertNoFailures(response);
XContentSource source = xContentSource(response.getHits().getAt(0).getSourceRef());

View File

@ -31,6 +31,9 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
@ -39,15 +42,28 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.common.http.HttpClient;
import org.elasticsearch.xpack.notification.email.EmailService;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchTests;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -330,6 +346,31 @@ public class TriggeredWatchStoreTests extends ESTestCase {
assertThat(triggeredWatches, hasSize(0));
}
public void testTriggeredWatchParser() throws Exception {
EmailService emailService = mock(EmailService.class);
HttpClient httpClient = mock(HttpClient.class);
WatcherSearchTemplateService searchTemplateService = mock(WatcherSearchTemplateService.class);
Watch watch = WatcherTestUtils.createTestWatch("fired_test", client, httpClient, emailService, searchTemplateService, logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC));
Wid wid = new Wid("_record", DateTime.now(DateTimeZone.UTC));
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
triggeredWatch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(Collections.singleton(new CronSchedule.Parser()));
TriggerEngine triggerEngine = new WatchTests.ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, new ClockMock());
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
TriggeredWatch.Parser parser = new TriggeredWatch.Parser(Settings.EMPTY, triggerService);
TriggeredWatch parsedTriggeredWatch = parser.parse(triggeredWatch.id().value(), 0, jsonBuilder.bytes());
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedTriggeredWatch.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
assertThat(jsonBuilder.bytes().utf8ToString(), equalTo(jsonBuilder2.bytes().utf8ToString()));
}
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total);

View File

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.xpack.watcher.watch.Watch;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import static org.hamcrest.Matchers.equalTo;
public class TriggeredWatchTests extends AbstractWatcherIntegrationTestCase {
public void testParser() throws Exception {
Watch watch = WatcherTestUtils.createTestWatch("fired_test", watcherHttpClient(), noopEmailService(),
watcherSearchTemplateService(), logger);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), DateTime.now(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC));
Wid wid = new Wid("_record", DateTime.now(DateTimeZone.UTC));
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
triggeredWatch.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
TriggeredWatch parsedTriggeredWatch = triggeredWatchParser().parse(triggeredWatch.id().value(), 0, jsonBuilder.bytes());
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedTriggeredWatch.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
assertThat(jsonBuilder.bytes().utf8ToString(), equalTo(jsonBuilder2.bytes().utf8ToString()));
}
private TriggeredWatch.Parser triggeredWatchParser() {
return internalCluster().getInstance(TriggeredWatch.Parser.class);
}
protected WatcherSearchTemplateService watcherSearchTemplateService() {
return internalCluster().getInstance(WatcherSearchTemplateService.class);
}
}

View File

@ -94,7 +94,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
String index = HistoryStore.getHistoryIndexNameForTime(now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event)
.endObject()
.startObject(Watch.Field.CONDITION.getPreferredName())
@ -112,7 +112,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event)
.endObject()
.startObject(Watch.Field.CONDITION.getPreferredName())
@ -130,7 +130,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
wid = new Wid("_id", now);
client().prepareIndex(index, HistoryStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
.startObject("unknown").endObject()
.endObject()
.startObject(Watch.Field.CONDITION.getPreferredName())

View File

@ -131,7 +131,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
long throttledCount = docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
assertThat(throttledCount, greaterThan(0L));
}
@ -199,7 +199,7 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
long throttledCount = docCount(HistoryStore.INDEX_PREFIX_WITH_TEMPLATE + "*", null,
matchQuery(WatchRecord.Field.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
matchQuery(WatchRecord.STATE.getPreferredName(), ExecutionState.ACKNOWLEDGED.id()));
assertThat(throttledCount, greaterThan(0L));
}

View File

@ -562,9 +562,9 @@ public class WatchTests extends ESTestCase {
));
}
static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine {
public static class ParseOnlyScheduleTriggerEngine extends ScheduleTriggerEngine {
ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry, Clock clock) {
public ParseOnlyScheduleTriggerEngine(Settings settings, ScheduleRegistry registry, Clock clock) {
super(settings, registry, clock);
}