NIFI-5891 fix handling of null logical types in Hive3Streaming processor

NIFI-5891: Fixed Checkstyle issues
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3216
This commit is contained in:
gkkorir 2018-12-13 17:25:37 +03:00 committed by Matthew Burgess
parent 9a1ab4c504
commit c51512f5e3
2 changed files with 159 additions and 9 deletions

View File

@ -223,27 +223,37 @@ public class NiFiRecordSerDe extends AbstractSerDe {
break;
case DATE:
Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
hiveDate.setTimeInMillis(d.getTime());
val = hiveDate;
if(d != null) {
org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
hiveDate.setTimeInMillis(d.getTime());
val = hiveDate;
} else {
val = null;
}
break;
// ORC doesn't currently handle TIMESTAMPLOCALTZ
case TIMESTAMP:
Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
// Convert to Hive's Timestamp type
org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
val = hivetimestamp;
if(ts != null) {
// Convert to Hive's Timestamp type
org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
val = hivetimestamp;
} else {
val = null;
}
break;
case DECIMAL:
val = HiveDecimal.create(record.getAsDouble(fieldName));
Double value = record.getAsDouble(fieldName);
val = value == null ? null : HiveDecimal.create(value);
break;
default:
throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
}
break;
case LIST:
val = Arrays.asList(record.getAsArray(fieldName));
Object[] value = record.getAsArray(fieldName);
val = value == null ? null : Arrays.asList(value);
break;
case MAP:
val = record.getValue(fieldName);

View File

@ -56,6 +56,7 @@ import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@ -752,6 +753,145 @@ public class TestPutHive3Streaming {
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
//logical types
@Test
public void testNullDateHandling() throws IOException, MalformedRecordException, InitializationException {
String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"int\", \"logicalType\":\"date\" } ] } ] }";
schema = new Schema.Parser().parse(schemaText);
processor.setFields(Arrays.asList(
new FieldSchema("dob", serdeConstants.DATE_TYPE_NAME, "null dob")
));
//setup runner
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
}
readerFactory.addRecord(new Object[] { null });
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "dobs");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.dobs", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void testNullTimestampHandling() throws IOException, MalformedRecordException, InitializationException {
String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"dob\", \"type\": [ \"null\", { \"type\":\"long\", \"logicalType\":\"timestamp-millis\" } ] } ] }";
schema = new Schema.Parser().parse(schemaText);
processor.setFields(Arrays.asList(
new FieldSchema("dob", serdeConstants.TIMESTAMP_TYPE_NAME, "null dob")
));
//setup runner
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
}
readerFactory.addRecord(new Object[] { null });
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "ts");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.ts", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void testNullDecimalHandling() throws IOException, MalformedRecordException, InitializationException {
String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"amount\", \"type\": [ \"null\", { \"type\":\"bytes\", "
+ "\"logicalType\":\"decimal\", \"precision\":18, \"scale\":2 } ] } ] }";
schema = new Schema.Parser().parse(schemaText);
processor.setFields(Arrays.asList(
new FieldSchema("amount", serdeConstants.DECIMAL_TYPE_NAME, "null amount")
));
//setup runner
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
}
readerFactory.addRecord(new Object[] { null });
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "transactions");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.transactions", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void testNullArrayHandling() throws IOException, MalformedRecordException, InitializationException {
String schemaText = "{ \"name\":\"test\", \"type\":\"record\", \"fields\":[ { \"name\":\"groups\", \"type\": [ \"null\", { \"type\":\"array\", \"items\":\"string\" } ] } ] }";
schema = new Schema.Parser().parse(schemaText);
processor.setFields(Arrays.asList(
new FieldSchema("groups", "array<string>", "null groups")
));
//setup runner
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
}
readerFactory.addRecord(new Object[] { null });
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.groups", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void cleanup() {
processor.cleanup();