NIFI-4506 Adding toDate and format functions to record path. This closes #2221.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bryan Bende 2017-10-20 14:08:56 -04:00 committed by Mark Payne
parent 1959586389
commit ff7283d65d
5 changed files with 416 additions and 0 deletions

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.util.Date;
import java.util.stream.Stream;
public class Format extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment dateFormat;
public Format(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) {
super("format", null, absolute);
this.recordPath = recordPath;
this.dateFormat = dateFormat;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context);
if (dateFormat == null) {
return fv;
}
if (!(fv.getValue() instanceof Date) && !(fv.getValue() instanceof Number)) {
return fv;
}
final Date dateValue = DataTypeUtils.toDate(fv.getValue(), null, fv.getField().getFieldName());
final String formatted = dateFormat.format(dateValue);
return new StandardFieldValue(formatted, fv.getField(), fv.getParent().orElse(null));
});
}
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) {
final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context);
if (dateFormatString == null || dateFormatString.isEmpty()) {
return null;
}
try {
return DataTypeUtils.getDateFormat(dateFormatString);
} catch (final Exception e) {
return null;
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.util.Date;
import java.util.stream.Stream;
public class ToDate extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment dateFormat;
public ToDate(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) {
super("toDate", null, absolute);
this.recordPath = recordPath;
this.dateFormat = dateFormat;
}
@Override
public Stream<FieldValue> evaluate(RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
if (!(fv.getValue() instanceof String)) {
return fv;
}
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context);
final Date dateValue;
try {
dateValue = DataTypeUtils.toDate(fv.getValue(), () -> dateFormat, fv.getField().getFieldName());
} catch (final Exception e) {
return fv;
}
if (dateValue == null) {
return fv;
}
return new StandardFieldValue(dateValue, fv.getField(), fv.getParent().orElse(null));
});
}
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) {
if (dateFormatSegment == null) {
return null;
}
final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context);
if (dateFormatString == null || dateFormatString.isEmpty()) {
return null;
}
try {
return DataTypeUtils.getDateFormat(dateFormatString);
} catch (final Exception e) {
return null;
}
}
}

View File

@ -65,6 +65,7 @@ import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith; import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Concat; import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.FieldName; import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.Replace; import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull; import org.apache.nifi.record.path.functions.ReplaceNull;
@ -74,6 +75,7 @@ import org.apache.nifi.record.path.functions.SubstringAfter;
import org.apache.nifi.record.path.functions.SubstringAfterLast; import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore; import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast; import org.apache.nifi.record.path.functions.SubstringBeforeLast;
import org.apache.nifi.record.path.functions.ToDate;
public class RecordPathCompiler { public class RecordPathCompiler {
@ -244,6 +246,14 @@ public class RecordPathCompiler {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute); final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new FieldName(args[0], absolute); return new FieldName(args[0], absolute);
} }
case "toDate": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ToDate(args[0], args[1], absolute);
}
case "format": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Format(args[0], args[1], absolute);
}
default: { default: {
throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only " throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+ "be used within a predicate, not as a standalone function"); + "be used within a predicate, not as a standalone function");
@ -346,4 +356,19 @@ public class RecordPathCompiler {
return argPaths; return argPaths;
} }
private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int minCount, final int maxCount, final String functionName, final boolean absolute) {
final int numArgs = argumentListTree.getChildCount();
if (numArgs < minCount || numArgs > maxCount) {
throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes at least" + minCount
+ " arguments, and at most " + maxCount + "arguments, but got " + numArgs);
}
final List<RecordPathSegment> argPaths = new ArrayList<>();
for (int i=0; i < argumentListTree.getChildCount(); i++) {
argPaths.add(buildPath(argumentListTree.getChild(i), null, absolute));
}
return argPaths.toArray(new RecordPathSegment[argPaths.size()]);
}
} }

View File

