NIFI-8023: Convert java.sql.Date between UTC/local time zone normalized forms before/after database operations

This closes #4781

Signed-off-by: David Handermann <exceptionfactory@gmail.com>
This commit is contained in:
Peter Turcsanyi 2021-01-25 21:15:26 +00:00 committed by exceptionfactory
parent 0a10557dd5
commit 67d06003b7
9 changed files with 277 additions and 59 deletions

View File

@ -18,6 +18,7 @@
package org.apache.nifi.serialization.record; package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -149,6 +150,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return null; return null;
} }
if (value instanceof java.sql.Date) {
// Date objects should be stored in records as UTC normalized dates (UTC 00:00:00)
// but they come from the driver in JVM's local time zone 00:00:00 and need to be converted.
return DataTypeUtils.convertDateToUTC((java.sql.Date) value);
}
if (value instanceof List) { if (value instanceof List) {
return ((List) value).toArray(); return ((List) value).toArray();
} }

View File

@ -49,6 +49,10 @@ import java.sql.Types;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -1085,6 +1089,32 @@ public class DataTypeUtils {
throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName); throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
} }
/**
* Converts a java.sql.Date object in local time zone (typically coming from a java.sql.ResultSet and having 00:00:00 time part)
* to UTC normalized form (storing the epoch corresponding to the UTC time with the same date/time as the input).
*
* @param dateLocalTZ java.sql.Date in local time zone
* @return java.sql.Date in UTC normalized form
*/
public static Date convertDateToUTC(Date dateLocalTZ) {
ZonedDateTime zdtLocalTZ = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
ZonedDateTime zdtUTC = zdtLocalTZ.withZoneSameLocal(ZoneOffset.UTC);
return new Date(zdtUTC.toInstant().toEpochMilli());
}
/**
* Converts a java.sql.Date object in UTC normalized form
* to local time zone (storing the epoch corresponding to the local time with the same date/time as the input).
*
* @param dateUTC java.sql.Date in UTC normalized form
* @return java.sql.Date in local time zone
*/
public static Date convertDateToLocalTZ(Date dateUTC) {
ZonedDateTime zdtUTC = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneOffset.UTC);
ZonedDateTime zdtLocalTZ = zdtUTC.withZoneSameLocal(ZoneId.systemDefault());
return new Date(zdtLocalTZ.toInstant().toEpochMilli());
}
public static boolean isDateTypeCompatible(final Object value, final String format) { public static boolean isDateTypeCompatible(final Object value, final String format) {
if (value == null) { if (value == null) {
return false; return false;

View File

@ -18,7 +18,6 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.DecimalDataType; import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -27,35 +26,61 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class ResultSetRecordSetTest { public class ResultSetRecordSetTest {
private static final String COLUMN_NAME_VARCHAR = "varchar";
private static final String COLUMN_NAME_BIGINT = "bigint";
private static final String COLUMN_NAME_ROWID = "rowid";
private static final String COLUMN_NAME_BIT = "bit";
private static final String COLUMN_NAME_BOOLEAN = "boolean";
private static final String COLUMN_NAME_CHAR = "char";
private static final String COLUMN_NAME_DATE = "date";
private static final String COLUMN_NAME_INTEGER = "integer";
private static final String COLUMN_NAME_DOUBLE = "double";
private static final String COLUMN_NAME_REAL = "real";
private static final String COLUMN_NAME_FLOAT = "float";
private static final String COLUMN_NAME_SMALLINT = "smallint";
private static final String COLUMN_NAME_TINYINT = "tinyint";
private static final String COLUMN_NAME_BIG_DECIMAL_1 = "bigDecimal1";
private static final String COLUMN_NAME_BIG_DECIMAL_2 = "bigDecimal2";
private static final String COLUMN_NAME_BIG_DECIMAL_3 = "bigDecimal3";
private static final String COLUMN_NAME_BIG_DECIMAL_4 = "bigDecimal4";
private static final Object[][] COLUMNS = new Object[][] { private static final Object[][] COLUMNS = new Object[][] {
// column number; column label / name / schema field; column type; schema data type; // column number; column label / name / schema field; column type; schema data type;
{1, "varchar", Types.VARCHAR, RecordFieldType.STRING.getDataType()}, {1, COLUMN_NAME_VARCHAR, Types.VARCHAR, RecordFieldType.STRING.getDataType()},
{2, "bigint", Types.BIGINT, RecordFieldType.LONG.getDataType()}, {2, COLUMN_NAME_BIGINT, Types.BIGINT, RecordFieldType.LONG.getDataType()},
{3, "rowid", Types.ROWID, RecordFieldType.LONG.getDataType()}, {3, COLUMN_NAME_ROWID, Types.ROWID, RecordFieldType.LONG.getDataType()},
{4, "bit", Types.BIT, RecordFieldType.BOOLEAN.getDataType()}, {4, COLUMN_NAME_BIT, Types.BIT, RecordFieldType.BOOLEAN.getDataType()},
{5, "boolean", Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()}, {5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, RecordFieldType.BOOLEAN.getDataType()},
{6, "char", Types.CHAR, RecordFieldType.CHAR.getDataType()}, {6, COLUMN_NAME_CHAR, Types.CHAR, RecordFieldType.CHAR.getDataType()},
{7, "date", Types.DATE, RecordFieldType.DATE.getDataType()}, {7, COLUMN_NAME_DATE, Types.DATE, RecordFieldType.DATE.getDataType()},
{8, "integer", Types.INTEGER, RecordFieldType.INT.getDataType()}, {8, COLUMN_NAME_INTEGER, Types.INTEGER, RecordFieldType.INT.getDataType()},
{9, "double", Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()}, {9, COLUMN_NAME_DOUBLE, Types.DOUBLE, RecordFieldType.DOUBLE.getDataType()},
{10, "real", Types.REAL, RecordFieldType.DOUBLE.getDataType()}, {10, COLUMN_NAME_REAL, Types.REAL, RecordFieldType.DOUBLE.getDataType()},
{11, "float", Types.FLOAT, RecordFieldType.FLOAT.getDataType()}, {11, COLUMN_NAME_FLOAT, Types.FLOAT, RecordFieldType.FLOAT.getDataType()},
{12, "smallint", Types.SMALLINT, RecordFieldType.SHORT.getDataType()}, {12, COLUMN_NAME_SMALLINT, Types.SMALLINT, RecordFieldType.SHORT.getDataType()},
{13, "tinyint", Types.TINYINT, RecordFieldType.BYTE.getDataType()}, {13, COLUMN_NAME_TINYINT, Types.TINYINT, RecordFieldType.BYTE.getDataType()},
{14, "bigDecimal1", Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)}, {14, COLUMN_NAME_BIG_DECIMAL_1, Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
{15, "bigDecimal2", Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)}, {15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
{16, "bigDecimal3", Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)}, {16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
{17, "bigDecimal4", Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)}, {17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
}; };
@Mock @Mock
@ -66,26 +91,26 @@ public class ResultSetRecordSetTest {
@Before @Before
public void setUp() throws SQLException { public void setUp() throws SQLException {
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData); when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length); when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
for (final Object[] column : COLUMNS) { for (final Object[] column : COLUMNS) {
Mockito.when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((column[1]) + "Col"); when(resultSetMetaData.getColumnLabel((Integer) column[0])).thenReturn((String) (column[1]));
Mockito.when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]); when(resultSetMetaData.getColumnName((Integer) column[0])).thenReturn((String) column[1]);
Mockito.when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]); when(resultSetMetaData.getColumnType((Integer) column[0])).thenReturn((Integer) column[2]);
if(column[3] instanceof DecimalDataType) { if(column[3] instanceof DecimalDataType) {
DecimalDataType ddt = (DecimalDataType)column[3]; DecimalDataType ddt = (DecimalDataType)column[3];
Mockito.when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision()); when(resultSetMetaData.getPrecision((Integer) column[0])).thenReturn(ddt.getPrecision());
Mockito.when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale()); when(resultSetMetaData.getScale((Integer) column[0])).thenReturn(ddt.getScale());
} }
} }
// Big decimal values are necessary in order to determine precision and scale // Big decimal values are necessary in order to determine precision and scale
Mockito.when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1")); when(resultSet.getBigDecimal(16)).thenReturn(new BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
// This will be handled by a dedicated branch for Java Objects, needs some further details // This will be handled by a dedicated branch for Java Objects, needs some further details
Mockito.when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName()); when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
} }
@Test @Test
@ -124,7 +149,7 @@ public class ResultSetRecordSetTest {
final RecordSchema resultSchema = testSubject.getSchema(); final RecordSchema resultSchema = testSubject.getSchema();
// then // then
Assert.assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType()); assertEquals(RecordFieldType.DECIMAL.getDecimalDataType(30, 10), resultSchema.getField(0).getDataType());
} }
@Test @Test
@ -137,17 +162,88 @@ public class ResultSetRecordSetTest {
final RecordSchema resultSchema = testSubject.getSchema(); final RecordSchema resultSchema = testSubject.getSchema();
// then // then
Assert.assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType()); assertEquals(RecordFieldType.CHOICE, resultSchema.getField(0).getDataType().getFieldType());
}
@Test
public void testCreateRecord() throws SQLException {
// given
final RecordSchema recordSchema = givenRecordSchema();
LocalDate testDate = LocalDate.of(2021, 1, 26);
final String varcharValue = "varchar";
final Long bigintValue = 1234567890123456789L;
final Long rowidValue = 11111111L;
final Boolean bitValue = Boolean.FALSE;
final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate);
final Integer integerValue = 1234567890;
final Double doubleValue = 0.12;
final Double realValue = 3.45;
final Float floatValue = 6.78F;
final Short smallintValue = 12345;
final Byte tinyintValue = 123;
final BigDecimal bigDecimal1Value = new BigDecimal("1234.567");
final BigDecimal bigDecimal2Value = new BigDecimal("1234");
final BigDecimal bigDecimal3Value = new BigDecimal("1234567890.1");
final BigDecimal bigDecimal4Value = new BigDecimal("1234567.089");
when(resultSet.getObject(COLUMN_NAME_VARCHAR)).thenReturn(varcharValue);
when(resultSet.getObject(COLUMN_NAME_BIGINT)).thenReturn(bigintValue);
when(resultSet.getObject(COLUMN_NAME_ROWID)).thenReturn(rowidValue);
when(resultSet.getObject(COLUMN_NAME_BIT)).thenReturn(bitValue);
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
when(resultSet.getObject(COLUMN_NAME_FLOAT)).thenReturn(floatValue);
when(resultSet.getObject(COLUMN_NAME_SMALLINT)).thenReturn(smallintValue);
when(resultSet.getObject(COLUMN_NAME_TINYINT)).thenReturn(tinyintValue);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_1)).thenReturn(bigDecimal1Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_2)).thenReturn(bigDecimal2Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_3)).thenReturn(bigDecimal3Value);
when(resultSet.getObject(COLUMN_NAME_BIG_DECIMAL_4)).thenReturn(bigDecimal4Value);
// when
ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, recordSchema);
Record record = testSubject.createRecord(resultSet);
// then
assertEquals(varcharValue, record.getAsString(COLUMN_NAME_VARCHAR));
assertEquals(bigintValue, record.getAsLong(COLUMN_NAME_BIGINT));
assertEquals(rowidValue, record.getAsLong(COLUMN_NAME_ROWID));
assertEquals(bitValue, record.getAsBoolean(COLUMN_NAME_BIT));
assertEquals(booleanValue, record.getAsBoolean(COLUMN_NAME_BOOLEAN));
assertEquals(charValue, record.getValue(COLUMN_NAME_CHAR));
// Date is expected in UTC normalized form
Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));
assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));
assertEquals(realValue, record.getAsDouble(COLUMN_NAME_REAL));
assertEquals(floatValue, record.getAsFloat(COLUMN_NAME_FLOAT));
assertEquals(smallintValue.shortValue(), record.getAsInt(COLUMN_NAME_SMALLINT).shortValue());
assertEquals(tinyintValue.byteValue(), record.getAsInt(COLUMN_NAME_TINYINT).byteValue());
assertEquals(bigDecimal1Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_1));
assertEquals(bigDecimal2Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_2));
assertEquals(bigDecimal3Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_3));
assertEquals(bigDecimal4Value, record.getValue(COLUMN_NAME_BIG_DECIMAL_4));
} }
private ResultSet givenResultSetForOther() throws SQLException { private ResultSet givenResultSetForOther() throws SQLException {
final ResultSet resultSet = Mockito.mock(ResultSet.class); final ResultSet resultSet = Mockito.mock(ResultSet.class);
final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class); final ResultSetMetaData resultSetMetaData = Mockito.mock(ResultSetMetaData.class);
Mockito.when(resultSet.getMetaData()).thenReturn(resultSetMetaData); when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
Mockito.when(resultSetMetaData.getColumnCount()).thenReturn(1); when(resultSetMetaData.getColumnCount()).thenReturn(1);
Mockito.when(resultSetMetaData.getColumnLabel(1)).thenReturn("column"); when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnName(1)).thenReturn("column"); when(resultSetMetaData.getColumnName(1)).thenReturn("column");
Mockito.when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER); when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
return resultSet; return resultSet;
} }
@ -162,10 +258,10 @@ public class ResultSetRecordSetTest {
} }
private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) { private void thenAllColumnDataTypesAreCorrect(final RecordSchema resultSchema) {
Assert.assertNotNull(resultSchema); assertNotNull(resultSchema);
for (final Object[] column : COLUMNS) { for (final Object[] column : COLUMNS) {
Assert.assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType()); assertEquals("For column " + column[0] + " the converted type is not matching", column[3], resultSchema.getField((Integer) column[0] - 1).getDataType());
} }
} }
} }

