From 8491486181df6bd346b5953b1665d77113d72f5e Mon Sep 17 00:00:00 2001 From: Timea Barna Date: Tue, 31 Aug 2021 13:44:34 +0200 Subject: [PATCH] NIFI-9185 Add Avro logical type to SelectHive3QL processor Modifying unit test to avoid systemdefault timezone usuage NIFI-9185 Applying review recommendations removing duplicate dependency from pom.xml Signed-off-by: Matthew Burgess This closes #5358 --- .../nifi/processors/hive/SelectHive3QL.java | 19 ++- .../apache/nifi/util/hive/HiveJdbcCommon.java | 72 ++++++++-- .../processors/hive/TestSelectHive3QL.java | 124 ++++++++++++++++++ 3 files changed, 202 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java index f124c73637..af87bd1f9f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/SelectHive3QL.java @@ -236,6 +236,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); + public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder() + .name("use-logical-types") + .displayName("Use Avro Logical Types") + .description("Whether to use Avro Logical Types for DECIMAL, DATE and TIMESTAMP columns. " + + "If disabled, written as string. " + + "If enabled, Logical types are used and written as its underlying type, specifically, " + + "DECIMAL as logical 'decimal': written as bytes with additional precision and scale meta data, " + + "DATE as logical 'date': written as int denoting days since Unix epoch (1970-01-01), " + + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. " + + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + private final static List propertyDescriptors; private final static Set relationships; @@ -255,6 +270,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { _propertyDescriptors.add(MAX_FRAGMENTS); _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT); _propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO); + _propertyDescriptors.add(USE_AVRO_LOGICAL_TYPES); _propertyDescriptors.add(HIVEQL_CSV_HEADER); _propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER); _propertyDescriptors.add(HIVEQL_CSV_DELIMITER); @@ -344,6 +360,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue(); final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean(); final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); + final boolean useLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); final String fragmentIdentifier = UUID.randomUUID().toString(); try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); @@ -411,7 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor { flowfile = session.write(flowfile, out -> { try { if (AVRO.equals(outputFormat)) { - nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro)); + nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro, useLogicalTypes)); } else if (CSV.equals(outputFormat)) { CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile); nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options)); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java index 09eeccead8..2a704c07ec 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.util.hive; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.SchemaBuilder.FieldAssembler; @@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import java.io.IOException; @@ -88,6 +90,10 @@ public class HiveJdbcCommon { public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; public static final String CSV_MIME_TYPE = "text/csv"; + private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema DATE_SCHEMA = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + private static final int DEFAULT_PRECISION = 10; + private static final int DEFAULT_SCALE = 0; public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() .name("hive-normalize-avro") @@ -99,14 +105,15 @@ public class HiveJdbcCommon { .required(true) .build(); - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException { - return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null); + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames, final boolean useLogicalTypes) throws SQLException, IOException { + return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null, useLogicalTypes); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, + ResultSetRowCallback callback, final boolean useLogicalTypes) throws SQLException, IOException { - final Schema schema = createSchema(rs, recordName, convertNames); + final Schema schema = createSchema(rs, recordName, convertNames, useLogicalTypes); final GenericRecord rec = new GenericData.Record(schema); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -149,7 +156,16 @@ public class HiveJdbcCommon { // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte rec.put(i - 1, ((Byte) value).intValue()); - } else if (value instanceof BigDecimal || value instanceof BigInteger) { + } else if (value instanceof BigDecimal) { + if (useLogicalTypes) { + final int precision = getPrecision(meta.getPrecision(i)); + final int scale = getScale(meta.getScale(i)); + rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES)))); + } else { + rec.put(i - 1, value.toString()); + } + + } else if (value instanceof BigInteger) { // Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" rec.put(i - 1, value.toString()); @@ -170,10 +186,14 @@ public class HiveJdbcCommon { rec.put(i - 1, value); } else if (value instanceof java.sql.SQLXML) { rec.put(i - 1, ((java.sql.SQLXML) value).getString()); + } else if (useLogicalTypes && javaSqlType == DATE) { + rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, DATE_SCHEMA)); + } else if (useLogicalTypes && javaSqlType == TIMESTAMP) { + rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, TIMESTAMP_MILLIS_SCHEMA)); } else { // The different types that we support are numbers (int, long, double, float), - // as well as boolean values and Strings. Since Avro doesn't provide - // timestamp types, we want to convert those to Strings. So we will cast anything other + // as well as boolean values, decimal, date, timestamp and Strings. Since Avro doesn't provide + // times type, we want to convert those to Strings. So we will cast anything other // than numbers or booleans to strings by using the toString() method. rec.put(i - 1, value.toString()); } @@ -190,7 +210,7 @@ public class HiveJdbcCommon { } public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException { - return createSchema(rs, null, false); + return createSchema(rs, null, false, false); } /** @@ -203,7 +223,7 @@ public class HiveJdbcCommon { * @return A Schema object representing the result set converted to an Avro record * @throws SQLException if any error occurs during conversion */ - public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException { + public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames, final boolean useLogicalTypes) throws SQLException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName; @@ -298,14 +318,32 @@ public class HiveJdbcCommon { // Did not find direct suitable type, need to be clarified!!!! case DECIMAL: case NUMERIC: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + if (useLogicalTypes) { + final int precision = getPrecision(meta.getPrecision(i)); + final int scale = getScale(meta.getScale(i)); + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault(); + } else { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + } break; - // Did not find direct suitable type, need to be clarified!!!! + // Dates were introduced in Hive 0.12.0 case DATE: + if (useLogicalTypes) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(DATE_SCHEMA).endUnion().noDefault(); + } else { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + } + break; + // Did not find direct suitable type, need to be clarified!!!! case TIME: case TIMESTAMP: - builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + if (useLogicalTypes) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion().noDefault(); + } else { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + } break; case BINARY: @@ -461,4 +499,14 @@ public class HiveJdbcCommon { } return hiveConfig; } + + //If data in result set contains invalid precision value use Hive default precision. + private static int getPrecision(int precision) { + return precision > 1 ? precision : DEFAULT_PRECISION; + } + + //If data in result set contains invalid scale value use Hive default scale. + private static int getScale(int scale) { + return scale > 0 ? scale : DEFAULT_SCALE; + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java index 356106fcaa..d319742d22 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.hive; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -44,11 +47,15 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.sql.Types; +import java.time.LocalDate; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +74,11 @@ public class TestSelectHive3QL { private static final Logger LOGGER; private final static String MAX_ROWS_KEY = "maxRows"; private final int NUM_OF_ROWS = 100; + private static final int ID = 1; + private static final String NAME = "Joe Smith"; + private static final String BIRTH_DATE = "1956-11-22"; + private static final String BIG_NUMBER = "12345678.12"; + private static final String CREATED_ON = "1962-09-23 03:23:34.234"; static { @@ -690,6 +702,111 @@ public class TestSelectHive3QL { runner.clearTransferState(); } + @Test + public void testAvroRecordCreatedWithoutLogicalTypesByDefault() throws SQLException, IOException { + final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields() + .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault() + .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .endRecord(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + final InputStream in; + final MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" + + ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')"); + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + + final GenericRecord record = getFirstRecordFromStream(in); + + assertEquals(expectedSchema, record.getSchema()); + assertEquals(ID, record.get("ID")); + assertEquals(NAME, record.get("NAME").toString()); + assertEquals(BIRTH_DATE, record.get("BIRTH_DATE").toString()); + assertEquals(BIG_NUMBER, record.get("BIG_NUMBER").toString()); + assertEquals(CREATED_ON, record.get("CREATED_ON").toString()); + + runner.clearTransferState(); + } + + @Test + public void testAvroRecordCreatedWithLogicalTypesWhenSet() throws SQLException, IOException { + final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields() + .name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault() + .name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault() + .name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).endUnion().noDefault() + .name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault() + .name("CREATED_ON").type().unionOf().nullBuilder().endNull().and() + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).endUnion().noDefault() + .endRecord(); + + final int expectedBirthDate = (int) LocalDate.parse(BIRTH_DATE).toEpochDay(); + final BigDecimal decimal = new BigDecimal(BIG_NUMBER).setScale(2, BigDecimal.ROUND_HALF_EVEN); + final ByteBuffer expectedBigNumber = ByteBuffer.wrap(decimal.unscaledValue().toByteArray()); + final Timestamp timestamp = Timestamp.valueOf(CREATED_ON); + final long expectedCreatedOn = timestamp.getTime(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + final InputStream in; + final MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" + + ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')"); + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + runner.setProperty(SelectHive3QL.USE_AVRO_LOGICAL_TYPES, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0); + in = new ByteArrayInputStream(mff.toByteArray()); + + final GenericRecord record = getFirstRecordFromStream(in); + + assertEquals(expectedSchema, record.getSchema()); + assertEquals(ID, record.get("ID")); + assertEquals(NAME, record.get("NAME").toString()); + assertEquals(expectedBirthDate, record.get("BIRTH_DATE")); + assertEquals(expectedBigNumber, record.get("BIG_NUMBER")); + assertEquals(expectedCreatedOn, record.get("CREATED_ON")); + + + runner.clearTransferState(); + } + private long getNumberOfRecordsFromStream(InputStream in) throws IOException { final DatumReader datumReader = new GenericDatumReader<>(); try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { @@ -707,6 +824,13 @@ public class TestSelectHive3QL { } } + private GenericRecord getFirstRecordFromStream(InputStream in) throws IOException { + final DatumReader datumReader = new GenericDatumReader<>(); + try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { + return dataFileReader.next(); + } + } + /** * Simple implementation only for SelectHive3QL processor testing. */