mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 10:08:42 +00:00
NIFI-6035: 1. Add formatWithTimeZone() and toDateWithTimeZone(); 2. Their test code and docs.
This closes #3481 Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
parent
4b509aa5a5
commit
4d18eaa481
@ -66,6 +66,11 @@
|
||||
<artifactId>nifi-record</artifactId>
|
||||
<version>1.10.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-properties</artifactId>
|
||||
<version>1.10.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.antlr</groupId>
|
||||
<artifactId>antlr-runtime</artifactId>
|
||||
|
@ -22,6 +22,7 @@ import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
import org.apache.nifi.record.path.util.RecordPathUtils;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.stream.Stream;
|
||||
@ -30,11 +31,20 @@ public class Format extends RecordPathSegment {
|
||||
|
||||
private final RecordPathSegment recordPath;
|
||||
private final RecordPathSegment dateFormat;
|
||||
private final RecordPathSegment timeZoneID;
|
||||
|
||||
public Format(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) {
|
||||
super("format", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
this.dateFormat = dateFormat;
|
||||
this.timeZoneID = null;
|
||||
}
|
||||
|
||||
public Format(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final RecordPathSegment timeZoneID, final boolean absolute) {
|
||||
super("format", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
this.dateFormat = dateFormat;
|
||||
this.timeZoneID = timeZoneID;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -42,7 +52,7 @@ public class Format extends RecordPathSegment {
|
||||
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
|
||||
return fieldValues.filter(fv -> fv.getValue() != null)
|
||||
.map(fv -> {
|
||||
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context);
|
||||
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, this.timeZoneID, context);
|
||||
if (dateFormat == null) {
|
||||
return fv;
|
||||
}
|
||||
@ -57,14 +67,22 @@ public class Format extends RecordPathSegment {
|
||||
});
|
||||
}
|
||||
|
||||
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) {
|
||||
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathSegment timeZoneID, final RecordPathEvaluationContext context) {
|
||||
final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context);
|
||||
if (dateFormatString == null || dateFormatString.isEmpty()) {
|
||||
if (StringUtils.isEmpty(dateFormatString)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (timeZoneID == null) {
|
||||
return DataTypeUtils.getDateFormat(dateFormatString);
|
||||
} else {
|
||||
final String timeZoneStr = RecordPathUtils.getFirstStringValue(timeZoneID, context);
|
||||
if (StringUtils.isEmpty(timeZoneStr)) {
|
||||
return null;
|
||||
}
|
||||
return DataTypeUtils.getDateFormat(dateFormatString, timeZoneStr);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
return null;
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
import org.apache.nifi.record.path.util.RecordPathUtils;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.stream.Stream;
|
||||
@ -30,11 +31,20 @@ public class ToDate extends RecordPathSegment {
|
||||
|
||||
private final RecordPathSegment recordPath;
|
||||
private final RecordPathSegment dateFormat;
|
||||
private final RecordPathSegment timeZoneID;
|
||||
|
||||
public ToDate(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final boolean absolute) {
|
||||
super("toDate", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
this.dateFormat = dateFormat;
|
||||
this.timeZoneID = null;
|
||||
}
|
||||
|
||||
public ToDate(final RecordPathSegment recordPath, final RecordPathSegment dateFormat, final RecordPathSegment timeZoneID, final boolean absolute) {
|
||||
super("toDate", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
this.dateFormat = dateFormat;
|
||||
this.timeZoneID = timeZoneID;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -47,7 +57,7 @@ public class ToDate extends RecordPathSegment {
|
||||
return fv;
|
||||
}
|
||||
|
||||
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, context);
|
||||
final java.text.DateFormat dateFormat = getDateFormat(this.dateFormat, this.timeZoneID, context);
|
||||
|
||||
final Date dateValue;
|
||||
try {
|
||||
@ -64,18 +74,26 @@ public class ToDate extends RecordPathSegment {
|
||||
});
|
||||
}
|
||||
|
||||
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathEvaluationContext context) {
|
||||
private java.text.DateFormat getDateFormat(final RecordPathSegment dateFormatSegment, final RecordPathSegment timeZoneID, final RecordPathEvaluationContext context) {
|
||||
if (dateFormatSegment == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String dateFormatString = RecordPathUtils.getFirstStringValue(dateFormatSegment, context);
|
||||
if (dateFormatString == null || dateFormatString.isEmpty()) {
|
||||
if (StringUtils.isEmpty(dateFormatString)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (timeZoneID == null) {
|
||||
return DataTypeUtils.getDateFormat(dateFormatString);
|
||||
} else {
|
||||
final String timeZoneStr = RecordPathUtils.getFirstStringValue(timeZoneID, context);
|
||||
if (StringUtils.isEmpty(timeZoneStr)) {
|
||||
return null;
|
||||
}
|
||||
return DataTypeUtils.getDateFormat(dateFormatString, timeZoneStr);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
return null;
|
||||
}
|
||||
|
@ -251,8 +251,15 @@ public class RecordPathCompiler {
|
||||
return new FieldName(args[0], absolute);
|
||||
}
|
||||
case "toDate": {
|
||||
final int numArgs = argumentListTree.getChildCount();
|
||||
|
||||
if (numArgs == 2) {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
return new ToDate(args[0], args[1], absolute);
|
||||
} else {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
|
||||
return new ToDate(args[0], args[1], args[2], absolute);
|
||||
}
|
||||
}
|
||||
case "toString": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
@ -263,8 +270,15 @@ public class RecordPathCompiler {
|
||||
return new ToBytes(args[0], args[1], absolute);
|
||||
}
|
||||
case "format": {
|
||||
final int numArgs = argumentListTree.getChildCount();
|
||||
|
||||
if (numArgs == 2) {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
return new Format(args[0], args[1], absolute);
|
||||
} else {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
|
||||
return new Format(args[0], args[1], args[2], absolute);
|
||||
}
|
||||
}
|
||||
case "base64Encode": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
|
@ -1243,6 +1243,7 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Date);
|
||||
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\", \"GMT+8:00\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Date);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1263,6 +1264,7 @@ public class TestRecordPath {
|
||||
|
||||
// since the field is a long it shouldn't do the conversion and should return the value unchanged
|
||||
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Long);
|
||||
assertTrue(RecordPath.compile("toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\", \"GMT+8:00\")").evaluate(record).getSelectedFields().findFirst().get().getValue() instanceof Long);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1281,6 +1283,8 @@ public class TestRecordPath {
|
||||
// since the field is a string it shouldn't do the conversion and should return the value unchanged
|
||||
final FieldValue fieldValue = RecordPath.compile("toDate(/name, \"yyyy-MM-dd'T'HH:mm:ss'Z'\")").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals("John Doe", fieldValue.getValue());
|
||||
final FieldValue fieldValue2 = RecordPath.compile("toDate(/name, \"yyyy-MM-dd'T'HH:mm:ss'Z'\", \"GMT+8:00\")").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals("John Doe", fieldValue2.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1298,9 +1302,18 @@ public class TestRecordPath {
|
||||
|
||||
final FieldValue fieldValue = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals("2017-10-20", fieldValue.getValue());
|
||||
final FieldValue fieldValue2 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd' , 'GMT+8:00')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals("2017-10-20", fieldValue2.getValue());
|
||||
|
||||
final FieldValue fieldValue3 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'yyyy-MM-dd HH:mm', 'GMT+8:00')").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals("2017-10-20 19:00", fieldValue3.getValue());
|
||||
|
||||
final FieldValue fieldValueUnchanged = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged.getValue());
|
||||
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format( toDate(/date, \"yyyy-MM-dd'T'HH:mm:ss'Z'\"), 'INVALID' , 'INVALID')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(DataTypeUtils.getDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").parse("2017-10-20T11:00:00Z"), fieldValueUnchanged2.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1320,9 +1333,12 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("2017-10-20 08:00:00", RecordPath.compile("format(/date, 'yyyy-MM-dd HH:mm:ss', 'GMT+8:00' )").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
|
||||
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(dateValue, fieldValueUnchanged.getValue());
|
||||
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format(/date, 'INVALID', 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(dateValue, fieldValueUnchanged2.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1343,9 +1359,12 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertEquals("2017-10-20", RecordPath.compile("format(/date, 'yyyy-MM-dd')").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("2017-10-20 08:00:00", RecordPath.compile("format(/date, 'yyyy-MM-dd HH:mm:ss', 'GMT+8:00')").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
|
||||
final FieldValue fieldValueUnchanged = RecordPath.compile("format(/date, 'INVALID')").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(dateValue, fieldValueUnchanged.getValue());
|
||||
final FieldValue fieldValueUnchanged2 = RecordPath.compile("format(/date, 'INVALID', 'INVALID' )").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertEquals(dateValue, fieldValueUnchanged2.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1362,6 +1381,7 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM')").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("John Doe", RecordPath.compile("format(/name, 'yyyy-MM', 'GMT+8:00')").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1019,6 +1019,15 @@ public class DataTypeUtils {
|
||||
return df;
|
||||
}
|
||||
|
||||
public static DateFormat getDateFormat(final String format, final String timezoneID) {
|
||||
if (format == null || timezoneID == null) {
|
||||
return null;
|
||||
}
|
||||
final DateFormat df = new SimpleDateFormat(format);
|
||||
df.setTimeZone(TimeZone.getTimeZone(timezoneID));
|
||||
return df;
|
||||
}
|
||||
|
||||
public static boolean isTimeTypeCompatible(final Object value, final String format) {
|
||||
return isDateTypeCompatible(value, format);
|
||||
}
|
||||
|
@ -494,7 +494,7 @@ and a record such as:
|
||||
----
|
||||
{
|
||||
"name" : "My Event",
|
||||
"eventDate" : "2017-10-20'T'11:00:00'Z'"
|
||||
"eventDate" : "2017-10-20T00:00:00Z"
|
||||
}
|
||||
----
|
||||
|
||||
@ -502,6 +502,8 @@ The following record path would parse the eventDate field into a Date:
|
||||
|
||||
`toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")`
|
||||
|
||||
`toDate( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'", "GMT+8:00")`
|
||||
|
||||
=== toString
|
||||
|
||||
Converts a value to a String, using the given character set if the input type is "bytes". For example,
|
||||
@ -561,10 +563,11 @@ The following record path would convert the String field into a byte array using
|
||||
|
||||
=== format
|
||||
|
||||
Converts a Date to a String in the given format.
|
||||
Converts a Date to a String in the given format with the given time zone(optional, default time zone is GMT).
|
||||
|
||||
The first argument to this function must be a Date or a Number, and the second argument must be a format String that
|
||||
follows the Java SimpleDateFormat.
|
||||
follows the Java SimpleDateFormat, and the third argument, optional, must be a format String that
|
||||
either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom ID such as "GMT-8:00"
|
||||
|
||||
For example, given a schema such as:
|
||||
|
||||
@ -592,8 +595,10 @@ The following record path expressions would format the date as a String:
|
||||
|
||||
|==========================================================
|
||||
| RecordPath | Return value
|
||||
| `format( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` | 2017-10-20'T'11:00:00'Z'
|
||||
| `format( /eventDate, "yyyy-MM-dd'T'HH:mm:ss'Z'")` | 2017-10-20T00:00:00Z
|
||||
| `format( /eventDate, "yyyy-MM-dd")` | 2017-10-20
|
||||
| `format( /eventDate, "yyyy-MM-dd HH:mm:ss Z", "GMT+8:00")` | 2017-10-20 08:00:00 +0800
|
||||
| `format( /eventDate, "yyyy-MM-dd", "GMT+8:00")` | 2017-10-20
|
||||
|==========================================================
|
||||
|
||||
In the case where the field is declared as a String, the toDate function must be called before formatting.
|
||||
@ -616,7 +621,7 @@ and a record such as:
|
||||
----
|
||||
{
|
||||
"name" : "My Event",
|
||||
"eventDate" : "2017-10-20'T'11:00:00'Z'"
|
||||
"eventDate" : "2017-10-20T00:00:00Z"
|
||||
}
|
||||
----
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user