Add a Sort ingest processor

Sorts an array of values in ascending or descending order. If all elements are numerics, they will be sorted numerically. If values are strings, or mixtures of strings/numbers, the elements will be sorted lexicographically.
This commit is contained in:
Zachary Tong 2016-05-17 12:06:48 -04:00
parent b12cabd2f5
commit 7c46b57ff2
5 changed files with 480 additions and 0 deletions

View File

@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.ingest.core.AbstractProcessor;
import org.elasticsearch.ingest.core.AbstractProcessorFactory;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.ConfigurationUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public final class SortProcessor extends AbstractProcessor {
public static final String TYPE = "sort";
public static final String FIELD = "field";
public static final String ORDER = "order";
public static final String DEFAULT_ORDER = "asc";
public enum SortOrder {
ASCENDING("asc"), DESCENDING("desc");
private final String direction;
SortOrder(String direction) {
this.direction = direction;
}
public String toString() {
return this.direction;
}
public static SortOrder fromString(String value) {
if (value == null) {
throw new IllegalArgumentException("Sort direction cannot be null");
}
if (value.equals(ASCENDING.toString())) {
return ASCENDING;
} else if (value.equals(DESCENDING.toString())) {
return DESCENDING;
}
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized."
+ " Valid values are: [asc, desc]");
}
}
private final String field;
private final SortOrder order;
SortProcessor(String tag, String field, SortOrder order) {
super(tag);
this.field = field;
this.order = order;
}
String getField() {
return field;
}
SortOrder getOrder() {
return order;
}
@Override
@SuppressWarnings("unchecked")
public void execute(IngestDocument document) {
List<? extends Comparable> list = document.getFieldValue(field, List.class);
if (list == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot sort.");
}
if (list.size() <= 1) {
return;
}
if (order.equals(SortOrder.ASCENDING)) {
Collections.sort(list);
} else {
Collections.sort(list, Collections.reverseOrder());
}
document.setFieldValue(field, list);
}
@Override
public String getType() {
return TYPE;
}
public final static class Factory extends AbstractProcessorFactory<SortProcessor> {
@Override
public SortProcessor doCreate(String processorTag, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD);
try {
SortOrder direction = SortOrder.fromString(
ConfigurationUtils.readStringProperty(
TYPE,
processorTag,
config,
ORDER,
DEFAULT_ORDER));
return new SortProcessor(processorTag, field, direction);
} catch (IllegalArgumentException e) {
throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, ORDER, e.getMessage());
}
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.ingest.processor.LowercaseProcessor;
import org.elasticsearch.ingest.processor.RemoveProcessor;
import org.elasticsearch.ingest.processor.RenameProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SortProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.ingest.processor.TrimProcessor;
import org.elasticsearch.ingest.processor.UppercaseProcessor;
@ -78,6 +79,7 @@ public class NodeModule extends AbstractModule {
registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService));
registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry));
registerProcessor(DateIndexNameProcessor.TYPE, (templateService, registry) -> new DateIndexNameProcessor.Factory());
registerProcessor(SortProcessor.TYPE, (templateService, registry) -> new SortProcessor.Factory());
}
@Override

View File

@ -0,0 +1,281 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.processor;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.processor.SortProcessor.SortOrder;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class SortProcessorTests extends ESTestCase {
public void testSortStrings() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>(numItems);
List<String> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
String value = randomAsciiOfLengthBetween(1, 10);
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortIntegersNonRandom() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Integer[] expectedResult = new Integer[]{1,2,3,4,5,10,20,21,22,50,100};
List<Integer> fieldValue = new ArrayList<>(expectedResult.length);
fieldValue.addAll(Arrays.asList(expectedResult).subList(0, expectedResult.length));
Collections.shuffle(fieldValue);
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, SortOrder.ASCENDING);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(fieldName, List.class).toArray(), equalTo(expectedResult));
}
public void testSortIntegers() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Integer> fieldValue = new ArrayList<>(numItems);
List<Integer> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Integer value = randomIntBetween(1, 100);
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortShorts() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Short> fieldValue = new ArrayList<>(numItems);
List<Short> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Short value = randomShort();
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortDoubles() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Double> fieldValue = new ArrayList<>(numItems);
List<Double> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Double value = randomDoubleBetween(0.0, 100.0, true);
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortFloats() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Float> fieldValue = new ArrayList<>(numItems);
List<Float> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Float value = randomFloat();
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortBytes() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Byte> fieldValue = new ArrayList<>(numItems);
List<Byte> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Byte value = randomByte();
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortBooleans() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<Boolean> fieldValue = new ArrayList<>(numItems);
List<Boolean> expectedResult = new ArrayList<>(numItems);
for (int j = 0; j < numItems; j++) {
Boolean value = randomBoolean();
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortMixedStrings() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numItems = randomIntBetween(1, 10);
List<String> fieldValue = new ArrayList<>(numItems);
List<String> expectedResult = new ArrayList<>(numItems);
String value;
for (int j = 0; j < numItems; j++) {
if (randomBoolean()) {
value = String.valueOf(randomIntBetween(0, 100));
} else {
value = randomAsciiOfLengthBetween(1, 10);
}
fieldValue.add(value);
expectedResult.add(value);
}
Collections.sort(expectedResult);
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
if (order.equals(SortOrder.DESCENDING)) {
Collections.reverse(expectedResult);
}
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
processor.execute(ingestDocument);
assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult);
}
public void testSortNonListField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10));
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.String] cannot be cast to [java.util.List]"));
}
}
public void testSortNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order);
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("not present as part of path [" + fieldName + "]"));
}
}
public void testSortNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null));
SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING;
Processor processor = new SortProcessor(randomAsciiOfLength(10), "field", order);
try {
processor.execute(ingestDocument);
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("field [field] is null, cannot sort."));
}
}
}

View File

@ -1282,6 +1282,31 @@ Splits a field into an array using a separator character. Only works on string f
--------------------------------------------------
<1> Treat all consecutive whitespace characters as a single separator
[[sort-processor]]
=== Sort Processor
Sorts the elements of an array ascending or descending. Homogeneous arrays of numbers will be sorted
numerically, while arrays of strings or heterogeneous arrays of strings + numbers will be sorted lexicographically.
Throws an error when the field is not an array.
[[sort-options]]
.Sort Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to be sorted
| `order` | no | `"asc"` | The sort order to use. Accepts `"asc"` or `"desc"`.
|======
[source,js]
--------------------------------------------------
{
"sort": {
"field": "field_to_sort",
"order": "desc"
}
}
--------------------------------------------------
[[trim-processor]]
=== Trim Processor
Trims whitespace from field.

View File

@ -0,0 +1,35 @@
---
"Test sort Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"sort" : {
"field" : "values"
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
type: test
id: 1
pipeline: "my_pipeline"
body: >
{
"values": ["foo", "bar", "baz"]
}
- do:
get:
index: test
type: test
id: 1
- match: { _source.values: ["bar", "baz", "foo"] }