diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 4e33e15044c..69b10794496 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -16,8 +16,10 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; @@ -33,19 +35,25 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.condition.Condition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; +import org.elasticsearch.xpack.core.watcher.execution.Wid; +import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField; import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.input.Input; +import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; import org.elasticsearch.xpack.core.watcher.transform.Transform; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -67,8 +75,10 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -399,24 +409,70 @@ public class ExecutionService { try { executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx))); } catch (EsRejectedExecutionException e) { - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); - try { - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); + //Using the generic pool here since this can happen from a write thread and we don't want to block a write + //thread to kick off these additional write/delete requests. + //Intentionally not using the HistoryStore or TriggerWatchStore to avoid re-using the same synchronous + //BulkProcessor which can cause a deadlock see #41390 + genericExecutor.execute(new WatchExecutionTask(ctx, () -> { + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + logger.warn(message); + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); + try { + forcePutHistory(record); + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage( + "Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); } - } catch (Exception exc) { - logger.error((Supplier) () -> - new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", - triggeredWatch.id()), exc); - } - - triggeredWatchStore.delete(triggeredWatch.id()); + deleteTrigger(triggeredWatch.id()); + })); } } + /** + * Stores the specified watchRecord. + * Any existing watchRecord will be overwritten. + */ + private void forcePutHistory(WatchRecord watchRecord) { + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + try { + try (XContentBuilder builder = XContentFactory.jsonBuilder(); + ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { + watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); + IndexRequest request = new IndexRequest(index) + .id(watchRecord.id().value()) + .source(builder) + .opType(IndexRequest.OpType.CREATE); + client.index(request).get(30, TimeUnit.SECONDS); + logger.debug("indexed watch history record [{}]", watchRecord.id().value()); + } catch (VersionConflictEngineException vcee) { + watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, + "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); + ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { + IndexRequest request = new IndexRequest(index) + .id(watchRecord.id().value()) + .source(xContentBuilder.value(watchRecord)); + client.index(request).get(30, TimeUnit.SECONDS); + } + logger.debug("overwrote watch history record [{}]", watchRecord.id().value()); + } + } catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) { + final WatchRecord wr = watchRecord; + logger.error((Supplier) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe); + } + } + + private void deleteTrigger(Wid watcherId) { + DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME); + request.id(watcherId.value()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { + client.delete(request).actionGet(30, TimeUnit.SECONDS); + } + logger.trace("successfully deleted triggered watch with id [{}]", watcherId); + } + WatchRecord executeInner(WatchExecutionContext ctx) { ctx.start(); Watch watch = ctx.watch(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 321cf979bca..80cb657a576 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.watcher.execution; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -28,8 +30,11 @@ import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.Authentication; @@ -48,6 +53,7 @@ import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.execution.Wid; @@ -91,6 +97,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -844,11 +851,15 @@ public class ExecutionServiceTests extends ESTestCase { when(getResponse.isExists()).thenReturn(true); when(getResponse.getId()).thenReturn("foo"); mockGetWatchResponse(client, "foo", getResponse); + ActionFuture actionFuture = mock(ActionFuture.class); + when(actionFuture.get()).thenReturn(""); + when(client.index(any())).thenReturn(actionFuture); + when(client.delete(any())).thenReturn(actionFuture); + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); - // execute needs to fail as well as storing the history + // execute needs to fail doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); - doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); @@ -856,10 +867,58 @@ public class ExecutionServiceTests extends ESTestCase { new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC) ,ZonedDateTime.now(ZoneOffset.UTC))); executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); - verify(triggeredWatchStore, times(1)).delete(wid); - ArgumentCaptor captor = ArgumentCaptor.forClass(WatchRecord.class); - verify(historyStore, times(1)).forcePut(captor.capture()); - assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION)); + ArgumentCaptor deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(deleteCaptor.capture()); + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); + + ArgumentCaptor watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client).index(watchHistoryCaptor.capture()); + + assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); + assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history")); + } + + public void testForcePutHistoryOnExecutionRejection() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("foo"); + WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap()); + when(watch.status()).thenReturn(status); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getId()).thenReturn("foo"); + mockGetWatchResponse(client, "foo", getResponse); + ActionFuture actionFuture = mock(ActionFuture.class); + when(actionFuture.get()).thenReturn(""); + when(client.index(any())) + .thenThrow(new VersionConflictEngineException( + new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination")) + .thenReturn(actionFuture); + when(client.delete(any())).thenReturn(actionFuture); + + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); + + // execute needs to fail + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); + + Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); + + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, + new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))); + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); + + ArgumentCaptor deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(deleteCaptor.capture()); + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); + + ArgumentCaptor watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client, times(2)).index(watchHistoryCaptor.capture()); + List indexRequests = watchHistoryCaptor.getAllValues(); + + assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id())); + assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); + assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString())); } public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception { @@ -898,7 +957,7 @@ public class ExecutionServiceTests extends ESTestCase { when(watch.status()).thenReturn(watchStatus); executionService.execute(context); - verify(triggeredWatchStore, never()).delete(any()); + verify(client, never()).delete(any()); } public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java new file mode 100644 index 00000000000..a457c1052ca --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.test.integration; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseService; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.watcher.client.WatcherClient; +import org.elasticsearch.xpack.watcher.condition.CompareCondition; +import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; +import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class RejectedExecutionTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + //need to use the real scheduler + return false; + } + + public void testHistoryAndTriggeredOnRejection() throws Exception { + WatcherClient watcherClient = watcherClient(); + createIndex("idx"); + client().prepareIndex("idx", "_doc").setSource("field", "a").get(); + refresh(); + WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "a")), "idx"); + watcherClient.preparePutWatch(randomAlphaOfLength(5)) + .setSource(watchBuilder() + .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) + .input(searchInput(request)) + .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L)) + .addAction("_logger", loggingAction("_logging") + .setCategory("_category"))) + .get(); + + assertBusy(() -> { + flushAndRefresh(".watcher-history-*"); + SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get(); + assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2L)); + }, 10, TimeUnit.SECONDS); + + flushAndRefresh(".triggered_watches"); + SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(XPackSettings.MONITORING_ENABLED.getKey(), false) + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial") + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 1) + .put("xpack.watcher.thread_pool.size", 1) + .put("xpack.watcher.thread_pool.queue_size", 0) + .build(); + } + + +}