Fix Watcher stats class cast exception (#39821)

The watcher stats implementation tries to look at all queued watches
before preparing the result. We want to cast these to a
WatchExecutionTask to extract the context to prepare the stats for
queued watches. The problem is that not all tasks on the watcher queue
were WatchExecutionTask. This is because a manually executed watch was
not even at all wrapped in a WatchExecutionTask. Moreover, we were using
ExecutorService#submit(Runnable) which would wrap the Runnable in a
FutureTask<?>. This commit addresses this by using a WatchExecutionTask,
and also using ExecutorService#execute(Runnable) so that no wrapping
occurs. This will let us continue with the assumption that all queued
tasks are WatchExecutionTasks.
This commit is contained in:
Jason Tedor 2019-03-08 14:51:39 -05:00
parent 3246d3e355
commit 73a672b8dd
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
3 changed files with 182 additions and 42 deletions

View File

@ -534,12 +534,12 @@ public class ExecutionService {
// the watch execution task takes another runnable as parameter
// the best solution would be to move the whole execute() method, which is handed over as ctor parameter
// over into this class, this is the quicker way though
static final class WatchExecutionTask implements Runnable {
public static final class WatchExecutionTask implements Runnable {
private final WatchExecutionContext ctx;
private final Runnable runnable;
WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
public WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) {
this.ctx = ctx;
this.runnable = runnable;
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchAction;
@ -110,48 +111,63 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
}
private void executeWatch(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener,
Watch watch, boolean knownWatch) {
private void executeWatch(
final ExecuteWatchRequest request,
final ActionListener<ExecuteWatchResponse> listener,
final Watch watch,
final boolean knownWatch) {
try {
/*
* Ensure that the headers from the incoming request are used instead those of the stored watch otherwise the watch would run
* as the user who stored the watch, but it needs to run as the user who executes this request.
*/
final Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
watch.status().setHeaders(headers);
threadPool.executor(XPackField.WATCHER).submit(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
final String triggerType = watch.trigger().type();
final TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
final ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(
watch,
knownWatch,
new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
final ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
ctxBuilder.executionTime(executionTime);
for (final Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
ctxBuilder.actionMode(entry.getKey(), entry.getValue());
}
@Override
protected void doRun() throws Exception {
// ensure that the headers from the incoming request are used instead those of the stored watch
// otherwise the watch would run as the user who stored the watch, but it needs to be run as the user who
// executes this request
Map<String, String> headers = new HashMap<>(threadPool.getThreadContext().getHeaders());
watch.status().setHeaders(headers);
String triggerType = watch.trigger().type();
TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData());
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch,
new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod());
ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC);
ctxBuilder.executionTime(executionTime);
for (Map.Entry<String, ActionExecutionMode> entry : request.getActionModes().entrySet()) {
ctxBuilder.actionMode(entry.getKey(), entry.getValue());
}
if (request.getAlternativeInput() != null) {
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
}
if (request.isIgnoreCondition()) {
ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
}
ctxBuilder.recordExecution(request.isRecordExecution());
WatchRecord record = executionService.execute(ctxBuilder.build());
XContentBuilder builder = XContentFactory.jsonBuilder();
record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
if (request.getAlternativeInput() != null) {
ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput())));
}
});
if (request.isIgnoreCondition()) {
ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE);
}
ctxBuilder.recordExecution(request.isRecordExecution());
final WatchExecutionContext ctx = ctxBuilder.build();
// use execute so that the runnable is not wrapped in a RunnableFuture<?>
threadPool.executor(XPackField.WATCHER).execute(new ExecutionService.WatchExecutionTask(ctx, new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
final WatchRecord record = executionService.execute(ctx);
final XContentBuilder builder = XContentFactory.jsonBuilder();
record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build());
listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON));
}
}));
} catch (final Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.core.watcher.client.WatcherClient;
import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode;
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.watcher.actions.index.IndexAction;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import java.io.IOException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.empty;
public class ExecuteWatchQueuedStatsTests extends AbstractWatcherIntegrationTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
// we use a small thread pool to force executions to be queued
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("xpack.watcher.thread_pool.size", 1).build();
}
@Override
protected boolean timeWarped() {
return false;
}
/*
* This test is effectively forcing a manually executed watch to end up queued while we simultaneously try to get stats, including
* queued watches. The reason that we do this is because previously a manually executed watch would be queued as a FutureTask<?> while
* we try to cast queued watches to WatchExecutionTask. This would previously result in a ClassCastException. This test fails when that
* happens yet succeeds with the production code change that accompanies this test.
*/
public void testQueuedStats() throws ExecutionException, InterruptedException {
final WatcherClient client = new WatcherClient(client());
client.preparePutWatch("id")
.setActive(true)
.setSource(
new WatchSourceBuilder()
.input(simpleInput("payload", "yes"))
.trigger(schedule(interval("1s")))
.addAction(
"action",
TimeValue.timeValueSeconds(1),
IndexAction.builder("test_index", "acknowledgement").setDocId("id")))
.get();
final int numberOfIterations = 128 - scaledRandomIntBetween(0, 128);
final CyclicBarrier barrier = new CyclicBarrier(2);
final List<ActionFuture<ExecuteWatchResponse>> futures = new ArrayList<>(numberOfIterations);
final Thread executeWatchThread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
fail(e.toString());
}
for (int i = 0; i < numberOfIterations; i++) {
final ExecuteWatchRequest request = new ExecuteWatchRequest("id");
try {
request.setTriggerEvent(new ManualTriggerEvent(
"id-" + i,
new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))));
} catch (final IOException e) {
fail(e.toString());
}
request.setActionMode("_all", ActionExecutionMode.EXECUTE);
request.setRecordExecution(true);
futures.add(client.executeWatch(request));
}
});
executeWatchThread.start();
final List<FailedNodeException> failures = new ArrayList<>();
final Thread watcherStatsThread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
fail(e.toString());
}
for (int i = 0; i < numberOfIterations; i++) {
final WatcherStatsResponse response = client.prepareWatcherStats().setIncludeQueuedWatches(true).get();
failures.addAll(response.failures());
}
});
watcherStatsThread.start();
executeWatchThread.join();
watcherStatsThread.join();
for (final ActionFuture<ExecuteWatchResponse> future : futures) {
future.get();
}
assertThat(failures, empty());
client.prepareDeleteWatch("id").get();
}
}