Moved transform errors to the tranform result

Until now, if the transform failed (either on the watch or action level), an exception would be thrown and it would be captured globally on the watch execution and in the watch record message

This commit bring the error to the transform result

- A new `status` field was added to the transform result. Can either have `success` or `failure` values. When set to `failure` a `reason` field will hold the failure message.
- The `ExecutionService` changed to enable this functionality. Mainly, instead of relying on exception, during the execution the transform result status is checked and if failed the watch execution is aborted.

Original commit: elastic/x-pack-elasticsearch@65b7f51f00
This commit is contained in:
uboness 2015-06-15 14:59:11 +02:00
parent 09fcecc069
commit 03b704f79b
16 changed files with 473 additions and 36 deletions

View File

@ -78,9 +78,13 @@ public class ActionWrapper implements ToXContent {
if (transform != null) {
try {
transformResult = transform.execute(ctx, payload);
if (transformResult.status() == Transform.Result.Status.FAILURE) {
action.logger().error("failed to execute action [{}/{}]. failed to transform payload. {}", ctx.watch().id(), id, transformResult.reason());
return new ActionWrapper.Result(id, transformResult, new Action.Result.Failure(action.type(), "Failed to transform payload"));
}
payload = transformResult.payload();
} catch (Exception e) {
action.logger.error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id);
action.logger().error("failed to execute action [{}/{}]. failed to transform payload.", e, ctx.watch().id(), id);
return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), "Failed to transform payload. error: " + ExceptionsHelper.detailedMessage(e)));
}
}
@ -88,7 +92,7 @@ public class ActionWrapper implements ToXContent {
Action.Result actionResult = action.execute(id, ctx, payload);
return new ActionWrapper.Result(id, transformResult, actionResult);
} catch (Exception e) {
action.logger.error("failed to execute action [{}/{}]", e, ctx.watch().id(), id);
action.logger().error("failed to execute action [{}/{}]", e, ctx.watch().id(), id);
return new ActionWrapper.Result(id, new Action.Result.Failure(action.type(), ExceptionsHelper.detailedMessage(e)));
}
}

View File

@ -36,6 +36,13 @@ public abstract class ExecutableAction<A extends Action> implements ToXContent {
return action;
}
/**
* yack... needed to expose that for testing purposes
*/
public ESLogger logger() {
return logger;
}
public abstract Action.Result execute(String actionId, WatchExecutionContext context, Payload payload) throws Exception;
@Override

View File

@ -121,6 +121,9 @@ public abstract class WatchExecutionContext {
}
beforeWatchTransform();
this.transformResult = watch.transform().execute(this, payload);
if (this.transformResult.status() == Transform.Result.Status.FAILURE) {
throw new WatchExecutionException("failed to execute watch level transform for [{}]", id);
}
this.payload = transformResult.payload();
this.transformedPayload = this.payload;
return transformedPayload;

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.execution;
import org.elasticsearch.watcher.WatcherException;
/**
*
*/
public class WatchExecutionException extends WatcherException {
public WatchExecutionException(String msg, Object... args) {
super(msg, args);
}
public WatchExecutionException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -34,7 +34,7 @@ public abstract class ExecutableTransform<T extends Transform, R extends Transfo
return transform;
}
public abstract R execute(WatchExecutionContext ctx, Payload payload) throws IOException;
public abstract R execute(WatchExecutionContext ctx, Payload payload);
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -21,27 +23,64 @@ public interface Transform extends ToXContent {
abstract class Result implements ToXContent {
public enum Status {
SUCCESS, FAILURE
}
protected final String type;
protected final Payload payload;
protected final Status status;
protected final @Nullable Payload payload;
protected final @Nullable String reason;
public Result(String type, Payload payload) {
this.type = type;
this.status = Status.SUCCESS;
this.payload = payload;
this.reason = null;
}
public Result(String type, Exception e) {
this.type = type;
this.status = Status.FAILURE;
this.reason = ExceptionsHelper.detailedMessage(e);
this.payload = null;
}
public String type() {
return type;
}
public Status status() {
return status;
}
public Payload payload() {
assert status == Status.SUCCESS;
return payload;
}
public String reason() {
assert status == Status.FAILURE;
return reason;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Field.TYPE.getPreferredName(), type);
builder.field(Field.PAYLOAD.getPreferredName(), payload, params);
builder.field(Field.STATUS.getPreferredName(), status);
switch (status) {
case SUCCESS:
assert reason == null;
builder.field(Field.PAYLOAD.getPreferredName(), payload, params);
break;
case FAILURE:
assert payload == null;
builder.field(Field.REASON.getPreferredName(), reason);
break;
default:
assert false;
}
typeXContent(builder, params);
return builder.endObject();
}
@ -56,8 +95,12 @@ public interface Transform extends ToXContent {
}
interface Field {
ParseField TYPE = new ParseField("type");
ParseField PAYLOAD = new ParseField("payload");
ParseField TRANSFORM = new ParseField("transform");
ParseField TYPE = new ParseField("type");
ParseField STATUS = new ParseField("status");
ParseField PAYLOAD = new ParseField("payload");
ParseField REASON = new ParseField("reason");
}
}

