From ff7283d65df551e647025e3f8277949ce9e5dfd8 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 20 Oct 2017 14:08:56 -0400 Subject: [PATCH] NIFI-4506 Adding toDate and format functions to record path. This closes #2221. Signed-off-by: Mark Payne --- .../nifi/record/path/functions/Format.java | 72 +++++++++ .../nifi/record/path/functions/ToDate.java | 84 +++++++++++ .../record/path/paths/RecordPathCompiler.java | 25 ++++ .../nifi/record/path/TestRecordPath.java | 139 ++++++++++++++++++ .../src/main/asciidoc/record-path-guide.adoc | 96 ++++++++++++ 5 files changed, 416 insertions(+) create mode 100644 nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java create mode 100644 nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java new file mode 100644 index 0000000000..2951402b6f --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Format.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.Date; +import java.util.stream.Stream; + +public class Format extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment dateFormat; + + public Format(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) { + super("format", null, absolute); + this.recordPath = recordPath; + this.dateFormat = dateFormat; + } + + @Override + public Stream evaluate(final RecordPathEvaluationContext context) { + final Stream fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context); + if (dateFormat == null) { + return fv; + } + + if (!(fv.getValue() instanceof Date) && !(fv.getValue() instanceof Number)) { + return fv; + } + + final Date dateValue = DataTypeUtils.toDate(fv.getValue(), null, fv.getField().getFieldName()); + final String formatted = dateFormat.format(dateValue); + return new StandardFieldValue(formatted, fv.getField(), fv.getParent().orElse(null)); + }); + } + + private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) { + final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context); + if (dateFormatString == null || dateFormatString.isEmpty()) { + return null; + } + + try { + return DataTypeUtils.getDateFormat(dateFormatString); + } catch (final Exception e) { + return null; + } + } +} diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java new file mode 100644 index 0000000000..3196d1a007 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ToDate.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.path.functions; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.paths.RecordPathSegment; +import org.apache.nifi.record.path.util.RecordPathUtils; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.util.Date; +import java.util.stream.Stream; + +public class ToDate extends RecordPathSegment { + + private final RecordPathSegment recordPath; + private final RecordPathSegment dateFormat; + + public ToDate(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) { + super("toDate", null, absolute); + this.recordPath = recordPath; + this.dateFormat = dateFormat; + } + + @Override + public Stream evaluate(RecordPathEvaluationContext context) { + final Stream fieldValues = recordPath.evaluate(context); + return fieldValues.filter(fv -> fv.getValue() != null) + .map(fv -> { + + if (!(fv.getValue() instanceof String)) { + return fv; + } + + final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context); + + final Date dateValue; + try { + dateValue = DataTypeUtils.toDate(fv.getValue(), () -> dateFormat, fv.getField().getFieldName()); + } catch (final Exception e) { + return fv; + } + + if (dateValue == null) { + return fv; + } + + return new StandardFieldValue(dateValue, fv.getField(), fv.getParent().orElse(null)); + }); + } + + private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) { + if (dateFormatSegment == null) { + return null; + } + + final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context); + if (dateFormatString == null || dateFormatString.isEmpty()) { + return null; + } + + try { + return DataTypeUtils.getDateFormat(dateFormatString); + } catch (final Exception e) { + return null; + } + } + +} diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java index 89fec05c2c..cfb5c064af 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -65,6 +65,7 @@ import org.apache.nifi.record.path.filter.NotFilter; import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.StartsWith; import org.apache.nifi.record.path.functions.Concat; +import org.apache.nifi.record.path.functions.Format; import org.apache.nifi.record.path.functions.FieldName; import org.apache.nifi.record.path.functions.Replace; import org.apache.nifi.record.path.functions.ReplaceNull; @@ -74,6 +75,7 @@ import org.apache.nifi.record.path.functions.SubstringAfter; import org.apache.nifi.record.path.functions.SubstringAfterLast; import org.apache.nifi.record.path.functions.SubstringBefore; import org.apache.nifi.record.path.functions.SubstringBeforeLast; +import org.apache.nifi.record.path.functions.ToDate; public class RecordPathCompiler { @@ -244,6 +246,14 @@ public class RecordPathCompiler { final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); return new FieldName(args[0], absolute); } + case "toDate": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new ToDate(args[0], args[1], absolute); + } + case "format": { + final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute); + return new Format(args[0], args[1], absolute); + } default: { throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only " + "be used within a predicate, not as a standalone function"); @@ -346,4 +356,19 @@ public class RecordPathCompiler { return argPaths; } + + private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int minCount, final int maxCount, final String functionName, final boolean absolute) { + final int numArgs = argumentListTree.getChildCount(); + if (numArgs < minCount || numArgs > maxCount) { + throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes at least" + minCount + + " arguments, and at most " + maxCount + "arguments, but got " + numArgs); + } + + final List argPaths = new ArrayList<>(); + for (int i=0; i < argumentListTree.getChildCount(); i++) { + argPaths.add(buildPath(argumentListTree.getChild(i), null, absolute)); + } + + return argPaths.toArray(new RecordPathSegment[argPaths.size()]); + } } diff --git a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java index 430d48c55e..5d6def877d 100644 --- a/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java +++ b/nifi-commons/nifi-record-path/src/test/java/org/apache/nifi/record/path/TestRecordPath.java @@ -20,6 +20,9 @@ package org.apache.nifi.record.path; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.sql.Date; +import java.text.DateFormat; +import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -36,6 +39,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.junit.Test; public class TestRecordPath { @@ -1017,6 +1021,141 @@ public class TestRecordPath { assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count()); } + @Test + public void testToDateFromString() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("date", "2017-10-20T11:00:00Z"); + final Record record = new MapRecord(schema, values); + + assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Date); + } + + @Test + public void testToDateFromLong() throws ParseException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("date", RecordFieldType.LONG.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd"); + final long dateValue = dateFormat.parse("2017-10-20T11:00:00Z").getTime(); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("date", dateValue); + final Record record = new MapRecord(schema, values); + + // since the field is a long it shouldn't do the conversion and should return the value unchanged + assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Long); + } + + @Test + public void testToDateFromNonDateString() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.DATE.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("name", "John Doe"); + final Record record = new MapRecord(schema, values); + + // since the field is a string it shouldn't do the conversion and should return the value unchanged + final FieldValue fieldValue = RecordPath.compile("toDate(/name, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get(); + assertEquals("John Doe", fieldValue.getValue()); + } + + @Test + public void testFormatDateFromString() throws ParseException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("date", "2017-10-20T11:00:00Z"); + final Record record = new MapRecord(schema, values); + + final FieldValue fieldValue = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get(); + assertEquals("2017-10-20", fieldValue.getValue()); + + final FieldValue fieldValueUnchanged = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get(); + assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged.getValue()); + } + + @Test + public void testFormatDateFromLong() throws ParseException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("date", RecordFieldType.LONG.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd"); + final long dateValue = dateFormat.parse("2017-10-20").getTime(); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("date", dateValue); + final Record record = new MapRecord(schema, values); + + assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get().getValue()); + + final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get(); + assertEquals(dateValue, fieldValueUnchanged.getValue()); + } + + @Test + public void testFormatDateFromDate() throws ParseException { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd"); + final java.util.Date utilDate = dateFormat.parse("2017-10-20"); + final Date dateValue = new Date(utilDate.getTime()); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("date", dateValue); + final Record record = new MapRecord(schema, values); + + assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd')").evaluate(record).getSelectedFields().findFirst().get().getValue()); + + final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID')").evaluate(record).getSelectedFields().findFirst().get(); + assertEquals(dateValue, fieldValueUnchanged.getValue()); + } + + @Test + public void testFormatDateWhenNotDate() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values = new HashMap<>(); + values.put("id", 48); + values.put("name", "John Doe"); + final Record record = new MapRecord(schema, values); + + assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue()); + } + private List getDefaultFields() { final List fields = new ArrayList<>(); fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); diff --git a/nifi-docs/src/main/asciidoc/record-path-guide.adoc b/nifi-docs/src/main/asciidoc/record-path-guide.adoc index 625fa55d85..f011c5212d 100644 --- a/nifi-docs/src/main/asciidoc/record-path-guide.adoc +++ b/nifi-docs/src/main/asciidoc/record-path-guide.adoc @@ -474,6 +474,102 @@ RecordPath finds a "city" field whose parent does not have a name that begins wi the value of the "city" field whose parent is "homeAddress" but not the value of the "city" field whose parent is "workAddress". +=== toDate + +Converts a String to a date. For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "eventDate", "type" : "string"} + ] +} +---- + +and a record such as: + +---- +{ + "name" : "My Event", + "eventDate" : "2017-10-20'T'11:00:00'Z'" +} +---- + +The following record path would parse the eventDate field into a Date: + +`toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` + +=== format + +Converts a Date to a String in the given format. + +The first argument to this function must be a Date or a Number, and the second argument must be a format String that +follows the Java SimpleDateFormat. + +For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "eventDate", "type" : { "type" : "long", "logicalType" : "timestamp-millis" } } + ] +} +---- + +and a record such as: + +---- +{ + "name" : "My Event", + "eventDate" : 1508457600000 +} +---- + +The following record path expressions would format the date as a String: + +|========================================================== +| RecordPath | Return value +| `format( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` | 2017-10-20'T'11:00:00'Z' +| `format( /eventDate, "yyyy-MM-dd")` | 2017-10-20 +|========================================================== + +In the case where the field is declared as a String, the toDate function must be called before formatting. + +For example, given a schema such as: + +---- +{ + "type": "record", + "name": "events", + "fields": [ + { "name": "name", "type": "string" }, + { "name": "eventDate", "type" : "string"} + ] +} +---- + +and a record such as: + +---- +{ + "name" : "My Event", + "eventDate" : "2017-10-20'T'11:00:00'Z'" +} +---- + +The following record path expression would re-format the date String: + +|========================================================== +| RecordPath | Return value +| `format( toDate(/eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'"), 'yyyy-MM-dd')` | 2017-10-20 +|========================================================== + [[filter_functions]] == Filter Functions