NIFI-13579 Improved Timestamp Zone Offset Formatting and Parsing

- Improved Timestamp to String formatting to support patterns with zone offsets
- Improved String to Timestamp parsing to support adjusted hours and minutes when zone offset is included
This commit is contained in:
exceptionfactory 2024-10-09 01:46:14 -05:00 committed by dan-s1
parent 1b37d78403
commit cb2117f8e6
6 changed files with 285 additions and 9 deletions

View File

@ -21,8 +21,10 @@ import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.Optional;
@ -70,7 +72,7 @@ class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalD
if (pattern.isPresent()) {
final DateTimeFormatter formatter = DateTimeFormatterRegistry.getDateTimeFormatter(pattern.get());
try {
return LocalDateTime.parse(string, formatter);
return parseLocalDateTime(field, name, string, formatter);
} catch (final DateTimeParseException e) {
return tryParseAsNumber(string, name);
}
@ -82,6 +84,24 @@ class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalD
throw new FieldConversionException(LocalDateTime.class, field, name);
}
private LocalDateTime parseLocalDateTime(final Object field, final String name, final String string, final DateTimeFormatter formatter) {
final LocalDateTime parsed;
// Attempt ZonedDateTime parsing before LocalDateTime to handle zone offsets
final TemporalAccessor resolved = formatter.parseBest(string, ZonedDateTime::from, LocalDateTime::from);
if (resolved instanceof ZonedDateTime zonedDateTime) {
// Convert Instant to LocalDateTime using system default zone offset to incorporate adjusted hours and minutes
final Instant instant = zonedDateTime.toInstant();
parsed = ofInstant(instant);
} else if (resolved instanceof LocalDateTime localDateTime) {
parsed = localDateTime;
} else {
throw new FieldConversionException(LocalDateTime.class, field, name);
}
return parsed;
}
private LocalDateTime tryParseAsNumber(final String value, final String fieldName) {
try {
// If decimal, treat as a double and convert to seconds and nanoseconds.

View File

@ -26,16 +26,12 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* Convert Object to String using instanceof evaluation and optional format pattern for DateTimeFormatter
*/
class ObjectStringFieldConverter implements FieldConverter<Object, String> {
private static final Map<String, DateTimeFormatter> FORMATTERS = new ConcurrentHashMap<>();
/**
* Convert Object field to String using optional format supported in DateTimeFormatter
*
@ -59,7 +55,10 @@ class ObjectStringFieldConverter implements FieldConverter<Object, String> {
}
final DateTimeFormatter formatter = DateTimeFormatterRegistry.getDateTimeFormatter(pattern.get());
final LocalDateTime localDateTime = timestamp.toLocalDateTime();
return formatter.format(localDateTime);
// Convert LocalDateTime to ZonedDateTime using system default zone to support offsets in Date Time Formatter
final ZonedDateTime dateTime = ZonedDateTime.of(localDateTime, ZoneId.systemDefault());
return formatter.format(dateTime);
}
if (field instanceof java.util.Date date) {
if (pattern.isEmpty()) {

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.field;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.zone.ZoneRules;
import java.util.Date;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
class ObjectStringFieldConverterTest {
private static final ObjectStringFieldConverter CONVERTER = new ObjectStringFieldConverter();
private static final String DEFAULT_PATTERN = RecordFieldType.TIMESTAMP.getDefaultFormat();
private static final String FIELD_NAME = Timestamp.class.getSimpleName();
private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
private static final String DATE_TIME_NANOSECONDS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSSSSS";
private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789";
private static final String DATE_TIME_ZONE_OFFSET_PATTERN = "yyyy-MM-dd HH:mm:ssZZZZZ";
@Test
void testConvertFieldNull() {
final String string = CONVERTER.convertField(null, Optional.of(DEFAULT_PATTERN), FIELD_NAME);
assertNull(string);
}
@Test
void testConvertFieldTimestampDefaultPattern() {
final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final String string = CONVERTER.convertField(timestamp, Optional.of(DEFAULT_PATTERN), FIELD_NAME);
assertEquals(DATE_TIME_DEFAULT, string);
}
@Test
void testConvertFieldTimestampNanoseconds() {
final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_NANOSECONDS);
final String string = CONVERTER.convertField(timestamp, Optional.of(DATE_TIME_NANOSECONDS_PATTERN), FIELD_NAME);
assertEquals(DATE_TIME_NANOSECONDS, string);
}
@Test
void testConvertFieldTimestampEmptyPattern() {
final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final String string = CONVERTER.convertField(timestamp, Optional.empty(), FIELD_NAME);
final String expected = Long.toString(timestamp.getTime());
assertEquals(expected, string);
}
@Test
void testConvertFieldTimestampZoneOffsetPattern() {
final Timestamp timestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final String string = CONVERTER.convertField(timestamp, Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
final String dateTimeZoneOffsetExpected = getDateTimeZoneOffset();
assertEquals(dateTimeZoneOffsetExpected, string);
}
@Test
void testConvertFieldDateDefaultPattern() {
final Date date = new Date(Timestamp.valueOf(DATE_TIME_DEFAULT).getTime());
final String string = CONVERTER.convertField(date, Optional.of(DEFAULT_PATTERN), FIELD_NAME);
assertEquals(DATE_TIME_DEFAULT, string);
}
@Test
void testConvertFieldDateEmptyPattern() {
final Date date = new Date(Timestamp.valueOf(DATE_TIME_DEFAULT).getTime());
final String string = CONVERTER.convertField(date, Optional.empty(), FIELD_NAME);
final String expected = Long.toString(date.getTime());
assertEquals(expected, string);
}
@Test
void testConvertFieldDateZoneOffsetPattern() {
final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final long inputTime = inputTimestamp.getTime();
final Date date = new Date(inputTime);
final String string = CONVERTER.convertField(date, Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
final String dateTimeZoneOffsetExpected = getDateTimeZoneOffset();
assertEquals(dateTimeZoneOffsetExpected, string);
}
private String getDateTimeZoneOffset() {
final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final LocalDateTime inputLocalDateTime = inputTimestamp.toLocalDateTime();
final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
final ZoneRules zoneRules = systemDefaultZoneId.getRules();
final ZoneOffset inputZoneOffset = zoneRules.getOffset(inputLocalDateTime);
final String inputZoneOffsetId = inputZoneOffset.getId();
// Get Date Time with Zone Offset from current system configuration
return DATE_TIME_DEFAULT + inputZoneOffsetId;
}
}

View File

@ -20,6 +20,12 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.zone.ZoneRules;
import java.util.Date;
import java.util.Optional;
@ -39,6 +45,10 @@ public class ObjectTimestampFieldConverterTest {
private static final String DATE_TIME_DEFAULT = "2000-01-01 12:00:00";
private static final String DATE_TIME_ZONE_OFFSET_PATTERN = "yyyy-MM-dd HH:mm:ssZZZZZ";
private static final String DATE_TIME_UTC_OFFSET = "2000-01-01 12:00:00+00:00";
private static final Optional<String> DATE_TIME_NANOSECONDS_PATTERN = Optional.of("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
private static final String DATE_TIME_NANOSECONDS = "2000-01-01 12:00:00.123456789";
@ -110,4 +120,42 @@ public class ObjectTimestampFieldConverterTest {
final FieldConversionException exception = assertThrows(FieldConversionException.class, () -> CONVERTER.convertField(DATE_TIME_DEFAULT, DATE_TIME_NANOSECONDS_PATTERN, FIELD_NAME));
assertTrue(exception.getMessage().contains(DATE_TIME_DEFAULT));
}
@Test
public void testConvertFieldStringFormatCustomZoneOffsetSystemDefault() {
final String dateTimeZoneOffset = getDateTimeZoneOffset();
final Timestamp timestamp = CONVERTER.convertField(dateTimeZoneOffset, Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
final Timestamp expected = Timestamp.valueOf(DATE_TIME_DEFAULT);
assertEquals(expected, timestamp);
}
@Test
public void testConvertFieldStringFormatCustomZoneOffsetCoordinatedUniversalTime() {
final Timestamp timestamp = CONVERTER.convertField(DATE_TIME_UTC_OFFSET, Optional.of(DATE_TIME_ZONE_OFFSET_PATTERN), FIELD_NAME);
final Timestamp expected = getDateTimeCoordinatedUniversalTime();
assertEquals(expected, timestamp);
}
private Timestamp getDateTimeCoordinatedUniversalTime() {
final Timestamp dateTime = Timestamp.valueOf(DATE_TIME_DEFAULT);
final LocalDateTime localDateTime = dateTime.toLocalDateTime();
final ZonedDateTime zonedDateTime = ZonedDateTime.of(localDateTime, ZoneOffset.UTC);
final Instant instant = zonedDateTime.toInstant();
final LocalDateTime localDateTimeAdjusted = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
return Timestamp.valueOf(localDateTimeAdjusted);
}
private String getDateTimeZoneOffset() {
final Timestamp inputTimestamp = Timestamp.valueOf(DATE_TIME_DEFAULT);
final LocalDateTime inputLocalDateTime = inputTimestamp.toLocalDateTime();
final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
final ZoneRules zoneRules = systemDefaultZoneId.getRules();
final ZoneOffset inputZoneOffset = zoneRules.getOffset(inputLocalDateTime);
final String inputZoneOffsetId = inputZoneOffset.getId();
// Get Date Time with Zone Offset from current system configuration
return DATE_TIME_DEFAULT + inputZoneOffsetId;
}
}

View File

@ -69,7 +69,9 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -512,8 +514,8 @@ public class TestIcebergRecordConverter {
values.put("binary", "hello".getBytes());
values.put("date", "2017-04-04");
values.put("time", "14:20:33.000");
values.put("timestamp", "2017-04-04 14:20:33.789-0500");
values.put("timestampTz", "2017-04-04 14:20:33.789-0500");
values.put("timestamp", Timestamp.valueOf(LOCAL_DATE_TIME));
values.put("timestampTz", Timestamp.valueOf(LOCAL_DATE_TIME));
values.put("uuid", "0000-00-00-00-000000");
values.put("choice", "10");
@ -764,7 +766,8 @@ public class TestIcebergRecordConverter {
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.getFirst();
OffsetDateTime offsetDateTime = OffsetDateTime.of(LOCAL_DATE_TIME, ZoneOffset.ofHours(-5));
final ZonedDateTime zonedDateTime = ZonedDateTime.of(LOCAL_DATE_TIME, ZoneId.systemDefault());
OffsetDateTime offsetDateTime = zonedDateTime.toOffsetDateTime();
assertEquals("123", resultRecord.get(0, String.class));
assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));

View File

@ -53,6 +53,11 @@ import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.zone.ZoneRules;
import java.util.Map;
import java.util.Optional;
@ -577,6 +582,64 @@ public class TestValidateRecord {
validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
}
@Test
public void testValidateCsvTimestampZoneOffset() throws InitializationException {
final String recordSchema = """
{
"name": "ts",
"namespace": "nifi",
"type": "record",
"fields": [{
"name": "created",
"type": {
"type": "long", "logicalType": "timestamp-millis"
}
}]
}
""";
final String inputDateTime = "2020-01-01 12:00:00";
final Timestamp inputTimestamp = Timestamp.valueOf(inputDateTime);
final LocalDateTime inputLocalDateTime = inputTimestamp.toLocalDateTime();
final String systemZoneOffsetId = getSystemZoneOffsetId(inputLocalDateTime);
final String serializedRecord = inputDateTime + systemZoneOffsetId;
final String timestampFormat = "yyyy-MM-dd HH:mm:ssZZZZZ";
final String readerServiceId = "reader";
final String writerServiceId = "writer";
final CSVReader recordReader = new CSVReader();
runner.addControllerService(readerServiceId, recordReader);
runner.setProperty(recordReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(recordReader, SCHEMA_TEXT, recordSchema);
runner.setProperty(recordReader, DateTimeUtils.TIMESTAMP_FORMAT, timestampFormat);
runner.enableControllerService(recordReader);
final CSVRecordSetWriter recordSetWriter = new CSVRecordSetWriter();
runner.addControllerService(writerServiceId, recordSetWriter);
runner.setProperty(recordSetWriter, "Schema Write Strategy", JsonRecordSetWriter.AVRO_SCHEMA_ATTRIBUTE.getValue());
runner.setProperty(recordSetWriter, DateTimeUtils.TIMESTAMP_FORMAT, timestampFormat);
runner.setProperty(recordSetWriter, CSVUtils.INCLUDE_HEADER_LINE, Boolean.FALSE.toString());
runner.enableControllerService(recordSetWriter);
runner.setProperty(ValidateRecord.RECORD_READER, readerServiceId);
runner.setProperty(ValidateRecord.RECORD_WRITER, writerServiceId);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, recordSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, writerServiceId);
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, Boolean.TRUE.toString());
runner.enqueue(serializedRecord);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
final String flowFileContent = validFlowFile.getContent().trim();
assertEquals(serializedRecord, flowFileContent);
}
@Test
public void testValidateMaps() throws IOException, InitializationException, MalformedRecordException {
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc"));
@ -918,4 +981,12 @@ public class TestValidateRecord {
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match the schema: [/id]");
}
private String getSystemZoneOffsetId(final LocalDateTime inputLocalDateTime) {
final ZoneId systemDefaultZoneId = ZoneOffset.systemDefault();
final ZoneRules zoneRules = systemDefaultZoneId.getRules();
final ZoneOffset systemZoneOffset = zoneRules.getOffset(inputLocalDateTime);
return systemZoneOffset.getId();
}
}