Cleanup and Refactoring of the transforms

* Split the transform into two constructs: `Transform` and `ExecutableTransform`. The former holds all the transform configuration, the latter can execute the transform based on that configuration (an executable transform holds a transform)
 - This makes the code clearer to understand and maintain.
 - This also enabled to pull some common implementation code into the `ExecutableTransform` and by that reduce the implementation details of each executable to the minimum required.

* Also, extracted the `Transform.Parser` to its own top level class, and renamed it to - `TransformFactory`. The main thing that the factory does is: 1) delegate to the parsing to the `Transform` class, 2) construct & wire up the `ExecutableTransform`.

* With the introduction of `Transform`, we no longer need the `SourceBuilder` for transforms. Instead, we have `Transform.Builder` that help you build a transform. This is much more intuitive from the client perspective.

Original commit: elastic/x-pack-elasticsearch@f6ee0d0c75
This commit is contained in:
uboness 2015-04-22 23:11:48 +02:00
parent 34c9d6af62
commit 690af790b2
38 changed files with 1257 additions and 783 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.execution.Wid;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.watch.Payload;
@ -23,14 +24,14 @@ import java.io.IOException;
public class ActionWrapper implements ToXContent {
private String id;
private final @Nullable Transform transform;
private final @Nullable ExecutableTransform transform;
private final ExecutableAction action;
public ActionWrapper(String id, ExecutableAction action) {
this(id, null, action);
}
public ActionWrapper(String id, @Nullable Transform transform, ExecutableAction action) {
public ActionWrapper(String id, @Nullable ExecutableTransform transform, ExecutableAction action) {
this.id = id;
this.transform = transform;
this.action = action;
@ -40,7 +41,7 @@ public class ActionWrapper implements ToXContent {
return id;
}
public Transform transform() {
public ExecutableTransform transform() {
return transform;
}
@ -52,7 +53,7 @@ public class ActionWrapper implements ToXContent {
Payload payload = ctx.payload();
Transform.Result transformResult = null;
if (transform != null) {
transformResult = transform.apply(ctx, payload);
transformResult = transform.execute(ctx, payload);
payload = transformResult.payload();
}
@ -84,7 +85,7 @@ public class ActionWrapper implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName())
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transform.type(), transform)
.endObject();
}
@ -95,7 +96,7 @@ public class ActionWrapper implements ToXContent {
static ActionWrapper parse(String watchId, String actionId, XContentParser parser, ActionRegistry actionRegistry, TransformRegistry transformRegistry) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
Transform transform = null;
ExecutableTransform transform = null;
ExecutableAction action = null;
String currentFieldName = null;
@ -104,8 +105,8 @@ public class ActionWrapper implements ToXContent {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) {
transform = transformRegistry.parse(parser);
if (Transform.Field.TRANSFORM.match(currentFieldName)) {
transform = transformRegistry.parse(watchId, parser);
} else {
// it's the type of the action
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);
@ -174,7 +175,7 @@ public class ActionWrapper implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName())
builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName())
.field(transform.type(), transform)
.endObject();
}
@ -194,8 +195,8 @@ public class ActionWrapper implements ToXContent {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(parser);
if (Transform.Field.TRANSFORM.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(wid.watchId(), parser);
} else {
// it's the type of the action
ActionFactory actionFactory = actionRegistry.factory(currentFieldName);

View File

@ -35,7 +35,7 @@ public class WatchSourceBuilder implements ToXContent {
private Trigger.SourceBuilder trigger;
private Input input = NoneInput.INSTANCE;
private Condition condition = AlwaysCondition.INSTANCE;
private Transform.SourceBuilder transform = null;
private Transform transform = null;
private Map<String, TransformedAction> actions = new HashMap<>();
private TimeValue throttlePeriod = null;
private Map<String, Object> metadata;
@ -63,21 +63,25 @@ public class WatchSourceBuilder implements ToXContent {
return this;
}
public WatchSourceBuilder transform(Transform.SourceBuilder transform) {
public WatchSourceBuilder transform(Transform transform) {
this.transform = transform;
return this;
}
public WatchSourceBuilder transform(Transform.Builder transform) {
return transform(transform.build());
}
public WatchSourceBuilder throttlePeriod(TimeValue throttlePeriod) {
this.throttlePeriod = throttlePeriod;
return this;
}
public WatchSourceBuilder addAction(String id, Transform.SourceBuilder transform, Action action) {
actions.put(id, new TransformedAction(id, action, transform));
return this;
public WatchSourceBuilder addAction(String id, Transform.Builder transform, Action action) {
return addAction(id, transform.build(), action);
}
public WatchSourceBuilder addAction(String id, Action action) {
actions.put(id, new TransformedAction(id, action));
return this;
@ -87,6 +91,16 @@ public class WatchSourceBuilder implements ToXContent {
return addAction(id, action.build());
}
public WatchSourceBuilder addAction(String id, Transform.Builder transform, Action.Builder action) {
actions.put(id, new TransformedAction(id, action.build(), transform.build()));
return this;
}
public WatchSourceBuilder addAction(String id, Transform transform, Action action) {
actions.put(id, new TransformedAction(id, action, transform));
return this;
}
public WatchSourceBuilder metadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
@ -148,13 +162,13 @@ public class WatchSourceBuilder implements ToXContent {
private final String id;
private final Action action;
private final @Nullable Transform.SourceBuilder transform;
private final @Nullable Transform transform;
public TransformedAction(String id, Action action) {
this(id, action, null);
}
public TransformedAction(String id, Action action, @Nullable Transform.SourceBuilder transform) {
public TransformedAction(String id, Action action, @Nullable Transform transform) {
this.id = id;
this.transform = transform;
this.action = action;
@ -164,7 +178,7 @@ public class WatchSourceBuilder implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (transform != null) {
builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName())
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transform.type(), transform)
.endObject();
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.watch.Watch;
@ -267,9 +268,9 @@ public class ExecutionService extends AbstractComponent {
}
if (!throttleResult.throttle()) {
Transform transform = watch.transform();
ExecutableTransform transform = watch.transform();
if (transform != null) {
Transform.Result result = watch.transform().apply(ctx, inputResult.payload());
Transform.Result result = watch.transform().execute(ctx, inputResult.payload());
ctx.onTransformResult(result);
}
for (ActionWrapper action : watch.actions()) {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.watcher.actions.ExecutableActions;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.watch.Payload;

View File

@ -7,7 +7,7 @@ package org.elasticsearch.watcher.support.init;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.ChainTransform;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
@ -25,7 +25,7 @@ public class InitializingModule extends AbstractModule {
Multibinder<InitializingService.Initializable> mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class);
mbinder.addBinding().to(ClientProxy.class);
mbinder.addBinding().to(ScriptServiceProxy.class);
mbinder.addBinding().to(ChainTransform.Parser.class);
mbinder.addBinding().to(ChainTransformFactory.class);
bind(InitializingService.class).asEagerSingleton();
}
}

View File

@ -1,231 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
/**
*
*/
public class ChainTransform extends Transform<ChainTransform.Result> {
public static final String TYPE = "chain";
private final ImmutableList<Transform> transforms;
public ChainTransform(ImmutableList<Transform> transforms) {
this.transforms = transforms;
}
@Override
public String type() {
return TYPE;
}
ImmutableList<Transform> transforms() {
return transforms;
}
@Override
public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException {
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
for (Transform transform : transforms) {
Transform.Result result = transform.apply(ctx, payload);
results.add(result);
payload = result.payload();
}
return new Result(TYPE, payload, results.build());
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray();
for (Transform transform : transforms) {
builder.startObject()
.field(transform.type(), transform)
.endObject();
}
return builder.endArray();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChainTransform transform = (ChainTransform) o;
if (!transforms.equals(transform.transforms)) return false;
return true;
}
@Override
public int hashCode() {
return transforms.hashCode();
}
public static class Result extends Transform.Result {
private final List<Transform.Result> results;
public Result(String type, Payload payload, List<Transform.Result> results) {
super(type, payload);
this.results = results;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Parser.RESULTS_FIELD.getPreferredName());
for (Transform.Result result : results) {
builder.startObject()
.field(result.type(), result)
.endObject();
}
return builder.endArray();
}
}
public static class Parser implements Transform.Parser<Result, ChainTransform>, InitializingService.Initializable {
public static final ParseField RESULTS_FIELD = new ParseField("results");
private TransformRegistry registry;
// used by guice
public Parser() {
}
// used for tests
Parser(TransformRegistry registry) {
this.registry = registry;
}
@Override
public void init(Injector injector) {
init(injector.getInstance(TransformRegistry.class));
}
public void init(TransformRegistry registry) {
this.registry = registry;
}
@Override
public String type() {
return TYPE;
}
@Override
public ChainTransform parse(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new WatcherSettingsException("could not parse [chain] transform. expected an array of objects, but found [" + token + '}');
}
ImmutableList.Builder<Transform> builder = ImmutableList.builder();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token != XContentParser.Token.START_OBJECT) {
throw new WatcherSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
builder.add(registry.parse(currentFieldName, parser));
} else {
throw new WatcherSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]");
}
}
}
return new ChainTransform(builder.build());
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [chain] transform result. expected an object, but found [" + token + "]");
}
Payload payload = null;
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.START_OBJECT) {
if (PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new TransformException("could not parse [chain] transform result. unexpected object field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (RESULTS_FIELD.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
results.add(registry.parseResult(parser));
} else {
throw new TransformException("could not parse [chain] transform result. expected an object representing a transform result, but found [" + token + "]");
}
}
} else {
throw new TransformException("could not parse [chain] transform result. unexpected array field [" + currentFieldName + "]");
}
} else {
throw new TransformException("could not parse [chain] transform result. unexpected token [" + token+ "]");
}
}
}
return new Result(TYPE, payload, results.build());
}
}
public static class SourceBuilder implements Transform.SourceBuilder {
private final ImmutableList.Builder<Transform.SourceBuilder> builders = ImmutableList.builder();
@Override
public String type() {
return TYPE;
}
public SourceBuilder(Transform.SourceBuilder... builders) {
this.builders.add(builders);
}
public SourceBuilder add(Transform.SourceBuilder builder) {
builders.add(builder);
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray();
for (Transform.SourceBuilder transBuilder : builders.build()) {
builder.startObject()
.field(TYPE, transBuilder)
.endObject();
}
return builder.endArray();
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public abstract class ExecutableTransform<T extends Transform, R extends Transform.Result> implements ToXContent {
protected final T transform;
protected final ESLogger logger;
public ExecutableTransform(T transform, ESLogger logger) {
this.transform = transform;
this.logger = logger;
}
public final String type() {
return transform.type();
}
public T transform() {
return transform;
}
public abstract R execute(WatchExecutionContext ctx, Payload payload) throws IOException;
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return transform.toXContent(builder, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutableTransform<?, ?> that = (ExecutableTransform<?, ?>) o;
return transform.equals(that.transform);
}
@Override
public int hashCode() {
return transform.hashCode();
}
}

View File

@ -1,162 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.watcher.support.Variables.createCtxModel;
/**
*
*/
public class ScriptTransform extends Transform<ScriptTransform.Result> {
public static final String TYPE = "script";
private final ScriptServiceProxy scriptService;
private final Script script;
public ScriptTransform(ScriptServiceProxy scriptService, Script script) {
this.scriptService = scriptService;
this.script = script;
}
@Override
public String type() {
return TYPE;
}
Script script() {
return script;
}
@Override
public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> model = new HashMap<>();
model.putAll(script.params());
model.putAll(createCtxModel(ctx, payload));
ExecutableScript executable = scriptService.executable(script.lang(), script.script(), script.type(), model);
Object value = executable.run();
if (value instanceof Map) {
return new Result(TYPE, new Payload.Simple((Map<String, Object>) value));
}
Map<String, Object> data = new HashMap<>();
data.put("_value", value);
return new Result(TYPE, new Payload.Simple(data));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.value(script);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScriptTransform transform = (ScriptTransform) o;
if (!script.equals(transform.script)) return false;
return true;
}
@Override
public int hashCode() {
return script.hashCode();
}
public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser implements Transform.Parser<Result, ScriptTransform> {
private final ScriptServiceProxy scriptService;
@Inject
public Parser(ScriptServiceProxy scriptService) {
this.scriptService = scriptService;
}
@Override
public String type() {
return TYPE;
}
@Override
public ScriptTransform parse(XContentParser parser) throws IOException {
Script script = null;
try {
script = Script.parse(parser);
} catch (Script.ParseException pe) {
throw new WatcherSettingsException("could not parse [script] transform", pe);
}
return new ScriptTransform(scriptService, script);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [script] transform result. expected an object, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) {
throw new TransformException("could not parse [script] transform result. expected a payload field, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [script] transform result. expected a payload object, but found [" + token + "]");
}
return new Result(TYPE, new Payload.XContent(parser));
}
}
public static class SourceBuilder implements Transform.SourceBuilder {
private final Script script;
public SourceBuilder(String script) {
this(new Script(script));
}
public SourceBuilder(Script script) {
this.script = script;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return script.toXContent(builder, params);
}
}
}

