NIFI-12710 Support microsecond precision for Timestamp Record fields

- PutDatabaseRecordIT supports operating systems with either nanosecond or microsecond precision

This closes #8332

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-01-31 21:33:59 -05:00 committed by exceptionfactory
parent a519585b02
commit 5f534dcc42
No known key found for this signature in database
6 changed files with 448 additions and 16 deletions

View File

@ -30,6 +30,8 @@ import java.util.Optional;
* Convert Object to java.time.LocalDateTime using instanceof evaluation and optional format pattern for DateTimeFormatter
*/
class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalDateTime> {
private static final long YEAR_TEN_THOUSAND = 253_402_300_800_000L;
/**
* Convert Object field to java.sql.Timestamp using optional format supported in DateTimeFormatter
*
@ -51,10 +53,13 @@ class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalD
final Instant instant = Instant.ofEpochMilli(date.getTime());
return ofInstant(instant);
}
if (field instanceof Number) {
final Number number = (Number) field;
final Instant instant = Instant.ofEpochMilli(number.longValue());
return ofInstant(instant);
if (field instanceof final Number number) {
// If value is a floating point number, we consider it as seconds since epoch plus a decimal part for fractions of a second.
if (field instanceof Double || field instanceof Float) {
return toLocalDateTime(number.doubleValue());
}
return toLocalDateTime(number.longValue());
}
if (field instanceof String) {
final String string = field.toString().trim();
@ -67,22 +72,60 @@ class ObjectLocalDateTimeFieldConverter implements FieldConverter<Object, LocalD
try {
return LocalDateTime.parse(string, formatter);
} catch (final DateTimeParseException e) {
throw new FieldConversionException(LocalDateTime.class, field, name, e);
return tryParseAsNumber(string, name);
}
} else {
try {
final long number = Long.parseLong(string);
final Instant instant = Instant.ofEpochMilli(number);
return ofInstant(instant);
} catch (final NumberFormatException e) {
throw new FieldConversionException(LocalDateTime.class, field, name, e);
}
return tryParseAsNumber(string, name);
}
}
throw new FieldConversionException(LocalDateTime.class, field, name);
}
private LocalDateTime tryParseAsNumber(final String value, final String fieldName) {
try {
// If decimal, treat as a double and convert to seconds and nanoseconds.
if (value.contains(".")) {
final double number = Double.parseDouble(value);
return toLocalDateTime(number);
}
// attempt to parse as a long value
final long number = Long.parseLong(value);
return toLocalDateTime(number);
} catch (final NumberFormatException e) {
throw new FieldConversionException(LocalDateTime.class, value, fieldName, e);
}
}
private LocalDateTime toLocalDateTime(final double secondsSinceEpoch) {
// Determine the number of micros past the second by subtracting the number of seconds from the decimal value and multiplying by 1 million.
final double micros = 1_000_000 * (secondsSinceEpoch - (long) secondsSinceEpoch);
// Convert micros to nanos. Note that we perform this as a separate operation, rather than multiplying by 1_000,000,000 in order to avoid
// issues that occur with rounding at high precision.
final long nanos = (long) micros * 1000L;
return toLocalDateTime((long) secondsSinceEpoch, nanos);
}
private LocalDateTime toLocalDateTime(final long epochSeconds, final long nanosPastSecond) {
final Instant instant = Instant.ofEpochSecond(epochSeconds).plusNanos(nanosPastSecond);
return ofInstant(instant);
}
private LocalDateTime toLocalDateTime(final long value) {
if (value > YEAR_TEN_THOUSAND) {
// Value is too large. Assume microseconds instead of milliseconds.
final Instant microsInstant = Instant.ofEpochSecond(value / 1_000_000, (value % 1_000_000) * 1_000);
return ofInstant(microsInstant);
}
final Instant instant = Instant.ofEpochMilli(value);
final LocalDateTime localDateTime = ofInstant(instant);
return localDateTime;
}
private LocalDateTime ofInstant(final Instant instant) {
return LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.serialization.record.field;
import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestObjectLocalDateTimeFieldConverter {
private static final String FIELD_NAME = "test";
private static final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
private static final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
private static final String MICROS_TIMESTAMP_STRING = Long.toString(MICROS_TIMESTAMP_LONG);
private static final double MICROS_TIMESTAMP_DOUBLE = ((double) MICROS_TIMESTAMP_LONG) / 1000000D;
private static final long NANOS_AFTER_SECOND = 351567000L;
private static final Instant INSTANT_MILLIS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG);
// Create an instant to represent the same time as the microsecond precision timestamp. We add nanoseconds after second but then have to subtract the milliseconds after the second that are already
// present in the MILLIS_TIMESTAMP_LONG value.
private static final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG % 1000);
private static final LocalDateTime LOCAL_DATE_TIME_MILLIS_PRECISION = LocalDateTime.ofInstant(INSTANT_MILLIS_PRECISION, ZoneId.systemDefault());
private static final LocalDateTime LOCAL_DATE_TIME_MICROS_PRECISION = LocalDateTime.ofInstant(INSTANT_MICROS_PRECISION, ZoneId.systemDefault());
private final ObjectLocalDateTimeFieldConverter converter = new ObjectLocalDateTimeFieldConverter();
@Test
public void testConvertTimestampMillis() {
final LocalDateTime result = converter.convertField(MILLIS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MILLIS_PRECISION, result);
}
@Test
public void testConvertTimestampMicros() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_LONG, Optional.empty(), FIELD_NAME);
assertEquals(MILLIS_TIMESTAMP_LONG, result.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
final Instant resultInstant = result.atZone(ZoneId.systemDefault()).toInstant();
assertEquals(NANOS_AFTER_SECOND, resultInstant.getNano());
}
@Test
public void testDoubleAsEpochSeconds() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_DOUBLE, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
assertEquals(NANOS_AFTER_SECOND, result.getNano(), 1D);
}
@Test
public void testDoubleAsEpochSecondsAsString() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_STRING, Optional.empty(), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
final double expectedNanos = 351567000L;
assertEquals(expectedNanos, result.getNano(), 1D);
}
@Test
public void testWithDateFormatMillisPrecision() {
final long millis = System.currentTimeMillis();
final LocalDateTime result = converter.convertField(millis, Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSS"), FIELD_NAME);
assertEquals(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault()), result);
}
@Test
public void testWithDateFormatMicrosecondPrecision() {
final LocalDateTime result = converter.convertField(MICROS_TIMESTAMP_LONG, Optional.of("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"), FIELD_NAME);
assertEquals(LOCAL_DATE_TIME_MICROS_PRECISION, result);
}
}

View File

@ -359,7 +359,7 @@ public class RegexDateTimeMatcher implements DateTimeMatcher {
addSecondInMinute();
break;
case 'S':
addMillisecond();
addSubsecond();
break;
case 'z':
addGeneralTimeZone();
@ -468,9 +468,9 @@ public class RegexDateTimeMatcher implements DateTimeMatcher {
range = range.plus(1, 2);
}
private void addMillisecond() {
patterns.add("\\d{1,3}");
range = range.plus(1, 3);
private void addSubsecond() {
patterns.add("\\d{1," + charCount + "}");
range = range.plus(1, charCount);
}
private void addGeneralTimeZone() {

View File

@ -488,6 +488,36 @@
<artifactId>nifi-json-schema-shared</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<!-- Test Dependencies for database processors -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.19.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
@ -747,6 +777,7 @@
<exclude>src/test/resources/TestXml/xml-bundle-1</exclude>
<exclude>src/test/resources/xxe_from_report.xml</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/PutDatabaseRecordIT/create-person-table.sql</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.dbcp.DBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.Month;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("resource")
public class PutDatabaseRecordIT {
private final long MILLIS_TIMESTAMP_LONG = 1707238288351L;
private final long MICROS_TIMESTAMP_LONG = 1707238288351567L;
private final String MICROS_TIMESTAMP_FORMATTED = "2024-02-06 11:51:28.351567";
private final double MICROS_TIMESTAMP_DOUBLE = ((double) MICROS_TIMESTAMP_LONG) / 1000000D;
private final long NANOS_AFTER_SECOND = 351567000L;
private final Instant INSTANT_MICROS_PRECISION = Instant.ofEpochMilli(MILLIS_TIMESTAMP_LONG).plusNanos(NANOS_AFTER_SECOND).minusMillis(MILLIS_TIMESTAMP_LONG % 1000);
private static PostgreSQLContainer<?> postgres;
private TestRunner runner;
@BeforeAll
public static void startPostgres() {
postgres = new PostgreSQLContainer<>("postgres:9.6.12")
.withInitScript("PutDatabaseRecordIT/create-person-table.sql");
postgres.start();
}
@AfterAll
public static void cleanup() {
if (postgres != null) {
postgres.close();
postgres = null;
}
}
@BeforeEach
public void setup() throws InitializationException, SQLException {
truncateTable();
runner = TestRunners.newTestRunner(PutDatabaseRecord.class);
final DBCPConnectionPool connectionPool = new DBCPConnectionPool();
runner.addControllerService("connectionPool", connectionPool);
runner.setProperty(connectionPool, DBCPProperties.DATABASE_URL, postgres.getJdbcUrl());
runner.setProperty(connectionPool, DBCPProperties.DB_USER, postgres.getUsername());
runner.setProperty(connectionPool, DBCPProperties.DB_PASSWORD, postgres.getPassword());
runner.setProperty(connectionPool, DBCPProperties.DB_DRIVERNAME, postgres.getDriverClassName());
runner.enableControllerService(connectionPool);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("json-reader", jsonReader);
runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "yyyy-MM-dd");
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSSSS");
runner.enableControllerService(jsonReader);
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "json-reader");
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, "connectionPool");
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "person");
runner.setProperty(PutDatabaseRecord.DB_TYPE, "PostgreSQL");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, "INSERT");
}
@Test
public void testSimplePut() throws SQLException {
runner.enqueue("""
{
"name": "John Doe",
"age": 50,
"favorite_color": "blue"
}
""");
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
assertEquals("blue", results.get("favorite_color"));
}
@Test
public void testWithDate() throws SQLException {
runner.enqueue("""
{
"name": "John Doe",
"age": 50,
"dob": "1975-01-01"
}
""");
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
final Date dob = (Date) results.get("dob");
assertEquals(1975, dob.toLocalDate().getYear());
assertEquals(Month.JANUARY, dob.toLocalDate().getMonth());
assertEquals(1, dob.toLocalDate().getDayOfMonth());
}
@Test
public void testWithTimestampUsingMillis() throws SQLException {
runner.enqueue(createJson(MILLIS_TIMESTAMP_LONG));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
assertEquals(new Timestamp(MILLIS_TIMESTAMP_LONG), results.get("lasttransactiontime"));
}
@Test
public void testWithTimestampUsingMillisAsString() throws SQLException {
runner.enqueue(createJson(MILLIS_TIMESTAMP_LONG));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
assertEquals(new Timestamp(MILLIS_TIMESTAMP_LONG), results.get("lasttransactiontime"));
}
@Test
public void testWithStringTimestampUsingMicros() throws SQLException {
runner.enqueue(createJson(MICROS_TIMESTAMP_FORMATTED));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
final LocalDateTime transactionLocalTime = lastTransactionTime.toLocalDateTime();
assertEquals(2024, transactionLocalTime.getYear());
assertEquals(Month.FEBRUARY, transactionLocalTime.getMonth());
assertEquals(6, transactionLocalTime.getDayOfMonth());
assertEquals(11, transactionLocalTime.getHour());
assertEquals(51, transactionLocalTime.getMinute());
assertEquals(28, transactionLocalTime.getSecond());
assertEquals(351567000, transactionLocalTime.getNano());
}
@Test
public void testWithNumericTimestampUsingMicros() throws SQLException {
runner.enqueue(createJson(MICROS_TIMESTAMP_LONG));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
assertEquals(INSTANT_MICROS_PRECISION, lastTransactionTime.toInstant());
}
@Test
public void testWithDecimalTimestampUsingMicros() throws SQLException {
runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
assertEquals(INSTANT_MICROS_PRECISION, lastTransactionTime.toInstant());
}
@Test
public void testWithDecimalTimestampUsingMicrosAsString() throws SQLException {
runner.enqueue(createJson(Double.toString(MICROS_TIMESTAMP_DOUBLE)));
runner.run();
runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_SUCCESS, 1);
final Map<String, Object> results = getResults();
final Timestamp lastTransactionTime = (Timestamp) results.get("lasttransactiontime");
assertEquals(INSTANT_MICROS_PRECISION, lastTransactionTime.toInstant());
}
private static void truncateTable() throws SQLException {
try (final Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) {
final String sqlQuery = "TRUNCATE TABLE person";
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery)) {
preparedStatement.execute();
}
}
}
private Map<String, Object> getResults() throws SQLException {
try (final Connection connection = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) {
final String sqlQuery = "SELECT * FROM person";
final Map<String, Object> resultsMap = new HashMap<>();
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
final ResultSet resultSet = preparedStatement.executeQuery()) {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
final String columnName = metaData.getColumnName(i);
final Object columnValue = resultSet.getObject(i);
resultsMap.put(columnName, columnValue);
}
}
}
assertEquals("John Doe", resultsMap.get("name"));
assertEquals(50, resultsMap.get("age"));
return resultsMap;
}
}
private String createJson(final long lastTransactionTime) {
return createJson(Long.toString(lastTransactionTime));
}
private String createJson(final String lastTransactionTime) {
return """
{
"name": "John Doe",
"age": 50,
"lastTransactionTime": "%s"
}""".formatted(lastTransactionTime);
}
}

View File

@ -0,0 +1,7 @@
CREATE TABLE person (
name VARCHAR(255) NOT NULL,
age INT,
favorite_color VARCHAR(255),
dob DATE,
lastTransactionTime TIMESTAMP WITH TIME ZONE
);