NIFI-5517 - Add support for remaining Hive types to PutHive3Streaming

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2950.
This commit is contained in:
Matthew Burgess 2018-08-14 14:06:46 -04:00 committed by Pierre Villard
parent aac2c6a60e
commit cfc858c901
3 changed files with 47 additions and 7 deletions

View File

@ -17,6 +17,7 @@
package org.apache.hive.streaming;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@ -45,9 +46,11 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -219,23 +222,31 @@ public class NiFiRecordSerDe extends AbstractSerDe {
val = AvroTypeUtil.convertByteArray(array).array();
break;
case DATE:
val = record.getAsDate(fieldName, field.getDataType().getFormat());
Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
hiveDate.setTimeInMillis(d.getTime());
val = hiveDate;
break;
// ORC doesn't currently handle TIMESTAMPLOCALTZ
case TIMESTAMP:
val = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
// Convert to Hive's Timestamp type
org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
val = hivetimestamp;
break;
case DECIMAL:
val = record.getAsDouble(fieldName);
val = HiveDecimal.create(record.getAsDouble(fieldName));
break;
default:
throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to unknown type: " + primitiveCategory.name());
throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
}
break;
case LIST:
val = Arrays.asList(record.getAsArray(fieldName));
break;
case MAP:
val = DataTypeUtils.convertRecordFieldtoObject(record.getValue(fieldName), field.getDataType());
val = record.getValue(fieldName);
break;
case STRUCT:
// The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map.

View File

@ -77,7 +77,10 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -660,6 +663,8 @@ public class TestPutHive3Streaming {
processor.setFields(Arrays.asList(
new FieldSchema("uuid", serdeConstants.STRING_TYPE_NAME, "uuid"),
new FieldSchema("stringc", serdeConstants.STRING_TYPE_NAME, "stringc"),
new FieldSchema("charc", serdeConstants.CHAR_TYPE_NAME + "(1)", "charc"),
new FieldSchema("varcharc", serdeConstants.VARCHAR_TYPE_NAME + "(100)", "varcharc"),
new FieldSchema("intc", serdeConstants.INT_TYPE_NAME, "intc"),
new FieldSchema("tinyintc", serdeConstants.TINYINT_TYPE_NAME, "tinyintc"),
new FieldSchema("smallintc", serdeConstants.SMALLINT_TYPE_NAME, "smallintc"),
@ -667,11 +672,16 @@ public class TestPutHive3Streaming {
new FieldSchema("booleanc", serdeConstants.BOOLEAN_TYPE_NAME, "booleanc"),
new FieldSchema("floatc", serdeConstants.FLOAT_TYPE_NAME, "floatc"),
new FieldSchema("doublec", serdeConstants.DOUBLE_TYPE_NAME, "doublec"),
new FieldSchema("bytesc", serdeConstants.BINARY_TYPE_NAME, "bytesc"),
new FieldSchema("listc", serdeConstants.LIST_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + ">", "listc"),
new FieldSchema("structc", serdeConstants.STRUCT_TYPE_NAME
+ "<sint:" + serdeConstants.INT_TYPE_NAME + ","
+ "sboolean:" + serdeConstants.BOOLEAN_TYPE_NAME + ","
+ "sstring:" + serdeConstants.STRING_TYPE_NAME + ">", "structc"),
new FieldSchema("mapc", serdeConstants.MAP_TYPE_NAME + "<" + serdeConstants.STRING_TYPE_NAME + "," + serdeConstants.INT_TYPE_NAME + ">", "mapc"),
new FieldSchema("datec", serdeConstants.DATE_TYPE_NAME, "datec"),
new FieldSchema("timestampc", serdeConstants.TIMESTAMP_TYPE_NAME, "timestampc"),
new FieldSchema("decimalc", serdeConstants.DECIMAL_TYPE_NAME + "(4,2)", "decimalc"),
new FieldSchema("enumc", serdeConstants.STRING_TYPE_NAME, "enumc")));
runner = TestRunners.newTestRunner(processor);
@ -686,7 +696,7 @@ public class TestPutHive3Streaming {
Random r = new Random();
for (int index = 0; index < 10; index++) {
final int i = index;
Record mapRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union
Record structRecord = new MapRecord(AvroTypeUtil.createSchema(schema.getField("structc").schema().getTypes().get(1)), // Get non-null type in union
new HashMap<String, Object>() {
{
put("sint", i + 2); // {"name": "sint", "type": "int"},
@ -701,6 +711,8 @@ public class TestPutHive3Streaming {
readerFactory.addRecord(
UUID.randomUUID(), // {"name": "uuid", "type": "string"},
"hello", // {"name": "stringc", "type": "string"},
'a',
"world",
i, // {"name": "intc", "type": "int"},
i + 1, // {"name": "tinyintc", "type": ["null", "int"]},
i * 10, // {"name": "smallintc", "type": "int"},
@ -708,8 +720,18 @@ public class TestPutHive3Streaming {
i % 2 == 0, // {"name": "booleanc", "type": "boolean"},
i * 100.0f, // {"name": "floatc", "type": "floatc"},
i * 100.0, // {"name": "doublec", "type": "double"},
"Hello".getBytes(),
new String[]{"a", "b"}, // {"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
mapRecord,
structRecord,
new HashMap<String, Integer>() { //"name": "mapType", "type": "map", "values": "string"}
{
put("sint1", i + 2); // {"name": "sint", "type": "int"},
put("sint2", i); // {"name": "x", "type": "int"},
}
},
new java.sql.Date(Calendar.getInstance().getTimeInMillis()),
Timestamp.from(Instant.now()),
i*99.0 / 100,
enumc.get(r.nextInt(4)) // {"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
);
}

View File

@ -21,6 +21,8 @@
"fields": [
{"name": "uuid", "type": "string"},
{"name": "stringc", "type": "string"},
{"name": "charc", "type": "string"},
{"name": "varcharc", "type": "string"},
{"name": "intc", "type": "int"},
{"name": "tinyintc", "type": ["null", "int"]},
{"name": "smallintc", "type": "int"},
@ -28,12 +30,17 @@
{"name": "booleanc", "type": "boolean"},
{"name": "floatc", "type": "float"},
{"name": "doublec", "type": "double"},
{"name": "bytesc", "type": "bytes"},
{"name": "listc", "type": ["null", {"type": "array", "items": "string"}]},
{"name": "structc", "type": ["null", {"name": "structcRecord", "type": "record", "fields": [
{"name": "sint", "type": "int"},
{"name": "sboolean", "type": ["null","boolean"]},
{"name": "sstring", "type": "string"}
]}]},
{"name": "mapc", "type": ["null", {"name": "mapType", "type": "map", "values": "string"}]},
{"name": "datec","type": {"type": "int","logicalType": "date"}},
{"name": "timestampc","type": {"type": "long","logicalType": "timestamp-millis"}},
{"name": "decimalc", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},
{"name": "enumc", "type": {"type": "enum", "name": "Suit", "symbols": ["SPADES","HEARTS","DIAMONDS","CLUBS"]}}
]
}