NIFI-3958: Decimal logical type with undefined precision and scale.

- Oracle NUMBER can return 0 precision and -127 or 0 scale with variable scale NUMBER such as ROWNUM or function result
- Added 'Default Decimal Precision' and 'Default Decimal Scale' property to ExecuteSQL and QueryDatabaseTable to apply default precision and scale if those are unknown
- Coerce BigDecimal scale to field schema logical type, so that BigDecimals having different scale can be written

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1851
This commit is contained in:
Koji Kawamura 2017-05-25 00:53:09 +09:00 committed by Matt Burgess
parent 36911957dc
commit 13b59b5621
5 changed files with 129 additions and 18 deletions

View File

@ -335,8 +335,12 @@ public class AvroTypeUtil {
final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
final BigDecimal decimal;
if (rawValue instanceof BigDecimal) {
decimal = (BigDecimal) rawValue;
final BigDecimal rawDecimal = (BigDecimal) rawValue;
final int desiredScale = decimalType.getScale();
// If the desired scale is different than this value's coerce scale.
decimal = rawDecimal.scale() == desiredScale ? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP);
} else if (rawValue instanceof Double) {
// Scale is adjusted based on precision. If double was 123.456 and precision is 5, then decimal would be 123.46.
decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");

View File

@ -55,6 +55,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.StopWatch;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@ -125,6 +127,8 @@ public class ExecuteSQL extends AbstractProcessor {
pds.add(QUERY_TIMEOUT);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -168,6 +172,8 @@ public class ExecuteSQL extends AbstractProcessor {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
final StopWatch stopWatch = new StopWatch(true);
final String selectQuery;
if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
@ -200,7 +206,10 @@ public class ExecuteSQL extends AbstractProcessor {
final ResultSet resultSet = st.executeQuery(selectQuery);
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder()
.convertNames(convertNamesForAvro)
.useLogicalTypes(useAvroLogicalTypes).build();
.useLogicalTypes(useAvroLogicalTypes)
.defaultPrecision(defaultPrecision)
.defaultScale(defaultScale)
.build();
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
} catch (final SQLException e) {
throw new ProcessException(e);

View File

@ -68,6 +68,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
@ -160,6 +162,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
pds.add(MAX_FRAGMENTS);
pds.add(NORMALIZE_NAMES_FOR_AVRO);
pds.add(USE_AVRO_LOGICAL_TYPES);
pds.add(DEFAULT_PRECISION);
pds.add(DEFAULT_SCALE);
propDescriptors = Collections.unmodifiableList(pds);
}
@ -212,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
.maxRows(maxRowsPerFlowFile)
.convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
.useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
.defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
.defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
.build();
final Map<String,String> maxValueProperties = getDefaultMaxValueProperties(context.getProperties());

View File

@ -73,12 +73,19 @@ import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.util.StandardValidators;
/**
* JDBC / SQL common functions.
*/
public class JdbcCommon {
private static final int MAX_DIGITS_IN_BIGINT = 19;
private static final int MAX_DIGITS_IN_INT = 9;
// Derived from MySQL default precision.
private static final int DEFAULT_PRECISION_VALUE = 10;
private static final int DEFAULT_SCALE_VALUE = 0;
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
@ -107,9 +114,36 @@ public class JdbcCommon {
.required(true)
.build();
public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder()
.name("dbf-default-precision")
.displayName("Default Decimal Precision")
.description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ " a specific 'precision' denoting number of available digits is required."
+ " Generally, precision is defined by column data type definition or database engines default."
+ " However undefined precision (0) can be returned from some database engines."
+ " 'Default Decimal Precision' is used when writing those undefined precision numbers.")
.defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE))
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder()
.name("dbf-default-scale")
.displayName("Default Decimal Scale")
.description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type,"
+ " a specific 'scale' denoting number of available decimal digits is required."
+ " Generally, scale is defined by column data type definition or database engines default."
+ " However when undefined precision (0) is returned, scale can also be uncertain with some database engines."
+ " 'Default Decimal Scale' is used when writing those undefined numbers."
+ " If a value has more decimals than specified scale, then the value will be rounded-up,"
+ " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.")
.defaultValue(String.valueOf(DEFAULT_SCALE_VALUE))
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
private static final int MAX_DIGITS_IN_BIGINT = 19;
private static final int MAX_DIGITS_IN_INT = 9;
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null, convertNames);
@ -140,12 +174,16 @@ public class JdbcCommon {
private final int maxRows;
private final boolean convertNames;
private final boolean useLogicalTypes;
private final int defaultPrecision;
private final int defaultScale;
private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) {
private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) {
this.recordName = recordName;
this.maxRows = maxRows;
this.convertNames = convertNames;
this.useLogicalTypes = useLogicalTypes;
this.defaultPrecision = defaultPrecision;
this.defaultScale = defaultScale;
}
public static Builder builder() {
@ -157,6 +195,8 @@ public class JdbcCommon {
private int maxRows = 0;
private boolean convertNames = false;
private boolean useLogicalTypes = false;
private int defaultPrecision = DEFAULT_PRECISION_VALUE;
private int defaultScale = DEFAULT_SCALE_VALUE;
/**
* Specify a priori record name to use if it cannot be determined from the result set.
@ -181,8 +221,18 @@ public class JdbcCommon {
return this;
}
public Builder defaultPrecision(int defaultPrecision) {
this.defaultPrecision = defaultPrecision;
return this;
}
public Builder defaultScale(int defaultScale) {
this.defaultScale = defaultScale;
return this;
}
public AvroConversionOptions build() {
return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes);
return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale);
}
}
}
@ -455,14 +505,28 @@ public class JdbcCommon {
// Since Avro 1.8, LogicalType is supported.
case DECIMAL:
case NUMERIC:
final LogicalTypes.Decimal decimal = options.useLogicalTypes
? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null;
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))
: u.stringType());
if (options.useLogicalTypes) {
final int decimalPrecision;
final int decimalScale;
if (meta.getPrecision(i) > 0) {
// When database returns a certain precision, we can rely on that.
decimalPrecision = meta.getPrecision(i);
decimalScale = meta.getScale(i);
} else {
// If not, use default precision.
decimalPrecision = options.defaultPrecision;
// Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result.
// Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead.
// Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0.
// Default scale is used to preserve decimals in such case.
decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale;
}
final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
addNullableField(builder, columnName,
u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));
} else {
addNullableField(builder, columnName, u -> u.stringType());
}
break;
case DATE:

View File

@ -31,6 +31,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
@ -55,6 +57,8 @@ import java.util.function.BiFunction;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
@ -362,15 +366,29 @@ public class TestJdbcCommon {
@Test
public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException {
final BigDecimal bigDecimal = new BigDecimal(38D);
final BigDecimal bigDecimal = new BigDecimal(12345D);
// If db returns a precision, it should be used.
testConvertToAvroStreamForBigDecimal(bigDecimal, 38, 10, 38, 0);
}
@Test
public void testConvertToAvroStreamForBigDecimalWithUndefinedPrecision() throws SQLException, IOException {
final int expectedScale = 3;
final int dbPrecision = 0;
final BigDecimal bigDecimal = new BigDecimal(new BigInteger("12345"), expectedScale, new MathContext(dbPrecision));
// If db doesn't return a precision, default precision should be used.
testConvertToAvroStreamForBigDecimal(bigDecimal, dbPrecision, 24, 24, expectedScale);
}
private void testConvertToAvroStreamForBigDecimal(BigDecimal bigDecimal, int dbPrecision, int defaultPrecision, int expectedPrecision, int expectedScale) throws SQLException, IOException {
final ResultSetMetaData metadata = mock(ResultSetMetaData.class);
when(metadata.getColumnCount()).thenReturn(1);
when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC);
when(metadata.getColumnName(1)).thenReturn("The.Chairman");
when(metadata.getTableName(1)).thenReturn("1the::table");
when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision());
when(metadata.getScale(1)).thenReturn(bigDecimal.scale());
when(metadata.getPrecision(1)).thenReturn(dbPrecision);
when(metadata.getScale(1)).thenReturn(expectedScale);
final ResultSet rs = mock(ResultSet.class);
when(rs.getMetaData()).thenReturn(metadata);
@ -388,7 +406,7 @@ public class TestJdbcCommon {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions
.builder().convertNames(true).useLogicalTypes(true).build();
.builder().convertNames(true).useLogicalTypes(true).defaultPrecision(defaultPrecision).build();
JdbcCommon.convertToAvroStream(rs, baos, options, null);
final byte[] serializedBytes = baos.toByteArray();
@ -400,6 +418,16 @@ public class TestJdbcCommon {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(null, null, genericData);
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) {
final Schema generatedUnion = dataFileReader.getSchema().getField("The_Chairman").schema();
// null and decimal.
assertEquals(2, generatedUnion.getTypes().size());
final LogicalType logicalType = generatedUnion.getTypes().get(1).getLogicalType();
assertNotNull(logicalType);
assertEquals("decimal", logicalType.getName());
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
assertEquals(expectedPrecision, decimalType.getPrecision());
assertEquals(expectedScale, decimalType.getScale());
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next(record);