NIFI-9064:Support Oracle timestamp when Use Avro Logical Types is true for ExecuteSQLRecord and QueryDatabaseTableRecord

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5807.
This commit is contained in:
zhangcheng 2021-08-19 21:09:31 +08:00 committed by Joe Gresock
parent 4cc20e6c06
commit a85cafe771
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
2 changed files with 22 additions and 21 deletions

View File

@ -44,6 +44,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.nifi.serialization.record.RecordFieldType.TIMESTAMP;
public class ResultSetRecordSet implements RecordSet, Closeable { public class ResultSetRecordSet implements RecordSet, Closeable {
private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class); private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
private static final int JDBC_DEFAULT_PRECISION_VALUE = 10; private static final int JDBC_DEFAULT_PRECISION_VALUE = 10;
@ -81,7 +83,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
try { try {
tempSchema = createSchema(rs, readerSchema, useLogicalTypes); tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
moreRows = rs.next(); moreRows = rs.next();
} catch(SQLException se) { } catch (SQLException se) {
// Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around // Tried to create the schema with a ResultSet without calling next() first (probably for DB2), now try the other way around
moreRows = rs.next(); moreRows = rs.next();
tempSchema = createSchema(rs, readerSchema, useLogicalTypes); tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
@ -136,13 +138,12 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
for (final RecordField field : schema.getFields()) { for (final RecordField field : schema.getFields()) {
final String fieldName = field.getFieldName(); final String fieldName = field.getFieldName();
RecordFieldType fieldType = field.getDataType().getFieldType();
final Object value; final Object value;
if (rsColumnNames.contains(fieldName)) {
value = normalizeValue(rs.getObject(fieldName)); value = rsColumnNames.contains(fieldName)
} else { ? normalizeValue((fieldType == TIMESTAMP) ? rs.getTimestamp(fieldName) : rs.getObject(fieldName))
value = null; : null;
}
values.put(fieldName, value); values.put(fieldName, value);
} }
@ -186,12 +187,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
final String fieldName = metadata.getColumnLabel(column); final String fieldName = metadata.getColumnLabel(column);
final int nullableFlag = metadata.isNullable(column); final int nullableFlag = metadata.isNullable(column);
final boolean nullable; final boolean nullable = nullableFlag != ResultSetMetaData.columnNoNulls;
if (nullableFlag == ResultSetMetaData.columnNoNulls) {
nullable = false;
} else {
nullable = true;
}
final RecordField field = new RecordField(fieldName, dataType, nullable); final RecordField field = new RecordField(fieldName, dataType, nullable);
fields.add(field); fields.add(field);
@ -240,7 +236,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (!(obj instanceof Record)) { if (!(obj instanceof Record)) {
final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE, final List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING, RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, RecordFieldType.STRING,
RecordFieldType.TIME, RecordFieldType.TIMESTAMP) RecordFieldType.TIME, TIMESTAMP)
.map(RecordFieldType::getDataType) .map(RecordFieldType::getDataType)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -279,7 +275,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
&& (fieldType == RecordFieldType.DECIMAL && (fieldType == RecordFieldType.DECIMAL
|| fieldType == RecordFieldType.DATE || fieldType == RecordFieldType.DATE
|| fieldType == RecordFieldType.TIME || fieldType == RecordFieldType.TIME
|| fieldType == RecordFieldType.TIMESTAMP)) { || fieldType == TIMESTAMP)) {
return RecordFieldType.STRING.getDataType(); return RecordFieldType.STRING.getDataType();
} else { } else {
return dataType; return dataType;
@ -424,9 +420,6 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
if (valueToLookAt instanceof BigInteger) { if (valueToLookAt instanceof BigInteger) {
return RecordFieldType.BIGINT.getDataType(); return RecordFieldType.BIGINT.getDataType();
} }
if (valueToLookAt instanceof Integer) {
return RecordFieldType.INT.getDataType();
}
if (valueToLookAt instanceof java.sql.Time) { if (valueToLookAt instanceof java.sql.Time) {
return getDataType(RecordFieldType.TIME, useLogicalTypes); return getDataType(RecordFieldType.TIME, useLogicalTypes);
} }
@ -434,7 +427,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
return getDataType(RecordFieldType.DATE, useLogicalTypes); return getDataType(RecordFieldType.DATE, useLogicalTypes);
} }
if (valueToLookAt instanceof java.sql.Timestamp) { if (valueToLookAt instanceof java.sql.Timestamp) {
return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes); return getDataType(TIMESTAMP, useLogicalTypes);
} }
if (valueToLookAt instanceof Record) { if (valueToLookAt instanceof Record) {
final Record record = (Record) valueToLookAt; final Record record = (Record) valueToLookAt;
@ -518,7 +511,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
case Types.TIMESTAMP_WITH_TIMEZONE: case Types.TIMESTAMP_WITH_TIMEZONE:
case -101: // Oracle's TIMESTAMP WITH TIME ZONE case -101: // Oracle's TIMESTAMP WITH TIME ZONE
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
return getRecordFieldType(RecordFieldType.TIMESTAMP, useLogicalTypes); return getRecordFieldType(TIMESTAMP, useLogicalTypes);
} }
return RecordFieldType.STRING; return RecordFieldType.STRING;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType; import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -40,6 +41,7 @@ import java.sql.Time;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.sql.Types; import java.sql.Types;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -67,6 +69,7 @@ public class ResultSetRecordSetTest {
private static final String COLUMN_NAME_BOOLEAN = "boolean"; private static final String COLUMN_NAME_BOOLEAN = "boolean";
private static final String COLUMN_NAME_CHAR = "char"; private static final String COLUMN_NAME_CHAR = "char";
private static final String COLUMN_NAME_DATE = "date"; private static final String COLUMN_NAME_DATE = "date";
private static final String COLUMN_NAME_TIMESTAMP = "timestamp";
private static final String COLUMN_NAME_INTEGER = "integer"; private static final String COLUMN_NAME_INTEGER = "integer";
private static final String COLUMN_NAME_DOUBLE = "double"; private static final String COLUMN_NAME_DOUBLE = "double";
private static final String COLUMN_NAME_REAL = "real"; private static final String COLUMN_NAME_REAL = "real";
@ -99,7 +102,8 @@ public class ResultSetRecordSetTest {
new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)), new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)), new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)), new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)) new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, RecordFieldType.DECIMAL.getDecimalDataType(3, 10)),
new TestColumn(19, COLUMN_NAME_TIMESTAMP, Types.TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType())
}; };
@Mock @Mock
@ -252,6 +256,7 @@ public class ResultSetRecordSetTest {
final RecordSchema recordSchema = givenRecordSchema(COLUMNS); final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
LocalDate testDate = LocalDate.of(2021, 1, 26); LocalDate testDate = LocalDate.of(2021, 1, 26);
LocalDateTime testDateTime = LocalDateTime.of(2021, 9, 10, 11, 11, 11);
final String varcharValue = "varchar"; final String varcharValue = "varchar";
final Long bigintValue = 1234567890123456789L; final Long bigintValue = 1234567890123456789L;
@ -260,6 +265,7 @@ public class ResultSetRecordSetTest {
final Boolean booleanValue = Boolean.TRUE; final Boolean booleanValue = Boolean.TRUE;
final Character charValue = 'c'; final Character charValue = 'c';
final Date dateValue = Date.valueOf(testDate); final Date dateValue = Date.valueOf(testDate);
final Timestamp timestampValue = Timestamp.valueOf(testDateTime);
final Integer integerValue = 1234567890; final Integer integerValue = 1234567890;
final Double doubleValue = 0.12; final Double doubleValue = 0.12;
final Double realValue = 3.45; final Double realValue = 3.45;
@ -279,6 +285,7 @@ public class ResultSetRecordSetTest {
when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue); when(resultSet.getObject(COLUMN_NAME_BOOLEAN)).thenReturn(booleanValue);
when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue); when(resultSet.getObject(COLUMN_NAME_CHAR)).thenReturn(charValue);
when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue); when(resultSet.getObject(COLUMN_NAME_DATE)).thenReturn(dateValue);
when(resultSet.getTimestamp(COLUMN_NAME_TIMESTAMP)).thenReturn(timestampValue);
when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue); when(resultSet.getObject(COLUMN_NAME_INTEGER)).thenReturn(integerValue);
when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue); when(resultSet.getObject(COLUMN_NAME_DOUBLE)).thenReturn(doubleValue);
when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue); when(resultSet.getObject(COLUMN_NAME_REAL)).thenReturn(realValue);
@ -306,6 +313,7 @@ public class ResultSetRecordSetTest {
// Date is expected in UTC normalized form // Date is expected in UTC normalized form
Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli()); Date expectedDate = new Date(testDate.atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli());
assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null)); assertEquals(expectedDate, record.getAsDate(COLUMN_NAME_DATE, null));
assertEquals(timestampValue, DataTypeUtils.toTimestamp(record.getValue(COLUMN_NAME_TIMESTAMP), null, COLUMN_NAME_TIMESTAMP));
assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER)); assertEquals(integerValue, record.getAsInt(COLUMN_NAME_INTEGER));
assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE)); assertEquals(doubleValue, record.getAsDouble(COLUMN_NAME_DOUBLE));