View File

@ -27,8 +27,13 @@ import org.junit.Test;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types; import java.sql.Types;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -874,4 +879,44 @@ public class TestDataTypeUtils {
assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE)); assertTrue(DataTypeUtils.isFittingNumberType(9D, RecordFieldType.DOUBLE));
assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE)); assertFalse(DataTypeUtils.isFittingNumberType(9, RecordFieldType.DOUBLE));
} }
@Test
public void testConvertDateToUTC() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;
Date dateLocalTZ = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.systemDefault()).toInstant().toEpochMilli());
Date dateUTC = DataTypeUtils.convertDateToUTC(dateLocalTZ);
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateUTC.getTime()), ZoneId.of("UTC"));
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}
@Test
public void testConvertDateToLocalTZ() {
int year = 2021;
int month = 1;
int dayOfMonth = 25;
Date dateUTC = new Date(ZonedDateTime.of(LocalDateTime.of(year, month, dayOfMonth,0,0,0), ZoneId.of("UTC")).toInstant().toEpochMilli());
Date dateLocalTZ = DataTypeUtils.convertDateToLocalTZ(dateUTC);
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dateLocalTZ.getTime()), ZoneId.systemDefault());
assertEquals(year, zdt.getYear());
assertEquals(month, zdt.getMonthValue());
assertEquals(dayOfMonth, zdt.getDayOfMonth());
assertEquals(0, zdt.getHour());
assertEquals(0, zdt.getMinute());
assertEquals(0, zdt.getSecond());
assertEquals(0, zdt.getNano());
}
} }

