NIFI-12885 Added Record Methods for Local and Offset Dates (#8502)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
knguyen1 2024-04-25 00:23:54 -04:00 committed by GitHub
parent bb44ffe357
commit b6d044853c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 166 additions and 32 deletions

View File

@ -32,11 +32,12 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@ -279,10 +280,23 @@ public class MapRecord implements Record {
}
@Override
public Date getAsDate(final String fieldName, final String format) {
final FieldConverter<Object, LocalDate> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(LocalDate.class);
final LocalDate localDate = converter.convertField(getValue(fieldName), Optional.ofNullable(format), fieldName);
return localDate == null ? null : java.sql.Date.valueOf(localDate);
public LocalDate getAsLocalDate(final String fieldName, final String format) {
return convertFieldToDateTime(LocalDate.class, fieldName, format);
}
@Override
public LocalDateTime getAsLocalDateTime(String fieldName, String format) {
return convertFieldToDateTime(LocalDateTime.class, fieldName, format);
}
@Override
public OffsetDateTime getAsOffsetDateTime(final String fieldName, final String format) {
return convertFieldToDateTime(OffsetDateTime.class, fieldName, format);
}
private <T> T convertFieldToDateTime(Class<T> clazz, String fieldName, String format) {
final FieldConverter<Object, T> converter = StandardFieldConverterRegistry.getRegistry().getFieldConverter(clazz);
return converter.convertField(getValue(fieldName), Optional.ofNullable(format), fieldName);
}
@Override

View File

@ -17,10 +17,13 @@
package org.apache.nifi.serialization.record;
import java.util.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
public interface Record {
@ -107,7 +110,11 @@ public interface Record {
Boolean getAsBoolean(String fieldName);
Date getAsDate(String fieldName, String format);
LocalDate getAsLocalDate(String fieldName, String format);
LocalDateTime getAsLocalDateTime(String fieldName, String format);
OffsetDateTime getAsOffsetDateTime(String fieldName, String format);
Object[] getAsArray(String fieldName);

View File

@ -275,7 +275,6 @@ public class ResultSetRecordSetTest {
final Boolean bitValue = Boolean.FALSE;
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate);
final Timestamp timestampValue = Timestamp.valueOf(testDateTime);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
@ -295,7 +294,7 @@ public class ResultSetRecordSetTest {
when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue);
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(testDate);
when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
@ -319,7 +318,7 @@ public class ResultSetRecordSetTest {
assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN));
assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR));
assertEquals(dateValue, record.getAsDate(COLUMN_NAME_DATE, null));
assertEquals(testDate, record.getAsLocalDate(COLUMN_NAME_DATE, null));
final Object timestampObject = record.getValue(COLUMN_NAME_TIMESTAMP);
assertEquals(timestampValue, timestampObject);

View File

@ -21,8 +21,16 @@ import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.TriFunction;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -30,6 +38,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -38,7 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestMapRecord {
class TestMapRecord {
private static final String ISO_LOCAL_DATE = "yyyy-MM-dd";
private static final String ISO_LOCAL_DATE_TIME = "yyyy-MM-dd'T'HH:mm:ss.SSS";
private static final String ISO_OFFSET_DATE_TIME = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final List<RecordField> STRING_NUMBER_FIELDS = List.of(
new RecordField("string", RecordFieldType.STRING.getDataType()),
@ -47,7 +60,7 @@ public class TestMapRecord {
@Test
public void testRenameClearsSerializedForm() {
void testRenameClearsSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8));
final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test"));
@ -58,7 +71,7 @@ public class TestMapRecord {
}
@Test
public void testRemoveClearsSerializedForm() {
void testRemoveClearsSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8));
final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test"));
@ -69,7 +82,7 @@ public class TestMapRecord {
}
@Test
public void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
void testRenameRemoveInvalidFieldsToNotClearSerializedForm() {
final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8));
final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test"));
@ -85,7 +98,7 @@ public class TestMapRecord {
}
@Test
public void testIncorporateInactiveFieldsWithUpdate() {
void testIncorporateInactiveFieldsWithUpdate() {
final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8));
final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test"));
@ -105,7 +118,7 @@ public class TestMapRecord {
}
@Test
public void testIncorporateInactiveFieldsWithConflict() {
void testIncorporateInactiveFieldsWithConflict() {
final Map<String, Object> values = new HashMap<>(Map.of("string", "hello", "number", 8));
final RecordSchema schema = new SimpleRecordSchema(STRING_NUMBER_FIELDS);
final Record record = new MapRecord(schema, values, SerializedForm.of("Hello there", "text/unit-test"));
@ -127,7 +140,7 @@ public class TestMapRecord {
}
@Test
public void testDefaultValue() {
void testDefaultValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
@ -141,7 +154,7 @@ public class TestMapRecord {
}
@Test
public void testDefaultValueInGivenField() {
void testDefaultValueInGivenField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
@ -158,7 +171,7 @@ public class TestMapRecord {
}
@Test
public void testIllegalDefaultValue() {
void testIllegalDefaultValue() {
new RecordField("hello", RecordFieldType.STRING.getDataType(), 84);
new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null);
new RecordField("hello", RecordFieldType.INT.getDataType(), 84);
@ -168,15 +181,11 @@ public class TestMapRecord {
}
private Set<String> set(final String... values) {
final Set<String> set = new LinkedHashSet<>();
for (final String value : values) {
set.add(value);
}
return set;
return new LinkedHashSet<>(Arrays.asList(values));
}
@Test
public void testAliasOneValue() {
void testAliasOneValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@ -191,7 +200,7 @@ public class TestMapRecord {
}
@Test
public void testAliasConflictingValues() {
void testAliasConflictingValues() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@ -207,7 +216,7 @@ public class TestMapRecord {
}
@Test
public void testAliasConflictingAliasValues() {
void testAliasConflictingAliasValues() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@ -223,7 +232,7 @@ public class TestMapRecord {
}
@Test
public void testAliasInGivenField() {
void testAliasInGivenField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
@ -246,7 +255,7 @@ public class TestMapRecord {
@Test
public void testDefaultValueWithAliasValue() {
void testDefaultValueWithAliasValue() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
@ -262,7 +271,7 @@ public class TestMapRecord {
}
@Test
public void testDefaultValueWithAliasesDefined() {
void testDefaultValueWithAliasesDefined() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
@ -275,7 +284,7 @@ public class TestMapRecord {
}
@Test
public void testNestedSchema() {
void testNestedSchema() {
final String FOO_TEST_VAL = "test!";
final String NESTED_RECORD_VALUE = "Hello, world!";
@ -323,4 +332,77 @@ public class TestMapRecord {
}
}
@ParameterizedTest
@MethodSource("provideLocalDates")
void testGettingLocalDate(final String input, final String format, LocalDate expectedDate) {
executeDateTimeTest(input, format, expectedDate, MapRecord::getAsLocalDate);
}
@ParameterizedTest
@MethodSource("provideLocalDateTimes")
void testGettingLocalDateTime(final String input, final String format, LocalDateTime expectedDateTime) {
executeDateTimeTest(input, format, expectedDateTime, MapRecord::getAsLocalDateTime);
}
@ParameterizedTest
@MethodSource("provideOffsetDateTimes")
void testGettingOffsetDateTime(final String input, final String format, OffsetDateTime expectedOffsetDateTime) {
executeDateTimeTest(input, format, expectedOffsetDateTime, MapRecord::getAsOffsetDateTime);
}
private <T> void executeDateTimeTest(final String input,
final String format,
final Object expectedDateTime,
TriFunction<MapRecord, String, String, T> dateTimeFunction) {
// create a `MapRecord` from the input
final List<RecordField> fields = new ArrayList<>();
final String timestampFieldName = "timestamp";
fields.add(new RecordField(timestampFieldName, RecordFieldType.TIMESTAMP.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final HashMap<String, Object> item = new HashMap<>();
item.put(timestampFieldName, input);
final MapRecord testRecord = new MapRecord(schema, item);
// apply the datetime function to the record and compare
final T actualDateTime = dateTimeFunction.apply(testRecord, timestampFieldName, format);
assertEquals(expectedDateTime, actualDateTime);
}
private static Stream<Arguments> provideLocalDates() {
return Stream.of(
Arguments.of("2022-01-01", ISO_LOCAL_DATE, LocalDate.parse("2022-01-01")),
Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME, LocalDate.parse("2022-01-01")),
Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME, LocalDate.parse("2017-06-23")),
Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDate.parse("2020-02-29")), // leap year
Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME, LocalDate.parse("2024-03-10")), // DST transition
// test minimum and maximum values
Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME, LocalDate.parse("0001-01-01")),
Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDate.parse("9999-12-31"))
);
}
private static Stream<Arguments> provideLocalDateTimes() {
return Stream.of(
Arguments.of("2022-01-01T12:34:56.789", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2022-01-01T12:34:56.789")),
Arguments.of("2017-06-23T01:02:03.456", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2017-06-23T01:02:03.456")),
Arguments.of("2020-02-29T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2020-02-29T23:59:59.999")), // leap year
Arguments.of("2024-03-10T02:00:00.000", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("2024-03-10T02:00:00.000")), // DST transition
// test minimum and maximum values
Arguments.of("0001-01-01T00:00:00.000", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("0001-01-01T00:00:00.000")),
Arguments.of("9999-12-31T23:59:59.999", ISO_LOCAL_DATE_TIME, LocalDateTime.parse("9999-12-31T23:59:59.999"))
);
}
private static Stream<Arguments> provideOffsetDateTimes() {
return Stream.of(
Arguments.of("2022-01-01T12:34:56.789+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2022-01-01T12:34:56.789+00:00")),
Arguments.of("2017-06-23T01:02:03.456+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2017-06-23T01:02:03.456+00:00")),
Arguments.of("2020-02-29T23:59:59.999+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2020-02-29T23:59:59.999+00:00")), // leap year
Arguments.of("2024-03-10T02:00:00.000+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("2024-03-10T02:00:00.000+00:00")), // DST transition
// test minimum and maximum values
Arguments.of("0001-01-01T00:00:00.000+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("0001-01-01T00:00:00.000+00:00")),
Arguments.of("9999-12-31T23:59:59.999+00:00", ISO_OFFSET_DATE_TIME, OffsetDateTime.parse("9999-12-31T23:59:59.999+00:00"))
);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.util;
import java.util.Objects;
import java.util.function.Function;
@FunctionalInterface
public interface TriFunction<S, T, U, R> {
R apply(S s, T t, U u);
default <V> TriFunction<S, T, U, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (S s, T t, U u) -> after.apply(apply(s, t, u));
}
}

View File

@ -135,7 +135,7 @@ public class TestQueryRecord {
assertArrayEquals(new String[] { "red", "green"}, (Object[]) output.getValue("colors"));
assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[]) output.getValue("names"));
assertEquals(java.sql.Date.valueOf(ISO_DATE), output.getAsDate("joinTime", ISO_DATE_FORMAT));
assertEquals(java.time.LocalDate.parse(ISO_DATE), output.getAsLocalDate("joinTime", ISO_DATE_FORMAT));
assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
}