From 353fcdda9cd9e4bc42e63a7ddad993ba75bab9b1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 21 Dec 2017 16:28:48 +0100 Subject: [PATCH] NIFI-3660: This closes #2356. Support schema containing a map with an array value in ConvertAvroToORC Signed-off-by: joewitt --- .../hadoop/hive/ql/io/orc/NiFiOrcUtils.java | 18 ++-- .../processors/hive/TestConvertAvroToORC.java | 99 ++++++++++++++++++- .../nifi/util/orc/TestNiFiOrcUtils.java | 27 ++++- 3 files changed, 126 insertions(+), 18 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java index b8c9dab93c..c9624b6f73 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java @@ -40,15 +40,14 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -147,23 +146,20 @@ public class NiFiOrcUtils { return o; } if (o instanceof Map) { - MapWritable mapWritable = new MapWritable(); + Map map = new HashMap(); TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); - TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); // Unions are not allowed as key/value types, so if we convert the key and value objects, // they should return Writable objects ((Map) o).forEach((key, value) -> { Object keyObject = convertToORCObject(keyInfo, key); Object valueObject = convertToORCObject(valueInfo, value); - if (keyObject == null - || !(keyObject instanceof Writable) - || !(valueObject instanceof Writable) - ) { - throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null"); + if (keyObject == null) { + throw new IllegalArgumentException("Maps' key cannot be null"); } - mapWritable.put((Writable) keyObject, (Writable) valueObject); + map.put(keyObject, valueObject); }); - return mapWritable; + return map; } if (o instanceof GenericData.Record) { GenericData.Record record = (GenericData.Record) o; diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java index 1c269fac85..282b42d495 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java @@ -202,8 +202,6 @@ public class TestConvertAvroToORC { assertTrue(intFieldObject instanceof IntWritable); assertEquals(10, ((IntWritable) intFieldObject).get()); - // This is pretty awkward and messy. The map object is a Map (not a MapWritable) but the keys are writables (in this case Text) - // and so are the values (DoubleWritables in this case). Object mapFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMap")); assertTrue(mapFieldObject instanceof Map); Map map = (Map) mapFieldObject; @@ -308,4 +306,101 @@ public class TestConvertAvroToORC { assertTrue(ageObject instanceof IntWritable); assertEquals(28, ((IntWritable) ageObject).get()); } + + @Test + public void test_onTrigger_nested_complex_record() throws Exception { + + Map> mapData1 = new TreeMap>() {{ + put("key1", Arrays.asList(1.0, 2.0)); + put("key2", Arrays.asList(3.0, 4.0)); + }}; + + Map arrayMap11 = new TreeMap() {{ + put("key1", "v1"); + put("key2", "v2"); + }}; + Map arrayMap12 = new TreeMap() {{ + put("key3", "v3"); + put("key4", "v4"); + }}; + + GenericData.Record record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData1, Arrays.asList(arrayMap11, arrayMap12)); + + DatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record.getSchema(), out); + fileWriter.append(record); + + // Put another record in + Map> mapData2 = new TreeMap>() {{ + put("key1", Arrays.asList(-1.0, -2.0)); + put("key2", Arrays.asList(-3.0, -4.0)); + }}; + + Map arrayMap21 = new TreeMap() {{ + put("key1", "v-1"); + put("key2", "v-2"); + }}; + Map arrayMap22 = new TreeMap() {{ + put("key3", "v-3"); + put("key4", "v-4"); + }}; + + record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData2, Arrays.asList(arrayMap21, arrayMap22)); + fileWriter.append(record); + + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1); + + // Write the flow file out to disk, since the ORC Reader needs a path + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS nested_complex_record " + + "(myMapOfArray MAP>, myArrayOfMap ARRAY>)" + + " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE)); + assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE)); + assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + byte[] resultContents = runner.getContentAsByteArray(resultFlowFile); + FileOutputStream fos = new FileOutputStream("target/test1.orc"); + fos.write(resultContents); + fos.flush(); + fos.close(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + Object o = rows.next(null); + assertNotNull(o); + assertTrue(o instanceof OrcStruct); + TypeInfo resultSchema = TestNiFiOrcUtils.buildNestedComplexOrcSchema(); + StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema); + + + // check values + Object myMapOfArray = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMapOfArray")); + assertTrue(myMapOfArray instanceof Map); + Map map = (Map) myMapOfArray; + Object mapValue = map.get(new Text("key1")); + assertNotNull(mapValue); + assertTrue(mapValue instanceof List); + assertEquals(Arrays.asList(new DoubleWritable(1.0), new DoubleWritable(2.0)), mapValue); + + Object myArrayOfMap = inspector.getStructFieldData(o, inspector.getStructFieldRef("myArrayOfMap")); + assertTrue(myArrayOfMap instanceof List); + List list = (List) myArrayOfMap; + Object el0 = list.get(0); + assertNotNull(el0); + assertTrue(el0 instanceof Map); + assertEquals(new Text("v1"), ((Map) el0).get(new Text("key1"))); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java index 556723c13e..342aed9ba2 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java @@ -28,7 +28,6 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -202,10 +201,9 @@ public class TestNiFiOrcUtils { map.put("Hello", 1.0f); map.put("World", 2.0f); - Object writable = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map"), map); - assertTrue(writable instanceof MapWritable); - MapWritable mapWritable = (MapWritable) writable; - mapWritable.forEach((key, value) -> { + Object convMap = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map"), map); + assertTrue(convMap instanceof Map); + ((Map) convMap).forEach((key, value) -> { assertTrue(key instanceof Text); assertTrue(value instanceof FloatWritable); }); @@ -338,6 +336,25 @@ public class TestNiFiOrcUtils { return TypeInfoUtils.getTypeInfoFromTypeString("struct,myEnum:string,myLongOrFloat:uniontype,myIntList:array>"); } + public static Schema buildNestedComplexAvroSchema() { + // Build a fake Avro record with nested complex types + final SchemaBuilder.FieldAssembler builder = SchemaBuilder.record("nested.complex.record").namespace("any.data").fields(); + builder.name("myMapOfArray").type().map().values().array().items().doubleType().noDefault(); + builder.name("myArrayOfMap").type().array().items().map().values().stringType().noDefault(); + return builder.endRecord(); + } + + public static GenericData.Record buildNestedComplexAvroRecord(Map> m, List> a) { + Schema schema = buildNestedComplexAvroSchema(); + GenericData.Record row = new GenericData.Record(schema); + row.put("myMapOfArray", m); + row.put("myArrayOfMap", a); + return row; + } + + public static TypeInfo buildNestedComplexOrcSchema() { + return TypeInfoUtils.getTypeInfoFromTypeString("struct>,myArrayOfMap:array>>"); + } private static class TypeInfoCreator { static TypeInfo createInt() {