View File

@ -100,19 +100,27 @@ public class ChainTransform implements Transform {
this.results = results;
}
public Result(Exception e, ImmutableList<Transform.Result> results) {
super(TYPE, e);
this.results = results;
}
public ImmutableList<Transform.Result> results() {
return results;
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type);
builder.startArray(Field.RESULTS.getPreferredName());
for (Transform.Result result : results) {
result.toXContent(builder, params);
if (!results.isEmpty()) {
builder.startObject(type);
builder.startArray(Field.RESULTS.getPreferredName());
for (Transform.Result result : results) {
result.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
}
builder.endArray();
return builder.endObject();
return builder;
}
}

View File

@ -31,11 +31,24 @@ public class ExecutableChainTransform extends ExecutableTransform<ChainTransform
}
@Override
public ChainTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
public ChainTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
try {
return doExecute(ctx, payload, results);
} catch (Exception e) {
logger.error("failed to execute [{}] transform for [{}]", e, ChainTransform.TYPE, ctx.id());
return new ChainTransform.Result(e, results.build());
}
}
ChainTransform.Result doExecute(WatchExecutionContext ctx, Payload payload, ImmutableList.Builder<Transform.Result> results) throws IOException {
for (ExecutableTransform transform : transforms) {
Transform.Result result = transform.execute(ctx, payload);
results.add(result);
if (result.status() == Transform.Result.Status.FAILURE) {
throw new ChainTransformException("failed to execute [{}] transform. failed to execute sub-transform [{}]", ChainTransform.TYPE, transform.type());
}
payload = result.payload();
}
return new ChainTransform.Result(payload, results.build());

View File

@ -40,7 +40,17 @@ public class ExecutableScriptTransform extends ExecutableTransform<ScriptTransfo
}
@Override
public ScriptTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
public ScriptTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
try {
return doExecute(ctx, payload);
} catch (Exception e) {
logger.error("failed to execute [{}] transform for [{}]", e, ScriptTransform.TYPE, ctx.id());
return new ScriptTransform.Result(e);
}
}
ScriptTransform.Result doExecute(WatchExecutionContext ctx, Payload payload) throws IOException {
Script script = transform.getScript();
Map<String, Object> model = new HashMap<>();
model.putAll(script.params());

View File

@ -74,6 +74,10 @@ public class ScriptTransform implements Transform {
super(TYPE, payload);
}
public Result(Exception e) {
super(TYPE, e);
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
return builder;

View File

@ -15,8 +15,6 @@ import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
@ -32,10 +30,16 @@ public class ExecutableSearchTransform extends ExecutableTransform<SearchTransfo
}
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, payload);
SearchResponse resp = client.search(req);
return new SearchTransform.Result(req, new Payload.XContent(resp));
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) {
SearchRequest request = null;
try {
request = WatcherUtils.createSearchRequestFromPrototype(transform.request, ctx, payload);
SearchResponse resp = client.search(request);
return new SearchTransform.Result(request, new Payload.XContent(resp));
} catch (Exception e) {
logger.error("failed to execute [{}] transform for [{}]", e, SearchTransform.TYPE, ctx.id());
return new SearchTransform.Result(request, e);
}
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -74,23 +75,31 @@ public class SearchTransform implements Transform {
public static class Result extends Transform.Result {
private final SearchRequest request;
private final @Nullable SearchRequest request;
public Result(SearchRequest request, Payload payload) {
super(TYPE, payload);
this.request = request;
}
public Result(SearchRequest request, Exception e) {
super(TYPE, e);
this.request = request;
}
public SearchRequest executedRequest() {
return request;
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(request, builder, params);
return builder.endObject();
if (request != null) {
builder.startObject(type);
builder.field(Field.REQUEST.getPreferredName());
WatcherUtils.writeSearchRequest(request, builder, params);
builder.endObject();
}
return builder;
}
}

View File

@ -100,6 +100,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
// watch level transform
Transform.Result watchTransformResult = mock(Transform.Result.class);
when(watchTransformResult.status()).thenReturn(Transform.Result.Status.SUCCESS);
when(watchTransformResult.payload()).thenReturn(payload);
ExecutableTransform watchTransform = mock(ExecutableTransform.class);
when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult);
@ -292,6 +293,147 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
verify(action, never()).execute("_action", context, payload);
}
@Test
public void testExecute_FailedWatchTransform() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// watch level transform
Transform.Result watchTransformResult = mock(Transform.Result.class);
when(watchTransformResult.status()).thenReturn(Transform.Result.Status.FAILURE);
when(watchTransformResult.reason()).thenReturn("_reason");
ExecutableTransform watchTransform = mock(ExecutableTransform.class);
when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.payload()).thenReturn(payload);
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// the action
Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("_action_type");
when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS);
ExecutableAction action = mock(ExecutableAction.class);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.transform()).thenReturn(watchTransform);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord.result().inputResult(), is(inputResult));
assertThat(watchRecord.result().conditionResult(), is(conditionResult));
assertThat(watchRecord.result().transformResult(), is(watchTransformResult));
assertThat(watchRecord.result().actionsResults(), notNullValue());
assertThat(watchRecord.result().actionsResults().count(), is(0));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
verify(action, never()).execute("_action", context, payload);
}
@Test
public void testExecute_FailedActionTransform() throws Exception {
WatchLockService.Lock lock = mock(WatchLockService.Lock.class);
when(watchLockService.acquire("_id")).thenReturn(lock);
Watch watch = mock(Watch.class);
when(watch.id()).thenReturn("_id");
when(watchStore.get("_id")).thenReturn(watch);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", clock.nowUTC(), clock.nowUTC());
WatchExecutionContext context = new TriggeredExecutionContext(watch, clock.nowUTC(), event, timeValueSeconds(5));
Condition.Result conditionResult = AlwaysCondition.Result.INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
// watch level transform
Transform.Result watchTransformResult = mock(Transform.Result.class);
when(watchTransformResult.status()).thenReturn(Transform.Result.Status.SUCCESS);
when(watchTransformResult.payload()).thenReturn(payload);
ExecutableTransform watchTransform = mock(ExecutableTransform.class);
when(watchTransform.execute(context, payload)).thenReturn(watchTransformResult);
// action throttler
Throttler.Result throttleResult = mock(Throttler.Result.class);
when(throttleResult.throttle()).thenReturn(false);
ActionThrottler throttler = mock(ActionThrottler.class);
when(throttler.throttle("_action", context)).thenReturn(throttleResult);
// action level transform
Transform.Result actionTransformResult = mock(Transform.Result.class);
when(actionTransformResult.status()).thenReturn(Transform.Result.Status.FAILURE);
when(actionTransformResult.reason()).thenReturn("_reason");
ExecutableTransform actionTransform = mock(ExecutableTransform.class);
when(actionTransform.execute(context, payload)).thenReturn(actionTransformResult);
// the action
Action.Result actionResult = mock(Action.Result.class);
when(actionResult.type()).thenReturn("_action_type");
when(actionResult.status()).thenReturn(Action.Result.Status.SUCCESS);
ExecutableAction action = mock(ExecutableAction.class);
when(action.logger()).thenReturn(logger);
when(action.execute("_action", context, payload)).thenReturn(actionResult);
ActionWrapper actionWrapper = new ActionWrapper("_action", throttler, actionTransform, action);
ExecutableActions actions = new ExecutableActions(Arrays.asList(actionWrapper));
WatchStatus watchStatus = new WatchStatus(ImmutableMap.of("_action", new ActionStatus(clock.nowUTC())));
when(watch.input()).thenReturn(input);
when(watch.condition()).thenReturn(condition);
when(watch.transform()).thenReturn(watchTransform);
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
WatchRecord watchRecord = executionService.execute(context);
assertThat(watchRecord.result().inputResult(), is(inputResult));
assertThat(watchRecord.result().conditionResult(), is(conditionResult));
assertThat(watchRecord.result().transformResult(), is(watchTransformResult));
assertThat(watchRecord.result().actionsResults(), notNullValue());
assertThat(watchRecord.result().actionsResults().count(), is(1));
assertThat(watchRecord.result().actionsResults().get("_action").transform(), is(actionTransformResult));
assertThat(watchRecord.result().actionsResults().get("_action").action().status(), is(Action.Result.Status.FAILURE));
verify(historyStore, times(1)).put(watchRecord);
verify(lock, times(1)).release();
verify(input, times(1)).execute(context);
verify(condition, times(1)).execute(context);
verify(watchTransform, times(1)).execute(context, payload);
// the action level transform is executed before the action itself
verify(action, never()).execute("_action", context, payload);
}
@Test
public void testExecuteInner() throws Exception {
DateTime now = DateTime.now(UTC);

View File

@ -37,7 +37,7 @@ import static org.mockito.Mockito.mock;
public class ChainTransformTests extends ElasticsearchTestCase {
@Test
public void testApply() throws Exception {
public void testExecute() throws Exception {
ChainTransform transform = new ChainTransform(ImmutableList.<Transform>of(
new NamedExecutableTransform.Transform("name1"),
new NamedExecutableTransform.Transform("name2"),
@ -51,7 +51,21 @@ public class ChainTransformTests extends ElasticsearchTestCase {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
Payload payload = new Payload.Simple(new HashMap<String, Object>());
Transform.Result result = executable.execute(ctx, payload);
ChainTransform.Result result = executable.execute(ctx, payload);
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
assertThat(result.results(), hasSize(3));
assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class));
assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS));
assertThat((List<String>) result.results().get(0).payload().data().get("names"), hasSize(1));
assertThat((List<String>) result.results().get(0).payload().data().get("names"), contains("name1"));
assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class));
assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS));
assertThat((List<String>) result.results().get(1).payload().data().get("names"), hasSize(2));
assertThat((List<String>) result.results().get(1).payload().data().get("names"), contains("name1", "name2"));
assertThat(result.results().get(2), instanceOf(NamedExecutableTransform.Result.class));
assertThat(result.results().get(2).status(), is(Transform.Result.Status.SUCCESS));
assertThat((List<String>) result.results().get(2).payload().data().get("names"), hasSize(3));
assertThat((List<String>) result.results().get(2).payload().data().get("names"), contains("name1", "name2", "name3"));
Map<String, Object> data = result.payload().data();
assertThat(data, notNullValue());
@ -62,6 +76,39 @@ public class ChainTransformTests extends ElasticsearchTestCase {
assertThat(names, contains("name1", "name2", "name3"));
}
@Test
public void testExecute_Failure() throws Exception {
ChainTransform transform = new ChainTransform(ImmutableList.of(
new NamedExecutableTransform.Transform("name1"),
new NamedExecutableTransform.Transform("name2"),
new FailingExecutableTransform.Transform()
));
ExecutableChainTransform executable = new ExecutableChainTransform(transform, logger, ImmutableList.<ExecutableTransform>of(
new NamedExecutableTransform("name1"),
new NamedExecutableTransform("name2"),
new FailingExecutableTransform(logger)));
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
Payload payload = new Payload.Simple(new HashMap<String, Object>());
ChainTransform.Result result = executable.execute(ctx, payload);
assertThat(result.status(), is(Transform.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.results(), hasSize(3));
assertThat(result.results().get(0), instanceOf(NamedExecutableTransform.Result.class));
assertThat(result.results().get(0).status(), is(Transform.Result.Status.SUCCESS));
assertThat((List<String>) result.results().get(0).payload().data().get("names"), hasSize(1));
assertThat((List<String>) result.results().get(0).payload().data().get("names"), contains("name1"));
assertThat(result.results().get(1), instanceOf(NamedExecutableTransform.Result.class));
assertThat(result.results().get(1).status(), is(Transform.Result.Status.SUCCESS));
assertThat((List<String>) result.results().get(1).payload().data().get("names"), hasSize(2));
assertThat((List<String>) result.results().get(1).payload().data().get("names"), contains("name1", "name2"));
assertThat(result.results().get(2), instanceOf(FailingExecutableTransform.Result.class));
assertThat(result.results().get(2).status(), is(Transform.Result.Status.FAILURE));
assertThat(result.results().get(2).reason(), containsString("_error"));
}
@Test
public void testParser() throws Exception {
Map<String, TransformFactory> factories = ImmutableMap.<String, TransformFactory>builder()
@ -103,14 +150,16 @@ public class ChainTransformTests extends ElasticsearchTestCase {
}
@Override
public Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> data = new HashMap<>(payload.data());
List<String> names = (List<String>) data.get("names");
public Result execute(WatchExecutionContext ctx, Payload payload) {
List<String> names = (List<String>) payload.data().get("names");
if (names == null) {
names = new ArrayList<>();
data.put("names", names);
} else {
names = new ArrayList<>(names);
}
names.add(transform.name);
Map<String, Object> data = new HashMap<>();
data.put("names", names);
return new Result("named", new Payload.Simple(data));
}
@ -178,4 +227,68 @@ public class ChainTransformTests extends ElasticsearchTestCase {
}
}
}
private static class FailingExecutableTransform extends ExecutableTransform<FailingExecutableTransform.Transform, FailingExecutableTransform.Result> {
private static final String TYPE = "throwing";
public FailingExecutableTransform(ESLogger logger) {
super(new Transform(), logger);
}
@Override
public Result execute(WatchExecutionContext ctx, Payload payload) {
return new Result(TYPE);
}
public static class Transform implements org.elasticsearch.watcher.transform.Transform {
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endArray();
}
}
public static class Result extends Transform.Result {
public Result(String type) {
super(type, new Exception("_error"));
}
@Override
protected XContentBuilder typeXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Factory extends TransformFactory<Transform, Result, FailingExecutableTransform> {
public Factory(ESLogger transformLogger) {
super(transformLogger);
}
@Override
public String type() {
return TYPE;
}
@Override
public Transform parseTransform(String watchId, XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.END_OBJECT;
return new Transform();
}
@Override
public FailingExecutableTransform createExecutable(Transform transform) {
return new FailingExecutableTransform(transformLogger);
}
}
}
}