View File

@ -100,6 +100,7 @@ import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
@ -393,6 +394,18 @@ public class JdbcCommon {
rec.put(i-1, value); rec.put(i-1, value);
} }
} else if (value instanceof java.sql.Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
// AvroTypeUtil.convertToAvroObject() expects java.sql.Date object as a UTC normalized date (UTC 00:00:00)
// but it comes from the driver in JVM's local time zone 00:00:00 and needs to be converted.
java.sql.Date normalizedDate = DataTypeUtils.convertDateToUTC((java.sql.Date) value);
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(normalizedDate, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof Date) { } else if (value instanceof Date) {
if (options.useLogicalTypes) { if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types. // Delegate mapping to AvroTypeUtil in order to utilize logical types.

View File

@ -59,6 +59,7 @@ import java.sql.Time;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types; import java.sql.Types;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneId; import java.time.ZoneId;
@ -67,10 +68,10 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField; import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAccessor;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -679,11 +680,10 @@ public class TestJdbcCommon {
testConvertToAvroStreamForDateTime(options, testConvertToAvroStreamForDateTime(options,
(record, date) -> { (record, date) -> {
final int daysSinceEpoch = (int) record.get("date"); final int expectedDaysSinceEpoch = (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0), date.toLocalDate());
final long millisSinceEpoch = TimeUnit.MILLISECONDS.convert(daysSinceEpoch, TimeUnit.DAYS); final int actualDaysSinceEpoch = (int) record.get("date");
java.sql.Date actual = java.sql.Date.valueOf(Instant.ofEpochMilli(millisSinceEpoch).atZone(ZoneOffset.UTC).toLocalDate()); LOGGER.debug("comparing days since epoch, expecting '{}', actual '{}'", expectedDaysSinceEpoch, actualDaysSinceEpoch);
LOGGER.debug("comparing dates, expecting '{}', actual '{}'", date, actual); assertEquals(expectedDaysSinceEpoch, actualDaysSinceEpoch);
assertEquals(date, actual);
}, },
(record, time) -> { (record, time) -> {
int millisSinceMidnight = (int) record.get("time"); int millisSinceMidnight = (int) record.get("time");

View File

@ -56,6 +56,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -483,8 +484,9 @@ public class TestAvroTypeUtil {
@Test @Test
public void testDateConversion() { public void testDateConversion() {
final Calendar c = Calendar.getInstance(); final Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
c.set(2019, Calendar.JANUARY, 1, 0, 0, 0); c.set(2019, Calendar.JANUARY, 1, 0, 0, 0);
c.set(Calendar.MILLISECOND, 0);
final long epochMillis = c.getTimeInMillis(); final long epochMillis = c.getTimeInMillis();
final LogicalTypes.Date dateType = LogicalTypes.date(); final LogicalTypes.Date dateType = LogicalTypes.date();
@ -492,7 +494,7 @@ public class TestAvroTypeUtil {
dateType.addToSchema(fieldSchema); dateType.addToSchema(fieldSchema);
final Object convertedValue = AvroTypeUtil.convertToAvroObject(new Date(epochMillis), fieldSchema); final Object convertedValue = AvroTypeUtil.convertToAvroObject(new Date(epochMillis), fieldSchema);
assertTrue(convertedValue instanceof Integer); assertTrue(convertedValue instanceof Integer);
assertEquals((int) convertedValue, LocalDate.of(2019, 1, 1).toEpochDay()); assertEquals(LocalDate.of(2019, 1, 1).toEpochDay(), (int) convertedValue);
} }
@Test @Test

View File

@ -63,6 +63,7 @@ import java.io.InputStream;
import java.sql.BatchUpdateException; import java.sql.BatchUpdateException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
@ -71,6 +72,7 @@ import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException; import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTransientException; import java.sql.SQLTransientException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -685,10 +687,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
for (int i = 0; i < fieldIndexes.size(); i++) { for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i); final int currentFieldIndex = fieldIndexes.get(i);
final Object currentValue = values[currentFieldIndex]; Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex); final DataType dataType = dataTypes.get(currentFieldIndex);
final int sqlType = DataTypeUtils.getSQLTypeValue(dataType); final int sqlType = DataTypeUtils.getSQLTypeValue(dataType);
if (sqlType == Types.DATE && currentValue instanceof Date) {
// convert Date from the internal UTC normalized form to local time zone needed by database drivers
currentValue = DataTypeUtils.convertDateToLocalTZ((Date) currentValue);
}
// If DELETE type, insert the object twice because of the null check (see generateDelete for details) // If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) { if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, currentValue, sqlType); ps.setObject(i * 2 + 1, currentValue, sqlType);

View File

@ -39,6 +39,7 @@ import org.junit.runner.RunWith
import org.junit.runners.JUnit4 import org.junit.runners.JUnit4
import java.sql.Connection import java.sql.Connection
import java.sql.Date
import java.sql.DriverManager import java.sql.DriverManager
import java.sql.PreparedStatement import java.sql.PreparedStatement
import java.sql.ResultSet import java.sql.ResultSet
@ -46,6 +47,9 @@ import java.sql.SQLDataException
import java.sql.SQLException import java.sql.SQLException
import java.sql.SQLNonTransientConnectionException import java.sql.SQLNonTransientConnectionException
import java.sql.Statement import java.sql.Statement
import java.time.LocalDate
import java.time.ZoneId
import java.time.ZoneOffset
import java.util.function.Supplier import java.util.function.Supplier
import static org.junit.Assert.assertEquals import static org.junit.Assert.assertEquals
@ -68,7 +72,8 @@ import static org.mockito.Mockito.verify
class TestPutDatabaseRecord { class TestPutDatabaseRecord {
private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," + private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100)," +
" code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000))" " code integer CONSTRAINT CODE_RANGE CHECK (code >= 0 AND code < 1000)," +
" dt date)"
private final static String DB_LOCATION = "target/db_pdr" private final static String DB_LOCATION = "target/db_pdr"
TestRunner runner TestRunner runner
@ -238,12 +243,20 @@ class TestPutDatabaseRecord {
parser.addSchemaField("id", RecordFieldType.INT) parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING) parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT) parser.addSchemaField("code", RecordFieldType.INT)
parser.addSchemaField("dt", RecordFieldType.DATE)
parser.addRecord(1, 'rec1', 101) LocalDate testDate1 = LocalDate.of(2021, 1, 26)
parser.addRecord(2, 'rec2', 102) Date nifiDate1 = new Date(testDate1.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in UTC
parser.addRecord(3, 'rec3', 103) Date jdbcDate1 = Date.valueOf(testDate1) // in local TZ
parser.addRecord(4, 'rec4', 104) LocalDate testDate2 = LocalDate.of(2021, 7, 26)
parser.addRecord(5, null, 105) Date nifiDate2 = new Date(testDate2.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()) // in URC
Date jdbcDate2 = Date.valueOf(testDate2) // in local TZ
parser.addRecord(1, 'rec1', 101, nifiDate1)
parser.addRecord(2, 'rec2', 102, nifiDate2)
parser.addRecord(3, 'rec3', 103, null)
parser.addRecord(4, 'rec4', 104, null)
parser.addRecord(5, null, 105, null)
runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser') runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE) runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE)
@ -260,22 +273,27 @@ class TestPutDatabaseRecord {
assertEquals(1, rs.getInt(1)) assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2)) assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3)) assertEquals(101, rs.getInt(3))
assertEquals(jdbcDate1, rs.getDate(4))
assertTrue(rs.next()) assertTrue(rs.next())
assertEquals(2, rs.getInt(1)) assertEquals(2, rs.getInt(1))
assertEquals('rec2', rs.getString(2)) assertEquals('rec2', rs.getString(2))
assertEquals(102, rs.getInt(3)) assertEquals(102, rs.getInt(3))
assertEquals(jdbcDate2, rs.getDate(4))
assertTrue(rs.next()) assertTrue(rs.next())
assertEquals(3, rs.getInt(1)) assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2)) assertEquals('rec3', rs.getString(2))
assertEquals(103, rs.getInt(3)) assertEquals(103, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next()) assertTrue(rs.next())
assertEquals(4, rs.getInt(1)) assertEquals(4, rs.getInt(1))
assertEquals('rec4', rs.getString(2)) assertEquals('rec4', rs.getString(2))
assertEquals(104, rs.getInt(3)) assertEquals(104, rs.getInt(3))
assertNull(rs.getDate(4))
assertTrue(rs.next()) assertTrue(rs.next())
assertEquals(5, rs.getInt(1)) assertEquals(5, rs.getInt(1))
assertNull(rs.getString(2)) assertNull(rs.getString(2))
assertEquals(105, rs.getInt(3)) assertEquals(105, rs.getInt(3))
assertNull(rs.getDate(4))
assertFalse(rs.next()) assertFalse(rs.next())
stmt.close() stmt.close()
@ -633,8 +651,8 @@ class TestPutDatabaseRecord {
// Set some existing records with different values for name and code // Set some existing records with different values for name and code
final Connection conn = dbcp.getConnection() final Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101)''') stmt.execute('''INSERT INTO PERSONS VALUES (1,'x1',101, null)''')
stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102)''') stmt.execute('''INSERT INTO PERSONS VALUES (2,'x2',102, null)''')
stmt.close() stmt.close()
runner.enqueue(new byte[0]) runner.enqueue(new byte[0])
@ -789,9 +807,9 @@ class TestPutDatabaseRecord {
recreateTable("PERSONS", createPersons) recreateTable("PERSONS", createPersons)
Connection conn = dbcp.getConnection() Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)") stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', 102)") stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', 102, null)")
stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)") stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103, null)")
stmt.close() stmt.close()
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()
@ -833,9 +851,9 @@ class TestPutDatabaseRecord {
recreateTable("PERSONS", createPersons) recreateTable("PERSONS", createPersons)
Connection conn = dbcp.getConnection() Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement() Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)") stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101, null)")
stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)") stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null, null)")
stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)") stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103, null)")
stmt.close() stmt.close()
final MockRecordParser parser = new MockRecordParser() final MockRecordParser parser = new MockRecordParser()