INGEST: Make a few Processors callable by Painless (#32170)
* INGEST: Make a few Processors callable by Painless * Extracted a few stateless String processors as well as the json processor to static methods and whitelisted them in Painless * provide whitelist from processors plugin
This commit is contained in:
parent
ac63408655
commit
e21692e387
|
@ -20,11 +20,17 @@
|
|||
esplugin {
|
||||
description 'Module for ingest processors that do not require additional security permissions or have large dependencies and resources'
|
||||
classname 'org.elasticsearch.ingest.common.IngestCommonPlugin'
|
||||
extendedPlugins = ['lang-painless']
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly project(':modules:lang-painless')
|
||||
compile project(':libs:grok')
|
||||
}
|
||||
|
||||
compileJava.options.compilerArgs << "-Xlint:-unchecked,-rawtypes"
|
||||
compileTestJava.options.compilerArgs << "-Xlint:-unchecked,-rawtypes"
|
||||
|
||||
integTestCluster {
|
||||
module project(':modules:lang-painless')
|
||||
}
|
|
@ -35,9 +35,13 @@ public final class BytesProcessor extends AbstractStringProcessor {
|
|||
super(processorTag, field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
public static long apply(String value) {
|
||||
return ByteSizeValue.parseBytesSizeValue(value, null, "Ingest Field").getBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long process(String value) {
|
||||
return ByteSizeValue.parseBytesSizeValue(value, null, getField()).getBytes();
|
||||
return apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -67,10 +67,8 @@ public final class JsonProcessor extends AbstractProcessor {
|
|||
return addToRoot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(IngestDocument document) throws Exception {
|
||||
Object fieldValue = document.getFieldValue(field, Object.class);
|
||||
BytesReference bytesRef = (fieldValue == null) ? new BytesArray("null") : new BytesArray(fieldValue.toString());
|
||||
public static Object apply(Object fieldValue) {
|
||||
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
|
||||
try (InputStream stream = bytesRef.streamInput();
|
||||
XContentParser parser = JsonXContent.jsonXContent
|
||||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
|
||||
|
@ -91,20 +89,32 @@ public final class JsonProcessor extends AbstractProcessor {
|
|||
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
|
||||
throw new IllegalArgumentException("cannot read binary value");
|
||||
}
|
||||
if (addToRoot && (value instanceof Map)) {
|
||||
for (Map.Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) {
|
||||
document.setFieldValue(entry.getKey(), entry.getValue());
|
||||
}
|
||||
} else if (addToRoot) {
|
||||
throw new IllegalArgumentException("cannot add non-map fields to root of document");
|
||||
} else {
|
||||
document.setFieldValue(targetField, value);
|
||||
}
|
||||
return value;
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void apply(Map<String, Object> ctx, String fieldName) {
|
||||
Object value = apply(ctx.get(fieldName));
|
||||
if (value instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
ctx.putAll(map);
|
||||
} else {
|
||||
throw new IllegalArgumentException("cannot add non-map fields to root of document");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(IngestDocument document) throws Exception {
|
||||
if (addToRoot) {
|
||||
apply(document.getSourceAndMetadata(), field);
|
||||
} else {
|
||||
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
|
|
|
@ -35,9 +35,13 @@ public final class LowercaseProcessor extends AbstractStringProcessor {
|
|||
super(processorTag, field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
public static String apply(String value) {
|
||||
return value.toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String process(String value) {
|
||||
return value.toLowerCase(Locale.ROOT);
|
||||
return apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public final class Processors {
|
||||
|
||||
public static long bytes(String value) {
|
||||
return BytesProcessor.apply(value);
|
||||
}
|
||||
|
||||
public static String lowercase(String value) {
|
||||
return LowercaseProcessor.apply(value);
|
||||
}
|
||||
|
||||
public static String uppercase(String value) {
|
||||
return UppercaseProcessor.apply(value);
|
||||
}
|
||||
|
||||
public static Object json(Object fieldValue) {
|
||||
return JsonProcessor.apply(fieldValue);
|
||||
}
|
||||
|
||||
public static void json(Map<String, Object> ctx, String field) {
|
||||
JsonProcessor.apply(ctx, field);
|
||||
}
|
||||
|
||||
public static String urlDecode(String value) {
|
||||
return URLDecodeProcessor.apply(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import org.elasticsearch.painless.spi.PainlessExtension;
|
||||
import org.elasticsearch.painless.spi.Whitelist;
|
||||
import org.elasticsearch.painless.spi.WhitelistLoader;
|
||||
import org.elasticsearch.script.IngestScript;
|
||||
import org.elasticsearch.script.ScriptContext;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ProcessorsWhitelistExtension implements PainlessExtension {
|
||||
|
||||
private static final Whitelist WHITELIST =
|
||||
WhitelistLoader.loadFromResourceFiles(ProcessorsWhitelistExtension.class, "processors_whitelist.txt");
|
||||
|
||||
@Override
|
||||
public Map<ScriptContext<?>, List<Whitelist>> getContextWhitelists() {
|
||||
return Collections.singletonMap(IngestScript.CONTEXT, Collections.singletonList(WHITELIST));
|
||||
}
|
||||
}
|
|
@ -34,15 +34,19 @@ public final class URLDecodeProcessor extends AbstractStringProcessor {
|
|||
super(processorTag, field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String process(String value) {
|
||||
public static String apply(String value) {
|
||||
try {
|
||||
return URLDecoder.decode(value, "UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new IllegalArgumentException("could not URL-decode field[" + getField() + "]", e);
|
||||
throw new IllegalArgumentException("Could not URL-decode value.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String process(String value) {
|
||||
return apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
|
|
|
@ -34,9 +34,13 @@ public final class UppercaseProcessor extends AbstractStringProcessor {
|
|||
super(processorTag, field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
public static String apply(String value) {
|
||||
return value.toUpperCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String process(String value) {
|
||||
return value.toUpperCase(Locale.ROOT);
|
||||
return apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
org.elasticsearch.ingest.common.ProcessorsWhitelistExtension
|
|
@ -0,0 +1,29 @@
|
|||
#
|
||||
# Licensed to Elasticsearch under one or more contributor
|
||||
# license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright
|
||||
# ownership. Elasticsearch licenses this file to you under
|
||||
# the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
# This file contains a whitelist of static processor methods that can be accessed from painless
|
||||
|
||||
class org.elasticsearch.ingest.common.Processors {
|
||||
long bytes(String)
|
||||
String lowercase(String)
|
||||
String uppercase(String)
|
||||
Object json(Object)
|
||||
void json(Map, String)
|
||||
String urlDecode(String)
|
||||
}
|
|
@ -63,7 +63,7 @@ public class BytesProcessorTests extends AbstractStringProcessorTestCase {
|
|||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
|
||||
assertThat(exception.getMessage(),
|
||||
CoreMatchers.equalTo("failed to parse setting [" + fieldName + "] with value [8912pb] as a size in bytes"));
|
||||
CoreMatchers.equalTo("failed to parse setting [Ingest Field] with value [8912pb] as a size in bytes"));
|
||||
assertThat(exception.getCause().getMessage(),
|
||||
CoreMatchers.containsString("Values greater than 9223372036854775807 bytes are not supported"));
|
||||
}
|
||||
|
@ -93,6 +93,6 @@ public class BytesProcessorTests extends AbstractStringProcessorTestCase {
|
|||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, expectedResultType()), equalTo(1126L));
|
||||
assertWarnings("Fractional bytes values are deprecated. Use non-fractional bytes values instead: [1.1kb] found for setting " +
|
||||
"[" + fieldName + "]");
|
||||
"[Ingest Field]");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,216 @@
|
|||
---
|
||||
teardown:
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
ignore: 404
|
||||
|
||||
---
|
||||
"Test invoke bytes processor":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "ctx.target_field = Processors.bytes(ctx.source_field)"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "1kb"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "1kb" }
|
||||
- match: { _source.target_field: 1024 }
|
||||
|
||||
---
|
||||
"Test invoke lowercase processor":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "ctx.target_field = Processors.lowercase(ctx.source_field)"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "FooBar"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "FooBar" }
|
||||
- match: { _source.target_field: "foobar" }
|
||||
|
||||
---
|
||||
"Test invoke uppercase processor":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "FooBar"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "FooBar" }
|
||||
- match: { _source.target_field: "FOOBAR" }
|
||||
|
||||
---
|
||||
"Test invoke json processor, assign to field":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "ctx.target_field = Processors.json(ctx.source_field)"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "{\"foo\":\"bar\"}"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "{\"foo\":\"bar\"}" }
|
||||
- match: { _source.target_field.foo: "bar" }
|
||||
|
||||
---
|
||||
"Test invoke json processor, assign to root":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "Processors.json(ctx, 'source_field')"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "{\"foo\":\"bar\"}"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "{\"foo\":\"bar\"}" }
|
||||
- match: { _source.foo: "bar" }
|
||||
|
||||
---
|
||||
"Test invoke urlDecode processor":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"script" : {
|
||||
"lang": "painless",
|
||||
"source" : "ctx.target_field = Processors.urlDecode(ctx.source_field)"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {source_field: "foo%20bar"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.source_field: "foo%20bar" }
|
||||
- match: { _source.target_field: "foo bar" }
|
Loading…
Reference in New Issue