From a85cafe7718aee5a4080ccd35432b6517d90dab4 Mon Sep 17 00:00:00 2001 From: zhangcheng Date: Thu, 19 Aug 2021 21:09:31 +0800 Subject: [PATCH] NIFI-9064:Support Oracle timestamp when `Use Avro Logical Types` is true for ExecuteSQLRecord and QueryDatabaseTableRecord Signed-off-by: Joe Gresock This closes #5807. --- .../record/ResultSetRecordSet.java | 33 ++++++++----------- .../record/ResultSetRecordSetTest.java | 10 +++++- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index 5229dd5ef0..cfb1e4b4d3 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -44,6 +44,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.nifi.serialization.record.RecordFieldType.TIMESTAMP; + public class ResultSetRecordSet implements RecordSet, Closeable { private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class); private static final int JDBC_DEFAULT_PRECISION_VALUE = 10; @@ -81,7 +83,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { try { tempSchema = createSchema(rs, readerSchema, useLogicalTypes); moreRows = rs.next(); - } catch(SQLException se) { + } catch (SQLException se) { // Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around moreRows = rs.next(); tempSchema = createSchema(rs, readerSchema, useLogicalTypes); @@ -136,13 +138,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable { for (final RecordField field : schema.getFields()) { final String fieldName = field.getFieldName(); - + RecordFieldType fieldType = field.getDataType().getFieldType(); final Object value; - if (rsColumnNames.contains(fieldName)) { - value = normalizeValue(rs.getObject(fieldName)); - } else { - value = null; - } + + value = rsColumnNames.contains(fieldName) + ? normalizeValue((fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) : rs.getObject(fieldName)) + : null; values.put(fieldName, value); } @@ -186,12 +187,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final String fieldName = metadata.getColumnLabel(column); final int nullableFlag = metadata.isNullable(column); - final boolean nullable; - if (nullableFlag == ResultSetMetaData.columnNoNulls) { - nullable = false; - } else { - nullable = true; - } + final boolean nullable = nullableFlag != ResultSetMetaData.columnNoNulls; final RecordField field = new RecordField(fieldName, dataType, nullable); fields.add(field); @@ -240,7 +236,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { if (!(obj instanceof Record)) { final List dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, - RecordFieldType.TIME, RecordFieldType.TIMESTAMP) + RecordFieldType.TIME, TIMESTAMP) .map(RecordFieldType::getDataType) .collect(Collectors.toList()); @@ -279,7 +275,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { && (fieldType == RecordFieldType.DECIMAL || fieldType == RecordFieldType.DATE || fieldType == RecordFieldType.TIME - || fieldType == RecordFieldType.TIMESTAMP)) { + || fieldType == TIMESTAMP)) { return RecordFieldType.STRING.getDataType(); } else { return dataType; @@ -424,9 +420,6 @@ public class ResultSetRecordSet implements RecordSet, Closeable { if (valueToLookAt instanceof BigInteger) { return RecordFieldType.BIGINT.getDataType(); } - if (valueToLookAt instanceof Integer) { - return RecordFieldType.INT.getDataType(); - } if (valueToLookAt instanceof java.sql.Time) { return getDataType(RecordFieldType.TIME, useLogicalTypes); } @@ -434,7 +427,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return getDataType(RecordFieldType.DATE, useLogicalTypes); } if (valueToLookAt instanceof java.sql.Timestamp) { - return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes); + return getDataType(TIMESTAMP, useLogicalTypes); } if (valueToLookAt instanceof Record) { final Record record = (Record) valueToLookAt; @@ -518,7 +511,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { case Types.TIMESTAMP_WITH_TIMEZONE: case -101: // Oracle's TIMESTAMP WITH TIME ZONE case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE - return getRecordFieldType(RecordFieldType.TIMESTAMP, useLogicalTypes); + return getRecordFieldType(TIMESTAMP, useLogicalTypes); } return RecordFieldType.STRING; diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java index ab28dab797..edc36da899 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java @@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.DecimalDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -40,6 +41,7 @@ import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; @@ -67,6 +69,7 @@ public class ResultSetRecordSetTest { private static final String COLUMN_NAME_BOOLEAN = "boolean"; private static final String COLUMN_NAME_CHAR = "char"; private static final String COLUMN_NAME_DATE = "date"; + private static final String COLUMN_NAME_TIMESTAMP = "timestamp"; private static final String COLUMN_NAME_INTEGER = "integer"; private static final String COLUMN_NAME_DOUBLE = "double"; private static final String COLUMN_NAME_REAL = "real"; @@ -99,7 +102,8 @@ public class ResultSetRecordSetTest { new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)), new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)), new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)), - new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)) + new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)), + new TestColumn(19, COLUMN_NAME_TIMESTAMP, Types.TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType()) }; @Mock @@ -252,6 +256,7 @@ public class ResultSetRecordSetTest { final RecordSchema recordSchema = givenRecordSchema(COLUMNS); LocalDate testDate = LocalDate.of(2021, 1, 26); + LocalDateTime testDateTime = LocalDateTime.of(2021, 9, 10, 11, 11, 11); final String varcharValue = "varchar"; final Long bigintValue = 1234567890123456789L; @@ -260,6 +265,7 @@ public class ResultSetRecordSetTest { final Boolean booleanValue = Boolean.TRUE; final Character charValue = 'c'; final Date dateValue = Date.valueOf(testDate); + final Timestamp timestampValue = Timestamp.valueOf(testDateTime); final Integer integerValue = 1234567890; final Double doubleValue = 0.12; final Double realValue = 3.45; @@ -279,6 +285,7 @@ public class ResultSetRecordSetTest { when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue); when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue); when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue); + when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue); when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue); when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue); when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue); @@ -306,6 +313,7 @@ public class ResultSetRecordSetTest { // Date is expected in UTC normalized form Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null)); + assertEquals(timestampValue, DataTypeUtils.toTimestamp(record.getValue(COLUMN_NAME_TIMESTAMP), null, COLUMN_NAME_TIMESTAMP)); assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER)); assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));