View File

@ -1,184 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import static org.elasticsearch.watcher.support.WatcherUtils.flattenModel;
import static org.elasticsearch.watcher.support.Variables.createCtxModel;
/**
*
*/
public class SearchTransform extends Transform<SearchTransform.Result> {
public static final String TYPE = "search";
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final ESLogger logger;
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
protected final SearchRequest request;
public SearchTransform(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest request) {
this.logger = logger;
this.scriptService = scriptService;
this.client = client;
this.request = request;
}
@Override
public String type() {
return TYPE;
}
@Override
public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx, payload);
SearchResponse resp = client.search(req);
return new Result(TYPE, new Payload.XContent(resp));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
return WatcherUtils.writeSearchRequest(request, builder, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchTransform transform = (SearchTransform) o;
if (!SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request)) return false;
return true;
}
@Override
public int hashCode() {
return request.hashCode();
}
SearchRequest createRequest(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload));
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, Object> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.putAll(flattenModel(createCtxModel(ctx, payload)));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
} else {
throw new TransformException("search requests needs either source or template name");
}
return request;
}
public static class Result extends Transform.Result {
public Result(String type, Payload payload) {
super(type, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
public static class Parser extends AbstractComponent implements Transform.Parser<Result, SearchTransform> {
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
@Inject
public Parser(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
super(settings);
this.scriptService = scriptService;
this.client = client;
}
@Override
public String type() {
return TYPE;
}
@Override
public SearchTransform parse(XContentParser parser) throws IOException {
SearchRequest request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
return new SearchTransform(logger, scriptService, client, request);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [search] transform result. expected an object, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) {
throw new TransformException("could not parse [search] transform result. expected a payload field, but found [" + token + "]");
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new TransformException("could not parse [search] transform result. expected a payload object, but found [" + token + "]");
}
return new Result(TYPE, new Payload.XContent(parser));
}
}
public static class SourceBuilder implements Transform.SourceBuilder {
private final SearchRequest request;
public SourceBuilder(SearchRequest request) {
this.request = request;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return WatcherUtils.writeSearchRequest(request, builder, params);
}
}
}

View File

@ -5,25 +5,21 @@
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public abstract class Transform<R extends Transform.Result> implements ToXContent {
public interface Transform extends ToXContent {
public abstract String type();
String type();
public abstract Result apply(WatchExecutionContext ctx, Payload payload) throws IOException;
public static abstract class Result implements ToXContent {
abstract class Result implements ToXContent {
protected final String type;
protected final Payload payload;
@ -44,7 +40,7 @@ public abstract class Transform<R extends Transform.Result> implements ToXConten
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.PAYLOAD_FIELD.getPreferredName(), payload);
builder.field(Field.PAYLOAD.getPreferredName(), payload);
xContentBody(builder, params);
return builder.endObject();
}
@ -53,23 +49,14 @@ public abstract class Transform<R extends Transform.Result> implements ToXConten
}
public static interface Parser<R extends Transform.Result, T extends Transform<R>> {
public static final ParseField PAYLOAD_FIELD = new ParseField("payload");
public static final ParseField TRANSFORM_FIELD = new ParseField("transform");
public static final ParseField TRANSFORM_RESULT_FIELD = new ParseField("transform_result");
String type();
T parse(XContentParser parser) throws IOException;
R parseResult(XContentParser parser) throws IOException;
interface Builder<T extends Transform> {
T build();
}
public static interface SourceBuilder extends ToXContent {
String type();
interface Field {
ParseField PAYLOAD = new ParseField("payload");
ParseField TRANSFORM = new ParseField("transform");
ParseField TRANSFORM_RESULT = new ParseField("transform_result");
}
}

View File

@ -6,7 +6,11 @@
package org.elasticsearch.watcher.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.transform.chain.ChainTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
/**
*
@ -16,20 +20,28 @@ public final class TransformBuilders {
private TransformBuilders() {
}
public static SearchTransform.SourceBuilder searchTransform(SearchRequest request) {
return new SearchTransform.SourceBuilder(request);
public static SearchTransform.Builder searchTransform(SearchRequest request) {
return SearchTransform.builder(request);
}
public static ScriptTransform.SourceBuilder scriptTransform(String script) {
return new ScriptTransform.SourceBuilder(script);
public static SearchTransform.Builder searchTransform(SearchRequestBuilder request) {
return searchTransform(request.request());
}
public static ScriptTransform.SourceBuilder scriptTransform(Script script) {
return new ScriptTransform.SourceBuilder(script);
public static ScriptTransform.Builder scriptTransform(String script) {
return scriptTransform(new Script(script));
}
public static ChainTransform.SourceBuilder chainTransform(Transform.SourceBuilder... transforms) {
return new ChainTransform.SourceBuilder(transforms);
public static ScriptTransform.Builder scriptTransform(Script script) {
return ScriptTransform.builder(script);
}
public static ChainTransform.Builder chainTransform(Transform.Builder... transforms) {
return ChainTransform.builder().add(transforms);
}
public static ChainTransform.Builder chainTransform(Transform... transforms) {
return ChainTransform.builder(transforms);
}
}

View File

@ -12,11 +12,11 @@ import org.elasticsearch.watcher.WatcherException;
*/
public class TransformException extends WatcherException {
public TransformException(String msg) {
super(msg);
public TransformException(String msg, Object... args) {
super(msg, args);
}
public TransformException(String msg, Throwable cause) {
super(msg, cause);
public TransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
*
*/
public abstract class TransformFactory<T extends Transform, R extends Transform.Result, E extends ExecutableTransform<T, R>> {
protected final ESLogger transformLogger;
public TransformFactory(ESLogger transformLogger) {
this.transformLogger = transformLogger;
}
/**
* @return The type of the transform
*/
public abstract String type();
/**
* Parses the given xcontent and creates a concrete transform
*/
public abstract T parseTransform(String watchId, XContentParser parser) throws IOException;
/**
* Parses the given xcontent and creates a concrete transform result
*/
public abstract R parseResult(String watchId, XContentParser parser) throws IOException;
/**
* Creates an executable transform out of the given transform.
*/
public abstract E createExecutable(T transform);
public E parseExecutable(String watchId, XContentParser parser) throws IOException {
T transform = parseTransform(watchId, parser);
return createExecutable(transform);
}
}

View File

@ -7,6 +7,12 @@ package org.elasticsearch.watcher.transform;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.watcher.transform.chain.ChainTransform;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.watcher.transform.script.ScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransformFactory;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import java.util.HashMap;
import java.util.Map;
@ -16,24 +22,26 @@ import java.util.Map;
*/
public class TransformModule extends AbstractModule {
private Map<String, Class<? extends Transform.Parser>> parsers = new HashMap<>();
private Map<String, Class<? extends TransformFactory>> factories = new HashMap<>();
public void registerPayload(String payloadType, Class<? extends Transform.Parser> parserType) {
parsers.put(payloadType, parserType);
public void registerTransform(String payloadType, Class<? extends TransformFactory> parserType) {
factories.put(payloadType, parserType);
}
@Override
protected void configure() {
MapBinder<String, TransformFactory> mbinder = MapBinder.newMapBinder(binder(), String.class, TransformFactory.class);
MapBinder<String, Transform.Parser> mbinder = MapBinder.newMapBinder(binder(), String.class, Transform.Parser.class);
bind(SearchTransform.Parser.class).asEagerSingleton();
mbinder.addBinding(SearchTransform.TYPE).to(SearchTransform.Parser.class);
bind(ScriptTransform.Parser.class).asEagerSingleton();
mbinder.addBinding(ScriptTransform.TYPE).to(ScriptTransform.Parser.class);
bind(ChainTransform.Parser.class).asEagerSingleton();
mbinder.addBinding(ChainTransform.TYPE).to(ChainTransform.Parser.class);
bind(SearchTransformFactory.class).asEagerSingleton();
mbinder.addBinding(SearchTransform.TYPE).to(SearchTransformFactory.class);
for (Map.Entry<String, Class<? extends Transform.Parser>> entry : parsers.entrySet()) {
bind(ScriptTransformFactory.class).asEagerSingleton();
mbinder.addBinding(ScriptTransform.TYPE).to(ScriptTransformFactory.class);
bind(ChainTransformFactory.class).asEagerSingleton();
mbinder.addBinding(ChainTransform.TYPE).to(ChainTransformFactory.class);
for (Map.Entry<String, Class<? extends TransformFactory>> entry : factories.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
mbinder.addBinding(entry.getKey()).to(entry.getValue());
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.watcher.transform;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
@ -18,36 +17,48 @@ import java.util.Map;
*/
public class TransformRegistry {
private final ImmutableMap<String, Transform.Parser> parsers;
private final ImmutableMap<String, TransformFactory> factories;
@Inject
public TransformRegistry(Map<String, Transform.Parser> parsers) {
this.parsers = ImmutableMap.copyOf(parsers);
public TransformRegistry(Map<String, TransformFactory> factories) {
this.factories = ImmutableMap.copyOf(factories);
}
public Transform parse(XContentParser parser) throws IOException {
public TransformFactory factory(String type) {
return factories.get(type);
}
public ExecutableTransform parse(String watchId, XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Transform transform = null;
ExecutableTransform transform = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (type != null) {
transform = parse(type, parser);
transform = parse(watchId, type, parser);
}
}
return transform;
}
public Transform parse(String type, XContentParser parser) throws IOException {
Transform.Parser transformParser = parsers.get(type);
if (transformParser == null) {
throw new WatcherSettingsException("unknown transform type [" + type + "]");
public ExecutableTransform parse(String watchId, String type, XContentParser parser) throws IOException {
TransformFactory factory = factories.get(type);
if (factory == null) {
throw new TransformException("could not parse transform for watch [{}], unknown transform type [{}]", watchId, type);
}
return transformParser.parse(parser);
return factory.parseExecutable(watchId, parser);
}
public Transform.Result parseResult(XContentParser parser) throws IOException {
public Transform parseTransform(String watchId, String type, XContentParser parser) throws IOException {
TransformFactory factory = factories.get(type);
if (factory == null) {
throw new TransformException("could not parse transform for watch [{}], unknown transform type [{}]", watchId, type);
}
return factory.parseTransform(watchId, parser);
}
public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Transform.Result result = null;
@ -55,17 +66,17 @@ public class TransformRegistry {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (type != null) {
result = parseResult(type, parser);
result = parseResult(watchId, type, parser);
}
}
return result;
}
public Transform.Result parseResult(String type, XContentParser parser) throws IOException {
Transform.Parser transformParser = parsers.get(type);
if (transformParser == null) {
throw new TransformException("unknown transform type [" + type + "]");
public Transform.Result parseResult(String watchId, String type, XContentParser parser) throws IOException {
TransformFactory factory = factories.get(type);
if (factory == null) {
throw new TransformException("could not parse transform result for watch [{}]. unknown transform type [{}]", watchId, type);
}
return transformParser.parseResult(parser);
return factory.parseResult(watchId, parser);
}
}

View File

@ -0,0 +1,190 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.chain;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public class ChainTransform implements Transform {
public static final String TYPE = "chain";
private final ImmutableList<Transform> transforms;
public ChainTransform(ImmutableList<Transform> transforms) {
this.transforms = transforms;
}
@Override
public String type() {
return TYPE;
}
public ImmutableList<Transform> getTransforms() {
return transforms;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChainTransform that = (ChainTransform) o;
return transforms.equals(that.transforms);
}
@Override
public int hashCode() {
return transforms.hashCode();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray();
for (Transform transform : transforms) {
builder.startObject()
.field(transform.type(), transform)
.endObject();
}
return builder.endArray();
}
public static ChainTransform parse(String watchId, XContentParser parser, TransformRegistry transformRegistry) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected an array of transform objects, but found [{}] instead", TYPE, watchId, token);
}
ImmutableList.Builder<Transform> builder = ImmutableList.builder();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token != XContentParser.Token.START_OBJECT) {
throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected a transform object, but found [{}] instead", TYPE, watchId, token);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
builder.add(transformRegistry.parseTransform(watchId, currentFieldName, parser));
} else {
throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected a transform object, but found [{}] instead", TYPE, watchId, token);
}
}
}
return new ChainTransform(builder.build());
}
public static Builder builder(Transform... transforms) {
return new Builder(transforms);
}
public static class Result extends Transform.Result {
private final ImmutableList<Transform.Result> results;
public Result(Payload payload, ImmutableList<Transform.Result> results) {
super(TYPE, payload);
this.results = results;
}
public ImmutableList<Transform.Result> results() {
return results;
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Field.RESULTS.getPreferredName());
for (Transform.Result result : results) {
builder.startObject()
.field(result.type(), result)
.endObject();
}
return builder.endArray();
}
public static Result parse(String watchId, XContentParser parser, TransformRegistry transformRegistry) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token);
}
Payload payload = null;
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else {
if (token == XContentParser.Token.START_OBJECT) {
if (Field.PAYLOAD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected object field [{}]", TYPE, watchId, currentFieldName);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (Field.RESULTS.match(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
results.add(transformRegistry.parseResult(watchId, parser));
} else {
throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. expected an object representing a transform result, but found [{}] instead", TYPE, watchId, token);
}
}
} else {
throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName);
}
} else {
throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected token [{}]", TYPE, watchId, token);
}
}
}
return new ChainTransform.Result(payload, results.build());
}
}
public static class Builder implements Transform.Builder<ChainTransform> {
private final ImmutableList.Builder<Transform> transforms = ImmutableList.builder();
public Builder(Transform... transforms) {
this.transforms.add(transforms);
}
public Builder add(Transform... transforms) {
this.transforms.add(transforms);
return this;
}
public Builder add(Transform.Builder... transforms) {
for (Transform.Builder transform: transforms) {
this.transforms.add(transform.build());
}
return this;
}
@Override
public ChainTransform build() {
return new ChainTransform(transforms.build());
}
}
interface Field extends Transform.Field {
ParseField RESULTS = new ParseField("results");
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.chain;
import org.elasticsearch.watcher.transform.TransformException;
/**
*
*/
public class ChainTransformException extends TransformException {
public ChainTransformException(String msg, Object... args) {
super(msg, args);
}
public ChainTransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.chain;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformFactory;
import org.elasticsearch.watcher.transform.TransformRegistry;
import java.io.IOException;
/**
*
*/
public class ChainTransformFactory extends TransformFactory<ChainTransform, ChainTransform.Result, ExecutableChainTransform> implements InitializingService.Initializable {
private TransformRegistry registry;
// used by guice
public ChainTransformFactory(Settings settings) {
super(Loggers.getLogger(ExecutableChainTransform.class, settings));
}
// used for tests
public ChainTransformFactory(TransformRegistry registry) {
super(Loggers.getLogger(ExecutableChainTransform.class));
this.registry = registry;
}
// used for tests
public ChainTransformFactory() {
super(Loggers.getLogger(ExecutableChainTransform.class));
}
@Override
public void init(Injector injector) {
init(injector.getInstance(TransformRegistry.class));
}
public void init(TransformRegistry registry) {
this.registry = registry;
}
@Override
public String type() {
return ChainTransform.TYPE;
}
@Override
public ChainTransform parseTransform(String watchId, XContentParser parser) throws IOException {
return ChainTransform.parse(watchId, parser, registry);
}
@Override
public ChainTransform.Result parseResult(String watchId, XContentParser parser) throws IOException {
return ChainTransform.Result.parse(watchId, parser, registry);
}
@Override
public ExecutableChainTransform createExecutable(ChainTransform chainTransform) {
ImmutableList.Builder<ExecutableTransform> executables = ImmutableList.builder();
for (Transform transform : chainTransform.getTransforms()) {
TransformFactory factory = registry.factory(transform.type());
executables.add(factory.createExecutable(transform));
}
return new ExecutableChainTransform(chainTransform, transformLogger, executables.build());
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.chain;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public class ExecutableChainTransform extends ExecutableTransform<ChainTransform, ChainTransform.Result> {
private final ImmutableList<ExecutableTransform> transforms;
public ExecutableChainTransform(ChainTransform transform, ESLogger logger, ImmutableList<ExecutableTransform> transforms) {
super(transform, logger);
this.transforms = transforms;
}
public ImmutableList<ExecutableTransform> executableTransforms() {
return transforms;
}
@Override
public ChainTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
ImmutableList.Builder<Transform.Result> results = ImmutableList.builder();
for (ExecutableTransform transform : transforms) {
Transform.Result result = transform.execute(ctx, payload);
results.add(result);
payload = result.payload();
}
return new ChainTransform.Result(payload, results.build());
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.script;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.watcher.support.Variables.createCtxModel;
/**
*
*/
public class ExecutableScriptTransform extends ExecutableTransform<ScriptTransform, ScriptTransform.Result> {
private final ScriptServiceProxy scriptService;
public ExecutableScriptTransform(ScriptTransform transform, ESLogger logger, ScriptServiceProxy scriptService) {
super(transform, logger);
this.scriptService = scriptService;
}
@Override
public ScriptTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
Script script = transform.getScript();
Map<String, Object> model = new HashMap<>();
model.putAll(script.params());
model.putAll(createCtxModel(ctx, payload));
ExecutableScript executable = scriptService.executable(script.lang(), script.script(), script.type(), model);
Object value = executable.run();
if (value instanceof Map) {
return new ScriptTransform.Result(new Payload.Simple((Map<String, Object>) value));
}
Map<String, Object> data = new HashMap<>();
data.put("_value", value);
return new ScriptTransform.Result(new Payload.Simple(data));
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.script;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public class ScriptTransform implements Transform {
public static final String TYPE = "script";
private final Script script;
public ScriptTransform(Script script) {
this.script = script;
}
@Override
public String type() {
return TYPE;
}
public Script getScript() {
return script;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScriptTransform that = (ScriptTransform) o;
return script.equals(that.script);
}
@Override
public int hashCode() {
return script.hashCode();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return script.toXContent(builder, params);
}
public static ScriptTransform parse(String watchId, XContentParser parser) throws IOException {
try {
Script script = Script.parse(parser);
return new ScriptTransform(script);
} catch (Script.ParseException pe) {
throw new ScriptTransformException("could not parse [{}] transform for watch [{}]. failed to parse script", pe, TYPE, watchId);
}
}
public static Builder builder(Script script) {
return new Builder(script);
}
public static class Result extends Transform.Result {
public Result(Payload payload) {
super(TYPE, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return null;
}
public static Result parse(String watchId, XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token);
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !Field.PAYLOAD.match(parser.currentName())) {
throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] object, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] object, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
}
return new ScriptTransform.Result(new Payload.XContent(parser));
}
}
public static class Builder implements Transform.Builder<ScriptTransform> {
private final Script script;
public Builder(Script script) {
this.script = script;
}
@Override
public ScriptTransform build() {
return new ScriptTransform(script);
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.script;
import org.elasticsearch.watcher.transform.TransformException;
/**
*
*/
public class ScriptTransformException extends TransformException {
public ScriptTransformException(String msg, Object... args) {
super(msg, args);
}
public ScriptTransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.script;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.TransformFactory;
import java.io.IOException;
/**
*
*/
public class ScriptTransformFactory extends TransformFactory<ScriptTransform, ScriptTransform.Result, ExecutableScriptTransform> {
private final ScriptServiceProxy scriptService;
@Inject
public ScriptTransformFactory(Settings settings, ScriptServiceProxy scriptService) {
super(Loggers.getLogger(ExecutableScriptTransform.class, settings));
this.scriptService = scriptService;
}
@Override
public String type() {
return ScriptTransform.TYPE;
}
@Override
public ScriptTransform parseTransform(String watchId, XContentParser parser) throws IOException {
return ScriptTransform.parse(watchId, parser);
}
@Override
public ScriptTransform.Result parseResult(String watchId, XContentParser parser) throws IOException {
return ScriptTransform.Result.parse(watchId, parser);
}
@Override
public ExecutableScriptTransform createExecutable(ScriptTransform transform) {
return new ExecutableScriptTransform(transform, transformLogger, scriptService);
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformException;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
import static org.elasticsearch.watcher.support.Variables.createCtxModel;
import static org.elasticsearch.watcher.support.WatcherUtils.flattenModel;
/**
*
*/
public class ExecutableSearchTransform extends ExecutableTransform<SearchTransform, SearchTransform.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH;
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client) {
super(transform, logger);
this.scriptService = scriptService;
this.client = client;
}
@Override
public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(transform.request, ctx, payload);
SearchResponse resp = client.search(req);
return new SearchTransform.Result(new Payload.XContent(resp));
}
SearchRequest createRequest(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload));
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, Object> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.putAll(flattenModel(createCtxModel(ctx, payload)));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
} else {
throw new TransformException("search requests needs either source or template name");
}
return request;
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.SearchRequestEquivalence;
import org.elasticsearch.watcher.support.SearchRequestParseException;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.watch.Payload;
import java.io.IOException;
/**
*
*/
public class SearchTransform implements Transform {
public static final String TYPE = "search";
protected final SearchRequest request;
public SearchTransform(SearchRequest request) {
this.request = request;
}
@Override
public String type() {
return TYPE;
}
public SearchRequest getRequest() {
return request;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchTransform transform = (SearchTransform) o;
return SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request);
}
@Override
public int hashCode() {
return request.hashCode();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return WatcherUtils.writeSearchRequest(request, builder, params);
}
public static SearchTransform parse(String watchId, XContentParser parser) throws IOException {
try {
SearchRequest request = WatcherUtils.readSearchRequest(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
return new SearchTransform(request);
} catch (SearchRequestParseException srpe) {
throw new SearchTransformException("could not parse [{}] transform for watch [{}]. failed parsing search request", srpe, TYPE, watchId);
}
}
public static Builder builder(SearchRequest request) {
return new Builder(request);
}
public static class Result extends Transform.Result {
public Result(Payload payload) {
super(TYPE, payload);
}
@Override
protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException {
return builder;
}
public static Result parse(String watchId, XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token);
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME || !Field.PAYLOAD.match(parser.currentName())) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token);
}
return new SearchTransform.Result(new Payload.XContent(parser));
}
}
public static class Builder implements Transform.Builder<SearchTransform> {
private final SearchRequest request;
public Builder(SearchRequest request) {
this.request = request;
}
@Override
public SearchTransform build() {
return new SearchTransform(request);
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.watcher.transform.TransformException;
/**
*
*/
public class SearchTransformException extends TransformException {
public SearchTransformException(String msg, Object... args) {
super(msg, args);
}
public SearchTransformException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.transform.search;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.transform.TransformFactory;
import java.io.IOException;
/**
*
*/
public class SearchTransformFactory extends TransformFactory<SearchTransform, SearchTransform.Result, ExecutableSearchTransform> {
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
@Inject
public SearchTransformFactory(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) {
super(Loggers.getLogger(ExecutableSearchTransform.class, settings));
this.scriptService = scriptService;
this.client = client;
}
@Override
public String type() {
return SearchTransform.TYPE;
}
@Override
public SearchTransform parseTransform(String watchId, XContentParser parser) throws IOException {
return SearchTransform.parse(watchId, parser);
}
@Override
public SearchTransform.Result parseResult(String watchId, XContentParser parser) throws IOException {
return SearchTransform.Result.parse(watchId, parser);
}
@Override
public ExecutableSearchTransform createExecutable(SearchTransform transform) {
return new ExecutableSearchTransform(transform, transformLogger, scriptService, client);
}
}

View File

@ -35,7 +35,7 @@ import org.elasticsearch.watcher.license.LicenseService;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.throttle.WatchThrottler;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
@ -67,11 +67,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final Map<String, Object> metadata;
@Nullable
private final Transform transform;
private final ExecutableTransform transform;
private final transient AtomicLong nonceCounter = new AtomicLong();
public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable Transform transform,
public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform,
ExecutableActions actions, @Nullable Map<String, Object> metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) {
this.name = name;
this.trigger = trigger;
@ -99,7 +99,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
return condition;
}
public Transform transform() {
public ExecutableTransform transform() {
return transform;
}
@ -234,7 +234,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
ExecutableInput input = defaultInput;
ExecutableCondition condition = defaultCondition;
ExecutableActions actions = null;
Transform transform = null;
ExecutableTransform transform = null;
Map<String, Object> metatdata = null;
Status status = null;
TimeValue throttlePeriod = defaultThrottleTimePeriod;
@ -256,7 +256,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
} else if (ACTIONS_FIELD.match(currentFieldName)) {
actions = actionRegistry.parseActions(id, parser);
} else if (TRANSFORM_FIELD.match(currentFieldName)) {
transform = transformRegistry.parse(parser);
transform = transformRegistry.parse(id, parser);
} else if (META_FIELD.match(currentFieldName)) {
metatdata = parser.map();
} else if (STATUS_FIELD.match(currentFieldName) && includeStatus) {

View File

@ -85,7 +85,7 @@ public class WatchExecution implements ToXContent {
}
}
if (transformResult != null) {
builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName()).field(transformResult.type(), transformResult).endObject();
builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName()).field(transformResult.type(), transformResult).endObject();
}
builder.startObject(Parser.ACTIONS_RESULTS.getPreferredName());
for (ActionWrapper.Result actionResult : actionsResults) {
@ -131,8 +131,8 @@ public class WatchExecution implements ToXContent {
inputResult = inputRegistry.parseResult(wid.watchId(), parser);
} else if (CONDITION_RESULT_FIELD.match(currentFieldName)) {
conditionResult = conditionRegistry.parseResult(wid.watchId(), parser);
} else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(parser);
} else if (Transform.Field.TRANSFORM_RESULT.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(wid.watchId(), parser);
} else if (ACTIONS_RESULTS.match(currentFieldName)) {
actionResults = actionRegistry.parseResults(wid, parser);
} else {

View File

@ -6,9 +6,14 @@
package org.elasticsearch.watcher.actions;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.transform.TransformFactory;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
@ -24,16 +29,27 @@ import static org.hamcrest.core.Is.is;
*/
public class TransformMocks {
public static class TransformMock extends Transform<TransformMock.Result> {
public static class ExecutableTransformMock extends ExecutableTransform {
private static final String TYPE = "mock";
public ExecutableTransformMock() {
super(new Transform() {
@Override
public String type() {
return "_transform";
return TYPE;
}
@Override
public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException {
return new Result("_transform", new Payload.Simple("_key", "_value"));
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().endObject();
}
}, Loggers.getLogger(ExecutableTransformMock.class));
}
@Override
public Transform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
return new Result(TYPE, new Payload.Simple("_key", "_value"));
}
@Override
@ -56,41 +72,51 @@ public class TransformMocks {
public static class TransformRegistryMock extends TransformRegistry {
public TransformRegistryMock(final Transform transform) {
super(ImmutableMap.<String, Transform.Parser>of("_transform", new Transform.Parser() {
public TransformRegistryMock(final ExecutableTransform executable) {
super(ImmutableMap.<String, TransformFactory>of("_transform", new TransformFactory(Loggers.getLogger(TransformRegistryMock.class)) {
@Override
public String type() {
return transform.type();
return executable.type();
}
@Override
public Transform parse(XContentParser parser) throws IOException {
public Transform parseTransform(String watchId, XContentParser parser) throws IOException {
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT));
return transform;
return null;
}
@Override
public Transform.Result parseResult(XContentParser parser) throws IOException {
return null; // should not be called when this ctor is used
public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException {
return null;
}
@Override
public ExecutableTransform createExecutable(Transform transform) {
return executable;
}
}));
}
public TransformRegistryMock(final Transform.Result result) {
super(ImmutableMap.<String, Transform.Parser>of("_transform_type", new Transform.Parser() {
super(ImmutableMap.<String, TransformFactory>of("_transform_type", new TransformFactory(Loggers.getLogger(TransformRegistryMock.class)) {
@Override
public String type() {
return result.type();
}
@Override
public Transform parse(XContentParser parser) throws IOException {
return null; // should not be called when this ctor is used.
public Transform parseTransform(String watchId, XContentParser parser) throws IOException {
return null;
}
@Override
public Transform.Result parseResult(XContentParser parser) throws IOException {
public ExecutableTransform createExecutable(Transform transform) {
return null;
}
@Override
public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException {
assertThat(parser.currentToken(), is(XContentParser.Token.START_OBJECT));
parser.nextToken();
assertThat(parser.currentToken(), is(XContentParser.Token.FIELD_NAME));

View File

@ -23,6 +23,7 @@ import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.Transform;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.*;
@ -77,8 +78,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
Throttler throttler = mock(Throttler.class);
when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult);
Transform transform = mock(Transform.class);
when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ExecutableTransform transform = mock(ExecutableTransform.class);
when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ActionWrapper action = mock(ActionWrapper.class);
when(action.execute(any(WatchExecutionContext.class))).thenReturn(watchActionResult);
ExecutableActions actions = new ExecutableActions(Arrays.asList(action));
@ -104,7 +105,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
verify(condition, times(1)).execute(any(WatchExecutionContext.class));
verify(throttler, times(1)).throttle(any(WatchExecutionContext.class));
verify(transform, times(1)).apply(any(WatchExecutionContext.class), same(payload));
verify(transform, times(1)).execute(any(WatchExecutionContext.class), same(payload));
verify(action, times(1)).execute(any(WatchExecutionContext.class));
}
@ -123,8 +124,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
Throttler throttler = mock(Throttler.class);
when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult);
Transform transform = mock(Transform.class);
when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ExecutableTransform transform = mock(ExecutableTransform.class);
when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ActionWrapper action = mock(ActionWrapper.class);
when(action.execute(any(WatchExecutionContext.class))).thenReturn(actionResult);
ExecutableActions actions = new ExecutableActions(Arrays.asList(action));
@ -151,7 +152,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
verify(condition, times(1)).execute(any(WatchExecutionContext.class));
verify(throttler, times(1)).throttle(any(WatchExecutionContext.class));
verify(transform, never()).apply(any(WatchExecutionContext.class), same(payload));
verify(transform, never()).execute(any(WatchExecutionContext.class), same(payload));
verify(action, never()).execute(any(WatchExecutionContext.class));
}
@ -169,8 +170,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult);
Throttler throttler = mock(Throttler.class);
when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult);
Transform transform = mock(Transform.class);
when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ExecutableTransform transform = mock(ExecutableTransform.class);
when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult);
ActionWrapper action = mock(ActionWrapper.class);
when(action.execute(any(WatchExecutionContext.class))).thenReturn(actionResult);
ExecutableActions actions = new ExecutableActions(Arrays.asList(action));
@ -197,7 +198,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
verify(condition, times(1)).execute(any(WatchExecutionContext.class));
verify(throttler, never()).throttle(any(WatchExecutionContext.class));
verify(transform, never()).apply(any(WatchExecutionContext.class), same(payload));
verify(transform, never()).execute(any(WatchExecutionContext.class), same(payload));
verify(action, never()).execute(any(WatchExecutionContext.class));
}

View File

@ -44,7 +44,8 @@ import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.xmustache.XMustacheTemplateEngine;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.transform.SearchTransform;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.trigger.TriggerEvent;
import org.elasticsearch.watcher.trigger.schedule.CronSchedule;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
@ -131,7 +132,7 @@ public final class WatcherTestUtils {
SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE);
transformRequest.searchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
conditionRequest.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE);
List<ActionWrapper> actions = new ArrayList<>();
@ -181,7 +182,7 @@ public final class WatcherTestUtils {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")),
new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService),
new SearchTransform(logger, scriptService, client, transformRequest),
new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, scriptService, client),
new ExecutableActions(actions),
metadata,
new TimeValue(0),

View File

@ -28,7 +28,8 @@ import org.elasticsearch.watcher.support.clock.SystemClock;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transform.SearchTransform;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.trigger.schedule.CronSchedule;
@ -91,7 +92,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())),
new ExecutableActions(new ArrayList<ActionWrapper>()),
null, // metadata
new TimeValue(0),
@ -153,7 +154,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests {
new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled
new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())),
new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()),
new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest),
new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())),
new ExecutableActions(new ArrayList<ActionWrapper>()),
null, // metadata
new TimeValue(0),

View File

@ -9,7 +9,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transform.SearchTransform;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
@ -43,7 +43,7 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests {
SearchRequest inputRequest = WatcherTestUtils.newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = WatcherTestUtils.newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE);
transformRequest.searchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE);
Map<String, Object> metadata = new HashMap<>();
metadata.put("foo", "bar");

View File

@ -7,12 +7,17 @@ package org.elasticsearch.watcher.transform;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.transform.chain.ChainTransform;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.watcher.transform.chain.ExecutableChainTransform;
import org.elasticsearch.watcher.watch.Payload;
import org.junit.Test;
import java.io.IOException;
@ -33,14 +38,19 @@ public class ChainTransformTests extends ElasticsearchTestCase {
@Test
public void testApply() throws Exception {
ChainTransform transform = new ChainTransform(ImmutableList.<Transform>of(
new NamedTransform("name1"),
new NamedTransform("name2"),
new NamedTransform("name3")));
new NamedExecutableTransform.Transform("name1"),
new NamedExecutableTransform.Transform("name2"),
new NamedExecutableTransform.Transform("name3")
));
ExecutableChainTransform executable = new ExecutableChainTransform(transform, logger, ImmutableList.<ExecutableTransform>of(
new NamedExecutableTransform("name1"),
new NamedExecutableTransform("name2"),
new NamedExecutableTransform("name3")));
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
Payload payload = new Payload.Simple(new HashMap<String, Object>());
Transform.Result result = transform.apply(ctx, payload);
Transform.Result result = executable.execute(ctx, payload);
Map<String, Object> data = result.payload().data();
assertThat(data, notNullValue());
@ -53,12 +63,12 @@ public class ChainTransformTests extends ElasticsearchTestCase {
@Test
public void testParser() throws Exception {
Map<String, Transform.Parser> parsers = ImmutableMap.<String, Transform.Parser>builder()
.put("named", new NamedTransform.Parser())
Map<String, TransformFactory> factories = ImmutableMap.<String, TransformFactory>builder()
.put("named", new NamedExecutableTransform.Factory(logger))
.build();
TransformRegistry registry = new TransformRegistry(parsers);
TransformRegistry registry = new TransformRegistry(factories);
ChainTransform.Parser transformParser = new ChainTransform.Parser(registry);
ChainTransformFactory transformParser = new ChainTransformFactory(registry);
XContentBuilder builder = jsonBuilder().startArray()
.startObject().startObject("named").field("name", "name1").endObject().endObject()
@ -68,46 +78,58 @@ public class ChainTransformTests extends ElasticsearchTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ChainTransform transform = transformParser.parse(parser);
assertThat(transform, notNullValue());
assertThat(transform.transforms(), notNullValue());
assertThat(transform.transforms(), hasSize(3));
for (int i = 0; i < transform.transforms().size(); i++) {
assertThat(transform.transforms().get(i), instanceOf(NamedTransform.class));
assertThat(((NamedTransform) transform.transforms().get(i)).name, is("name" + (i + 1)));
ExecutableChainTransform executable = transformParser.parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.transform.getTransforms(), notNullValue());
assertThat(executable.transform.getTransforms(), hasSize(3));
for (int i = 0; i < executable.transform.getTransforms().size(); i++) {
assertThat(executable.executableTransforms().get(i), instanceOf(NamedExecutableTransform.class));
assertThat(((NamedExecutableTransform) executable.executableTransforms().get(i)).transform().name, is("name" + (i + 1)));
}
}
private static class NamedTransform extends Transform<NamedTransform.Result> {
private static class NamedExecutableTransform extends ExecutableTransform<NamedExecutableTransform.Transform, NamedExecutableTransform.Result> {
private final String name;
private static final String TYPE = "named";
public NamedTransform(String name) {
this.name = name;
public NamedExecutableTransform(String name) {
this(new Transform(name));
}
public NamedExecutableTransform(Transform transform) {
super(transform, Loggers.getLogger(NamedExecutableTransform.class));
}
@Override
public String type() {
return "noop";
}
@Override
public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException {
public Result execute(WatchExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> data = new HashMap<>(payload.data());
List<String> names = (List<String>) data.get("names");
if (names == null) {
names = new ArrayList<>();
data.put("names", names);
}
names.add(name);
names.add(transform.name);
return new Result("named", new Payload.Simple(data));
}
public static class Transform implements org.elasticsearch.watcher.transform.Transform {
private final String name;
public Transform(String name) {
this.name = name;
}
@Override
public String type() {
return TYPE;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("name", name).endObject();
}
}
public static class Result extends Transform.Result {
@ -121,15 +143,19 @@ public class ChainTransformTests extends ElasticsearchTestCase {
}
}
public static class Parser implements Transform.Parser<Result, NamedTransform> {
public static class Factory extends TransformFactory<Transform, Result, NamedExecutableTransform> {
@Override
public String type() {
return "named";
public Factory(ESLogger transformLogger) {
super(transformLogger);
}
@Override
public NamedTransform parse(XContentParser parser) throws IOException {
public String type() {
return TYPE;
}
@Override
public Transform parseTransform(String watchId, XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME; // the "name" field
@ -138,11 +164,11 @@ public class ChainTransformTests extends ElasticsearchTestCase {
String name = parser.text();
token = parser.nextToken();
assert token == XContentParser.Token.END_OBJECT;
return new NamedTransform(name);
return new Transform(name);
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
public Result parseResult(String watchId, XContentParser parser) throws IOException {
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.FIELD_NAME; // the "payload" field
@ -153,6 +179,11 @@ public class ChainTransformTests extends ElasticsearchTestCase {
assert token == XContentParser.Token.END_OBJECT;
return new Result("named", payload);
}
@Override
public NamedExecutableTransform createExecutable(Transform transform) {
return new NamedExecutableTransform(transform);
}
}
}

View File

@ -7,7 +7,11 @@ package org.elasticsearch.watcher.transform;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableList;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.transform.script.ExecutableScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransformFactory;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.Script;
import org.elasticsearch.watcher.support.Variables;
@ -41,7 +45,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
ScriptService.ScriptType type = randomFrom(ScriptService.ScriptType.values());
Map<String, Object> params = Collections.emptyMap();
Script script = new Script("_script", type, "_lang", params);
ScriptTransform transform = new ScriptTransform(service, script);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -57,7 +61,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
when(executable.run()).thenReturn(transformed);
when(service.executable("_lang", "_script", type, model)).thenReturn(executable);
Transform.Result result = transform.apply(ctx, payload);
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(ScriptTransform.TYPE));
assertThat(result.payload().data(), equalTo(transformed));
@ -69,7 +73,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
ScriptService.ScriptType type = randomFrom(ScriptService.ScriptType.values());
Map<String, Object> params = Collections.emptyMap();
Script script = new Script("_script", type, "_lang", params);
ScriptTransform transform = new ScriptTransform(service, script);
ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service);
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
@ -82,7 +86,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
when(executable.run()).thenReturn(value);
when(service.executable("_lang", "_script", type, model)).thenReturn(executable);
Transform.Result result = transform.apply(ctx, payload);
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(ScriptTransform.TYPE));
assertThat(result.payload().data().size(), is(1));
@ -102,8 +106,8 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ScriptTransform transform = new ScriptTransform.Parser(service).parse(parser);
assertThat(transform.script(), equalTo(new Script("_script", type, "_lang", ImmutableMap.<String, Object>builder().put("key", "value").build())));
ExecutableScriptTransform transform = new ScriptTransformFactory(ImmutableSettings.EMPTY, service).parseExecutable("_id", parser);
assertThat(transform.transform().getScript(), equalTo(new Script("_script", type, "_lang", ImmutableMap.<String, Object>builder().put("key", "value").build())));
}
@Test
@ -113,7 +117,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ScriptTransform transform = new ScriptTransform.Parser(service).parse(parser);
assertThat(transform.script(), equalTo(new Script("_script", ScriptService.ScriptType.INLINE, ScriptService.DEFAULT_LANG, ImmutableMap.<String, Object>of())));
ExecutableScriptTransform transform = new ScriptTransformFactory(ImmutableSettings.EMPTY, service).parseExecutable("_id", parser);
assertThat(transform.transform().getScript(), equalTo(new Script("_script", ScriptService.ScriptType.INLINE, ScriptService.DEFAULT_LANG, ImmutableMap.<String, Object>of())));
}
}

View File

@ -17,6 +17,9 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
@ -51,11 +54,11 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
.startObject("match_all").endObject()
.endObject()
.endObject());
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD);
Transform.Result result = transform.apply(ctx, EMPTY_PAYLOAD);
Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
@ -108,14 +111,14 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
.must(rangeFilter("date").lt("{{ctx.execution_time}}"))
.must(termFilter("value", "{{ctx.payload.value}}")))));
SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request);
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00"), event, EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");
Transform.Result result = transform.apply(ctx, payload);
Transform.Result result = transform.execute(ctx, payload);
assertThat(result, notNullValue());
assertThat(result.type(), is(SearchTransform.TYPE));
@ -175,23 +178,23 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
builder.endObject();
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
SearchTransform transform = new SearchTransform.Parser(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parse(parser);
assertThat(transform, notNullValue());
assertThat(transform.type(), is(SearchTransform.TYPE));
assertThat(transform.request, notNullValue());
ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parseExecutable("_id", parser);
assertThat(executable, notNullValue());
assertThat(executable.type(), is(SearchTransform.TYPE));
assertThat(executable.transform().getRequest(), notNullValue());
if (indices != null) {
assertThat(transform.request.indices(), arrayContainingInAnyOrder(indices));
assertThat(executable.transform().getRequest().indices(), arrayContainingInAnyOrder(indices));
}
if (searchType != null) {
assertThat(transform.request.searchType(), is(searchType));
assertThat(executable.transform().getRequest().searchType(), is(searchType));
}
if (templateName != null) {
assertThat(transform.request.templateName(), equalTo(templateName));
assertThat(executable.transform().getRequest().templateName(), equalTo(templateName));
}
if (templateType != null) {
assertThat(transform.request.templateType(), equalTo(templateType));
assertThat(executable.transform().getRequest().templateType(), equalTo(templateType));
}
assertThat(transform.request.source().toBytes(), equalTo(source.toBytes()));
assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes()));
}
private static Map<String, Object> doc(String date, String value) {

View File

@ -68,7 +68,18 @@ import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.support.template.TemplateEngine;
import org.elasticsearch.watcher.test.WatcherTestUtils;
import org.elasticsearch.watcher.transform.*;
import org.elasticsearch.watcher.transform.ExecutableTransform;
import org.elasticsearch.watcher.transform.TransformFactory;
import org.elasticsearch.watcher.transform.TransformRegistry;
import org.elasticsearch.watcher.transform.chain.ChainTransform;
import org.elasticsearch.watcher.transform.chain.ChainTransformFactory;
import org.elasticsearch.watcher.transform.chain.ExecutableChainTransform;
import org.elasticsearch.watcher.transform.script.ExecutableScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransform;
import org.elasticsearch.watcher.transform.script.ScriptTransformFactory;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
import org.elasticsearch.watcher.trigger.TriggerService;
@ -124,7 +135,7 @@ public class WatchTests extends ElasticsearchTestCase {
ExecutableCondition condition = randomCondition();
ConditionRegistry conditionRegistry = registry(condition);
Transform transform = randomTransform();
ExecutableTransform transform = randomTransform();
ExecutableActions actions = randomActions();
ActionRegistry actionRegistry = registry(actions, transformRegistry);
@ -252,27 +263,30 @@ public class WatchTests extends ElasticsearchTestCase {
}
}
private Transform randomTransform() {
private ExecutableTransform randomTransform() {
String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE);
switch (type) {
case ScriptTransform.TYPE:
return new ScriptTransform(scriptService, new Script("_script"));
return new ExecutableScriptTransform(new ScriptTransform(new Script("_script")), logger, scriptService);
case SearchTransform.TYPE:
return new SearchTransform(logger, scriptService, client, matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS));
return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client);
default: // chain
return new ChainTransform(ImmutableList.<Transform>of(
new SearchTransform(logger, scriptService, client, matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(scriptService, new Script("_script"))));
ChainTransform chainTransform = new ChainTransform(ImmutableList.of(
new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)),
new ScriptTransform(new Script("_script"))));
return new ExecutableChainTransform(chainTransform, logger, ImmutableList.<ExecutableTransform>of(
new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client),
new ExecutableScriptTransform(new ScriptTransform(new Script("_script")), logger, scriptService)));
}
}
private TransformRegistry transformRegistry() {
ImmutableMap.Builder<String, Transform.Parser> parsers = ImmutableMap.builder();
ChainTransform.Parser parser = new ChainTransform.Parser();
parsers.put(ChainTransform.TYPE, parser);
parsers.put(ScriptTransform.TYPE, new ScriptTransform.Parser(scriptService));
parsers.put(SearchTransform.TYPE, new SearchTransform.Parser(settings, scriptService, client));
TransformRegistry registry = new TransformRegistry(parsers.build());
ImmutableMap.Builder<String, TransformFactory> factories = ImmutableMap.builder();
ChainTransformFactory parser = new ChainTransformFactory();
factories.put(ChainTransform.TYPE, parser);
factories.put(ScriptTransform.TYPE, new ScriptTransformFactory(settings, scriptService));
factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, scriptService, client));
TransformRegistry registry = new TransformRegistry(factories.build());
parser.init(registry);
return registry;
}
@ -280,7 +294,7 @@ public class WatchTests extends ElasticsearchTestCase {
private ExecutableActions randomActions() {
ImmutableList.Builder<ActionWrapper> list = ImmutableList.builder();
if (randomBoolean()) {
Transform transform = randomTransform();
ExecutableTransform transform = randomTransform();
EmailAction action = new EmailAction(EmailTemplate.builder().build(), null, null, Profile.STANDARD, randomBoolean());
list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine)));
}