NIFI-4951: Update convertToAvroObject to use the DataTypeUtils conversion function

The feature allows users to convert from non-integral types to the correct underlying type.  The
original behavior is maintained; however, now simple conversions take place automatically for some
logical types (date, time, and timestamp).

This closes #2526.

Signed-off-by: Derek Straka <derek@asterius.io>
Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Derek Straka 2018-03-08 16:10:02 -05:00 committed by Mark Payne
parent a1c917656e
commit c056ede6cc
1 changed files with 19 additions and 8 deletions

View File

@ -20,6 +20,8 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@ -517,6 +519,12 @@ public class AvroTypeUtil {
}
}
private static Long getLongFromTimestamp(final Object rawValue, final Schema fieldSchema, final String fieldName) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
return t.getTime();
}
@SuppressWarnings("unchecked")
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) {
if (rawValue == null) {
@ -531,14 +539,15 @@ public class AvroTypeUtil {
}
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final Date date = new Date(longValue);
final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant());
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final Date date = DataTypeUtils.toDate(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
final Duration duration = Duration.between(new Date(0L).toInstant(), new Date(date.getTime()).toInstant());
final long days = duration.toDays();
return (int) days;
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final Date date = new Date(longValue);
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final Time time = DataTypeUtils.toTime(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
final Date date = new Date(time.getTime());
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
final long millisSinceMidnight = duration.toMillis();
return (int) millisSinceMidnight;
@ -553,14 +562,16 @@ public class AvroTypeUtil {
}
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
final long longValue = DataTypeUtils.toLong(rawValue, fieldName);
final long longValue = getLongFromTimestamp(rawValue, fieldSchema, fieldName);
final Date date = new Date(longValue);
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
return duration.toMillis() * 1000L;
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
return DataTypeUtils.toLong(rawValue, fieldName);
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
return getLongFromTimestamp(rawValue, fieldSchema, fieldName);
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
return DataTypeUtils.toLong(rawValue, fieldName) * 1000L;
return getLongFromTimestamp(rawValue, fieldSchema, fieldName) * 1000L;
}
return DataTypeUtils.toLong(rawValue, fieldName);