From 7c46b57ff29e85e2e222020d3ec1aae6d9689816 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 17 May 2016 12:06:48 -0400 Subject: [PATCH] Add a Sort ingest processor Sorts an array of values in ascending or descending order. If all elements are numerics, they will be sorted numerically. If values are strings, or mixtures of strings/numbers, the elements will be sorted lexicographically. --- .../ingest/processor/SortProcessor.java | 137 +++++++++ .../org/elasticsearch/node/NodeModule.java | 2 + .../ingest/processor/SortProcessorTests.java | 281 ++++++++++++++++++ docs/reference/ingest/ingest-node.asciidoc | 25 ++ .../rest-api-spec/test/ingest/90_sort.yaml | 35 +++ 5 files changed, 480 insertions(+) create mode 100644 core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java create mode 100644 core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml diff --git a/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java b/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java new file mode 100644 index 00000000000..313a8bed7fc --- /dev/null +++ b/core/src/main/java/org/elasticsearch/ingest/processor/SortProcessor.java @@ -0,0 +1,137 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.ingest.core.AbstractProcessor; +import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.core.ConfigurationUtils; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Processor that sorts an array of items. + * Throws exception is the specified field is not an array. + */ +public final class SortProcessor extends AbstractProcessor { + + public static final String TYPE = "sort"; + public static final String FIELD = "field"; + public static final String ORDER = "order"; + public static final String DEFAULT_ORDER = "asc"; + + public enum SortOrder { + ASCENDING("asc"), DESCENDING("desc"); + + private final String direction; + + SortOrder(String direction) { + this.direction = direction; + } + + public String toString() { + return this.direction; + } + + public static SortOrder fromString(String value) { + if (value == null) { + throw new IllegalArgumentException("Sort direction cannot be null"); + } + + if (value.equals(ASCENDING.toString())) { + return ASCENDING; + } else if (value.equals(DESCENDING.toString())) { + return DESCENDING; + } + throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + + " Valid values are: [asc, desc]"); + } + } + + private final String field; + private final SortOrder order; + + SortProcessor(String tag, String field, SortOrder order) { + super(tag); + this.field = field; + this.order = order; + } + + String getField() { + return field; + } + + SortOrder getOrder() { + return order; + } + + @Override + @SuppressWarnings("unchecked") + public void execute(IngestDocument document) { + List list = document.getFieldValue(field, List.class); + + if (list == null) { + throw new IllegalArgumentException("field [" + field + "] is null, cannot sort."); + } + + if (list.size() <= 1) { + return; + } + + if (order.equals(SortOrder.ASCENDING)) { + Collections.sort(list); + } else { + Collections.sort(list, Collections.reverseOrder()); + } + + document.setFieldValue(field, list); + } + + @Override + public String getType() { + return TYPE; + } + + public final static class Factory extends AbstractProcessorFactory { + + @Override + public SortProcessor doCreate(String processorTag, Map config) throws Exception { + String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, FIELD); + try { + SortOrder direction = SortOrder.fromString( + ConfigurationUtils.readStringProperty( + TYPE, + processorTag, + config, + ORDER, + DEFAULT_ORDER)); + return new SortProcessor(processorTag, field, direction); + } catch (IllegalArgumentException e) { + throw ConfigurationUtils.newConfigurationException(TYPE, processorTag, ORDER, e.getMessage()); + } + } + } +} + diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 8565b14be8e..8c9b3e362fc 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -37,6 +37,7 @@ import org.elasticsearch.ingest.processor.LowercaseProcessor; import org.elasticsearch.ingest.processor.RemoveProcessor; import org.elasticsearch.ingest.processor.RenameProcessor; import org.elasticsearch.ingest.processor.SetProcessor; +import org.elasticsearch.ingest.processor.SortProcessor; import org.elasticsearch.ingest.processor.SplitProcessor; import org.elasticsearch.ingest.processor.TrimProcessor; import org.elasticsearch.ingest.processor.UppercaseProcessor; @@ -78,6 +79,7 @@ public class NodeModule extends AbstractModule { registerProcessor(FailProcessor.TYPE, (templateService, registry) -> new FailProcessor.Factory(templateService)); registerProcessor(ForEachProcessor.TYPE, (templateService, registry) -> new ForEachProcessor.Factory(registry)); registerProcessor(DateIndexNameProcessor.TYPE, (templateService, registry) -> new DateIndexNameProcessor.Factory()); + registerProcessor(SortProcessor.TYPE, (templateService, registry) -> new SortProcessor.Factory()); } @Override diff --git a/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java b/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java new file mode 100644 index 00000000000..f67ca3b6bee --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/processor/SortProcessorTests.java @@ -0,0 +1,281 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.processor; + +import org.elasticsearch.ingest.core.IngestDocument; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.ingest.core.Processor; +import org.elasticsearch.ingest.processor.SortProcessor.SortOrder; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class SortProcessorTests extends ESTestCase { + + public void testSortStrings() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + String value = randomAsciiOfLengthBetween(1, 10); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortIntegersNonRandom() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + + Integer[] expectedResult = new Integer[]{1,2,3,4,5,10,20,21,22,50,100}; + List fieldValue = new ArrayList<>(expectedResult.length); + fieldValue.addAll(Arrays.asList(expectedResult).subList(0, expectedResult.length)); + Collections.shuffle(fieldValue); + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, SortOrder.ASCENDING); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(fieldName, List.class).toArray(), equalTo(expectedResult)); + } + + public void testSortIntegers() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Integer value = randomIntBetween(1, 100); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortShorts() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Short value = randomShort(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortDoubles() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Double value = randomDoubleBetween(0.0, 100.0, true); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortFloats() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Float value = randomFloat(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortBytes() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Byte value = randomByte(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortBooleans() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + for (int j = 0; j < numItems; j++) { + Boolean value = randomBoolean(); + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortMixedStrings() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + int numItems = randomIntBetween(1, 10); + List fieldValue = new ArrayList<>(numItems); + List expectedResult = new ArrayList<>(numItems); + String value; + for (int j = 0; j < numItems; j++) { + if (randomBoolean()) { + value = String.valueOf(randomIntBetween(0, 100)); + } else { + value = randomAsciiOfLengthBetween(1, 10); + } + fieldValue.add(value); + expectedResult.add(value); + } + Collections.sort(expectedResult); + + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + if (order.equals(SortOrder.DESCENDING)) { + Collections.reverse(expectedResult); + } + + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + processor.execute(ingestDocument); + assertEquals(ingestDocument.getFieldValue(fieldName, List.class), expectedResult); + } + + public void testSortNonListField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10)); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.String] cannot be cast to [java.util.List]")); + } + } + + public void testSortNonExistingField() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String fieldName = RandomDocumentPicks.randomFieldName(random()); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), fieldName, order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("not present as part of path [" + fieldName + "]")); + } + } + + public void testSortNullValue() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("field", null)); + SortOrder order = randomBoolean() ? SortOrder.ASCENDING : SortOrder.DESCENDING; + Processor processor = new SortProcessor(randomAsciiOfLength(10), "field", order); + try { + processor.execute(ingestDocument); + } catch(IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("field [field] is null, cannot sort.")); + } + } + + + +} diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index ec6253f2d93..3dcf719f382 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1282,6 +1282,31 @@ Splits a field into an array using a separator character. Only works on string f -------------------------------------------------- <1> Treat all consecutive whitespace characters as a single separator +[[sort-processor]] +=== Sort Processor +Sorts the elements of an array ascending or descending. Homogeneous arrays of numbers will be sorted +numerically, while arrays of strings or heterogeneous arrays of strings + numbers will be sorted lexicographically. +Throws an error when the field is not an array. + +[[sort-options]] +.Sort Options +[options="header"] +|====== +| Name | Required | Default | Description +| `field` | yes | - | The field to be sorted +| `order` | no | `"asc"` | The sort order to use. Accepts `"asc"` or `"desc"`. +|====== + +[source,js] +-------------------------------------------------- +{ + "sort": { + "field": "field_to_sort", + "order": "desc" + } +} +-------------------------------------------------- + [[trim-processor]] === Trim Processor Trims whitespace from field. diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml new file mode 100644 index 00000000000..096e57fa4e1 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/ingest/90_sort.yaml @@ -0,0 +1,35 @@ +--- +"Test sort Processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "sort" : { + "field" : "values" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + type: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "values": ["foo", "bar", "baz"] + } + + - do: + get: + index: test + type: test + id: 1 + - match: { _source.values: ["bar", "baz", "foo"] }