NIFI-9185 Add Avro logical type to SelectHive3QL processor

Modifying unit test to avoid systemdefault timezone usuage

NIFI-9185 Applying review recommendations removing duplicate dependency from pom.xml

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5358
This commit is contained in:
Timea Barna 2021-08-31 13:44:34 +02:00 committed by Matthew Burgess
parent 37c0527a72
commit 8491486181
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 202 additions and 13 deletions

View File

@ -236,6 +236,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder()
.name("use-logical-types")
.displayName("Use Avro Logical Types")
.description("Whether to use Avro Logical Types for DECIMAL, DATE and TIMESTAMP columns. "
+ "If disabled, written as string. "
+ "If enabled, Logical types are used and written as its underlying type, specifically, "
+ "DECIMAL as logical 'decimal': written as bytes with additional precision and scale meta data, "
+ "DATE as logical 'date': written as int denoting days since Unix epoch (1970-01-01), "
+ "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. "
+ "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
@ -255,6 +270,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
_propertyDescriptors.add(MAX_FRAGMENTS);
_propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
_propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO);
_propertyDescriptors.add(USE_AVRO_LOGICAL_TYPES);
_propertyDescriptors.add(HIVEQL_CSV_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_DELIMITER);
@ -344,6 +360,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final boolean useLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
@ -411,7 +428,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
flowfile = session.write(flowfile, out -> {
try {
if (AVRO.equals(outputFormat)) {
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro));
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro, useLogicalTypes));
} else if (CSV.equals(outputFormat)) {
CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile);
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options));

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.util.hive;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import java.io.IOException;
@ -88,6 +90,10 @@ public class HiveJdbcCommon {
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final String CSV_MIME_TYPE = "text/csv";
private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
private static final Schema DATE_SCHEMA = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
private static final int DEFAULT_PRECISION = 10;
private static final int DEFAULT_SCALE = 0;
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("hive-normalize-avro")
@ -99,14 +105,15 @@ public class HiveJdbcCommon {
.required(true)
.build();
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null);
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames, final boolean useLogicalTypes) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null, useLogicalTypes);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback)
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames,
ResultSetRowCallback callback, final boolean useLogicalTypes)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName, convertNames);
final Schema schema = createSchema(rs, recordName, convertNames, useLogicalTypes);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -149,7 +156,16 @@ public class HiveJdbcCommon {
// org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
rec.put(i - 1, ((Byte) value).intValue());
} else if (value instanceof BigDecimal || value instanceof BigInteger) {
} else if (value instanceof BigDecimal) {
if (useLogicalTypes) {
final int precision = getPrecision(meta.getPrecision(i));
final int scale = getScale(meta.getScale(i));
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))));
} else {
rec.put(i - 1, value.toString());
}
} else if (value instanceof BigInteger) {
// Avro can't handle BigDecimal and BigInteger as numbers - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
rec.put(i - 1, value.toString());
@ -170,10 +186,14 @@ public class HiveJdbcCommon {
rec.put(i - 1, value);
} else if (value instanceof java.sql.SQLXML) {
rec.put(i - 1, ((java.sql.SQLXML) value).getString());
} else if (useLogicalTypes && javaSqlType == DATE) {
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, DATE_SCHEMA));
} else if (useLogicalTypes && javaSqlType == TIMESTAMP) {
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, TIMESTAMP_MILLIS_SCHEMA));
} else {
// The different types that we support are numbers (int, long, double, float),
// as well as boolean values and Strings. Since Avro doesn't provide
// timestamp types, we want to convert those to Strings. So we will cast anything other
// as well as boolean values, decimal, date, timestamp and Strings. Since Avro doesn't provide
// times type, we want to convert those to Strings. So we will cast anything other
// than numbers or booleans to strings by using the toString() method.
rec.put(i - 1, value.toString());
}
@ -190,7 +210,7 @@ public class HiveJdbcCommon {
}
public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException {
return createSchema(rs, null, false);
return createSchema(rs, null, false, false);
}
/**
@ -203,7 +223,7 @@ public class HiveJdbcCommon {
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
*/
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames, final boolean useLogicalTypes) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName;
@ -298,14 +318,32 @@ public class HiveJdbcCommon {
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
if (useLogicalTypes) {
final int precision = getPrecision(meta.getPrecision(i));
final int scale = getScale(meta.getScale(i));
builder.name(columnName).type().unionOf().nullBuilder().endNull().and()
.type(LogicalTypes.decimal(precision, scale).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault();
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
}
break;
// Did not find direct suitable type, need to be clarified!!!!
// Dates were introduced in Hive 0.12.0
case DATE:
if (useLogicalTypes) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(DATE_SCHEMA).endUnion().noDefault();
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
}
break;
// Did not find direct suitable type, need to be clarified!!!!
case TIME:
case TIMESTAMP:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
if (useLogicalTypes) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().type(TIMESTAMP_MILLIS_SCHEMA).endUnion().noDefault();
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
}
break;
case BINARY:
@ -461,4 +499,14 @@ public class HiveJdbcCommon {
}
return hiveConfig;
}
//If data in result set contains invalid precision value use Hive default precision.
private static int getPrecision(int precision) {
return precision > 1 ? precision : DEFAULT_PRECISION;
}
//If data in result set contains invalid scale value use Hive default scale.
private static int getScale(int scale) {
return scale > 0 ? scale : DEFAULT_SCALE;
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -44,11 +47,15 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -67,6 +74,11 @@ public class TestSelectHive3QL {
private static final Logger LOGGER;
private final static String MAX_ROWS_KEY = "maxRows";
private final int NUM_OF_ROWS = 100;
private static final int ID = 1;
private static final String NAME = "Joe Smith";
private static final String BIRTH_DATE = "1956-11-22";
private static final String BIG_NUMBER = "12345678.12";
private static final String CREATED_ON = "1962-09-23 03:23:34.234";
static {
@ -690,6 +702,111 @@ public class TestSelectHive3QL {
runner.clearTransferState();
}
@Test
public void testAvroRecordCreatedWithoutLogicalTypesByDefault() throws SQLException, IOException {
final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
.name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
.name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
.name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
.name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
.name("CREATED_ON").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
.endRecord();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
final Statement stmt = con.createStatement();
final InputStream in;
final MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
runner.setIncomingConnection(false);
runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
in = new ByteArrayInputStream(mff.toByteArray());
final GenericRecord record = getFirstRecordFromStream(in);
assertEquals(expectedSchema, record.getSchema());
assertEquals(ID, record.get("ID"));
assertEquals(NAME, record.get("NAME").toString());
assertEquals(BIRTH_DATE, record.get("BIRTH_DATE").toString());
assertEquals(BIG_NUMBER, record.get("BIG_NUMBER").toString());
assertEquals(CREATED_ON, record.get("CREATED_ON").toString());
runner.clearTransferState();
}
@Test
public void testAvroRecordCreatedWithLogicalTypesWhenSet() throws SQLException, IOException {
final Schema expectedSchema = SchemaBuilder.record("NiFi_SelectHiveQL_Record").namespace("any.data").fields()
.name("ID").type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault()
.name("NAME").type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault()
.name("BIRTH_DATE").type().unionOf().nullBuilder().endNull().and()
.type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))).endUnion().noDefault()
.name("BIG_NUMBER").type().unionOf().nullBuilder().endNull().and()
.type(LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES))).endUnion().noDefault()
.name("CREATED_ON").type().unionOf().nullBuilder().endNull().and()
.type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).endUnion().noDefault()
.endRecord();
final int expectedBirthDate = (int) LocalDate.parse(BIRTH_DATE).toEpochDay();
final BigDecimal decimal = new BigDecimal(BIG_NUMBER).setScale(2, BigDecimal.ROUND_HALF_EVEN);
final ByteBuffer expectedBigNumber = ByteBuffer.wrap(decimal.unscaledValue().toByteArray());
final Timestamp timestamp = Timestamp.valueOf(CREATED_ON);
final long expectedCreatedOn = timestamp.getTime();
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
final Statement stmt = con.createStatement();
final InputStream in;
final MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), birth_date date, big_number decimal(10,2),created_on timestamp)");
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, birth_date, big_number, created_on) VALUES (" +
ID + ", '" + NAME + "', '" + BIRTH_DATE + "', " + BIG_NUMBER + ", '" + CREATED_ON + "')");
runner.setIncomingConnection(false);
runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
runner.setProperty(SelectHive3QL.USE_AVRO_LOGICAL_TYPES, "true");
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
in = new ByteArrayInputStream(mff.toByteArray());
final GenericRecord record = getFirstRecordFromStream(in);
assertEquals(expectedSchema, record.getSchema());
assertEquals(ID, record.get("ID"));
assertEquals(NAME, record.get("NAME").toString());
assertEquals(expectedBirthDate, record.get("BIRTH_DATE"));
assertEquals(expectedBigNumber, record.get("BIG_NUMBER"));
assertEquals(expectedCreatedOn, record.get("CREATED_ON"));
runner.clearTransferState();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
@ -707,6 +824,13 @@ public class TestSelectHive3QL {
}
}
private GenericRecord getFirstRecordFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
return dataFileReader.next();
}
}
/**
* Simple implementation only for SelectHive3QL processor testing.
*/