diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 8ded9bcbd9..0682b34fae 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -17,7 +17,9 @@ package org.apache.nifi.avro; +import org.apache.avro.Conversions; import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -39,6 +41,8 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import java.io.IOException; +import java.math.BigDecimal; +import java.math.MathContext; import java.nio.ByteBuffer; import java.time.Duration; import java.time.temporal.ChronoUnit; @@ -61,6 +65,7 @@ public class AvroTypeUtil { private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; + private static final String LOGICAL_TYPE_DECIMAL = "decimal"; public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException { if (recordSchema == null) { @@ -107,6 +112,10 @@ public class AvroTypeUtil { case LOGICAL_TYPE_TIMESTAMP_MILLIS: case LOGICAL_TYPE_TIMESTAMP_MICROS: return RecordFieldType.TIMESTAMP.getDataType(); + case LOGICAL_TYPE_DECIMAL: + // We convert Decimal to Double. + // Alternatively we could convert it to String, but numeric type is generally more preferable by users. + return RecordFieldType.DOUBLE.getDataType(); } } @@ -262,6 +271,10 @@ public class AvroTypeUtil { * Convert a raw value to an Avro object to serialize in Avro type system. * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema)}. */ + public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) { + return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName()); + } + private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName) { if (rawValue == null) { return null; @@ -311,6 +324,19 @@ public class AvroTypeUtil { } case BYTES: case FIXED: + final LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) { + final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + final BigDecimal decimal; + if (rawValue instanceof BigDecimal) { + decimal = (BigDecimal) rawValue; + } else if (rawValue instanceof Double) { + decimal = new BigDecimal((Double) rawValue, new MathContext(decimalType.getPrecision())); + } else { + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal"); + } + return new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType); + } if (rawValue instanceof byte[]) { return ByteBuffer.wrap((byte[]) rawValue); } @@ -529,6 +555,10 @@ public class AvroTypeUtil { return new MapRecord(childSchema, values); case BYTES: final ByteBuffer bb = (ByteBuffer) value; + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) { + return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType); + } return AvroTypeUtil.convertByteArray(bb.array()); case FIXED: final GenericFixed fixed = (GenericFixed) value; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 1372735cd7..a86c836fa0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -174,8 +174,8 @@ jackson-databind - org.apache.avro - avro + org.apache.nifi + nifi-avro-record-utils org.apache.nifi diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index 1d943f4c19..e11a888fe5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -143,16 +143,6 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .expressionLanguageSupported(true) .build(); - public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() - .name("dbf-normalize") - .displayName("Normalize Table/Column Names") - .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " - + "will be changed to underscores in order to build a valid Avro record.") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - protected List propDescriptors; // The delimiter to use when referencing qualified names (such as table@!@column in the state map) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 14283c1204..3ef91074fe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -42,6 +42,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -54,6 +55,9 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; +import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; +import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; + @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database"}) @@ -107,16 +111,6 @@ public class ExecuteSQL extends AbstractProcessor { .sensitive(false) .build(); - public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() - .name("dbf-normalize") - .displayName("Normalize Table/Column Names") - .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " - + "will be changed to underscores in order to build a valid Avro record.") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - private final List propDescriptors; public ExecuteSQL() { @@ -130,6 +124,7 @@ public class ExecuteSQL extends AbstractProcessor { pds.add(SQL_SELECT_QUERY); pds.add(QUERY_TIMEOUT); pds.add(NORMALIZE_NAMES_FOR_AVRO); + pds.add(USE_AVRO_LOGICAL_TYPES); propDescriptors = Collections.unmodifiableList(pds); } @@ -172,6 +167,7 @@ public class ExecuteSQL extends AbstractProcessor { final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); + final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); final StopWatch stopWatch = new StopWatch(true); final String selectQuery; if (context.getProperty(SQL_SELECT_QUERY).isSet()) { @@ -202,7 +198,10 @@ public class ExecuteSQL extends AbstractProcessor { try { logger.debug("Executing query {}", new Object[]{selectQuery}); final ResultSet resultSet = st.executeQuery(selectQuery); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro)); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes).build(); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); } catch (final SQLException e) { throw new ProcessException(e); } @@ -211,6 +210,7 @@ public class ExecuteSQL extends AbstractProcessor { // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index e25273e036..dd3ac7b2b0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -35,6 +35,7 @@ import org.apache.nifi.components.state.StateMap; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -67,6 +68,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; +import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; +import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; + @EventDriven @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -155,6 +159,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(MAX_FRAGMENTS); pds.add(NORMALIZE_NAMES_FOR_AVRO); + pds.add(USE_AVRO_LOGICAL_TYPES); propDescriptors = Collections.unmodifiableList(pds); } @@ -202,7 +207,12 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() : 0; - final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .recordName(tableName) + .maxRows(maxRowsPerFlowFile) + .convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean()) + .useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean()) + .build(); final Map maxValueProperties = getDefaultMaxValueProperties(context.getProperties()); @@ -284,7 +294,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // Max values will be updated in the state property map by the callback final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter); try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, tableName, maxValCollector, maxRowsPerFlowFile, convertNamesForAvro)); + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, maxValCollector)); } catch (SQLException | RuntimeException e) { throw new ProcessException("Error during database query or conversion of records to Avro.", e); } @@ -299,6 +309,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); if(maxRowsPerFlowFile > 0) { fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 40709d0e63..8771fe9be5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -55,22 +55,59 @@ import java.sql.Clob; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.Date; +import java.util.function.Function; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.BaseTypeBuilder; import org.apache.avro.SchemaBuilder.FieldAssembler; +import org.apache.avro.SchemaBuilder.NullDefault; +import org.apache.avro.SchemaBuilder.UnionAccumulator; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.PropertyDescriptor; /** * JDBC / SQL common functions. */ public class JdbcCommon { + public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; + + public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() + .name("dbf-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " + + "will be changed to underscores in order to build a valid Avro record.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder() + .name("dbf-user-logical-types") + .displayName("Use Avro Logical Types") + .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. " + + "If disabled, written as string. " + + "If enabled, Logical types are used and written as its underlying type, specifically, " + + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, " + + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), " + + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, " + + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. " + + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + private static final int MAX_DIGITS_IN_BIGINT = 19; private static final int MAX_DIGITS_IN_INT = 9; @@ -90,7 +127,69 @@ public class JdbcCommon { public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback, final int maxRows, boolean convertNames) throws SQLException, IOException { - final Schema schema = createSchema(rs, recordName, convertNames); + final AvroConversionOptions options = AvroConversionOptions.builder() + .recordName(recordName) + .maxRows(maxRows) + .convertNames(convertNames) + .useLogicalTypes(false).build(); + return convertToAvroStream(rs, outStream, options, callback); + } + + public static class AvroConversionOptions { + private final String recordName; + private final int maxRows; + private final boolean convertNames; + private final boolean useLogicalTypes; + + private AvroConversionOptions(String recordName, int maxRows, boolean convertNames, boolean useLogicalTypes) { + this.recordName = recordName; + this.maxRows = maxRows; + this.convertNames = convertNames; + this.useLogicalTypes = useLogicalTypes; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String recordName; + private int maxRows = 0; + private boolean convertNames = false; + private boolean useLogicalTypes = false; + + /** + * Specify a priori record name to use if it cannot be determined from the result set. + */ + public Builder recordName(String recordName) { + this.recordName = recordName; + return this; + } + + public Builder maxRows(int maxRows) { + this.maxRows = maxRows; + return this; + } + + public Builder convertNames(boolean convertNames) { + this.convertNames = convertNames; + return this; + } + + public Builder useLogicalTypes(boolean useLogicalTypes) { + this.useLogicalTypes = useLogicalTypes; + return this; + } + + public AvroConversionOptions build() { + return new AvroConversionOptions(recordName, maxRows, convertNames, useLogicalTypes); + } + } + } + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final AvroConversionOptions options, final ResultSetRowCallback callback) + throws SQLException, IOException { + final Schema schema = createSchema(rs, options); final GenericRecord rec = new GenericData.Record(schema); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -106,6 +205,7 @@ public class JdbcCommon { } for (int i = 1; i <= nrOfColumns; i++) { final int javaSqlType = meta.getColumnType(i); + final Schema fieldSchema = schema.getFields().get(i - 1).schema(); // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement if (javaSqlType == CLOB) { @@ -171,8 +271,13 @@ public class JdbcCommon { //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand. rec.put(i - 1, ((Short) value).intValue()); } else if (value instanceof BigDecimal) { - // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" - rec.put(i - 1, value.toString()); + if (options.useLogicalTypes) { + // Delegate mapping to AvroTypeUtil in order to utilize logical types. + rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema)); + } else { + // As string for backward compatibility. + rec.put(i - 1, value.toString()); + } } else if (value instanceof BigInteger) { // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that. @@ -208,6 +313,15 @@ public class JdbcCommon { rec.put(i - 1, value); } + } else if (value instanceof Date) { + if (options.useLogicalTypes) { + // Delegate mapping to AvroTypeUtil in order to utilize logical types. + rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema)); + } else { + // As string for backward compatibility. + rec.put(i - 1, value.toString()); + } + } else { // The different types that we support are numbers (int, long, double, float), // as well as boolean values and Strings. Since Avro doesn't provide @@ -219,7 +333,7 @@ public class JdbcCommon { dataFileWriter.append(rec); nrOfRows += 1; - if (maxRows > 0 && nrOfRows == maxRows) + if (options.maxRows > 0 && nrOfRows == options.maxRows) break; } @@ -231,19 +345,33 @@ public class JdbcCommon { return createSchema(rs, null, false); } + public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException { + final AvroConversionOptions options = AvroConversionOptions.builder().recordName(recordName).convertNames(convertNames).build(); + return createSchema(rs, options); + } + + private static void addNullableField( + FieldAssembler builder, + String columnName, + Function>>, UnionAccumulator>> func + ) { + final BaseTypeBuilder>> and = builder.name(columnName).type().unionOf().nullBuilder().endNull().and(); + func.apply(and).endUnion().noDefault(); + } + /** * Creates an Avro schema from a result set. If the table/record name is known a priori and provided, use that as a * fallback for the record name if it cannot be retrieved from the result set, and finally fall back to a default value. * - * @param rs The result set to convert to Avro - * @param recordName The a priori record name to use if it cannot be determined from the result set. + * @param rs The result set to convert to Avro + * @param options Specify various options * @return A Schema object representing the result set converted to an Avro record * @throws SQLException if any error occurs during conversion */ - public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException { + public static Schema createSchema(final ResultSet rs, AvroConversionOptions options) throws SQLException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); - String tableName = StringUtils.isEmpty(recordName) ? "NiFi_ExecuteSQL_Record" : recordName; + String tableName = StringUtils.isEmpty(options.recordName) ? "NiFi_ExecuteSQL_Record" : options.recordName; if (nrOfColumns > 0) { String tableNameFromMeta = meta.getTableName(1); if (!StringUtils.isBlank(tableNameFromMeta)) { @@ -251,7 +379,7 @@ public class JdbcCommon { } } - if (convertNames) { + if (options.convertNames) { tableName = normalizeNameForAvro(tableName); } @@ -267,7 +395,7 @@ public class JdbcCommon { * check for alias. Postgres is the one that has the null column names for calculated fields. */ String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) :meta.getColumnName(i); - String columnName = convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel; + String columnName = options.convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel; switch (meta.getColumnType(i)) { case CHAR: case LONGNVARCHAR: @@ -324,17 +452,39 @@ public class JdbcCommon { builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); break; - // Did not find direct suitable type, need to be clarified!!!! + // Since Avro 1.8, LogicalType is supported. case DECIMAL: case NUMERIC: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + + final LogicalTypes.Decimal decimal = options.useLogicalTypes + ? LogicalTypes.decimal(meta.getPrecision(i), meta.getScale(i)) : null; + addNullableField(builder, columnName, + u -> options.useLogicalTypes + ? u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())) + : u.stringType()); + break; - // Did not find direct suitable type, need to be clarified!!!! case DATE: + + addNullableField(builder, columnName, + u -> options.useLogicalTypes + ? u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())) + : u.stringType()); + break; + case TIME: + addNullableField(builder, columnName, + u -> options.useLogicalTypes + ? u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType())) + : u.stringType()); + break; + case TIMESTAMP: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + addNullableField(builder, columnName, + u -> options.useLogicalTypes + ? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType())) + : u.stringType()); break; case BINARY: @@ -371,4 +521,5 @@ public class JdbcCommon { public interface ResultSetRowCallback { void processRow(ResultSet resultSet) throws IOException; } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java index f8ce1f3c01..37660d580a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -39,17 +40,28 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashSet; import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.stream.IntStream; +import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.avro.util.Utf8; import org.apache.commons.io.input.ReaderInputStream; import org.junit.Assert; import org.junit.BeforeClass; @@ -350,11 +362,15 @@ public class TestJdbcCommon { @Test public void testConvertToAvroStreamForBigDecimal() throws SQLException, IOException { + final BigDecimal bigDecimal = new BigDecimal(38D); + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); when(metadata.getColumnCount()).thenReturn(1); when(metadata.getColumnType(1)).thenReturn(Types.NUMERIC); when(metadata.getColumnName(1)).thenReturn("The.Chairman"); when(metadata.getTableName(1)).thenReturn("1the::table"); + when(metadata.getPrecision(1)).thenReturn(bigDecimal.precision()); + when(metadata.getScale(1)).thenReturn(bigDecimal.scale()); final ResultSet rs = mock(ResultSet.class); when(rs.getMetaData()).thenReturn(metadata); @@ -367,24 +383,28 @@ public class TestJdbcCommon { } }).when(rs).next(); - final BigDecimal bigDecimal = new BigDecimal(38D); when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JdbcCommon.convertToAvroStream(rs, baos, true); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions + .builder().convertNames(true).useLogicalTypes(true).build(); + JdbcCommon.convertToAvroStream(rs, baos, options, null); final byte[] serializedBytes = baos.toByteArray(); final InputStream instream = new ByteArrayInputStream(serializedBytes); - final DatumReader datumReader = new GenericDatumReader<>(); + final GenericData genericData = new GenericData(); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + final DatumReader datumReader = new GenericDatumReader<>(null, null, genericData); try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(record); assertEquals("_1the__table", record.getSchema().getName()); - assertEquals(bigDecimal.toString(), record.get("The_Chairman").toString()); + assertEquals(bigDecimal, record.get("The_Chairman")); } } } @@ -526,6 +546,100 @@ public class TestJdbcCommon { } } + @Test + public void testConvertToAvroStreamForDateTimeAsString() throws SQLException, IOException, ParseException { + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions + .builder().convertNames(true).useLogicalTypes(false).build(); + + testConvertToAvroStreamForDateTime(options, + (record, date) -> assertEquals(new Utf8(date.toString()), record.get("date")), + (record, time) -> assertEquals(new Utf8(time.toString()), record.get("time")), + (record, timestamp) -> assertEquals(new Utf8(timestamp.toString()), record.get("timestamp")) + ); + } + + @Test + public void testConvertToAvroStreamForDateTimeAsLogicalType() throws SQLException, IOException, ParseException { + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions + .builder().convertNames(true).useLogicalTypes(true).build(); + + testConvertToAvroStreamForDateTime(options, + (record, date) -> { + final int daysSinceEpoch = (int) record.get("date"); + final long millisSinceEpoch = TimeUnit.MILLISECONDS.convert(daysSinceEpoch, TimeUnit.DAYS); + assertEquals(date, new java.sql.Date(millisSinceEpoch)); + }, + (record, time) -> assertEquals(time, new Time((int) record.get("time"))), + (record, timestamp) -> assertEquals(timestamp, new Timestamp((long) record.get("timestamp"))) + ); + } + + private void testConvertToAvroStreamForDateTime( + JdbcCommon.AvroConversionOptions options, BiConsumer assertDate, + BiConsumer assertTime, BiConsumer assertTimeStamp) + throws SQLException, IOException, ParseException { + + final ResultSetMetaData metadata = mock(ResultSetMetaData.class); + + final ResultSet rs = mock(ResultSet.class); + when(rs.getMetaData()).thenReturn(metadata); + + BiFunction toMillis = (format, dateStr) -> { + try { + final SimpleDateFormat dateFormat = new SimpleDateFormat(format); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat.parse(dateStr).getTime(); + } catch (ParseException e) { + throw new RuntimeException(e); + } + }; + + when(metadata.getColumnCount()).thenReturn(3); + when(metadata.getTableName(anyInt())).thenReturn("table"); + + when(metadata.getColumnType(1)).thenReturn(Types.DATE); + when(metadata.getColumnName(1)).thenReturn("date"); + final java.sql.Date date = new java.sql.Date(toMillis.apply("yyyy/MM/dd", "2017/05/10")); + when(rs.getObject(1)).thenReturn(date); + + when(metadata.getColumnType(2)).thenReturn(Types.TIME); + when(metadata.getColumnName(2)).thenReturn("time"); + final Time time = new Time(toMillis.apply("HH:mm:ss.SSS", "12:34:56.789")); + when(rs.getObject(2)).thenReturn(time); + + when(metadata.getColumnType(3)).thenReturn(Types.TIMESTAMP); + when(metadata.getColumnName(3)).thenReturn("timestamp"); + final Timestamp timestamp = new Timestamp(toMillis.apply("yyyy/MM/dd HH:mm:ss.SSS", "2017/05/11 19:59:39.123")); + when(rs.getObject(3)).thenReturn(timestamp); + + final AtomicInteger counter = new AtomicInteger(1); + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return counter.getAndDecrement() > 0; + } + }).when(rs).next(); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + JdbcCommon.convertToAvroStream(rs, baos, options, null); + + final byte[] serializedBytes = baos.toByteArray(); + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader<>(); + try (final DataFileStream dataFileReader = new DataFileStream<>(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + assertDate.accept(record, date); + assertTime.accept(record, time); + assertTimeStamp.accept(record, timestamp); + } + } + } + // many test use Derby as database, so ensure driver is available @Test public void testDriverLoad() throws ClassNotFoundException { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java index bbb62c5b03..6782d33a51 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestAvroReaderWithEmbeddedSchema.java @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.text.DateFormat; @@ -82,6 +83,7 @@ public class TestAvroReaderWithEmbeddedSchema { final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60); final long millisSinceMidnight = secondsSinceMidnight * 1000L; + final BigDecimal bigDecimal = new BigDecimal("123.45"); final byte[] serialized; final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -94,6 +96,7 @@ public class TestAvroReaderWithEmbeddedSchema { record.put("timestampMillis", timeLong); record.put("timestampMicros", timeLong * 1000L); record.put("date", 17260); + record.put("decimal", ByteBuffer.wrap(bigDecimal.unscaledValue().toByteArray())); writer.append(record); writer.flush(); @@ -110,6 +113,7 @@ public class TestAvroReaderWithEmbeddedSchema { assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMillis").get().getFieldType()); assertEquals(RecordFieldType.TIMESTAMP, recordSchema.getDataType("timestampMicros").get().getFieldType()); assertEquals(RecordFieldType.DATE, recordSchema.getDataType("date").get().getFieldType()); + assertEquals(RecordFieldType.DOUBLE, recordSchema.getDataType("decimal").get().getFieldType()); final Record record = reader.nextRecord(); assertEquals(new java.sql.Time(millisSinceMidnight), record.getValue("timeMillis")); @@ -119,6 +123,7 @@ public class TestAvroReaderWithEmbeddedSchema { final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd"); noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt")); assertEquals(noTimeOfDayDateFormat.format(new java.sql.Date(timeLong)), noTimeOfDayDateFormat.format(record.getValue("date"))); + assertEquals(bigDecimal.doubleValue(), record.getValue("decimal")); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index dc5d943071..c9587f3b25 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Array; import org.apache.avro.generic.GenericRecord; @@ -74,9 +76,11 @@ public abstract class TestWriteAvroResult { fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType())); fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType())); fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + // Avro decimal is represented as double in NiFi type system. + fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType())); final RecordSchema recordSchema = new SimpleRecordSchema(fields); - final String expectedTime = "2017-04-04 14:20:33.000"; + final String expectedTime = "2017-04-04 14:20:33.789"; final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); df.setTimeZone(TimeZone.getTimeZone("gmt")); final long timeLong = df.parse(expectedTime).getTime(); @@ -87,6 +91,9 @@ public abstract class TestWriteAvroResult { values.put("timestampMillis", new Timestamp(timeLong)); values.put("timestampMicros", new Timestamp(timeLong)); values.put("date", new Date(timeLong)); + // Avro decimal is represented as double in NiFi type system. + final BigDecimal expectedDecimal = new BigDecimal("123.45"); + values.put("decimal", expectedDecimal.doubleValue()); final Record record = new MapRecord(recordSchema, values); final byte[] data; @@ -98,17 +105,20 @@ public abstract class TestWriteAvroResult { try (final InputStream in = new ByteArrayInputStream(data)) { final GenericRecord avroRecord = readRecord(in, schema); final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60); - final long millisSinceMidnight = secondsSinceMidnight * 1000L; + final long millisSinceMidnight = (secondsSinceMidnight * 1000L) + 789; assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis")); assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros")); assertEquals(timeLong, avroRecord.get("timestampMillis")); assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros")); assertEquals(17260, avroRecord.get("date")); + // Double value will be converted into logical decimal if Avro schema is defined as logical decimal. + final Schema decimalSchema = schema.getField("decimal").schema(); + final BigDecimal decimal = new Conversions.DecimalConversion().fromBytes((ByteBuffer) avroRecord.get("decimal"), decimalSchema, decimalSchema.getLogicalType()); + assertEquals(expectedDecimal, decimal); } } - @Test public void testDataTypes() throws IOException { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc")); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc index d8315b2007..ef3335c1c4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc @@ -29,6 +29,13 @@ "type" : "int", "logicalType" : "date" } + }, { + "name" : "decimal", "type": { + "type" : "bytes", + "logicalType" : "decimal", + "precision" : 5, + "scale" : 2 + } } ] } \ No newline at end of file