ingest: Introduction of a bytes processor (#31733)
ingest: Introduction of a bytes processor This processor allows for human readable byte values (e.g. 1kb) to be converted to value in bytes (e.g. 1024). Internally this processor re-uses "ByteSizeValue.parseBytesSizeValue" which supports conversions up to Long.MAX_VALUE and the following units: "b", "kb", "mb", "gb", "tb", pb". This change also introduces a generic return type for the AbstractStringProcessor to allow for code reuse while supporting a String -> T conversion. (String -> Long in this case).
This commit is contained in:
parent
396c578066
commit
c0056cddd8
|
@ -773,6 +773,33 @@ Accepts a single value or an array of values.
|
|||
--------------------------------------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
[[bytes-processor]]
|
||||
=== Bytes Processor
|
||||
Converts a human readable byte value (e.g. 1kb) to its value in bytes (e.g. 1024).
|
||||
|
||||
Supported human readable units are "b", "kb", "mb", "gb", "tb", "pb" case insensitive. An error will occur if
|
||||
the field is not a supported format or resultant value exceeds 2^63.
|
||||
|
||||
[[bytes-options]]
|
||||
.Bytes Options
|
||||
[options="header"]
|
||||
|======
|
||||
| Name | Required | Default | Description
|
||||
| `field` | yes | - | The field to convert
|
||||
| `target_field` | no | `field` | The field to assign the converted value to, by default `field` is updated in-place
|
||||
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|
||||
|======
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"bytes": {
|
||||
"field": "foo"
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
[[convert-processor]]
|
||||
=== Convert Processor
|
||||
Converts an existing field's value to a different type, such as converting a string to an integer.
|
||||
|
|
|
@ -27,10 +27,12 @@ import org.elasticsearch.ingest.Processor;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Base class for processors that manipulate strings and require a single "fields" array config value, which
|
||||
* Base class for processors that manipulate source strings and require a single "fields" array config value, which
|
||||
* holds a list of field names in string format.
|
||||
*
|
||||
* @param <T> The resultant type for the target field
|
||||
*/
|
||||
abstract class AbstractStringProcessor extends AbstractProcessor {
|
||||
abstract class AbstractStringProcessor<T> extends AbstractProcessor {
|
||||
private final String field;
|
||||
private final boolean ignoreMissing;
|
||||
private final String targetField;
|
||||
|
@ -67,7 +69,7 @@ abstract class AbstractStringProcessor extends AbstractProcessor {
|
|||
document.setFieldValue(targetField, process(val));
|
||||
}
|
||||
|
||||
protected abstract String process(String value);
|
||||
protected abstract T process(String value);
|
||||
|
||||
abstract static class Factory implements Processor.Factory {
|
||||
final String processorType;
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Processor that converts the content of string fields to the byte value.
|
||||
* Throws exception is the field is not of type string or can not convert to the numeric byte value
|
||||
*/
|
||||
public final class BytesProcessor extends AbstractStringProcessor {
|
||||
|
||||
public static final String TYPE = "bytes";
|
||||
|
||||
BytesProcessor(String processorTag, String field, boolean ignoreMissing, String targetField) {
|
||||
super(processorTag, field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long process(String value) {
|
||||
return ByteSizeValue.parseBytesSizeValue(value, null, getField()).getBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
public static final class Factory extends AbstractStringProcessor.Factory {
|
||||
|
||||
public Factory() {
|
||||
super(TYPE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BytesProcessor newProcessor(String tag, Map<String, Object> config, String field,
|
||||
boolean ignoreMissing, String targetField) {
|
||||
return new BytesProcessor(tag, field, ignoreMissing, targetField);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -81,6 +81,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
|
|||
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
|
||||
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
|
||||
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
|
||||
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
|
||||
return Collections.unmodifiableMap(processors);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocumen
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public abstract class AbstractStringProcessorTestCase extends ESTestCase {
|
||||
public abstract class AbstractStringProcessorTestCase<T> extends ESTestCase {
|
||||
|
||||
protected abstract AbstractStringProcessor newProcessor(String field, boolean ignoreMissing, String targetField);
|
||||
|
||||
|
@ -39,7 +39,11 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
|
|||
return input;
|
||||
}
|
||||
|
||||
protected abstract String expectedResult(String input);
|
||||
protected abstract T expectedResult(String input);
|
||||
|
||||
protected Class<T> expectedResultType(){
|
||||
return (Class<T>) String.class; // most results types are Strings
|
||||
}
|
||||
|
||||
public void testProcessor() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
|
@ -47,7 +51,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
|
|||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, modifyInput(fieldValue));
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(expectedResult(fieldValue)));
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, expectedResultType()), equalTo(expectedResult(fieldValue)));
|
||||
}
|
||||
|
||||
public void testFieldNotFound() throws Exception {
|
||||
|
@ -109,6 +113,6 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
|
|||
String targetFieldName = fieldName + "foo";
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), targetFieldName);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(targetFieldName, String.class), equalTo(expectedResult(fieldValue)));
|
||||
assertThat(ingestDocument.getFieldValue(targetFieldName, expectedResultType()), equalTo(expectedResult(fieldValue)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
|
||||
public class BytesProcessorFactoryTests extends AbstractStringProcessorFactoryTestCase {
|
||||
@Override
|
||||
protected AbstractStringProcessor.Factory newFactory() {
|
||||
return new BytesProcessor.Factory();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest.common;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class BytesProcessorTests extends AbstractStringProcessorTestCase {
|
||||
|
||||
private String modifiedInput;
|
||||
|
||||
@Override
|
||||
protected AbstractStringProcessor newProcessor(String field, boolean ignoreMissing, String targetField) {
|
||||
return new BytesProcessor(randomAlphaOfLength(10), field, ignoreMissing, targetField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String modifyInput(String input) {
|
||||
//largest value that allows all results < Long.MAX_VALUE bytes
|
||||
long randomNumber = randomLongBetween(1, Long.MAX_VALUE / ByteSizeUnit.PB.toBytes(1));
|
||||
ByteSizeUnit randomUnit = randomFrom(ByteSizeUnit.values());
|
||||
modifiedInput = randomNumber + randomUnit.getSuffix();
|
||||
return modifiedInput;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long expectedResult(String input) {
|
||||
return ByteSizeValue.parseBytesSizeValue(modifiedInput, null, "").getBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<Long> expectedResultType() {
|
||||
return Long.class;
|
||||
}
|
||||
|
||||
public void testTooLarge() {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "8912pb");
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
|
||||
assertThat(exception.getMessage(),
|
||||
CoreMatchers.equalTo("failed to parse setting [" + fieldName + "] with value [8912pb] as a size in bytes"));
|
||||
assertThat(exception.getCause().getMessage(),
|
||||
CoreMatchers.containsString("Values greater than 9223372036854775807 bytes are not supported"));
|
||||
}
|
||||
|
||||
public void testNotBytes() {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "junk");
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
|
||||
assertThat(exception.getMessage(),
|
||||
CoreMatchers.equalTo("failed to parse [junk]"));
|
||||
}
|
||||
|
||||
public void testMissingUnits() {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "1");
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> processor.execute(ingestDocument));
|
||||
assertThat(exception.getMessage(),
|
||||
CoreMatchers.containsString("unit is missing or unrecognized"));
|
||||
}
|
||||
|
||||
public void testFractional() throws Exception {
|
||||
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
|
||||
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "1.1kb");
|
||||
Processor processor = newProcessor(fieldName, randomBoolean(), fieldName);
|
||||
processor.execute(ingestDocument);
|
||||
assertThat(ingestDocument.getFieldValue(fieldName, expectedResultType()), equalTo(1126L));
|
||||
assertWarnings("Fractional bytes values are deprecated. Use non-fractional bytes values instead: [1.1kb] found for setting " +
|
||||
"[" + fieldName + "]");
|
||||
}
|
||||
}
|
|
@ -30,3 +30,4 @@
|
|||
- contains: { nodes.$master.ingest.processors: { type: split } }
|
||||
- contains: { nodes.$master.ingest.processors: { type: trim } }
|
||||
- contains: { nodes.$master.ingest.processors: { type: uppercase } }
|
||||
- contains: { nodes.$master.ingest.processors: { type: bytes } }
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
---
|
||||
teardown:
|
||||
- do:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
ignore: 404
|
||||
|
||||
---
|
||||
"Test bytes processor":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"description": "_description",
|
||||
"processors": [
|
||||
{
|
||||
"bytes" : {
|
||||
"field" : "bytes_source_field",
|
||||
"target_field" : "bytes_target_field"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
pipeline: "my_pipeline"
|
||||
body: {bytes_source_field: "1kb"}
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: test
|
||||
type: test
|
||||
id: 1
|
||||
- match: { _source.bytes_source_field: "1kb" }
|
||||
- match: { _source.bytes_target_field: 1024 }
|
||||
|
Loading…
Reference in New Issue