Disabled mapping for transform results

Transform results may product different payloads per watch/execution. It means that if the resulted transformed payload is mapped and indexed, there's a high chance for mapping conflicts and thus failures.

For this reason, this commit disables the mapping of all `transform` results (on both the watch and the action levels).

This commit also changes the field name of the transform result from `transform_result` to just `transform` (aligned with other result field names - `input`, `condition` and `actions`)

 Fixes elastic/elasticsearch#472

Original commit: elastic/x-pack-elasticsearch@2c6d4f5182
This commit is contained in:
uboness 2015-05-13 20:18:31 +02:00
parent aba3f8f34c
commit 926e39d21e
5 changed files with 122 additions and 5 deletions

View File

@ -182,7 +182,7 @@ public class ActionWrapper implements ToXContent {
builder.startObject();
builder.field(Field.ID.getPreferredName(), id);
if (transform != null) {
builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName())
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transform.type(), transform, params)
.endObject();
}

View File

@ -97,7 +97,7 @@ public class WatchExecutionResult implements ToXContent {
}
}
if (transformResult != null) {
builder.startObject(Transform.Field.TRANSFORM_RESULT.getPreferredName())
builder.startObject(Transform.Field.TRANSFORM.getPreferredName())
.field(transformResult.type(), transformResult, params)
.endObject();
}
@ -148,7 +148,7 @@ public class WatchExecutionResult implements ToXContent {
inputResult = inputRegistry.parseResult(wid.watchId(), parser);
} else if (Field.CONDITION.match(currentFieldName)) {
conditionResult = conditionRegistry.parseResult(wid.watchId(), parser);
} else if (Transform.Field.TRANSFORM_RESULT.match(currentFieldName)) {
} else if (Transform.Field.TRANSFORM.match(currentFieldName)) {
transformResult = transformRegistry.parseResult(wid.watchId(), parser);
} else {
throw new WatcherException("could not parse watch execution [{}]. unexpected field [{}]", wid, currentFieldName);

View File

@ -57,6 +57,5 @@ public interface Transform extends ToXContent {
interface Field {
ParseField PAYLOAD = new ParseField("payload");
ParseField TRANSFORM = new ParseField("transform");
ParseField TRANSFORM_RESULT = new ParseField("transform_result");
}
}

View File

@ -10,7 +10,7 @@
"dynamic_templates": [
{
"disabled_fields": {
"path_match": "execution_result\\.input\\..+\\.payload",
"path_match": "execution_result\\.((input\\..+)|(actions\\.transform))\\.payload",
"match_pattern": "regex",
"mapping": {
"type": "object",
@ -118,6 +118,10 @@
}
}
},
"transform" : {
"type" : "object",
"enabled" : false
},
"actions": {
"type": "nested",
"include_in_parent": true,
@ -202,6 +206,10 @@
}
}
}
},
"transform" : {
"type" : "object",
"enabled" : false
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.test.integration;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.watcher.actions.ActionBuilders.loggingAction;
import static org.elasticsearch.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.watcher.condition.ConditionBuilders.alwaysCondition;
import static org.elasticsearch.watcher.input.InputBuilders.simpleInput;
import static org.elasticsearch.watcher.transform.TransformBuilders.scriptTransform;
import static org.elasticsearch.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
/**
* This test makes sure that the http host and path fields in the watch_record action result are
* not analyzed so they can be used in aggregations
*/
public class HistoryTemplateTransformMappingsTests extends AbstractWatcherIntegrationTests {
@Override
protected boolean timeWarped() {
return true; // just to have better control over the triggers
}
@Override
protected boolean enableShield() {
return false; // remove shield noise from this test
}
@Test
public void testTransformFields() throws Exception {
String index = "the-index";
String type = "the-type";
createIndex(index);
index(index, type, "{}");
flush();
refresh();
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("_id1").setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(simpleInput())
.condition(alwaysCondition())
.transform(scriptTransform("return [ 'key' : 'value1' ];"))
.addAction("logger", scriptTransform("return [ 'key' : 'value2' ];"), loggingAction("indexed")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id1");
// adding another watch which with a transform that should conflict with the preview watch. Since the
// mapping for the transform construct is disabled, there should be nor problems.
putWatchResponse = watcherClient().preparePutWatch("_id2").setSource(watchBuilder()
.trigger(schedule(interval("5s")))
.input(simpleInput())
.condition(alwaysCondition())
.transform(scriptTransform("return [ 'key' : [ 'key1' : 'value1' ] ];"))
.addAction("logger", scriptTransform("return [ 'key' : [ 'key1' : 'value2' ] ];"), loggingAction("indexed")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
timeWarp().scheduler().trigger("_id2");
flush();
refresh();
assertWatchWithMinimumActionsCount("_id1", WatchRecord.State.EXECUTED, 1);
assertWatchWithMinimumActionsCount("_id2", WatchRecord.State.EXECUTED, 1);
refresh();
assertBusy(new Runnable() {
@Override
public void run() {
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings().get();
assertThat(mappingsResponse, notNullValue());
assertThat(mappingsResponse.getMappings().isEmpty(), is(false));
for (ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> metadatas : mappingsResponse.getMappings()) {
if (!metadatas.key.startsWith(".watch_history")) {
continue;
}
MappingMetaData metadata = metadatas.value.get("watch_record");
assertThat(metadata, notNullValue());
try {
Map<String, Object> source = metadata.getSourceAsMap();
logger.info("checking index [{}] with metadata:\n[{}]", metadatas.key, metadata.source().toString());
assertThat(extractValue("properties.execution_result.properties.transform.enabled", source), is((Object) false));
assertThat(extractValue("properties.execution_result.properties.actions.properties.transform.enabled", source), is((Object) false));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});
}
}