diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java index d4b444a5c5..0f150967b9 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java @@ -158,7 +158,7 @@ public class NiFiRecordSerDe extends AbstractSerDe { stats.setRowCount(stats.getRowCount() + 1); } catch (Exception e) { - log.warn("Error [{}] parsing Record [{}].", new Object[]{e.getLocalizedMessage(), t}, e); + log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), t}, e); throw new SerDeException(e); } @@ -166,14 +166,15 @@ public class NiFiRecordSerDe extends AbstractSerDe { } /** - * Utility method to extract current expected field from given JsonParser - * isTokenCurrent is a boolean variable also passed in, which determines - * if the JsonParser is already at the token we expect to read next, or - * needs advancing to the next before we read. + * Utility method to extract current expected field from given record. */ - private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) { + @SuppressWarnings("unchecked") + private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) throws SerDeException { Object val; - String fieldName = (field != null) ? field.getFieldName() : null; + if (field == null) { + return null; + } + String fieldName = field.getFieldName(); switch (fieldTypeInfo.getCategory()) { case PRIMITIVE: @@ -182,9 +183,15 @@ public class NiFiRecordSerDe extends AbstractSerDe { primitiveCategory = ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory(); } switch (primitiveCategory) { - case INT: case BYTE: + Integer bIntValue = record.getAsInt(fieldName); + val = bIntValue == null ? null : bIntValue.byteValue(); + break; case SHORT: + Integer sIntValue = record.getAsInt(fieldName); + val = sIntValue == null ? null : sIntValue.shortValue(); + break; + case INT: val = record.getAsInt(fieldName); break; case LONG: @@ -205,7 +212,11 @@ public class NiFiRecordSerDe extends AbstractSerDe { val = record.getAsString(fieldName); break; case BINARY: - val = AvroTypeUtil.convertByteArray(record.getAsArray(fieldName)).array(); + Object[] array = record.getAsArray(fieldName); + if (array == null) { + return null; + } + val = AvroTypeUtil.convertByteArray(array).array(); break; case DATE: val = record.getAsDate(fieldName, field.getDataType().getFormat()); @@ -227,8 +238,32 @@ public class NiFiRecordSerDe extends AbstractSerDe { val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType()); break; case STRUCT: - val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType()); - break; + // The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map. + // Do the conversion here, calling extractCurrentField recursively to traverse any nested structs. + Record nestedRecord = (Record) record.getValue(fieldName); + if (nestedRecord == null) { + return null; + } + try { + RecordSchema recordSchema = nestedRecord.getSchema(); + List recordFields = recordSchema.getFields(); + if (recordFields == null || recordFields.isEmpty()) { + return Collections.emptyList(); + } + // This List will hold the values of the entries in the Map + List structList = new ArrayList<>(recordFields.size()); + StructTypeInfo typeInfo = (StructTypeInfo) schema.getStructFieldTypeInfo(fieldName); + for (RecordField nestedRecordField : recordFields) { + String fName = nestedRecordField.getFieldName(); + String normalizedFieldName = fName.toLowerCase(); + structList.add(extractCurrentField(nestedRecord, nestedRecordField, typeInfo.getStructFieldTypeInfo(normalizedFieldName))); + } + return structList; + } catch (Exception e) { + log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), nestedRecord}, e); + throw new SerDeException(e); + } + // break unreachable default: log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + field.getDataType().toString()); return null; diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index cfc6017521..da463e652e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -57,7 +57,9 @@ import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; @@ -82,6 +84,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.function.BiFunction; import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; @@ -648,6 +652,84 @@ public class TestPutHive3Streaming { assertEquals(1, runner.getQueueSize().getObjectCount()); } + @Test + public void testDataTypeConversions() throws Exception { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/datatype_test.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + processor.setFields(Arrays.asList( + new FieldSchema("uuid", serdeConstants.STRING_TYPE_NAME, "uuid"), + new FieldSchema("stringc", serdeConstants.STRING_TYPE_NAME, "stringc"), + new FieldSchema("intc", serdeConstants.INT_TYPE_NAME, "intc"), + new FieldSchema("tinyintc", serdeConstants.TINYINT_TYPE_NAME, "tinyintc"), + new FieldSchema("smallintc", serdeConstants.SMALLINT_TYPE_NAME, "smallintc"), + new FieldSchema("bigintc", serdeConstants.BIGINT_TYPE_NAME, "bigintc"), + new FieldSchema("booleanc", serdeConstants.BOOLEAN_TYPE_NAME, "booleanc"), + new FieldSchema("floatc", serdeConstants.FLOAT_TYPE_NAME, "floatc"), + new FieldSchema("doublec", serdeConstants.DOUBLE_TYPE_NAME, "doublec"), + new FieldSchema("listc", serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ">", "listc"), + new FieldSchema("structc", serdeConstants.STRUCT_TYPE_NAME + + "", "structc"), + new FieldSchema("enumc", serdeConstants.STRING_TYPE_NAME, "enumc"))); + + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + List enumc = Arrays.asList("SPADES", "HEARTS", "DIAMONDS", "CLUBS"); + Random r = new Random(); + for (int index = 0; index < 10; index++) { + final int i = index; + Record mapRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union + new HashMap() { + { + put("sint", i + 2); // {"name": "sint", "type": "int"}, + if (i % 3 == 2) { + put("sboolean", null); + } else { + put("sboolean", i % 3 == 1); // {"name": "sboolean", "type": ["null","boolean"]}, + } + put("sstring", "world"); // {"name": "sstring", "type": "string"} + } + }); + readerFactory.addRecord( + UUID.randomUUID(), // {"name": "uuid", "type": "string"}, + "hello", // {"name": "stringc", "type": "string"}, + i, // {"name": "intc", "type": "int"}, + i + 1, // {"name": "tinyintc", "type": ["null", "int"]}, + i * 10, // {"name": "smallintc", "type": "int"}, + i * Integer.MAX_VALUE, // {"name": "bigintc", "type": "long"}, + i % 2 == 0, // {"name": "booleanc", "type": "boolean"}, + i * 100.0f, // {"name": "floatc", "type": "floatc"}, + i * 100.0, // {"name": "doublec", "type": "double"}, + new String[]{"a", "b"}, // {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]}, + mapRecord, + enumc.get(r.nextInt(4)) // {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}} + ); + } + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("10", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + @Test public void cleanup() { processor.cleanup(); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc new file mode 100644 index 0000000000..a232ad0bcc --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/datatype_test.avsc @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + { + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ + {"name": "uuid", "type": "string"}, + {"name": "stringc", "type": "string"}, + {"name": "intc", "type": "int"}, + {"name": "tinyintc", "type": ["null", "int"]}, + {"name": "smallintc", "type": "int"}, + {"name": "bigintc", "type": "long"}, + {"name": "booleanc", "type": "boolean"}, + {"name": "floatc", "type": "float"}, + {"name": "doublec", "type": "double"}, + {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]}, + {"name": "structc", "type": ["null", {"name": "structcRecord", "type": "record", "fields": [ + {"name": "sint", "type": "int"}, + {"name": "sboolean", "type": ["null","boolean"]}, + {"name": "sstring", "type": "string"} + ]}]}, + {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}} + ] +} \ No newline at end of file