diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java new file mode 100644 index 00000000000..313a8bed7fc --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.ingest.core.AbstractProcessor; +import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.ConfigurationUtils; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Processor that sorts an array of items. + * Throws exception is the specified field is not an array. + */ +public final class SortProcessor extends AbstractProcessor { + + public static final String TYPE = "sort"; + public static final String FIELD = "field"; + public static final String ORDER = "order"; + public static final String DEFAULT_ORDER = "asc"; + + public enum SortOrder { + ASCENDING("asc"), DESCENDING("desc"); + + private final String direction; + + SortOrder(String direction) { + this.direction = direction; + } + + public String toString() { + return this.direction; + } + + public static SortOrder fromString(String value) { + if (value == null) { + throw new IllegalArgumentException("Sort direction cannot be null"); + } + + if (value.equals(ASCENDING.toString())) { + return ASCENDING; + } else if (value.equals(DESCENDING.toString())) { + return DESCENDING; + } + throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + + " Valid values are: [asc, desc]"); + } + } + + private final String field; + private final SortOrder order; + + SortProcessor(String tag, String field, SortOrder order) { + super(tag); + this.field = field; + this.order = order; + } + + String getField() { + return field; + } + + SortOrder getOrder() { + return order; + } + + @Override + @SuppressWarnings("unchecked") + public void execute(IngestDocument document) { + List list = document.getFieldValue(field, List.class); + + if (list == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot sort."); + } + + if (list.size() <= 1) { + return; + } + + if (order.equals(SortOrder.ASCENDING)) { + Collections.sort(list); + } else { + Collections.sort(list, Collections.reverseOrder()); + } + + document.setFieldValue(field, list); + } + + @Override + public String getType() { + return TYPE; + } + + public final static class Factory extends AbstractProcessorFactory { + + @Override + public SortProcessor doCreate(String processorTag, Map config) throws Exception { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD); + try { + SortOrder direction = SortOrder.fromString( + ConfigurationUtils.readStringProperty( + TYPE, + processorTag, + config, + ORDER, + DEFAULT_ORDER)); + return new SortProcessor(processorTag, field, direction); + } catch (IllegalArgumentException e) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, ORDER, e.getMessage()); + } + } + } +} + diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 8565b14be8e..8c9b3e362fc 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -37,6 +37,7 @@ import org.elasticsearch.ingest.processor.LowercaseProcessor; import org.elasticsearch.ingest.processor.RemoveProcessor; import org.elasticsearch.ingest.processor.RenameProcessor; import org.elasticsearch.ingest.processor.SetProcessor; +import org.elasticsearch.ingest.processor.SortProcessor; import org.elasticsearch.ingest.processor.SplitProcessor; import org.elasticsearch.ingest.processor.TrimProcessor; import org.elasticsearch.ingest.processor.UppercaseProcessor; @@ -78,6 +79,7 @@ public class NodeModule extends AbstractModule { registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService)); registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry)); registerProcessor(DateIndexNameProcessor.TYPE, (templateService, registry) -> new DateIndexNameProcessor.Factory()); + registerProcessor(SortProcessor.TYPE, (templateService, registry) -> new SortProcessor.Factory()); } @Override diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java new file mode 100644 index 00000000000..f67ca3b6bee --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java @@ -0,0 +1,281 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.processor.SortProcessor.SortOrder; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class SortProcessorTests extends ESTestCase { + + public void testSortStrings() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + String value = randomAsciiOfLengthBetween(1, 10); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortIntegersNonRandom() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + + Integer[] expectedResult = new Integer[]{1,2,3,4,5,10,20,21,22,50,100}; + List fieldValue = new ArrayList<>(expectedResult.length); + fieldValue.addAll(Arrays.asList(expectedResult).subList(0, expectedResult.length)); + Collections.shuffle(fieldValue); + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, SortOrder.ASCENDING); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(fieldName, List.class).toArray(), equalTo(expectedResult)); + } + + public void testSortIntegers() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Integer value = randomIntBetween(1, 100); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortShorts() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Short value = randomShort(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortDoubles() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Double value = randomDoubleBetween(0.0, 100.0, true); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortFloats() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Float value = randomFloat(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortBytes() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Byte value = randomByte(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortBooleans() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Boolean value = randomBoolean(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortMixedStrings() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + String value; + for (int j = 0; j < numItems; j++) { + if (randomBoolean()) { + value = String.valueOf(randomIntBetween(0, 100)); + } else { + value = randomAsciiOfLengthBetween(1, 10); + } + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortNonListField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10)); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.String] cannot be cast to [java.util.List]")); + } + } + + public void testSortNonExistingField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("not present as part of path [" + fieldName + "]")); + } + } + + public void testSortNullValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null)); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), "field", order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [field] is null, cannot sort.")); + } + } + + + +} diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index ec6253f2d93..3dcf719f382 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1282,6 +1282,31 @@ Splits a field into an array using a separator character. Only works on string f -------------------------------------------------- <1> Treat all consecutive whitespace characters as a single separator +[[sort-processor]] +=== Sort Processor +Sorts the elements of an array ascending or descending. Homogeneous arrays of numbers will be sorted +numerically, while arrays of strings or heterogeneous arrays of strings + numbers will be sorted lexicographically. +Throws an error when the field is not an array. + +[[sort-options]] +.Sort Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The field to be sorted +| `order` | no | `"asc"` | The sort order to use. Accepts `"asc"` or `"desc"`. +|====== + +[source,js] +-------------------------------------------------- +{ + "sort": { + "field": "field_to_sort", + "order": "desc" + } +} +-------------------------------------------------- + [[trim-processor]] === Trim Processor Trims whitespace from field. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml new file mode 100644 index 00000000000..096e57fa4e1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml @@ -0,0 +1,35 @@ +--- +"Test sort Processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "sort" : { + "field" : "values" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "values": ["foo", "bar", "baz"] + } + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.values: ["bar", "baz", "foo"] }