Watcher: Ensure triggered watch is deleted on thread pool rejection (elastic/elasticsearch#3049)

This fixes a bug I found with a customer when he updated from 1.x to 2.x.
Due to an BWC incompatible change in the watch history mapping and a thread
pool rejection during execution a watch was not removed from the triggered
watches and tried to be executed again.

While trying to fix it it turned out that the execution of the failure
test case was still done in the transport thread and thus required some
offloading to another thread pool.

Original commit: elastic/x-pack-elasticsearch@df04ce31f2
This commit is contained in:
Alexander Reelsen 2016-09-07 15:55:33 +02:00 committed by GitHub
parent bb033f1e00
commit a296e31a7c
3 changed files with 70 additions and 21 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.common.stats.Counters;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.watcher.Watcher;
@ -60,13 +61,14 @@ public class ExecutionService extends AbstractComponent {
private final Clock clock;
private final TimeValue defaultThrottlePeriod;
private final TimeValue maxStopTimeout;
private final ThreadPool threadPool;
private volatile CurrentExecutions currentExecutions = null;
private final AtomicBoolean started = new AtomicBoolean(false);
@Inject
public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
WatchStore watchStore, WatchLockService watchLockService, Clock clock) {
WatchStore watchStore, WatchLockService watchLockService, Clock clock, ThreadPool threadPool) {
super(settings);
this.historyStore = historyStore;
this.triggeredWatchStore = triggeredWatchStore;
@ -76,6 +78,7 @@ public class ExecutionService extends AbstractComponent {
this.clock = clock;
this.defaultThrottlePeriod = DEFAULT_THROTTLE_PERIOD_SETTING.get(settings);
this.maxStopTimeout = Watcher.MAX_STOP_TIMEOUT_SETTING.get(settings);
this.threadPool = threadPool;
}
public void start(ClusterState state) throws Exception {
@ -323,20 +326,36 @@ public class ExecutionService extends AbstractComponent {
thread pool that executes the watches is completely busy, we don't lose the fact that the watch was
triggered (it'll have its history record)
*/
private void executeAsync(WatchExecutionContext ctx, TriggeredWatch triggeredWatch) throws Exception {
private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch triggeredWatch) {
try {
executor.execute(new WatchExecutionTask(ctx));
} catch (EsRejectedExecutionException e) {
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
logger.debug("{}", message);
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
if (ctx.overrideRecordOnConflict()) {
historyStore.forcePut(record);
} else {
historyStore.put(record);
}
triggeredWatchStore.delete(triggeredWatch.id());
// we are still in the transport thread here most likely, so we cannot run heavy operations
// this means some offloading needs to be done for indexing into the history and delete the triggered watches entry
threadPool.generic().execute(() -> {
String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity";
logger.debug("{}", message);
WatchRecord record = ctx.abortBeforeExecution(ExecutionState.FAILED, message);
try {
if (ctx.overrideRecordOnConflict()) {
historyStore.forcePut(record);
} else {
historyStore.put(record);
}
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection",
triggeredWatch.id()), exc);
}
try {
triggeredWatchStore.delete(triggeredWatch.id());
} catch (Exception exc) {
logger.error((Supplier<?>) () ->
new ParameterizedMessage("Error deleting triggered watch store record for watch [{}] after thread pool " +
"rejection", triggeredWatch.id()), exc);
}
});
}
}

View File

@ -222,7 +222,7 @@ public class TriggeredWatchStore extends AbstractComponent {
}
}
public void delete(Wid wid) throws Exception {
public void delete(Wid wid) {
ensureStarted();
accessLock.lock();
try {

View File

@ -5,10 +5,13 @@
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.watcher.actions.Action;
@ -41,7 +44,9 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
@ -51,7 +56,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTime.now;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -68,15 +75,16 @@ public class ExecutionServiceTests extends ESTestCase {
private Input.Result inputResult;
private WatchStore watchStore;
private TriggeredWatchStore triggeredWatchStore;
private WatchExecutor executor;
private HistoryStore historyStore;
private WatchLockService watchLockService;
private ExecutionService executionService;
private Clock clock;
private ThreadPool threadPool;
@Before
public void init() throws Exception {
TriggeredWatchStore triggeredWatchStore;
payload = mock(Payload.class);
input = mock(ExecutableInput.class);
inputResult = mock(Input.Result.class);
@ -88,13 +96,14 @@ public class ExecutionServiceTests extends ESTestCase {
triggeredWatchStore = mock(TriggeredWatchStore.class);
historyStore = mock(HistoryStore.class);
WatchExecutor executor = mock(WatchExecutor.class);
executor = mock(WatchExecutor.class);
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
watchLockService = mock(WatchLockService.class);
clock = new ClockMock();
threadPool = mock(ThreadPool.class);
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore,
watchLockService, clock);
watchLockService, clock, threadPool);
ClusterState clusterState = mock(ClusterState.class);
when(triggeredWatchStore.loadTriggeredWatches(clusterState)).thenReturn(new ArrayList<>());
@ -483,7 +492,7 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteInner() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -560,7 +569,7 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteInnerThrottled() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -613,7 +622,7 @@ public class ExecutionServiceTests extends ESTestCase {
}
public void testExecuteInnerConditionNotMet() throws Exception {
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = now(DateTimeZone.UTC);
Watch watch = mock(Watch.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event, timeValueSeconds(5));
@ -774,6 +783,28 @@ public class ExecutionServiceTests extends ESTestCase {
verify(action, never()).execute("_action", context, payload);
}
public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exception {
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("foo");
when(watch.nonce()).thenReturn(1L);
when(watchStore.get(any())).thenReturn(watch);
// execute needs to fail as well as storing the history
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any());
Wid wid = new Wid(watch.id(), watch.nonce(), now());
Executor currentThreadExecutor = command -> command.run();
when(threadPool.generic()).thenReturn(currentThreadExecutor);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, new ScheduleTriggerEvent(now() ,now()));
executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch));
verify(triggeredWatchStore, times(1)).delete(wid);
verify(historyStore, times(1)).forcePut(any(WatchRecord.class));
}
private Tuple<ExecutableCondition, Condition.Result> whenCondition(final WatchExecutionContext context) {
Condition.Result conditionResult = mock(Condition.Result.class);
when(conditionResult.met()).thenReturn(true);
@ -791,5 +822,4 @@ public class ExecutionServiceTests extends ESTestCase {
return new Tuple<>(transform, transformResult);
}
}