From 690af790b284b2fc20a9376b91e1e808beae84cd Mon Sep 17 00:00:00 2001 From: uboness Date: Wed, 22 Apr 2015 23:11:48 +0200 Subject: [PATCH] Cleanup and Refactoring of the transforms * Split the transform into two constructs: `Transform` and `ExecutableTransform`. The former holds all the transform configuration, the latter can execute the transform based on that configuration (an executable transform holds a transform) - This makes the code clearer to understand and maintain. - This also enabled to pull some common implementation code into the `ExecutableTransform` and by that reduce the implementation details of each executable to the minimum required. * Also, extracted the `Transform.Parser` to its own top level class, and renamed it to - `TransformFactory`. The main thing that the factory does is: 1) delegate to the parsing to the `Transform` class, 2) construct & wire up the `ExecutableTransform`. * With the introduction of `Transform`, we no longer need the `SourceBuilder` for transforms. Instead, we have `Transform.Builder` that help you build a transform. This is much more intuitive from the client perspective. Original commit: elastic/x-pack-elasticsearch@f6ee0d0c75079a6f31095833e20f68973c8a6ef8 --- .../watcher/actions/ActionWrapper.java | 23 +- .../watcher/client/WatchSourceBuilder.java | 30 ++- .../watcher/execution/ExecutionService.java | 5 +- .../execution/WatchExecutionContext.java | 1 + .../support/init/InitializingModule.java | 4 +- .../watcher/transform/ChainTransform.java | 231 ------------------ .../transform/ExecutableTransform.java | 59 +++++ .../watcher/transform/ScriptTransform.java | 162 ------------ .../watcher/transform/SearchTransform.java | 184 -------------- .../watcher/transform/Transform.java | 35 +-- .../watcher/transform/TransformBuilders.java | 28 ++- .../watcher/transform/TransformException.java | 8 +- .../watcher/transform/TransformFactory.java | 48 ++++ .../watcher/transform/TransformModule.java | 30 ++- .../watcher/transform/TransformRegistry.java | 49 ++-- .../transform/chain/ChainTransform.java | 190 ++++++++++++++ .../chain/ChainTransformException.java | 22 ++ .../chain/ChainTransformFactory.java | 78 ++++++ .../chain/ExecutableChainTransform.java | 44 ++++ .../script/ExecutableScriptTransform.java | 49 ++++ .../transform/script/ScriptTransform.java | 112 +++++++++ .../script/ScriptTransformException.java | 22 ++ .../script/ScriptTransformFactory.java | 49 ++++ .../search/ExecutableSearchTransform.java | 73 ++++++ .../transform/search/SearchTransform.java | 115 +++++++++ .../search/SearchTransformException.java | 22 ++ .../search/SearchTransformFactory.java | 52 ++++ .../elasticsearch/watcher/watch/Watch.java | 12 +- .../watcher/watch/WatchExecution.java | 6 +- .../watcher/actions/TransformMocks.java | 60 +++-- .../execution/ExecutionServiceTests.java | 19 +- .../watcher/test/WatcherTestUtils.java | 7 +- .../test/integration/BootStrapTests.java | 7 +- .../integration/TransformSearchTests.java | 4 +- .../transform/ChainTransformTests.java | 107 +++++--- .../transform/ScriptTransformTests.java | 20 +- .../transform/SearchTransformTests.java | 29 ++- .../watcher/watch/WatchTests.java | 44 ++-- 38 files changed, 1257 insertions(+), 783 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/watcher/transform/ChainTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java delete mode 100644 src/main/java/org/elasticsearch/watcher/transform/ScriptTransform.java delete mode 100644 src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/TransformFactory.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformException.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformFactory.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformException.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformFactory.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/search/SearchTransform.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformException.java create mode 100644 src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java diff --git a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java index ce956479f74..021acf6d4b7 100644 --- a/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java +++ b/src/main/java/org/elasticsearch/watcher/actions/ActionWrapper.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.watcher.execution.WatchExecutionContext; import org.elasticsearch.watcher.execution.Wid; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.watcher.watch.Payload; @@ -23,14 +24,14 @@ import java.io.IOException; public class ActionWrapper implements ToXContent { private String id; - private final @Nullable Transform transform; + private final @Nullable ExecutableTransform transform; private final ExecutableAction action; public ActionWrapper(String id, ExecutableAction action) { this(id, null, action); } - public ActionWrapper(String id, @Nullable Transform transform, ExecutableAction action) { + public ActionWrapper(String id, @Nullable ExecutableTransform transform, ExecutableAction action) { this.id = id; this.transform = transform; this.action = action; @@ -40,7 +41,7 @@ public class ActionWrapper implements ToXContent { return id; } - public Transform transform() { + public ExecutableTransform transform() { return transform; } @@ -52,7 +53,7 @@ public class ActionWrapper implements ToXContent { Payload payload = ctx.payload(); Transform.Result transformResult = null; if (transform != null) { - transformResult = transform.apply(ctx, payload); + transformResult = transform.execute(ctx, payload); payload = transformResult.payload(); } @@ -84,7 +85,7 @@ public class ActionWrapper implements ToXContent { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (transform != null) { - builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName()) + builder.startObject(Transform.Field.TRANSFORM.getPreferredName()) .field(transform.type(), transform) .endObject(); } @@ -95,7 +96,7 @@ public class ActionWrapper implements ToXContent { static ActionWrapper parse(String watchId, String actionId, XContentParser parser, ActionRegistry actionRegistry, TransformRegistry transformRegistry) throws IOException { assert parser.currentToken() == XContentParser.Token.START_OBJECT; - Transform transform = null; + ExecutableTransform transform = null; ExecutableAction action = null; String currentFieldName = null; @@ -104,8 +105,8 @@ public class ActionWrapper implements ToXContent { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else { - if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) { - transform = transformRegistry.parse(parser); + if (Transform.Field.TRANSFORM.match(currentFieldName)) { + transform = transformRegistry.parse(watchId, parser); } else { // it's the type of the action ActionFactory actionFactory = actionRegistry.factory(currentFieldName); @@ -174,7 +175,7 @@ public class ActionWrapper implements ToXContent { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (transform != null) { - builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName()) + builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName()) .field(transform.type(), transform) .endObject(); } @@ -194,8 +195,8 @@ public class ActionWrapper implements ToXContent { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else { - if (Transform.Parser.TRANSFORM_FIELD.match(currentFieldName)) { - transformResult = transformRegistry.parseResult(parser); + if (Transform.Field.TRANSFORM.match(currentFieldName)) { + transformResult = transformRegistry.parseResult(wid.watchId(), parser); } else { // it's the type of the action ActionFactory actionFactory = actionRegistry.factory(currentFieldName); diff --git a/src/main/java/org/elasticsearch/watcher/client/WatchSourceBuilder.java b/src/main/java/org/elasticsearch/watcher/client/WatchSourceBuilder.java index c5430ade319..7e28929bca7 100644 --- a/src/main/java/org/elasticsearch/watcher/client/WatchSourceBuilder.java +++ b/src/main/java/org/elasticsearch/watcher/client/WatchSourceBuilder.java @@ -35,7 +35,7 @@ public class WatchSourceBuilder implements ToXContent { private Trigger.SourceBuilder trigger; private Input input = NoneInput.INSTANCE; private Condition condition = AlwaysCondition.INSTANCE; - private Transform.SourceBuilder transform = null; + private Transform transform = null; private Map actions = new HashMap<>(); private TimeValue throttlePeriod = null; private Map metadata; @@ -63,21 +63,25 @@ public class WatchSourceBuilder implements ToXContent { return this; } - public WatchSourceBuilder transform(Transform.SourceBuilder transform) { + public WatchSourceBuilder transform(Transform transform) { this.transform = transform; return this; } + public WatchSourceBuilder transform(Transform.Builder transform) { + return transform(transform.build()); + } + public WatchSourceBuilder throttlePeriod(TimeValue throttlePeriod) { this.throttlePeriod = throttlePeriod; return this; } - public WatchSourceBuilder addAction(String id, Transform.SourceBuilder transform, Action action) { - actions.put(id, new TransformedAction(id, action, transform)); - return this; + public WatchSourceBuilder addAction(String id, Transform.Builder transform, Action action) { + return addAction(id, transform.build(), action); } + public WatchSourceBuilder addAction(String id, Action action) { actions.put(id, new TransformedAction(id, action)); return this; @@ -87,6 +91,16 @@ public class WatchSourceBuilder implements ToXContent { return addAction(id, action.build()); } + public WatchSourceBuilder addAction(String id, Transform.Builder transform, Action.Builder action) { + actions.put(id, new TransformedAction(id, action.build(), transform.build())); + return this; + } + + public WatchSourceBuilder addAction(String id, Transform transform, Action action) { + actions.put(id, new TransformedAction(id, action, transform)); + return this; + } + public WatchSourceBuilder metadata(Map metadata) { this.metadata = metadata; return this; @@ -148,13 +162,13 @@ public class WatchSourceBuilder implements ToXContent { private final String id; private final Action action; - private final @Nullable Transform.SourceBuilder transform; + private final @Nullable Transform transform; public TransformedAction(String id, Action action) { this(id, action, null); } - public TransformedAction(String id, Action action, @Nullable Transform.SourceBuilder transform) { + public TransformedAction(String id, Action action, @Nullable Transform transform) { this.id = id; this.transform = transform; this.action = action; @@ -164,7 +178,7 @@ public class WatchSourceBuilder implements ToXContent { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (transform != null) { - builder.startObject(Transform.Parser.TRANSFORM_FIELD.getPreferredName()) + builder.startObject(Transform.Field.TRANSFORM.getPreferredName()) .field(transform.type(), transform) .endObject(); } diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index c4717759335..acc18732777 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -26,6 +26,7 @@ import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.support.Callback; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.throttle.Throttler; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.watch.Watch; @@ -267,9 +268,9 @@ public class ExecutionService extends AbstractComponent { } if (!throttleResult.throttle()) { - Transform transform = watch.transform(); + ExecutableTransform transform = watch.transform(); if (transform != null) { - Transform.Result result = watch.transform().apply(ctx, inputResult.payload()); + Transform.Result result = watch.transform().execute(ctx, inputResult.payload()); ctx.onTransformResult(result); } for (ActionWrapper action : watch.actions()) { diff --git a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java index b18c0797d4f..9338195db2e 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java +++ b/src/main/java/org/elasticsearch/watcher/execution/WatchExecutionContext.java @@ -11,6 +11,7 @@ import org.elasticsearch.watcher.actions.ExecutableActions; import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.throttle.Throttler; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.watch.Payload; diff --git a/src/main/java/org/elasticsearch/watcher/support/init/InitializingModule.java b/src/main/java/org/elasticsearch/watcher/support/init/InitializingModule.java index d16f2dc2eaa..65341dfc9f5 100644 --- a/src/main/java/org/elasticsearch/watcher/support/init/InitializingModule.java +++ b/src/main/java/org/elasticsearch/watcher/support/init/InitializingModule.java @@ -7,7 +7,7 @@ package org.elasticsearch.watcher.support.init; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; -import org.elasticsearch.watcher.transform.ChainTransform; +import org.elasticsearch.watcher.transform.chain.ChainTransformFactory; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; @@ -25,7 +25,7 @@ public class InitializingModule extends AbstractModule { Multibinder mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class); mbinder.addBinding().to(ClientProxy.class); mbinder.addBinding().to(ScriptServiceProxy.class); - mbinder.addBinding().to(ChainTransform.Parser.class); + mbinder.addBinding().to(ChainTransformFactory.class); bind(InitializingService.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/ChainTransform.java b/src/main/java/org/elasticsearch/watcher/transform/ChainTransform.java deleted file mode 100644 index 113e757a6f6..00000000000 --- a/src/main/java/org/elasticsearch/watcher/transform/ChainTransform.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.transform; - -import org.elasticsearch.watcher.WatcherSettingsException; -import org.elasticsearch.watcher.execution.WatchExecutionContext; -import org.elasticsearch.watcher.watch.Payload; -import org.elasticsearch.watcher.support.init.InitializingService; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.collect.ImmutableList; -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; - -import java.io.IOException; -import java.util.List; - -/** - * - */ -public class ChainTransform extends Transform { - - public static final String TYPE = "chain"; - - private final ImmutableList transforms; - - public ChainTransform(ImmutableList transforms) { - this.transforms = transforms; - } - - @Override - public String type() { - return TYPE; - } - - ImmutableList transforms() { - return transforms; - } - - @Override - public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException { - ImmutableList.Builder results = ImmutableList.builder(); - for (Transform transform : transforms) { - Transform.Result result = transform.apply(ctx, payload); - results.add(result); - payload = result.payload(); - } - return new Result(TYPE, payload, results.build()); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray(); - for (Transform transform : transforms) { - builder.startObject() - .field(transform.type(), transform) - .endObject(); - } - return builder.endArray(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ChainTransform transform = (ChainTransform) o; - - if (!transforms.equals(transform.transforms)) return false; - - return true; - } - - @Override - public int hashCode() { - return transforms.hashCode(); - } - - public static class Result extends Transform.Result { - - private final List results; - - public Result(String type, Payload payload, List results) { - super(type, payload); - this.results = results; - } - - @Override - protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { - builder.startArray(Parser.RESULTS_FIELD.getPreferredName()); - for (Transform.Result result : results) { - builder.startObject() - .field(result.type(), result) - .endObject(); - } - return builder.endArray(); - } - } - - public static class Parser implements Transform.Parser, InitializingService.Initializable { - - public static final ParseField RESULTS_FIELD = new ParseField("results"); - - private TransformRegistry registry; - - // used by guice - public Parser() { - } - - // used for tests - Parser(TransformRegistry registry) { - this.registry = registry; - } - - @Override - public void init(Injector injector) { - init(injector.getInstance(TransformRegistry.class)); - } - - public void init(TransformRegistry registry) { - this.registry = registry; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public ChainTransform parse(XContentParser parser) throws IOException { - XContentParser.Token token = parser.currentToken(); - if (token != XContentParser.Token.START_ARRAY) { - throw new WatcherSettingsException("could not parse [chain] transform. expected an array of objects, but found [" + token + '}'); - } - - ImmutableList.Builder builder = ImmutableList.builder(); - - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token != XContentParser.Token.START_OBJECT) { - throw new WatcherSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]"); - } - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - builder.add(registry.parse(currentFieldName, parser)); - } else { - throw new WatcherSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]"); - } - } - } - return new ChainTransform(builder.build()); - } - - @Override - public Result parseResult(XContentParser parser) throws IOException { - XContentParser.Token token = parser.currentToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new TransformException("could not parse [chain] transform result. expected an object, but found [" + token + "]"); - } - - Payload payload = null; - ImmutableList.Builder results = ImmutableList.builder(); - - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else { - if (token == XContentParser.Token.START_OBJECT) { - if (PAYLOAD_FIELD.match(currentFieldName)) { - payload = new Payload.XContent(parser); - } else { - throw new TransformException("could not parse [chain] transform result. unexpected object field [" + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if (RESULTS_FIELD.match(currentFieldName)) { - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - if (token == XContentParser.Token.START_OBJECT) { - results.add(registry.parseResult(parser)); - } else { - throw new TransformException("could not parse [chain] transform result. expected an object representing a transform result, but found [" + token + "]"); - } - } - } else { - throw new TransformException("could not parse [chain] transform result. unexpected array field [" + currentFieldName + "]"); - } - } else { - throw new TransformException("could not parse [chain] transform result. unexpected token [" + token+ "]"); - } - } - } - return new Result(TYPE, payload, results.build()); - } - } - - public static class SourceBuilder implements Transform.SourceBuilder { - - private final ImmutableList.Builder builders = ImmutableList.builder(); - - @Override - public String type() { - return TYPE; - } - - public SourceBuilder(Transform.SourceBuilder... builders) { - this.builders.add(builders); - } - - public SourceBuilder add(Transform.SourceBuilder builder) { - builders.add(builder); - return this; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray(); - for (Transform.SourceBuilder transBuilder : builders.build()) { - builder.startObject() - .field(TYPE, transBuilder) - .endObject(); - } - return builder.endArray(); - } - } - -} diff --git a/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java new file mode 100644 index 00000000000..a77bbf0efad --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/ExecutableTransform.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +/** + * + */ +public abstract class ExecutableTransform implements ToXContent { + + protected final T transform; + protected final ESLogger logger; + + public ExecutableTransform(T transform, ESLogger logger) { + this.transform = transform; + this.logger = logger; + } + + public final String type() { + return transform.type(); + } + + public T transform() { + return transform; + } + + public abstract R execute(WatchExecutionContext ctx, Payload payload) throws IOException; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return transform.toXContent(builder, params); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ExecutableTransform that = (ExecutableTransform) o; + + return transform.equals(that.transform); + } + + @Override + public int hashCode() { + return transform.hashCode(); + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/ScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/ScriptTransform.java deleted file mode 100644 index 2aa7ab34eeb..00000000000 --- a/src/main/java/org/elasticsearch/watcher/transform/ScriptTransform.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.transform; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.watcher.WatcherSettingsException; -import org.elasticsearch.watcher.support.Script; -import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; -import org.elasticsearch.watcher.watch.Payload; -import org.elasticsearch.watcher.execution.WatchExecutionContext; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import static org.elasticsearch.watcher.support.Variables.createCtxModel; - -/** - * - */ -public class ScriptTransform extends Transform { - - public static final String TYPE = "script"; - - private final ScriptServiceProxy scriptService; - private final Script script; - - public ScriptTransform(ScriptServiceProxy scriptService, Script script) { - this.scriptService = scriptService; - this.script = script; - } - - @Override - public String type() { - return TYPE; - } - - Script script() { - return script; - } - - @Override - public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException { - Map model = new HashMap<>(); - model.putAll(script.params()); - model.putAll(createCtxModel(ctx, payload)); - ExecutableScript executable = scriptService.executable(script.lang(), script.script(), script.type(), model); - Object value = executable.run(); - if (value instanceof Map) { - return new Result(TYPE, new Payload.Simple((Map) value)); - } - Map data = new HashMap<>(); - data.put("_value", value); - return new Result(TYPE, new Payload.Simple(data)); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.value(script); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ScriptTransform transform = (ScriptTransform) o; - - if (!script.equals(transform.script)) return false; - - return true; - } - - @Override - public int hashCode() { - return script.hashCode(); - } - - public static class Result extends Transform.Result { - - public Result(String type, Payload payload) { - super(type, payload); - } - - @Override - protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { - return builder; - } - } - - public static class Parser implements Transform.Parser { - - private final ScriptServiceProxy scriptService; - - @Inject - public Parser(ScriptServiceProxy scriptService) { - this.scriptService = scriptService; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public ScriptTransform parse(XContentParser parser) throws IOException { - Script script = null; - try { - script = Script.parse(parser); - } catch (Script.ParseException pe) { - throw new WatcherSettingsException("could not parse [script] transform", pe); - } - return new ScriptTransform(scriptService, script); - } - - @Override - public Result parseResult(XContentParser parser) throws IOException { - XContentParser.Token token = parser.currentToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new TransformException("could not parse [script] transform result. expected an object, but found [" + token + "]"); - } - token = parser.nextToken(); - if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) { - throw new TransformException("could not parse [script] transform result. expected a payload field, but found [" + token + "]"); - } - token = parser.nextToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new TransformException("could not parse [script] transform result. expected a payload object, but found [" + token + "]"); - } - return new Result(TYPE, new Payload.XContent(parser)); - } - } - - public static class SourceBuilder implements Transform.SourceBuilder { - - private final Script script; - - public SourceBuilder(String script) { - this(new Script(script)); - } - - public SourceBuilder(Script script) { - this.script = script; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return script.toXContent(builder, params); - } - } -} diff --git a/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java deleted file mode 100644 index 1798b58473c..00000000000 --- a/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.transform; - -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.watcher.execution.WatchExecutionContext; -import org.elasticsearch.watcher.watch.Payload; -import org.elasticsearch.watcher.support.WatcherUtils; -import org.elasticsearch.watcher.support.SearchRequestEquivalence; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; -import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.ScriptService; - -import java.io.IOException; - -import static org.elasticsearch.watcher.support.WatcherUtils.flattenModel; -import static org.elasticsearch.watcher.support.Variables.createCtxModel; - -/** - * - */ -public class SearchTransform extends Transform { - - public static final String TYPE = "search"; - - public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH; - - protected final ESLogger logger; - protected final ScriptServiceProxy scriptService; - protected final ClientProxy client; - - protected final SearchRequest request; - - public SearchTransform(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest request) { - this.logger = logger; - this.scriptService = scriptService; - this.client = client; - this.request = request; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException { - SearchRequest req = createRequest(request, ctx, payload); - SearchResponse resp = client.search(req); - return new Result(TYPE, new Payload.XContent(resp)); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - return WatcherUtils.writeSearchRequest(request, builder, params); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SearchTransform transform = (SearchTransform) o; - - if (!SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request)) return false; - - return true; - } - - @Override - public int hashCode() { - return request.hashCode(); - } - - SearchRequest createRequest(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException { - SearchRequest request = new SearchRequest(requestPrototype) - .indicesOptions(requestPrototype.indicesOptions()) - .indices(requestPrototype.indices()); - if (Strings.hasLength(requestPrototype.source())) { - String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false); - ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload)); - request.source((BytesReference) script.unwrap(script.run()), false); - } else if (requestPrototype.templateName() != null) { - MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) - .putAll(flattenModel(createCtxModel(ctx, payload))); - request.templateParams(templateParams.map()); - request.templateName(requestPrototype.templateName()); - request.templateType(requestPrototype.templateType()); - } else { - throw new TransformException("search requests needs either source or template name"); - } - return request; - } - - public static class Result extends Transform.Result { - - public Result(String type, Payload payload) { - super(type, payload); - } - - @Override - protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { - return builder; - } - } - - public static class Parser extends AbstractComponent implements Transform.Parser { - - protected final ScriptServiceProxy scriptService; - protected final ClientProxy client; - - @Inject - public Parser(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) { - super(settings); - this.scriptService = scriptService; - this.client = client; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public SearchTransform parse(XContentParser parser) throws IOException { - SearchRequest request = WatcherUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE); - return new SearchTransform(logger, scriptService, client, request); - } - - @Override - public Result parseResult(XContentParser parser) throws IOException { - XContentParser.Token token = parser.currentToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new TransformException("could not parse [search] transform result. expected an object, but found [" + token + "]"); - } - token = parser.nextToken(); - if (token != XContentParser.Token.FIELD_NAME || !PAYLOAD_FIELD.match(parser.currentName())) { - throw new TransformException("could not parse [search] transform result. expected a payload field, but found [" + token + "]"); - } - token = parser.nextToken(); - if (token != XContentParser.Token.START_OBJECT) { - throw new TransformException("could not parse [search] transform result. expected a payload object, but found [" + token + "]"); - } - return new Result(TYPE, new Payload.XContent(parser)); - } - } - - public static class SourceBuilder implements Transform.SourceBuilder { - - private final SearchRequest request; - - public SourceBuilder(SearchRequest request) { - this.request = request; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return WatcherUtils.writeSearchRequest(request, builder, params); - } - } - -} diff --git a/src/main/java/org/elasticsearch/watcher/transform/Transform.java b/src/main/java/org/elasticsearch/watcher/transform/Transform.java index aa7fd12bd5b..19d97d9299e 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/Transform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/Transform.java @@ -5,25 +5,21 @@ */ package org.elasticsearch.watcher.transform; -import org.elasticsearch.watcher.execution.WatchExecutionContext; -import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.watch.Payload; import java.io.IOException; /** * */ -public abstract class Transform implements ToXContent { +public interface Transform extends ToXContent { - public abstract String type(); + String type(); - public abstract Result apply(WatchExecutionContext ctx, Payload payload) throws IOException; - - public static abstract class Result implements ToXContent { + abstract class Result implements ToXContent { protected final String type; protected final Payload payload; @@ -44,7 +40,7 @@ public abstract class Transform implements ToXConten @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(Parser.PAYLOAD_FIELD.getPreferredName(), payload); + builder.field(Field.PAYLOAD.getPreferredName(), payload); xContentBody(builder, params); return builder.endObject(); } @@ -53,23 +49,14 @@ public abstract class Transform implements ToXConten } - public static interface Parser> { - - public static final ParseField PAYLOAD_FIELD = new ParseField("payload"); - public static final ParseField TRANSFORM_FIELD = new ParseField("transform"); - public static final ParseField TRANSFORM_RESULT_FIELD = new ParseField("transform_result"); - - String type(); - - T parse(XContentParser parser) throws IOException; - - R parseResult(XContentParser parser) throws IOException; + interface Builder { + T build(); } - public static interface SourceBuilder extends ToXContent { - - String type(); + interface Field { + ParseField PAYLOAD = new ParseField("payload"); + ParseField TRANSFORM = new ParseField("transform"); + ParseField TRANSFORM_RESULT = new ParseField("transform_result"); } - } diff --git a/src/main/java/org/elasticsearch/watcher/transform/TransformBuilders.java b/src/main/java/org/elasticsearch/watcher/transform/TransformBuilders.java index 0b3a56b3d96..037892a599e 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/TransformBuilders.java +++ b/src/main/java/org/elasticsearch/watcher/transform/TransformBuilders.java @@ -6,7 +6,11 @@ package org.elasticsearch.watcher.transform; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.transform.chain.ChainTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransform; +import org.elasticsearch.watcher.transform.search.SearchTransform; /** * @@ -16,20 +20,28 @@ public final class TransformBuilders { private TransformBuilders() { } - public static SearchTransform.SourceBuilder searchTransform(SearchRequest request) { - return new SearchTransform.SourceBuilder(request); + public static SearchTransform.Builder searchTransform(SearchRequest request) { + return SearchTransform.builder(request); } - public static ScriptTransform.SourceBuilder scriptTransform(String script) { - return new ScriptTransform.SourceBuilder(script); + public static SearchTransform.Builder searchTransform(SearchRequestBuilder request) { + return searchTransform(request.request()); } - public static ScriptTransform.SourceBuilder scriptTransform(Script script) { - return new ScriptTransform.SourceBuilder(script); + public static ScriptTransform.Builder scriptTransform(String script) { + return scriptTransform(new Script(script)); } - public static ChainTransform.SourceBuilder chainTransform(Transform.SourceBuilder... transforms) { - return new ChainTransform.SourceBuilder(transforms); + public static ScriptTransform.Builder scriptTransform(Script script) { + return ScriptTransform.builder(script); + } + + public static ChainTransform.Builder chainTransform(Transform.Builder... transforms) { + return ChainTransform.builder().add(transforms); + } + + public static ChainTransform.Builder chainTransform(Transform... transforms) { + return ChainTransform.builder(transforms); } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/TransformException.java b/src/main/java/org/elasticsearch/watcher/transform/TransformException.java index ffe16e24750..b5b57c2b4b0 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/TransformException.java +++ b/src/main/java/org/elasticsearch/watcher/transform/TransformException.java @@ -12,11 +12,11 @@ import org.elasticsearch.watcher.WatcherException; */ public class TransformException extends WatcherException { - public TransformException(String msg) { - super(msg); + public TransformException(String msg, Object... args) { + super(msg, args); } - public TransformException(String msg, Throwable cause) { - super(msg, cause); + public TransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/TransformFactory.java b/src/main/java/org/elasticsearch/watcher/transform/TransformFactory.java new file mode 100644 index 00000000000..600649f01ee --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/TransformFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; + +/** + * + */ +public abstract class TransformFactory> { + + protected final ESLogger transformLogger; + + public TransformFactory(ESLogger transformLogger) { + this.transformLogger = transformLogger; + } + + /** + * @return The type of the transform + */ + public abstract String type(); + + /** + * Parses the given xcontent and creates a concrete transform + */ + public abstract T parseTransform(String watchId, XContentParser parser) throws IOException; + + /** + * Parses the given xcontent and creates a concrete transform result + */ + public abstract R parseResult(String watchId, XContentParser parser) throws IOException; + + /** + * Creates an executable transform out of the given transform. + */ + public abstract E createExecutable(T transform); + + public E parseExecutable(String watchId, XContentParser parser) throws IOException { + T transform = parseTransform(watchId, parser); + return createExecutable(transform); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/TransformModule.java b/src/main/java/org/elasticsearch/watcher/transform/TransformModule.java index 08d08317e54..0f62dca754c 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/TransformModule.java +++ b/src/main/java/org/elasticsearch/watcher/transform/TransformModule.java @@ -7,6 +7,12 @@ package org.elasticsearch.watcher.transform; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.MapBinder; +import org.elasticsearch.watcher.transform.chain.ChainTransform; +import org.elasticsearch.watcher.transform.chain.ChainTransformFactory; +import org.elasticsearch.watcher.transform.script.ScriptTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransformFactory; +import org.elasticsearch.watcher.transform.search.SearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransformFactory; import java.util.HashMap; import java.util.Map; @@ -16,24 +22,26 @@ import java.util.Map; */ public class TransformModule extends AbstractModule { - private Map> parsers = new HashMap<>(); + private Map> factories = new HashMap<>(); - public void registerPayload(String payloadType, Class parserType) { - parsers.put(payloadType, parserType); + public void registerTransform(String payloadType, Class parserType) { + factories.put(payloadType, parserType); } @Override protected void configure() { + MapBinder mbinder = MapBinder.newMapBinder(binder(), String.class, TransformFactory.class); - MapBinder mbinder = MapBinder.newMapBinder(binder(), String.class, Transform.Parser.class); - bind(SearchTransform.Parser.class).asEagerSingleton(); - mbinder.addBinding(SearchTransform.TYPE).to(SearchTransform.Parser.class); - bind(ScriptTransform.Parser.class).asEagerSingleton(); - mbinder.addBinding(ScriptTransform.TYPE).to(ScriptTransform.Parser.class); - bind(ChainTransform.Parser.class).asEagerSingleton(); - mbinder.addBinding(ChainTransform.TYPE).to(ChainTransform.Parser.class); + bind(SearchTransformFactory.class).asEagerSingleton(); + mbinder.addBinding(SearchTransform.TYPE).to(SearchTransformFactory.class); - for (Map.Entry> entry : parsers.entrySet()) { + bind(ScriptTransformFactory.class).asEagerSingleton(); + mbinder.addBinding(ScriptTransform.TYPE).to(ScriptTransformFactory.class); + + bind(ChainTransformFactory.class).asEagerSingleton(); + mbinder.addBinding(ChainTransform.TYPE).to(ChainTransformFactory.class); + + for (Map.Entry> entry : factories.entrySet()) { bind(entry.getValue()).asEagerSingleton(); mbinder.addBinding(entry.getKey()).to(entry.getValue()); } diff --git a/src/main/java/org/elasticsearch/watcher/transform/TransformRegistry.java b/src/main/java/org/elasticsearch/watcher/transform/TransformRegistry.java index 4f61a46ea13..a21863241e8 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/TransformRegistry.java +++ b/src/main/java/org/elasticsearch/watcher/transform/TransformRegistry.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.watcher.transform; -import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentParser; @@ -18,36 +17,48 @@ import java.util.Map; */ public class TransformRegistry { - private final ImmutableMap parsers; + private final ImmutableMap factories; @Inject - public TransformRegistry(Map parsers) { - this.parsers = ImmutableMap.copyOf(parsers); + public TransformRegistry(Map factories) { + this.factories = ImmutableMap.copyOf(factories); } - public Transform parse(XContentParser parser) throws IOException { + public TransformFactory factory(String type) { + return factories.get(type); + } + + public ExecutableTransform parse(String watchId, XContentParser parser) throws IOException { String type = null; XContentParser.Token token; - Transform transform = null; + ExecutableTransform transform = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { type = parser.currentName(); } else if (type != null) { - transform = parse(type, parser); + transform = parse(watchId, type, parser); } } return transform; } - public Transform parse(String type, XContentParser parser) throws IOException { - Transform.Parser transformParser = parsers.get(type); - if (transformParser == null) { - throw new WatcherSettingsException("unknown transform type [" + type + "]"); + public ExecutableTransform parse(String watchId, String type, XContentParser parser) throws IOException { + TransformFactory factory = factories.get(type); + if (factory == null) { + throw new TransformException("could not parse transform for watch [{}], unknown transform type [{}]", watchId, type); } - return transformParser.parse(parser); + return factory.parseExecutable(watchId, parser); } - public Transform.Result parseResult(XContentParser parser) throws IOException { + public Transform parseTransform(String watchId, String type, XContentParser parser) throws IOException { + TransformFactory factory = factories.get(type); + if (factory == null) { + throw new TransformException("could not parse transform for watch [{}], unknown transform type [{}]", watchId, type); + } + return factory.parseTransform(watchId, parser); + } + + public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException { String type = null; XContentParser.Token token; Transform.Result result = null; @@ -55,17 +66,17 @@ public class TransformRegistry { if (token == XContentParser.Token.FIELD_NAME) { type = parser.currentName(); } else if (type != null) { - result = parseResult(type, parser); + result = parseResult(watchId, type, parser); } } return result; } - public Transform.Result parseResult(String type, XContentParser parser) throws IOException { - Transform.Parser transformParser = parsers.get(type); - if (transformParser == null) { - throw new TransformException("unknown transform type [" + type + "]"); + public Transform.Result parseResult(String watchId, String type, XContentParser parser) throws IOException { + TransformFactory factory = factories.get(type); + if (factory == null) { + throw new TransformException("could not parse transform result for watch [{}]. unknown transform type [{}]", watchId, type); } - return transformParser.parseResult(parser); + return factory.parseResult(watchId, parser); } } diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransform.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransform.java new file mode 100644 index 00000000000..8513092c3b6 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransform.java @@ -0,0 +1,190 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.chain; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.transform.TransformRegistry; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +/** + * + */ +public class ChainTransform implements Transform { + + public static final String TYPE = "chain"; + + private final ImmutableList transforms; + + public ChainTransform(ImmutableList transforms) { + this.transforms = transforms; + } + + @Override + public String type() { + return TYPE; + } + + public ImmutableList getTransforms() { + return transforms; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ChainTransform that = (ChainTransform) o; + + return transforms.equals(that.transforms); + } + + @Override + public int hashCode() { + return transforms.hashCode(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + for (Transform transform : transforms) { + builder.startObject() + .field(transform.type(), transform) + .endObject(); + } + return builder.endArray(); + } + + public static ChainTransform parse(String watchId, XContentParser parser, TransformRegistry transformRegistry) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token != XContentParser.Token.START_ARRAY) { + throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected an array of transform objects, but found [{}] instead", TYPE, watchId, token); + } + + ImmutableList.Builder builder = ImmutableList.builder(); + + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token != XContentParser.Token.START_OBJECT) { + throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected a transform object, but found [{}] instead", TYPE, watchId, token); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + builder.add(transformRegistry.parseTransform(watchId, currentFieldName, parser)); + } else { + throw new ChainTransformException("could not parse [{}] transform for watch [{}]. expected a transform object, but found [{}] instead", TYPE, watchId, token); + } + } + } + return new ChainTransform(builder.build()); + } + + public static Builder builder(Transform... transforms) { + return new Builder(transforms); + } + + public static class Result extends Transform.Result { + + private final ImmutableList results; + + public Result(Payload payload, ImmutableList results) { + super(TYPE, payload); + this.results = results; + } + + public ImmutableList results() { + return results; + } + + @Override + protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startArray(Field.RESULTS.getPreferredName()); + for (Transform.Result result : results) { + builder.startObject() + .field(result.type(), result) + .endObject(); + } + return builder.endArray(); + } + + public static Result parse(String watchId, XContentParser parser, TransformRegistry transformRegistry) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token); + } + + Payload payload = null; + ImmutableList.Builder results = ImmutableList.builder(); + + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else { + if (token == XContentParser.Token.START_OBJECT) { + if (Field.PAYLOAD.match(currentFieldName)) { + payload = new Payload.XContent(parser); + } else { + throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected object field [{}]", TYPE, watchId, currentFieldName); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (Field.RESULTS.match(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + results.add(transformRegistry.parseResult(watchId, parser)); + } else { + throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. expected an object representing a transform result, but found [{}] instead", TYPE, watchId, token); + } + } + } else { + throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected array field [{}]", TYPE, watchId, currentFieldName); + } + } else { + throw new ChainTransformException("could not parse [{}] transform result for watch [{}]. unexpected token [{}]", TYPE, watchId, token); + } + } + } + return new ChainTransform.Result(payload, results.build()); + } + } + + public static class Builder implements Transform.Builder { + + private final ImmutableList.Builder transforms = ImmutableList.builder(); + + public Builder(Transform... transforms) { + this.transforms.add(transforms); + } + + public Builder add(Transform... transforms) { + this.transforms.add(transforms); + return this; + } + + public Builder add(Transform.Builder... transforms) { + for (Transform.Builder transform: transforms) { + this.transforms.add(transform.build()); + } + return this; + } + + @Override + public ChainTransform build() { + return new ChainTransform(transforms.build()); + } + } + + interface Field extends Transform.Field { + ParseField RESULTS = new ParseField("results"); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformException.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformException.java new file mode 100644 index 00000000000..5f9112f26c7 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformException.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.chain; + +import org.elasticsearch.watcher.transform.TransformException; + +/** + * + */ +public class ChainTransformException extends TransformException { + + public ChainTransformException(String msg, Object... args) { + super(msg, args); + } + + public ChainTransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformFactory.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformFactory.java new file mode 100644 index 00000000000..efbb4fdee0e --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ChainTransformFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.chain; + +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.init.InitializingService; +import org.elasticsearch.watcher.transform.ExecutableTransform; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.transform.TransformFactory; +import org.elasticsearch.watcher.transform.TransformRegistry; + +import java.io.IOException; + +/** + * + */ +public class ChainTransformFactory extends TransformFactory implements InitializingService.Initializable { + + private TransformRegistry registry; + + // used by guice + public ChainTransformFactory(Settings settings) { + super(Loggers.getLogger(ExecutableChainTransform.class, settings)); + } + + // used for tests + public ChainTransformFactory(TransformRegistry registry) { + super(Loggers.getLogger(ExecutableChainTransform.class)); + this.registry = registry; + } + + // used for tests + public ChainTransformFactory() { + super(Loggers.getLogger(ExecutableChainTransform.class)); + } + + @Override + public void init(Injector injector) { + init(injector.getInstance(TransformRegistry.class)); + } + + public void init(TransformRegistry registry) { + this.registry = registry; + } + + @Override + public String type() { + return ChainTransform.TYPE; + } + + @Override + public ChainTransform parseTransform(String watchId, XContentParser parser) throws IOException { + return ChainTransform.parse(watchId, parser, registry); + } + + @Override + public ChainTransform.Result parseResult(String watchId, XContentParser parser) throws IOException { + return ChainTransform.Result.parse(watchId, parser, registry); + } + + @Override + public ExecutableChainTransform createExecutable(ChainTransform chainTransform) { + ImmutableList.Builder executables = ImmutableList.builder(); + for (Transform transform : chainTransform.getTransforms()) { + TransformFactory factory = registry.factory(transform.type()); + executables.add(factory.createExecutable(transform)); + } + return new ExecutableChainTransform(chainTransform, transformLogger, executables.build()); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java new file mode 100644 index 00000000000..5eca75538df --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/chain/ExecutableChainTransform.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.chain; + +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.transform.ExecutableTransform; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +/** + * + */ +public class ExecutableChainTransform extends ExecutableTransform { + + private final ImmutableList transforms; + + public ExecutableChainTransform(ChainTransform transform, ESLogger logger, ImmutableList transforms) { + super(transform, logger); + this.transforms = transforms; + } + + public ImmutableList executableTransforms() { + return transforms; + } + + @Override + public ChainTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { + ImmutableList.Builder results = ImmutableList.builder(); + for (ExecutableTransform transform : transforms) { + Transform.Result result = transform.execute(ctx, payload); + results.add(result); + payload = result.payload(); + } + return new ChainTransform.Result(payload, results.build()); + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java new file mode 100644 index 00000000000..83ffaacb3a6 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ExecutableScriptTransform.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.script; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.transform.ExecutableTransform; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.watcher.support.Variables.createCtxModel; + +/** + * + */ +public class ExecutableScriptTransform extends ExecutableTransform { + + private final ScriptServiceProxy scriptService; + + public ExecutableScriptTransform(ScriptTransform transform, ESLogger logger, ScriptServiceProxy scriptService) { + super(transform, logger); + this.scriptService = scriptService; + } + + @Override + public ScriptTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { + Script script = transform.getScript(); + Map model = new HashMap<>(); + model.putAll(script.params()); + model.putAll(createCtxModel(ctx, payload)); + ExecutableScript executable = scriptService.executable(script.lang(), script.script(), script.type(), model); + Object value = executable.run(); + if (value instanceof Map) { + return new ScriptTransform.Result(new Payload.Simple((Map) value)); + } + Map data = new HashMap<>(); + data.put("_value", value); + return new ScriptTransform.Result(new Payload.Simple(data)); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java new file mode 100644 index 00000000000..ade7b0c871e --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransform.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.script; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.Script; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +/** + * + */ +public class ScriptTransform implements Transform { + + public static final String TYPE = "script"; + + private final Script script; + + public ScriptTransform(Script script) { + this.script = script; + } + + @Override + public String type() { + return TYPE; + } + + public Script getScript() { + return script; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ScriptTransform that = (ScriptTransform) o; + + return script.equals(that.script); + } + + @Override + public int hashCode() { + return script.hashCode(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return script.toXContent(builder, params); + } + + public static ScriptTransform parse(String watchId, XContentParser parser) throws IOException { + try { + Script script = Script.parse(parser); + return new ScriptTransform(script); + } catch (Script.ParseException pe) { + throw new ScriptTransformException("could not parse [{}] transform for watch [{}]. failed to parse script", pe, TYPE, watchId); + } + } + + public static Builder builder(Script script) { + return new Builder(script); + } + + public static class Result extends Transform.Result { + + public Result(Payload payload) { + super(TYPE, payload); + } + + @Override + protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { + return null; + } + + public static Result parse(String watchId, XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token); + } + token = parser.nextToken(); + if (token != XContentParser.Token.FIELD_NAME || !Field.PAYLOAD.match(parser.currentName())) { + throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] object, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token); + } + token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new ScriptTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] object, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token); + } + return new ScriptTransform.Result(new Payload.XContent(parser)); + } + } + + public static class Builder implements Transform.Builder { + + private final Script script; + + public Builder(Script script) { + this.script = script; + } + + @Override + public ScriptTransform build() { + return new ScriptTransform(script); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformException.java b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformException.java new file mode 100644 index 00000000000..c72b81f54b9 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformException.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.script; + +import org.elasticsearch.watcher.transform.TransformException; + +/** + * + */ +public class ScriptTransformException extends TransformException { + + public ScriptTransformException(String msg, Object... args) { + super(msg, args); + } + + public ScriptTransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformFactory.java b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformFactory.java new file mode 100644 index 00000000000..60694601a6b --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/script/ScriptTransformFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.script; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.transform.TransformFactory; + +import java.io.IOException; + +/** + * + */ +public class ScriptTransformFactory extends TransformFactory { + + private final ScriptServiceProxy scriptService; + + @Inject + public ScriptTransformFactory(Settings settings, ScriptServiceProxy scriptService) { + super(Loggers.getLogger(ExecutableScriptTransform.class, settings)); + this.scriptService = scriptService; + } + + @Override + public String type() { + return ScriptTransform.TYPE; + } + + @Override + public ScriptTransform parseTransform(String watchId, XContentParser parser) throws IOException { + return ScriptTransform.parse(watchId, parser); + } + + @Override + public ScriptTransform.Result parseResult(String watchId, XContentParser parser) throws IOException { + return ScriptTransform.Result.parse(watchId, parser); + } + + @Override + public ExecutableScriptTransform createExecutable(ScriptTransform transform) { + return new ExecutableScriptTransform(transform, transformLogger, scriptService); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java new file mode 100644 index 00000000000..078945bdae7 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/search/ExecutableSearchTransform.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.search; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.transform.ExecutableTransform; +import org.elasticsearch.watcher.transform.TransformException; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +import static org.elasticsearch.watcher.support.Variables.createCtxModel; +import static org.elasticsearch.watcher.support.WatcherUtils.flattenModel; + +/** + * + */ +public class ExecutableSearchTransform extends ExecutableTransform { + + public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.QUERY_THEN_FETCH; + + protected final ScriptServiceProxy scriptService; + protected final ClientProxy client; + + public ExecutableSearchTransform(SearchTransform transform, ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client) { + super(transform, logger); + this.scriptService = scriptService; + this.client = client; + } + + @Override + public SearchTransform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { + SearchRequest req = createRequest(transform.request, ctx, payload); + SearchResponse resp = client.search(req); + return new SearchTransform.Result(new Payload.XContent(resp)); + } + + SearchRequest createRequest(SearchRequest requestPrototype, WatchExecutionContext ctx, Payload payload) throws IOException { + SearchRequest request = new SearchRequest(requestPrototype) + .indicesOptions(requestPrototype.indicesOptions()) + .indices(requestPrototype.indices()); + if (Strings.hasLength(requestPrototype.source())) { + String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false); + ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload)); + request.source((BytesReference) script.unwrap(script.run()), false); + } else if (requestPrototype.templateName() != null) { + MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) + .putAll(flattenModel(createCtxModel(ctx, payload))); + request.templateParams(templateParams.map()); + request.templateName(requestPrototype.templateName()); + request.templateType(requestPrototype.templateType()); + } else { + throw new TransformException("search requests needs either source or template name"); + } + return request; + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransform.java new file mode 100644 index 00000000000..54cfc7c7700 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransform.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.search; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.SearchRequestEquivalence; +import org.elasticsearch.watcher.support.SearchRequestParseException; +import org.elasticsearch.watcher.support.WatcherUtils; +import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.watch.Payload; + +import java.io.IOException; + +/** + * + */ +public class SearchTransform implements Transform { + + public static final String TYPE = "search"; + + protected final SearchRequest request; + + public SearchTransform(SearchRequest request) { + this.request = request; + } + + @Override + public String type() { + return TYPE; + } + + public SearchRequest getRequest() { + return request; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SearchTransform transform = (SearchTransform) o; + + return SearchRequestEquivalence.INSTANCE.equivalent(request, transform.request); + } + + @Override + public int hashCode() { + return request.hashCode(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return WatcherUtils.writeSearchRequest(request, builder, params); + } + + public static SearchTransform parse(String watchId, XContentParser parser) throws IOException { + try { + SearchRequest request = WatcherUtils.readSearchRequest(parser, ExecutableSearchTransform.DEFAULT_SEARCH_TYPE); + return new SearchTransform(request); + } catch (SearchRequestParseException srpe) { + throw new SearchTransformException("could not parse [{}] transform for watch [{}]. failed parsing search request", srpe, TYPE, watchId); + } + } + + public static Builder builder(SearchRequest request) { + return new Builder(request); + } + + public static class Result extends Transform.Result { + + public Result(Payload payload) { + super(TYPE, payload); + } + + @Override + protected XContentBuilder xContentBody(XContentBuilder builder, Params params) throws IOException { + return builder; + } + + public static Result parse(String watchId, XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected an object, but found [{}] instead", TYPE, watchId, token); + } + token = parser.nextToken(); + if (token != XContentParser.Token.FIELD_NAME || !Field.PAYLOAD.match(parser.currentName())) { + throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token); + } + token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new SearchTransformException("could not parse [{}] transform result for watch [{}]. expected a [{}] field, but found [{}] instead", TYPE, watchId, Field.PAYLOAD.getPreferredName(), token); + } + return new SearchTransform.Result(new Payload.XContent(parser)); + } + } + + public static class Builder implements Transform.Builder { + + private final SearchRequest request; + + public Builder(SearchRequest request) { + this.request = request; + } + + @Override + public SearchTransform build() { + return new SearchTransform(request); + } + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformException.java b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformException.java new file mode 100644 index 00000000000..e8ac286263e --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformException.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.search; + +import org.elasticsearch.watcher.transform.TransformException; + +/** + * + */ +public class SearchTransformException extends TransformException { + + public SearchTransformException(String msg, Object... args) { + super(msg, args); + } + + public SearchTransformException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java new file mode 100644 index 00000000000..84891e100b1 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/transform/search/SearchTransformFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.watcher.transform.search; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.transform.TransformFactory; + +import java.io.IOException; + +/** + * + */ +public class SearchTransformFactory extends TransformFactory { + + protected final ScriptServiceProxy scriptService; + protected final ClientProxy client; + + @Inject + public SearchTransformFactory(Settings settings, ScriptServiceProxy scriptService, ClientProxy client) { + super(Loggers.getLogger(ExecutableSearchTransform.class, settings)); + this.scriptService = scriptService; + this.client = client; + } + + @Override + public String type() { + return SearchTransform.TYPE; + } + + @Override + public SearchTransform parseTransform(String watchId, XContentParser parser) throws IOException { + return SearchTransform.parse(watchId, parser); + } + + @Override + public SearchTransform.Result parseResult(String watchId, XContentParser parser) throws IOException { + return SearchTransform.Result.parse(watchId, parser); + } + + @Override + public ExecutableSearchTransform createExecutable(SearchTransform transform) { + return new ExecutableSearchTransform(transform, transformLogger, scriptService, client); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/watch/Watch.java b/src/main/java/org/elasticsearch/watcher/watch/Watch.java index 48761428993..c90c14c1310 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/Watch.java +++ b/src/main/java/org/elasticsearch/watcher/watch/Watch.java @@ -35,7 +35,7 @@ import org.elasticsearch.watcher.license.LicenseService; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.throttle.Throttler; import org.elasticsearch.watcher.throttle.WatchThrottler; -import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.TriggerEngine; @@ -67,11 +67,11 @@ public class Watch implements TriggerEngine.Job, ToXContent { private final Map metadata; @Nullable - private final Transform transform; + private final ExecutableTransform transform; private final transient AtomicLong nonceCounter = new AtomicLong(); - public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable Transform transform, + public Watch(String name, Clock clock, LicenseService licenseService, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform, ExecutableActions actions, @Nullable Map metadata, @Nullable TimeValue throttlePeriod, @Nullable Status status) { this.name = name; this.trigger = trigger; @@ -99,7 +99,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { return condition; } - public Transform transform() { + public ExecutableTransform transform() { return transform; } @@ -234,7 +234,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { ExecutableInput input = defaultInput; ExecutableCondition condition = defaultCondition; ExecutableActions actions = null; - Transform transform = null; + ExecutableTransform transform = null; Map metatdata = null; Status status = null; TimeValue throttlePeriod = defaultThrottleTimePeriod; @@ -256,7 +256,7 @@ public class Watch implements TriggerEngine.Job, ToXContent { } else if (ACTIONS_FIELD.match(currentFieldName)) { actions = actionRegistry.parseActions(id, parser); } else if (TRANSFORM_FIELD.match(currentFieldName)) { - transform = transformRegistry.parse(parser); + transform = transformRegistry.parse(id, parser); } else if (META_FIELD.match(currentFieldName)) { metatdata = parser.map(); } else if (STATUS_FIELD.match(currentFieldName) && includeStatus) { diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchExecution.java b/src/main/java/org/elasticsearch/watcher/watch/WatchExecution.java index 3ec38c43a29..61b91135e09 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchExecution.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchExecution.java @@ -85,7 +85,7 @@ public class WatchExecution implements ToXContent { } } if (transformResult != null) { - builder.startObject(Transform.Parser.TRANSFORM_RESULT_FIELD.getPreferredName()).field(transformResult.type(), transformResult).endObject(); + builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName()).field(transformResult.type(), transformResult).endObject(); } builder.startObject(Parser.ACTIONS_RESULTS.getPreferredName()); for (ActionWrapper.Result actionResult : actionsResults) { @@ -131,8 +131,8 @@ public class WatchExecution implements ToXContent { inputResult = inputRegistry.parseResult(wid.watchId(), parser); } else if (CONDITION_RESULT_FIELD.match(currentFieldName)) { conditionResult = conditionRegistry.parseResult(wid.watchId(), parser); - } else if (Transform.Parser.TRANSFORM_RESULT_FIELD.match(currentFieldName)) { - transformResult = transformRegistry.parseResult(parser); + } else if (Transform.Field.TRANSFORM_RESULT.match(currentFieldName)) { + transformResult = transformRegistry.parseResult(wid.watchId(), parser); } else if (ACTIONS_RESULTS.match(currentFieldName)) { actionResults = actionRegistry.parseResults(wid, parser); } else { diff --git a/src/test/java/org/elasticsearch/watcher/actions/TransformMocks.java b/src/test/java/org/elasticsearch/watcher/actions/TransformMocks.java index b5d70e19a5b..2fcbbe0d037 100644 --- a/src/test/java/org/elasticsearch/watcher/actions/TransformMocks.java +++ b/src/test/java/org/elasticsearch/watcher/actions/TransformMocks.java @@ -6,9 +6,14 @@ package org.elasticsearch.watcher.actions; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; +import org.elasticsearch.watcher.transform.TransformFactory; import org.elasticsearch.watcher.transform.TransformRegistry; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.execution.WatchExecutionContext; @@ -24,16 +29,27 @@ import static org.hamcrest.core.Is.is; */ public class TransformMocks { - public static class TransformMock extends Transform { + public static class ExecutableTransformMock extends ExecutableTransform { - @Override - public String type() { - return "_transform"; + private static final String TYPE = "mock"; + + public ExecutableTransformMock() { + super(new Transform() { + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().endObject(); + } + }, Loggers.getLogger(ExecutableTransformMock.class)); } @Override - public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException { - return new Result("_transform", new Payload.Simple("_key", "_value")); + public Transform.Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { + return new Result(TYPE, new Payload.Simple("_key", "_value")); } @Override @@ -56,41 +72,51 @@ public class TransformMocks { public static class TransformRegistryMock extends TransformRegistry { - public TransformRegistryMock(final Transform transform) { - super(ImmutableMap.of("_transform", new Transform.Parser() { + public TransformRegistryMock(final ExecutableTransform executable) { + super(ImmutableMap.of("_transform", new TransformFactory(Loggers.getLogger(TransformRegistryMock.class)) { @Override public String type() { - return transform.type(); + return executable.type(); } @Override - public Transform parse(XContentParser parser) throws IOException { + public Transform parseTransform(String watchId, XContentParser parser) throws IOException { parser.nextToken(); assertThat(parser.currentToken(), is(XContentParser.Token.END_OBJECT)); - return transform; + return null; } @Override - public Transform.Result parseResult(XContentParser parser) throws IOException { - return null; // should not be called when this ctor is used + public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException { + return null; + } + + @Override + public ExecutableTransform createExecutable(Transform transform) { + return executable; } })); } public TransformRegistryMock(final Transform.Result result) { - super(ImmutableMap.of("_transform_type", new Transform.Parser() { + super(ImmutableMap.of("_transform_type", new TransformFactory(Loggers.getLogger(TransformRegistryMock.class)) { @Override public String type() { return result.type(); } @Override - public Transform parse(XContentParser parser) throws IOException { - return null; // should not be called when this ctor is used. + public Transform parseTransform(String watchId, XContentParser parser) throws IOException { + return null; } @Override - public Transform.Result parseResult(XContentParser parser) throws IOException { + public ExecutableTransform createExecutable(Transform transform) { + return null; + } + + @Override + public Transform.Result parseResult(String watchId, XContentParser parser) throws IOException { assertThat(parser.currentToken(), is(XContentParser.Token.START_OBJECT)); parser.nextToken(); assertThat(parser.currentToken(), is(XContentParser.Token.FIELD_NAME)); diff --git a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java index 29f4d3b58e4..bef4929b0e0 100644 --- a/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java +++ b/src/test/java/org/elasticsearch/watcher/execution/ExecutionServiceTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.watcher.input.Input; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.support.clock.ClockMock; import org.elasticsearch.watcher.throttle.Throttler; +import org.elasticsearch.watcher.transform.ExecutableTransform; import org.elasticsearch.watcher.transform.Transform; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.*; @@ -77,8 +78,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult); Throttler throttler = mock(Throttler.class); when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult); - Transform transform = mock(Transform.class); - when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); + ExecutableTransform transform = mock(ExecutableTransform.class); + when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); ActionWrapper action = mock(ActionWrapper.class); when(action.execute(any(WatchExecutionContext.class))).thenReturn(watchActionResult); ExecutableActions actions = new ExecutableActions(Arrays.asList(action)); @@ -104,7 +105,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { verify(condition, times(1)).execute(any(WatchExecutionContext.class)); verify(throttler, times(1)).throttle(any(WatchExecutionContext.class)); - verify(transform, times(1)).apply(any(WatchExecutionContext.class), same(payload)); + verify(transform, times(1)).execute(any(WatchExecutionContext.class), same(payload)); verify(action, times(1)).execute(any(WatchExecutionContext.class)); } @@ -123,8 +124,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult); Throttler throttler = mock(Throttler.class); when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult); - Transform transform = mock(Transform.class); - when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); + ExecutableTransform transform = mock(ExecutableTransform.class); + when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); ActionWrapper action = mock(ActionWrapper.class); when(action.execute(any(WatchExecutionContext.class))).thenReturn(actionResult); ExecutableActions actions = new ExecutableActions(Arrays.asList(action)); @@ -151,7 +152,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { verify(condition, times(1)).execute(any(WatchExecutionContext.class)); verify(throttler, times(1)).throttle(any(WatchExecutionContext.class)); - verify(transform, never()).apply(any(WatchExecutionContext.class), same(payload)); + verify(transform, never()).execute(any(WatchExecutionContext.class), same(payload)); verify(action, never()).execute(any(WatchExecutionContext.class)); } @@ -169,8 +170,8 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { when(condition.execute(any(WatchExecutionContext.class))).thenReturn(conditionResult); Throttler throttler = mock(Throttler.class); when(throttler.throttle(any(WatchExecutionContext.class))).thenReturn(throttleResult); - Transform transform = mock(Transform.class); - when(transform.apply(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); + ExecutableTransform transform = mock(ExecutableTransform.class); + when(transform.execute(any(WatchExecutionContext.class), same(payload))).thenReturn(transformResult); ActionWrapper action = mock(ActionWrapper.class); when(action.execute(any(WatchExecutionContext.class))).thenReturn(actionResult); ExecutableActions actions = new ExecutableActions(Arrays.asList(action)); @@ -197,7 +198,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase { verify(condition, times(1)).execute(any(WatchExecutionContext.class)); verify(throttler, never()).throttle(any(WatchExecutionContext.class)); - verify(transform, never()).apply(any(WatchExecutionContext.class), same(payload)); + verify(transform, never()).execute(any(WatchExecutionContext.class), same(payload)); verify(action, never()).execute(any(WatchExecutionContext.class)); } diff --git a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java index 4aea05d59f3..132707d4156 100644 --- a/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java +++ b/src/test/java/org/elasticsearch/watcher/test/WatcherTestUtils.java @@ -44,7 +44,8 @@ import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.xmustache.XMustacheTemplateEngine; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.support.template.TemplateEngine; -import org.elasticsearch.watcher.transform.SearchTransform; +import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransform; import org.elasticsearch.watcher.trigger.TriggerEvent; import org.elasticsearch.watcher.trigger.schedule.CronSchedule; import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger; @@ -131,7 +132,7 @@ public final class WatcherTestUtils { SearchRequest conditionRequest = newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery())); SearchRequest transformRequest = newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery())); - transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE); + transformRequest.searchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE); conditionRequest.searchType(ExecutableSearchInput.DEFAULT_SEARCH_TYPE); List actions = new ArrayList<>(); @@ -181,7 +182,7 @@ public final class WatcherTestUtils { new ScheduleTrigger(new CronSchedule("0/5 * * * * ? *")), new ExecutableSimpleInput(new SimpleInput(new Payload.Simple(inputData)), logger), new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService), - new SearchTransform(logger, scriptService, client, transformRequest), + new ExecutableSearchTransform(new SearchTransform(transformRequest), logger, scriptService, client), new ExecutableActions(actions), metadata, new TimeValue(0), diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java index ceb82ca20c0..408378fa7e7 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/BootStrapTests.java @@ -28,7 +28,8 @@ import org.elasticsearch.watcher.support.clock.SystemClock; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.WatcherTestUtils; -import org.elasticsearch.watcher.transform.SearchTransform; +import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransform; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.watcher.trigger.schedule.CronSchedule; @@ -91,7 +92,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set this into the future so we don't get any extra runs new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())), new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()), - new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest), + new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())), new ExecutableActions(new ArrayList()), null, // metadata new TimeValue(0), @@ -153,7 +154,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTests { new ScheduleTrigger(new CronSchedule("0/5 * * * * ? 2035")), //Set a cron schedule far into the future so this watch is never scheduled new ExecutableSearchInput(new SearchInput(searchRequest, null), logger, scriptService(), ClientProxy.of(client())), new ExecutableScriptCondition(new ScriptCondition(new Script("return true")), logger, scriptService()), - new SearchTransform(logger, scriptService(), ClientProxy.of(client()), searchRequest), + new ExecutableSearchTransform(new SearchTransform(searchRequest), logger, scriptService(), ClientProxy.of(client())), new ExecutableActions(new ArrayList()), null, // metadata new TimeValue(0), diff --git a/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java b/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java index 63769a54999..12954866ead 100644 --- a/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/integration/TransformSearchTests.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; import org.elasticsearch.watcher.test.WatcherTestUtils; -import org.elasticsearch.watcher.transform.SearchTransform; +import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -43,7 +43,7 @@ public class TransformSearchTests extends AbstractWatcherIntegrationTests { SearchRequest inputRequest = WatcherTestUtils.newInputSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery())); SearchRequest transformRequest = WatcherTestUtils.newInputSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery())); - transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE); + transformRequest.searchType(ExecutableSearchTransform.DEFAULT_SEARCH_TYPE); Map metadata = new HashMap<>(); metadata.put("foo", "bar"); diff --git a/src/test/java/org/elasticsearch/watcher/transform/ChainTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/ChainTransformTests.java index b75d245ad55..149c67fd057 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/ChainTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/ChainTransformTests.java @@ -7,12 +7,17 @@ package org.elasticsearch.watcher.transform; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.transform.chain.ChainTransform; +import org.elasticsearch.watcher.transform.chain.ChainTransformFactory; +import org.elasticsearch.watcher.transform.chain.ExecutableChainTransform; +import org.elasticsearch.watcher.watch.Payload; import org.junit.Test; import java.io.IOException; @@ -33,14 +38,19 @@ public class ChainTransformTests extends ElasticsearchTestCase { @Test public void testApply() throws Exception { ChainTransform transform = new ChainTransform(ImmutableList.of( - new NamedTransform("name1"), - new NamedTransform("name2"), - new NamedTransform("name3"))); + new NamedExecutableTransform.Transform("name1"), + new NamedExecutableTransform.Transform("name2"), + new NamedExecutableTransform.Transform("name3") + )); + ExecutableChainTransform executable = new ExecutableChainTransform(transform, logger, ImmutableList.of( + new NamedExecutableTransform("name1"), + new NamedExecutableTransform("name2"), + new NamedExecutableTransform("name3"))); WatchExecutionContext ctx = mock(WatchExecutionContext.class); Payload payload = new Payload.Simple(new HashMap()); - Transform.Result result = transform.apply(ctx, payload); + Transform.Result result = executable.execute(ctx, payload); Map data = result.payload().data(); assertThat(data, notNullValue()); @@ -53,12 +63,12 @@ public class ChainTransformTests extends ElasticsearchTestCase { @Test public void testParser() throws Exception { - Map parsers = ImmutableMap.builder() - .put("named", new NamedTransform.Parser()) + Map factories = ImmutableMap.builder() + .put("named", new NamedExecutableTransform.Factory(logger)) .build(); - TransformRegistry registry = new TransformRegistry(parsers); + TransformRegistry registry = new TransformRegistry(factories); - ChainTransform.Parser transformParser = new ChainTransform.Parser(registry); + ChainTransformFactory transformParser = new ChainTransformFactory(registry); XContentBuilder builder = jsonBuilder().startArray() .startObject().startObject("named").field("name", "name1").endObject().endObject() @@ -68,45 +78,57 @@ public class ChainTransformTests extends ElasticsearchTestCase { XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); - ChainTransform transform = transformParser.parse(parser); - assertThat(transform, notNullValue()); - assertThat(transform.transforms(), notNullValue()); - assertThat(transform.transforms(), hasSize(3)); - for (int i = 0; i < transform.transforms().size(); i++) { - assertThat(transform.transforms().get(i), instanceOf(NamedTransform.class)); - assertThat(((NamedTransform) transform.transforms().get(i)).name, is("name" + (i + 1))); + ExecutableChainTransform executable = transformParser.parseExecutable("_id", parser); + assertThat(executable, notNullValue()); + assertThat(executable.transform.getTransforms(), notNullValue()); + assertThat(executable.transform.getTransforms(), hasSize(3)); + for (int i = 0; i < executable.transform.getTransforms().size(); i++) { + assertThat(executable.executableTransforms().get(i), instanceOf(NamedExecutableTransform.class)); + assertThat(((NamedExecutableTransform) executable.executableTransforms().get(i)).transform().name, is("name" + (i + 1))); } } - private static class NamedTransform extends Transform { + private static class NamedExecutableTransform extends ExecutableTransform { - private final String name; + private static final String TYPE = "named"; - public NamedTransform(String name) { - this.name = name; + public NamedExecutableTransform(String name) { + this(new Transform(name)); + } + + public NamedExecutableTransform(Transform transform) { + super(transform, Loggers.getLogger(NamedExecutableTransform.class)); } @Override - public String type() { - return "noop"; - } - - @Override - public Result apply(WatchExecutionContext ctx, Payload payload) throws IOException { - + public Result execute(WatchExecutionContext ctx, Payload payload) throws IOException { Map data = new HashMap<>(payload.data()); List names = (List) data.get("names"); if (names == null) { names = new ArrayList<>(); data.put("names", names); } - names.add(name); + names.add(transform.name); return new Result("named", new Payload.Simple(data)); } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject().field("name", name).endObject(); + public static class Transform implements org.elasticsearch.watcher.transform.Transform { + + private final String name; + + public Transform(String name) { + this.name = name; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field("name", name).endObject(); + } } public static class Result extends Transform.Result { @@ -121,15 +143,19 @@ public class ChainTransformTests extends ElasticsearchTestCase { } } - public static class Parser implements Transform.Parser { + public static class Factory extends TransformFactory { - @Override - public String type() { - return "named"; + public Factory(ESLogger transformLogger) { + super(transformLogger); } @Override - public NamedTransform parse(XContentParser parser) throws IOException { + public String type() { + return TYPE; + } + + @Override + public Transform parseTransform(String watchId, XContentParser parser) throws IOException { assert parser.currentToken() == XContentParser.Token.START_OBJECT; XContentParser.Token token = parser.nextToken(); assert token == XContentParser.Token.FIELD_NAME; // the "name" field @@ -138,11 +164,11 @@ public class ChainTransformTests extends ElasticsearchTestCase { String name = parser.text(); token = parser.nextToken(); assert token == XContentParser.Token.END_OBJECT; - return new NamedTransform(name); + return new Transform(name); } @Override - public Result parseResult(XContentParser parser) throws IOException { + public Result parseResult(String watchId, XContentParser parser) throws IOException { assert parser.currentToken() == XContentParser.Token.START_OBJECT; XContentParser.Token token = parser.nextToken(); assert token == XContentParser.Token.FIELD_NAME; // the "payload" field @@ -153,6 +179,11 @@ public class ChainTransformTests extends ElasticsearchTestCase { assert token == XContentParser.Token.END_OBJECT; return new Result("named", payload); } + + @Override + public NamedExecutableTransform createExecutable(Transform transform) { + return new NamedExecutableTransform(transform); + } } } diff --git a/src/test/java/org/elasticsearch/watcher/transform/ScriptTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/ScriptTransformTests.java index 713d973eb31..7bd10b73808 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/ScriptTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/ScriptTransformTests.java @@ -7,7 +7,11 @@ package org.elasticsearch.watcher.transform; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableList; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.watcher.execution.WatchExecutionContext; +import org.elasticsearch.watcher.transform.script.ExecutableScriptTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransformFactory; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.support.Script; import org.elasticsearch.watcher.support.Variables; @@ -41,7 +45,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { ScriptService.ScriptType type = randomFrom(ScriptService.ScriptType.values()); Map params = Collections.emptyMap(); Script script = new Script("_script", type, "_lang", params); - ScriptTransform transform = new ScriptTransform(service, script); + ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); @@ -57,7 +61,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { when(executable.run()).thenReturn(transformed); when(service.executable("_lang", "_script", type, model)).thenReturn(executable); - Transform.Result result = transform.apply(ctx, payload); + Transform.Result result = transform.execute(ctx, payload); assertThat(result, notNullValue()); assertThat(result.type(), is(ScriptTransform.TYPE)); assertThat(result.payload().data(), equalTo(transformed)); @@ -69,7 +73,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { ScriptService.ScriptType type = randomFrom(ScriptService.ScriptType.values()); Map params = Collections.emptyMap(); Script script = new Script("_script", type, "_lang", params); - ScriptTransform transform = new ScriptTransform(service, script); + ExecutableScriptTransform transform = new ExecutableScriptTransform(new ScriptTransform(script), logger, service); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); @@ -82,7 +86,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { when(executable.run()).thenReturn(value); when(service.executable("_lang", "_script", type, model)).thenReturn(executable); - Transform.Result result = transform.apply(ctx, payload); + Transform.Result result = transform.execute(ctx, payload); assertThat(result, notNullValue()); assertThat(result.type(), is(ScriptTransform.TYPE)); assertThat(result.payload().data().size(), is(1)); @@ -102,8 +106,8 @@ public class ScriptTransformTests extends ElasticsearchTestCase { XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); - ScriptTransform transform = new ScriptTransform.Parser(service).parse(parser); - assertThat(transform.script(), equalTo(new Script("_script", type, "_lang", ImmutableMap.builder().put("key", "value").build()))); + ExecutableScriptTransform transform = new ScriptTransformFactory(ImmutableSettings.EMPTY, service).parseExecutable("_id", parser); + assertThat(transform.transform().getScript(), equalTo(new Script("_script", type, "_lang", ImmutableMap.builder().put("key", "value").build()))); } @Test @@ -113,7 +117,7 @@ public class ScriptTransformTests extends ElasticsearchTestCase { XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); - ScriptTransform transform = new ScriptTransform.Parser(service).parse(parser); - assertThat(transform.script(), equalTo(new Script("_script", ScriptService.ScriptType.INLINE, ScriptService.DEFAULT_LANG, ImmutableMap.of()))); + ExecutableScriptTransform transform = new ScriptTransformFactory(ImmutableSettings.EMPTY, service).parseExecutable("_id", parser); + assertThat(transform.transform().getScript(), equalTo(new Script("_script", ScriptService.ScriptType.INLINE, ScriptService.DEFAULT_LANG, ImmutableMap.of()))); } } diff --git a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java index 32b09165142..c53ce8dcb8c 100644 --- a/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java +++ b/src/test/java/org/elasticsearch/watcher/transform/SearchTransformTests.java @@ -17,6 +17,9 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.script.ScriptService; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests; +import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransformFactory; import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.execution.WatchExecutionContext; @@ -51,11 +54,11 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests { .startObject("match_all").endObject() .endObject() .endObject()); - SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request); + ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client())); WatchExecutionContext ctx = mockExecutionContext("_name", EMPTY_PAYLOAD); - Transform.Result result = transform.apply(ctx, EMPTY_PAYLOAD); + Transform.Result result = transform.execute(ctx, EMPTY_PAYLOAD); assertThat(result, notNullValue()); assertThat(result.type(), is(SearchTransform.TYPE)); @@ -108,14 +111,14 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests { .must(rangeFilter("date").lt("{{ctx.execution_time}}")) .must(termFilter("value", "{{ctx.payload.value}}"))))); - SearchTransform transform = new SearchTransform(logger, scriptService(), ClientProxy.of(client()), request); + ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client())); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00")); WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00"), event, EMPTY_PAYLOAD); Payload payload = simplePayload("value", "val_3"); - Transform.Result result = transform.apply(ctx, payload); + Transform.Result result = transform.execute(ctx, payload); assertThat(result, notNullValue()); assertThat(result.type(), is(SearchTransform.TYPE)); @@ -175,23 +178,23 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests { builder.endObject(); XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes()); parser.nextToken(); - SearchTransform transform = new SearchTransform.Parser(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parse(parser); - assertThat(transform, notNullValue()); - assertThat(transform.type(), is(SearchTransform.TYPE)); - assertThat(transform.request, notNullValue()); + ExecutableSearchTransform executable = new SearchTransformFactory(ImmutableSettings.EMPTY, scriptService(), ClientProxy.of(client())).parseExecutable("_id", parser); + assertThat(executable, notNullValue()); + assertThat(executable.type(), is(SearchTransform.TYPE)); + assertThat(executable.transform().getRequest(), notNullValue()); if (indices != null) { - assertThat(transform.request.indices(), arrayContainingInAnyOrder(indices)); + assertThat(executable.transform().getRequest().indices(), arrayContainingInAnyOrder(indices)); } if (searchType != null) { - assertThat(transform.request.searchType(), is(searchType)); + assertThat(executable.transform().getRequest().searchType(), is(searchType)); } if (templateName != null) { - assertThat(transform.request.templateName(), equalTo(templateName)); + assertThat(executable.transform().getRequest().templateName(), equalTo(templateName)); } if (templateType != null) { - assertThat(transform.request.templateType(), equalTo(templateType)); + assertThat(executable.transform().getRequest().templateType(), equalTo(templateType)); } - assertThat(transform.request.source().toBytes(), equalTo(source.toBytes())); + assertThat(executable.transform().getRequest().source().toBytes(), equalTo(source.toBytes())); } private static Map doc(String date, String value) { diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java index 193b27c2eb0..b465c47b300 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchTests.java @@ -68,7 +68,18 @@ import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.watcher.support.template.Template; import org.elasticsearch.watcher.support.template.TemplateEngine; import org.elasticsearch.watcher.test.WatcherTestUtils; -import org.elasticsearch.watcher.transform.*; +import org.elasticsearch.watcher.transform.ExecutableTransform; +import org.elasticsearch.watcher.transform.TransformFactory; +import org.elasticsearch.watcher.transform.TransformRegistry; +import org.elasticsearch.watcher.transform.chain.ChainTransform; +import org.elasticsearch.watcher.transform.chain.ChainTransformFactory; +import org.elasticsearch.watcher.transform.chain.ExecutableChainTransform; +import org.elasticsearch.watcher.transform.script.ExecutableScriptTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransform; +import org.elasticsearch.watcher.transform.script.ScriptTransformFactory; +import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransform; +import org.elasticsearch.watcher.transform.search.SearchTransformFactory; import org.elasticsearch.watcher.trigger.Trigger; import org.elasticsearch.watcher.trigger.TriggerEngine; import org.elasticsearch.watcher.trigger.TriggerService; @@ -124,7 +135,7 @@ public class WatchTests extends ElasticsearchTestCase { ExecutableCondition condition = randomCondition(); ConditionRegistry conditionRegistry = registry(condition); - Transform transform = randomTransform(); + ExecutableTransform transform = randomTransform(); ExecutableActions actions = randomActions(); ActionRegistry actionRegistry = registry(actions, transformRegistry); @@ -252,27 +263,30 @@ public class WatchTests extends ElasticsearchTestCase { } } - private Transform randomTransform() { + private ExecutableTransform randomTransform() { String type = randomFrom(ScriptTransform.TYPE, SearchTransform.TYPE, ChainTransform.TYPE); switch (type) { case ScriptTransform.TYPE: - return new ScriptTransform(scriptService, new Script("_script")); + return new ExecutableScriptTransform(new ScriptTransform(new Script("_script")), logger, scriptService); case SearchTransform.TYPE: - return new SearchTransform(logger, scriptService, client, matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)); + return new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client); default: // chain - return new ChainTransform(ImmutableList.of( - new SearchTransform(logger, scriptService, client, matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), - new ScriptTransform(scriptService, new Script("_script")))); + ChainTransform chainTransform = new ChainTransform(ImmutableList.of( + new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), + new ScriptTransform(new Script("_script")))); + return new ExecutableChainTransform(chainTransform, logger, ImmutableList.of( + new ExecutableSearchTransform(new SearchTransform(matchAllRequest(WatcherUtils.DEFAULT_INDICES_OPTIONS)), logger, scriptService, client), + new ExecutableScriptTransform(new ScriptTransform(new Script("_script")), logger, scriptService))); } } private TransformRegistry transformRegistry() { - ImmutableMap.Builder parsers = ImmutableMap.builder(); - ChainTransform.Parser parser = new ChainTransform.Parser(); - parsers.put(ChainTransform.TYPE, parser); - parsers.put(ScriptTransform.TYPE, new ScriptTransform.Parser(scriptService)); - parsers.put(SearchTransform.TYPE, new SearchTransform.Parser(settings, scriptService, client)); - TransformRegistry registry = new TransformRegistry(parsers.build()); + ImmutableMap.Builder factories = ImmutableMap.builder(); + ChainTransformFactory parser = new ChainTransformFactory(); + factories.put(ChainTransform.TYPE, parser); + factories.put(ScriptTransform.TYPE, new ScriptTransformFactory(settings, scriptService)); + factories.put(SearchTransform.TYPE, new SearchTransformFactory(settings, scriptService, client)); + TransformRegistry registry = new TransformRegistry(factories.build()); parser.init(registry); return registry; } @@ -280,7 +294,7 @@ public class WatchTests extends ElasticsearchTestCase { private ExecutableActions randomActions() { ImmutableList.Builder list = ImmutableList.builder(); if (randomBoolean()) { - Transform transform = randomTransform(); + ExecutableTransform transform = randomTransform(); EmailAction action = new EmailAction(EmailTemplate.builder().build(), null, null, Profile.STANDARD, randomBoolean()); list.add(new ActionWrapper("_email_" + randomAsciiOfLength(8), transform, new ExecutableEmailAction(action, logger, emailService, templateEngine))); }