@ -20,6 +20,9 @@ package org.apache.nifi.record.path;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.sql.Date;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -36,6 +39,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.junit.Test; import org.junit.Test;
public class TestRecordPath { public class TestRecordPath {
@ -1017,6 +1021,141 @@ public class TestRecordPath {
assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count()); assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count());
} }
@Test
public void testToDateFromString() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", "2017-10-20T11:00:00Z");
final Record record = new MapRecord(schema, values);
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Date);
}
@Test
public void testToDateFromLong() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd");
final long dateValue = dateFormat.parse("2017-10-20T11:00:00Z").getTime();
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", dateValue);
final Record record = new MapRecord(schema, values);
// since the field is a long it shouldn't do the conversion and should return the value unchanged
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Long);
}
@Test
public void testToDateFromNonDateString() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
// since the field is a string it shouldn't do the conversion and should return the value unchanged
final FieldValue fieldValue = RecordPath.compile("toDate(/name, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get();
assertEquals("John Doe", fieldValue.getValue());
}
@Test
public void testFormatDateFromString() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", "2017-10-20T11:00:00Z");
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals("2017-10-20", fieldValue.getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged.getValue());
}
@Test
public void testFormatDateFromLong() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd");
final long dateValue = dateFormat.parse("2017-10-20").getTime();
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", dateValue);
final Record record = new MapRecord(schema, values);
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(dateValue, fieldValueUnchanged.getValue());
}
@Test
public void testFormatDateFromDate() throws ParseException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final DateFormat dateFormat = DataTypeUtils.getDateFormat("yyyy-MM-dd");
final java.util.Date utilDate = dateFormat.parse("2017-10-20");
final Date dateValue = new Date(utilDate.getTime());
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("date", dateValue);
final Record record = new MapRecord(schema, values);
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd')").evaluate(record).getSelectedFields().findFirst().get().getValue());
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID')").evaluate(record).getSelectedFields().findFirst().get();
assertEquals(dateValue, fieldValueUnchanged.getValue());
}
@Test
public void testFormatDateWhenNotDate() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
private List<RecordField> getDefaultFields() { private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

View File

@ -474,6 +474,102 @@ RecordPath finds a "city" field whose parent does not have a name that begins wi
the value of the "city" field whose parent is "homeAddress" but not the value of the "city" field whose parent is "workAddress". the value of the "city" field whose parent is "homeAddress" but not the value of the "city" field whose parent is "workAddress".
=== toDate
Converts a String to a date. For example, given a schema such as:
----
{
"type": "record",
"name": "events",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "eventDate", "type" : "string"}
]
}
----
and a record such as:
----
{
"name" : "My Event",
"eventDate" : "2017-10-20'T'11:00:00'Z'"
}
----
The following record path would parse the eventDate field into a Date:
`toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")`
=== format
Converts a Date to a String in the given format.
The first argument to this function must be a Date or a Number, and the second argument must be a format String that
follows the Java SimpleDateFormat.
For example, given a schema such as:
----
{
"type": "record",
"name": "events",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "eventDate", "type" : { "type" : "long", "logicalType" : "timestamp-millis" } }
]
}
----
and a record such as:
----
{
"name" : "My Event",
"eventDate" : 1508457600000
}
----
The following record path expressions would format the date as a String:
|==========================================================
| RecordPath | Return value
| `format( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` | 2017-10-20'T'11:00:00'Z'
| `format( /eventDate, "yyyy-MM-dd")` | 2017-10-20
|==========================================================
In the case where the field is declared as a String, the toDate function must be called before formatting.
For example, given a schema such as:
----
{
"type": "record",
"name": "events",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "eventDate", "type" : "string"}
]
}
----
and a record such as:
----
{
"name" : "My Event",
"eventDate" : "2017-10-20'T'11:00:00'Z'"
}
----
The following record path expression would re-format the date String:
|==========================================================
| RecordPath | Return value
| `format( toDate(/eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'"), 'yyyy-MM-dd')` | 2017-10-20
|==========================================================
[[filter_functions]] [[filter_functions]]
== Filter Functions == Filter Functions