View File

@ -58,7 +58,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
@Test
public void testApply_MapValue() throws Exception {
public void testExecute_MapValue() throws Exception {
ScriptServiceProxy service = mock(ScriptServiceProxy.class);
ScriptType type = randomFrom(ScriptType.values());
Map<String, Object> params = Collections.emptyMap();
@ -84,11 +84,39 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(ScriptTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
assertThat(result.payload().data(), equalTo(transformed));
}
@Test
public void testApply_NonMapValue() throws Exception {
public void testExecute_MapValue_Failure() throws Exception {
ScriptServiceProxy service = mock(ScriptServiceProxy.class);
ScriptType type = randomFrom(ScriptType.values());
Map<String, Object> params = Collections.emptyMap();
Script script = scriptBuilder(type, "_script").lang("_lang").params(params).build();
CompiledScript compiledScript = mock(CompiledScript.class);
when(service.compile(script)).thenReturn(compiledScript);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Payload payload = simplePayload("key", "value");
Map<String, Object> model = Variables.createCtxModel(ctx, payload);
ExecutableScript executable = mock(ExecutableScript.class);
when(executable.run()).thenThrow(new RuntimeException("_error"));
when(service.executable(compiledScript, model)).thenReturn(executable);
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(ScriptTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.FAILURE));
assertThat(result.reason(), containsString("_error"));
}
@Test
public void testExecute_NonMapValue() throws Exception {
ScriptServiceProxy service = mock(ScriptServiceProxy.class);
ScriptType type = randomFrom(ScriptType.values());

View File

@ -85,7 +85,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
}
@Test
public void testApply() throws Exception {
public void testExecute() throws Exception {
index("idx", "type", "1");
ensureGreen("idx");
@ -104,6 +104,7 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.SUCCESS));
SearchResponse response = client().search(request).get();
Payload expectedPayload = new Payload.XContent(response);
@ -120,7 +121,33 @@ public class SearchTransformTests extends ElasticsearchIntegrationTest {
}
@Test
public void testApply_MustacheTemplate() throws Exception {
public void testExecute_Failure() throws Exception {
index("idx", "type", "1");
ensureGreen("idx");
refresh();
// create a bad request
SearchRequest request = Requests.searchRequest("idx").source(jsonBuilder().startObject()
.startObject("query")
.startObject("_unknown_query_").endObject()
.endObject()
.endObject());
SearchTransform searchTransform = TransformBuilders.searchTransform(request).build();
ExecutableSearchTransform transform = new ExecutableSearchTransform(searchTransform, logger, ClientProxy.of(client()));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
SearchTransform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
assertThat(result.status(), is(Transform.Result.Status.FAILURE));
assertThat(result.reason(), notNullValue());
assertThat(result.executedRequest().templateSource().toUtf8(), containsString("_unknown_query_"));
}
@Test
public void testExecute_MustacheTemplate() throws Exception {
// The rational behind this test:
//