diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 19697d2945..52c55fc8df 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -335,8 +335,12 @@ public class AvroTypeUtil { final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; final BigDecimal decimal; if (rawValue instanceof BigDecimal) { - decimal = (BigDecimal) rawValue; + final BigDecimal rawDecimal = (BigDecimal) rawValue; + final int desiredScale = decimalType.getScale(); + // If the desired scale is different than this value's coerce scale. + decimal = rawDecimal.scale() == desiredScale ? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP); } else if (rawValue instanceof Double) { + // Scale is adjusted based on precision. If double was 123.456 and precision is 5, then decimal would be 123.46. decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision())); } else { throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 3ef91074fe..3f05766649 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -55,6 +55,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; +import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; +import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; @@ -125,6 +127,8 @@ public class ExecuteSQL extends AbstractProcessor { pds.add(QUERY_TIMEOUT); pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(USE_AVRO_LOGICAL_TYPES); + pds.add(DEFAULT_PRECISION); + pds.add(DEFAULT_SCALE); propDescriptors = Collections.unmodifiableList(pds); } @@ -168,6 +172,8 @@ public class ExecuteSQL extends AbstractProcessor { final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); + final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger(); + final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger(); final StopWatch stopWatch = new StopWatch(true); final String selectQuery; if (context.getProperty(SQL_SELECT_QUERY).isSet()) { @@ -200,7 +206,10 @@ public class ExecuteSQL extends AbstractProcessor { final ResultSet resultSet = st.executeQuery(selectQuery); final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes).build(); + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .build(); nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); } catch (final SQLException e) { throw new ProcessException(e); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index dd3ac7b2b0..f23b228c42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -68,6 +68,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; +import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; @@ -160,6 +162,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(MAX_FRAGMENTS); pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(USE_AVRO_LOGICAL_TYPES); + pds.add(DEFAULT_PRECISION); + pds.add(DEFAULT_SCALE); propDescriptors = Collections.unmodifiableList(pds); } @@ -212,6 +216,8 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .maxRows(maxRowsPerFlowFile) .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean()) .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean()) + .defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger()) + .defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger()) .build(); final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 8771fe9be5..97d5cc1bd7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -73,12 +73,19 @@ import org.apache.avro.io.DatumWriter; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; /** * JDBC / SQL common functions. */ public class JdbcCommon { + private static final int MAX_DIGITS_IN_BIGINT = 19; + private static final int MAX_DIGITS_IN_INT = 9; + // Derived from MySQL default precision. + private static final int DEFAULT_PRECISION_VALUE = 10; + private static final int DEFAULT_SCALE_VALUE = 0; + public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() @@ -107,9 +114,36 @@ public class JdbcCommon { .required(true) .build(); + public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder() + .name("dbf-default-precision") + .displayName("Default Decimal Precision") + .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," + + " a specific 'precision' denoting number of available digits is required." + + " Generally, precision is defined by column data type definition or database engines default." + + " However undefined precision (0) can be returned from some database engines." + + " 'Default Decimal Precision' is used when writing those undefined precision numbers.") + .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE)) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder() + .name("dbf-default-scale") + .displayName("Default Decimal Scale") + .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," + + " a specific 'scale' denoting number of available decimal digits is required." + + " Generally, scale is defined by column data type definition or database engines default." + + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines." + + " 'Default Decimal Scale' is used when writing those undefined numbers." + + " If a value has more decimals than specified scale, then the value will be rounded-up," + + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.") + .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE)) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); - private static final int MAX_DIGITS_IN_BIGINT = 19; - private static final int MAX_DIGITS_IN_INT = 9; public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { return convertToAvroStream(rs, outStream, null, null, convertNames); @@ -140,12 +174,16 @@ public class JdbcCommon { private final int maxRows; private final boolean convertNames; private final boolean useLogicalTypes; + private final int defaultPrecision; + private final int defaultScale; - private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) { + private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes, int defaultPrecision, int defaultScale) { this.recordName = recordName; this.maxRows = maxRows; this.convertNames = convertNames; this.useLogicalTypes = useLogicalTypes; + this.defaultPrecision = defaultPrecision; + this.defaultScale = defaultScale; } public static Builder builder() { @@ -157,6 +195,8 @@ public class JdbcCommon { private int maxRows = 0; private boolean convertNames = false; private boolean useLogicalTypes = false; + private int defaultPrecision = DEFAULT_PRECISION_VALUE; + private int defaultScale = DEFAULT_SCALE_VALUE; /** * Specify a priori record name to use if it cannot be determined from the result set. @@ -181,8 +221,18 @@ public class JdbcCommon { return this; } + public Builder defaultPrecision(int defaultPrecision) { + this.defaultPrecision = defaultPrecision; + return this; + } + + public Builder defaultScale(int defaultScale) { + this.defaultScale = defaultScale; + return this; + } + public AvroConversionOptions build() { - return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes); + return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes, defaultPrecision, defaultScale); } } } @@ -455,14 +505,28 @@ public class JdbcCommon { // Since Avro 1.8, LogicalType is supported. case DECIMAL: case NUMERIC: - - final LogicalTypes.Decimal decimal = options.useLogicalTypes - ? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null; - addNullableField(builder, columnName, - u -> options.useLogicalTypes - ? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())) - : u.stringType()); - + if (options.useLogicalTypes) { + final int decimalPrecision; + final int decimalScale; + if (meta.getPrecision(i) > 0) { + // When database returns a certain precision, we can rely on that. + decimalPrecision = meta.getPrecision(i); + decimalScale = meta.getScale(i); + } else { + // If not, use default precision. + decimalPrecision = options.defaultPrecision; + // Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result. + // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead. + // Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0. + // Default scale is used to preserve decimals in such case. + decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale; + } + final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale); + addNullableField(builder, columnName, + u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType()))); + } else { + addNullableField(builder, columnName, u -> u.stringType()); + } break; case DATE: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index 37660d580a..830567c554 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -31,6 +31,8 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Connection; @@ -55,6 +57,8 @@ import java.util.function.BiFunction; import java.util.stream.IntStream; import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; @@ -362,15 +366,29 @@ public class TestJdbcCommon { @Test public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException { - final BigDecimal bigDecimal = new BigDecimal(38D); + final BigDecimal bigDecimal = new BigDecimal(12345D); + // If db returns a precision, it should be used. + testConvertToAvroStreamForBigDecimal(bigDecimal, 38, 10, 38, 0); + } + + @Test + public void testConvertToAvroStreamForBigDecimalWithUndefinedPrecision() throws SQLException, IOException { + final int expectedScale = 3; + final int dbPrecision = 0; + final BigDecimal bigDecimal = new BigDecimal(new BigInteger("12345"), expectedScale, new MathContext(dbPrecision)); + // If db doesn't return a precision, default precision should be used. + testConvertToAvroStreamForBigDecimal(bigDecimal, dbPrecision, 24, 24, expectedScale); + } + + private void testConvertToAvroStreamForBigDecimal(BigDecimal bigDecimal, int dbPrecision, int defaultPrecision, int expectedPrecision, int expectedScale) throws SQLException, IOException { final ResultSetMetaData metadata = mock(ResultSetMetaData.class); when(metadata.getColumnCount()).thenReturn(1); when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); when(metadata.getColumnName(1)).thenReturn("The.Chairman"); when(metadata.getTableName(1)).thenReturn("1the::table"); - when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision()); - when(metadata.getScale(1)).thenReturn(bigDecimal.scale()); + when(metadata.getPrecision(1)).thenReturn(dbPrecision); + when(metadata.getScale(1)).thenReturn(expectedScale); final ResultSet rs = mock(ResultSet.class); when(rs.getMetaData()).thenReturn(metadata); @@ -388,7 +406,7 @@ public class TestJdbcCommon { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions - .builder().convertNames(true).useLogicalTypes(true).build(); + .builder().convertNames(true).useLogicalTypes(true).defaultPrecision(defaultPrecision).build(); JdbcCommon.convertToAvroStream(rs, baos, options, null); final byte[] serializedBytes = baos.toByteArray(); @@ -400,6 +418,16 @@ public class TestJdbcCommon { final DatumReader datumReader = new GenericDatumReader<>(null, null, genericData); try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + final Schema generatedUnion = dataFileReader.getSchema().getField("The_Chairman").schema(); + // null and decimal. + assertEquals(2, generatedUnion.getTypes().size()); + final LogicalType logicalType = generatedUnion.getTypes().get(1).getLogicalType(); + assertNotNull(logicalType); + assertEquals("decimal", logicalType.getName()); + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + assertEquals(expectedPrecision, decimalType.getPrecision()); + assertEquals(expectedScale, decimalType.getScale()); + GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(record);