NIFI-5491: Fixed PutHive3Streaming handling of Byte, Short, and Struct

This closes #2938.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matthew Burgess 2018-08-06 13:34:39 -04:00 committed by Mark Payne
parent d1ab17580f
commit 9ee2316ff6
3 changed files with 167 additions and 11 deletions

View File

@ -158,7 +158,7 @@ public class NiFiRecordSerDe extends AbstractSerDe {
stats.setRowCount(stats.getRowCount() + 1);
} catch (Exception e) {
log.warn("Error [{}] parsing Record [{}].", new Object[]{e.getLocalizedMessage(), t}, e);
log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), t}, e);
throw new SerDeException(e);
}
@ -166,14 +166,15 @@ public class NiFiRecordSerDe extends AbstractSerDe {
}
/**
* Utility method to extract current expected field from given JsonParser
* isTokenCurrent is a boolean variable also passed in, which determines
* if the JsonParser is already at the token we expect to read next, or
* needs advancing to the next before we read.
* Utility method to extract current expected field from given record.
*/
private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) {
@SuppressWarnings("unchecked")
private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) throws SerDeException {
Object val;
String fieldName = (field != null) ? field.getFieldName() : null;
if (field == null) {
return null;
}
String fieldName = field.getFieldName();
switch (fieldTypeInfo.getCategory()) {
case PRIMITIVE:
@ -182,9 +183,15 @@ public class NiFiRecordSerDe extends AbstractSerDe {
primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory();
}
switch (primitiveCategory) {
case INT:
case BYTE:
Integer bIntValue = record.getAsInt(fieldName);
val = bIntValue == null ? null : bIntValue.byteValue();
break;
case SHORT:
Integer sIntValue = record.getAsInt(fieldName);
val = sIntValue == null ? null : sIntValue.shortValue();
break;
case INT:
val = record.getAsInt(fieldName);
break;
case LONG:
@ -205,7 +212,11 @@ public class NiFiRecordSerDe extends AbstractSerDe {
val = record.getAsString(fieldName);
break;
case BINARY:
val = AvroTypeUtil.convertByteArray(record.getAsArray(fieldName)).array();
Object[] array = record.getAsArray(fieldName);
if (array == null) {
return null;
}
val = AvroTypeUtil.convertByteArray(array).array();
break;
case DATE:
val = record.getAsDate(fieldName, field.getDataType().getFormat());
@ -227,8 +238,32 @@ public class NiFiRecordSerDe extends AbstractSerDe {
val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
break;
case STRUCT:
val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
break;
// The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map.
// Do the conversion here, calling extractCurrentField recursively to traverse any nested structs.
Record nestedRecord = (Record) record.getValue(fieldName);
if (nestedRecord == null) {
return null;
}
try {
RecordSchema recordSchema = nestedRecord.getSchema();
List<RecordField> recordFields = recordSchema.getFields();
if (recordFields == null || recordFields.isEmpty()) {
return Collections.emptyList();
}
// This List will hold the values of the entries in the Map
List<Object> structList = new ArrayList<>(recordFields.size());
StructTypeInfo typeInfo = (StructTypeInfo) schema.getStructFieldTypeInfo(fieldName);
for (RecordField nestedRecordField : recordFields) {
String fName = nestedRecordField.getFieldName();
String normalizedFieldName = fName.toLowerCase();
structList.add(extractCurrentField(nestedRecord, nestedRecordField, typeInfo.getStructFieldTypeInfo(normalizedFieldName)));
}
return structList;
} catch (Exception e) {
log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), nestedRecord}, e);
throw new SerDeException(e);
}
// break unreachable
default:
log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + field.getDataType().toString());
return null;

View File

@ -57,7 +57,9 @@ import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
@ -82,6 +84,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.function.BiFunction;
import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@ -648,6 +652,84 @@ public class TestPutHive3Streaming {
assertEquals(1, runner.getQueueSize().getObjectCount());
}
@Test
public void testDataTypeConversions() throws Exception {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/datatype_test.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
processor.setFields(Arrays.asList(
new FieldSchema("uuid", serdeConstants.STRING_TYPE_NAME, "uuid"),
new FieldSchema("stringc", serdeConstants.STRING_TYPE_NAME, "stringc"),
new FieldSchema("intc", serdeConstants.INT_TYPE_NAME, "intc"),
new FieldSchema("tinyintc", serdeConstants.TINYINT_TYPE_NAME, "tinyintc"),
new FieldSchema("smallintc", serdeConstants.SMALLINT_TYPE_NAME, "smallintc"),
new FieldSchema("bigintc", serdeConstants.BIGINT_TYPE_NAME, "bigintc"),
new FieldSchema("booleanc", serdeConstants.BOOLEAN_TYPE_NAME, "booleanc"),
new FieldSchema("floatc", serdeConstants.FLOAT_TYPE_NAME, "floatc"),
new FieldSchema("doublec", serdeConstants.DOUBLE_TYPE_NAME, "doublec"),
new FieldSchema("listc", serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ">", "listc"),
new FieldSchema("structc", serdeConstants.STRUCT_TYPE_NAME
+ "<sint:" + serdeConstants.INT_TYPE_NAME + ","
+ "sboolean:" + serdeConstants.BOOLEAN_TYPE_NAME + ","
+ "sstring:" + serdeConstants.STRING_TYPE_NAME + ">", "structc"),
new FieldSchema("enumc", serdeConstants.STRING_TYPE_NAME, "enumc")));
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
}
List<String> enumc = Arrays.asList("SPADES", "HEARTS", "DIAMONDS", "CLUBS");
Random r = new Random();
for (int index = 0; index < 10; index++) {
final int i = index;
Record mapRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union
new HashMap<String, Object>() {
{
put("sint", i + 2); // {"name": "sint", "type": "int"},
if (i % 3 == 2) {
put("sboolean", null);
} else {
put("sboolean", i % 3 == 1); // {"name": "sboolean", "type": ["null","boolean"]},
}
put("sstring", "world"); // {"name": "sstring", "type": "string"}
}
});
readerFactory.addRecord(
UUID.randomUUID(), // {"name": "uuid", "type": "string"},
"hello", // {"name": "stringc", "type": "string"},
i, // {"name": "intc", "type": "int"},
i + 1, // {"name": "tinyintc", "type": ["null", "int"]},
i * 10, // {"name": "smallintc", "type": "int"},
i * Integer.MAX_VALUE, // {"name": "bigintc", "type": "long"},
i % 2 == 0, // {"name": "booleanc", "type": "boolean"},
i * 100.0f, // {"name": "floatc", "type": "floatc"},
i * 100.0, // {"name": "doublec", "type": "double"},
new String[]{"a", "b"}, // {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
mapRecord,
enumc.get(r.nextInt(4)) // {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
);
}
runner.addControllerService("mock-reader-factory", readerFactory);
runner.enableControllerService(readerFactory);
runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("10", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void cleanup() {
processor.cleanup();

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{"name": "uuid", "type": "string"},
{"name": "stringc", "type": "string"},
{"name": "intc", "type": "int"},
{"name": "tinyintc", "type": ["null", "int"]},
{"name": "smallintc", "type": "int"},
{"name": "bigintc", "type": "long"},
{"name": "booleanc", "type": "boolean"},
{"name": "floatc", "type": "float"},
{"name": "doublec", "type": "double"},
{"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
{"name": "structc", "type": ["null", {"name": "structcRecord", "type": "record", "fields": [
{"name": "sint", "type": "int"},
{"name": "sboolean", "type": ["null","boolean"]},
{"name": "sstring", "type": "string"}
]}]},
{"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
]
}