[transform] added a new chain transform
Enables chaining multiple transforms Original commit: elastic/x-pack-elasticsearch@312b7330df
This commit is contained in:
parent
46f6572756
commit
df491d036f
|
@ -7,6 +7,7 @@ package org.elasticsearch.alerts.support.init;
|
||||||
|
|
||||||
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
|
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
|
||||||
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
|
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
|
||||||
|
import org.elasticsearch.alerts.transform.ChainTransform;
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||||
|
|
||||||
|
@ -24,6 +25,7 @@ public class InitializingModule extends AbstractModule {
|
||||||
Multibinder<InitializingService.Initializable> mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class);
|
Multibinder<InitializingService.Initializable> mbinder = Multibinder.newSetBinder(binder(), InitializingService.Initializable.class);
|
||||||
mbinder.addBinding().to(ClientProxy.class);
|
mbinder.addBinding().to(ClientProxy.class);
|
||||||
mbinder.addBinding().to(ScriptServiceProxy.class);
|
mbinder.addBinding().to(ScriptServiceProxy.class);
|
||||||
|
mbinder.addBinding().to(ChainTransform.Parser.class);
|
||||||
bind(InitializingService.class).asEagerSingleton();
|
bind(InitializingService.class).asEagerSingleton();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.alerts.transform;
|
||||||
|
|
||||||
|
import org.elasticsearch.alerts.AlertsSettingsException;
|
||||||
|
import org.elasticsearch.alerts.ExecutionContext;
|
||||||
|
import org.elasticsearch.alerts.Payload;
|
||||||
|
import org.elasticsearch.alerts.support.init.InitializingService;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableList;
|
||||||
|
import org.elasticsearch.common.inject.Injector;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ChainTransform extends Transform {
|
||||||
|
|
||||||
|
public static final String TYPE = "chain";
|
||||||
|
|
||||||
|
private final ImmutableList<Transform> transforms;
|
||||||
|
|
||||||
|
public ChainTransform(ImmutableList<Transform> transforms) {
|
||||||
|
this.transforms = transforms;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type() {
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableList<Transform> transforms() {
|
||||||
|
return transforms;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
|
||||||
|
for (Transform transform : transforms) {
|
||||||
|
payload = transform.apply(ctx, payload).payload();
|
||||||
|
}
|
||||||
|
return new Result(TYPE, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startArray();
|
||||||
|
for (Transform transform : transforms) {
|
||||||
|
builder.startObject()
|
||||||
|
.field(transform.type(), transform)
|
||||||
|
.endObject();
|
||||||
|
}
|
||||||
|
return builder.endArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Parser implements Transform.Parser<ChainTransform>, InitializingService.Initializable {
|
||||||
|
|
||||||
|
private TransformRegistry registry;
|
||||||
|
|
||||||
|
// used by guice
|
||||||
|
public Parser() {
|
||||||
|
}
|
||||||
|
|
||||||
|
// used for tests
|
||||||
|
Parser(TransformRegistry registry) {
|
||||||
|
this.registry = registry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Injector injector) {
|
||||||
|
this.registry = injector.getInstance(TransformRegistry.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type() {
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChainTransform parse(XContentParser parser) throws IOException {
|
||||||
|
XContentParser.Token token = parser.currentToken();
|
||||||
|
if (token != XContentParser.Token.START_ARRAY) {
|
||||||
|
throw new AlertsSettingsException("could not parse [chain] transform. expected an array of objects, but found [" + token + '}');
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableList.Builder<Transform> builder = ImmutableList.builder();
|
||||||
|
|
||||||
|
String currentFieldName = null;
|
||||||
|
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||||
|
if (token != XContentParser.Token.START_OBJECT) {
|
||||||
|
throw new AlertsSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]");
|
||||||
|
}
|
||||||
|
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||||
|
if (token == XContentParser.Token.FIELD_NAME) {
|
||||||
|
currentFieldName = parser.currentName();
|
||||||
|
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||||
|
builder.add(registry.parse(currentFieldName, parser));
|
||||||
|
} else {
|
||||||
|
throw new AlertsSettingsException("could not parse [chain] transform. expected a transform object, but found [" + token + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ChainTransform(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,134 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.alerts.transform;
|
||||||
|
|
||||||
|
import org.elasticsearch.alerts.ExecutionContext;
|
||||||
|
import org.elasticsearch.alerts.Payload;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableList;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
|
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
|
import static org.hamcrest.Matchers.*;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ChainTransformTests extends ElasticsearchTestCase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApply() throws Exception {
|
||||||
|
ChainTransform transform = new ChainTransform(ImmutableList.<Transform>of(
|
||||||
|
new NamedTransform("name1"),
|
||||||
|
new NamedTransform("name2"),
|
||||||
|
new NamedTransform("name3")));
|
||||||
|
|
||||||
|
ExecutionContext ctx = mock(ExecutionContext.class);
|
||||||
|
Payload payload = new Payload.Simple(new HashMap<String, Object>());
|
||||||
|
|
||||||
|
Transform.Result result = transform.apply(ctx, payload);
|
||||||
|
|
||||||
|
Map<String, Object> data = result.payload().data();
|
||||||
|
assertThat(data, notNullValue());
|
||||||
|
assertThat(data, hasKey("names"));
|
||||||
|
assertThat(data.get("names"), instanceOf(List.class));
|
||||||
|
List<String> names = (List<String>) data.get("names");
|
||||||
|
assertThat(names, hasSize(3));
|
||||||
|
assertThat(names, contains("name1", "name2", "name3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParser() throws Exception {
|
||||||
|
Map<String, Transform.Parser> parsers = ImmutableMap.<String, Transform.Parser>builder()
|
||||||
|
.put("named", new NamedTransform.Parser())
|
||||||
|
.build();
|
||||||
|
TransformRegistry registry = new TransformRegistry(parsers);
|
||||||
|
|
||||||
|
ChainTransform.Parser transformParser = new ChainTransform.Parser(registry);
|
||||||
|
|
||||||
|
XContentBuilder builder = jsonBuilder().startArray()
|
||||||
|
.startObject().startObject("named").field("name", "name1").endObject().endObject()
|
||||||
|
.startObject().startObject("named").field("name", "name2").endObject().endObject()
|
||||||
|
.startObject().startObject("named").field("name", "name3").endObject().endObject()
|
||||||
|
.endArray();
|
||||||
|
|
||||||
|
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
|
||||||
|
parser.nextToken();
|
||||||
|
ChainTransform transform = transformParser.parse(parser);
|
||||||
|
assertThat(transform, notNullValue());
|
||||||
|
assertThat(transform.transforms(), notNullValue());
|
||||||
|
assertThat(transform.transforms(), hasSize(3));
|
||||||
|
for (int i = 0; i < transform.transforms().size(); i++) {
|
||||||
|
assertThat(transform.transforms().get(i), instanceOf(NamedTransform.class));
|
||||||
|
assertThat(((NamedTransform) transform.transforms().get(i)).name, is("name" + (i + 1)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class NamedTransform extends Transform {
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
public NamedTransform(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type() {
|
||||||
|
return "noop";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
|
||||||
|
|
||||||
|
Map<String, Object> data = new HashMap<>(payload.data());
|
||||||
|
List<String> names = (List<String>) data.get("names");
|
||||||
|
if (names == null) {
|
||||||
|
names = new ArrayList<>();
|
||||||
|
data.put("names", names);
|
||||||
|
}
|
||||||
|
names.add(name);
|
||||||
|
return new Result("named", new Payload.Simple(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
return builder.startObject().field("name", name).endObject();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Parser implements Transform.Parser<NamedTransform> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String type() {
|
||||||
|
return "named";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NamedTransform parse(XContentParser parser) throws IOException {
|
||||||
|
assert parser.currentToken() == XContentParser.Token.START_OBJECT;
|
||||||
|
XContentParser.Token token = parser.nextToken();
|
||||||
|
assert token == XContentParser.Token.FIELD_NAME; // the "name" field
|
||||||
|
token = parser.nextToken();
|
||||||
|
assert token == XContentParser.Token.VALUE_STRING;
|
||||||
|
String name = parser.text();
|
||||||
|
token = parser.nextToken();
|
||||||
|
assert token == XContentParser.Token.END_OBJECT;
|
||||||
|
return new NamedTransform(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue