NIFI-3660: This closes #2356. Support schema containing a map with an array value in ConvertAvroToORC

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Marco Gaido 2017-12-21 16:28:48 +01:00 committed by joewitt
parent 49285d325f
commit 353fcdda9c
3 changed files with 126 additions and 18 deletions

View File

@ -40,15 +40,14 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -147,23 +146,20 @@ public class NiFiOrcUtils {
return o; return o;
} }
if (o instanceof Map) { if (o instanceof Map) {
MapWritable mapWritable = new MapWritable(); Map map = new HashMap();
TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo();
// Unions are not allowed as key/value types, so if we convert the key and value objects, // Unions are not allowed as key/value types, so if we convert the key and value objects,
// they should return Writable objects // they should return Writable objects
((Map) o).forEach((key, value) -> { ((Map) o).forEach((key, value) -> {
Object keyObject = convertToORCObject(keyInfo, key); Object keyObject = convertToORCObject(keyInfo, key);
Object valueObject = convertToORCObject(valueInfo, value); Object valueObject = convertToORCObject(valueInfo, value);
if (keyObject == null if (keyObject == null) {
|| !(keyObject instanceof Writable) throw new IllegalArgumentException("Maps' key cannot be null");
|| !(valueObject instanceof Writable)
) {
throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
} }
mapWritable.put((Writable) keyObject, (Writable) valueObject); map.put(keyObject, valueObject);
}); });
return mapWritable; return map;
} }
if (o instanceof GenericData.Record) { if (o instanceof GenericData.Record) {
GenericData.Record record = (GenericData.Record) o; GenericData.Record record = (GenericData.Record) o;

View File

@ -202,8 +202,6 @@ public class TestConvertAvroToORC {
assertTrue(intFieldObject instanceof IntWritable); assertTrue(intFieldObject instanceof IntWritable);
assertEquals(10, ((IntWritable) intFieldObject).get()); assertEquals(10, ((IntWritable) intFieldObject).get());
// This is pretty awkward and messy. The map object is a Map (not a MapWritable) but the keys are writables (in this case Text)
// and so are the values (DoubleWritables in this case).
Object mapFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMap")); Object mapFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMap"));
assertTrue(mapFieldObject instanceof Map); assertTrue(mapFieldObject instanceof Map);
Map map = (Map) mapFieldObject; Map map = (Map) mapFieldObject;
@ -308,4 +306,101 @@ public class TestConvertAvroToORC {
assertTrue(ageObject instanceof IntWritable); assertTrue(ageObject instanceof IntWritable);
assertEquals(28, ((IntWritable) ageObject).get()); assertEquals(28, ((IntWritable) ageObject).get());
} }
@Test
public void test_onTrigger_nested_complex_record() throws Exception {
Map<String, List<Double>> mapData1 = new TreeMap<String, List<Double>>() {{
put("key1", Arrays.asList(1.0, 2.0));
put("key2", Arrays.asList(3.0, 4.0));
}};
Map<String, String> arrayMap11 = new TreeMap<String, String>() {{
put("key1", "v1");
put("key2", "v2");
}};
Map<String, String> arrayMap12 = new TreeMap<String, String>() {{
put("key3", "v3");
put("key4", "v4");
}};
GenericData.Record record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData1, Arrays.asList(arrayMap11, arrayMap12));
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
// Put another record in
Map<String, List<Double>> mapData2 = new TreeMap<String, List<Double>>() {{
put("key1", Arrays.asList(-1.0, -2.0));
put("key2", Arrays.asList(-3.0, -4.0));
}};
Map<String, String> arrayMap21 = new TreeMap<String, String>() {{
put("key1", "v-1");
put("key2", "v-2");
}};
Map<String, String> arrayMap22 = new TreeMap<String, String>() {{
put("key3", "v-3");
put("key4", "v-4");
}};
record = TestNiFiOrcUtils.buildNestedComplexAvroRecord(mapData2, Arrays.asList(arrayMap21, arrayMap22));
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS nested_complex_record " +
"(myMapOfArray MAP<STRING, ARRAY<DOUBLE>>, myArrayOfMap ARRAY<MAP<STRING, STRING>>)"
+ " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
Object o = rows.next(null);
assertNotNull(o);
assertTrue(o instanceof OrcStruct);
TypeInfo resultSchema = TestNiFiOrcUtils.buildNestedComplexOrcSchema();
StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
// check values
Object myMapOfArray = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMapOfArray"));
assertTrue(myMapOfArray instanceof Map);
Map map = (Map) myMapOfArray;
Object mapValue = map.get(new Text("key1"));
assertNotNull(mapValue);
assertTrue(mapValue instanceof List);
assertEquals(Arrays.asList(new DoubleWritable(1.0), new DoubleWritable(2.0)), mapValue);
Object myArrayOfMap = inspector.getStructFieldData(o, inspector.getStructFieldRef("myArrayOfMap"));
assertTrue(myArrayOfMap instanceof List);
List list = (List) myArrayOfMap;
Object el0 = list.get(0);
assertNotNull(el0);
assertTrue(el0 instanceof Map);
assertEquals(new Text("v1"), ((Map) el0).get(new Text("key1")));
}
} }

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.junit.Test; import org.junit.Test;
@ -202,10 +201,9 @@ public class TestNiFiOrcUtils {
map.put("Hello", 1.0f); map.put("Hello", 1.0f);
map.put("World", 2.0f); map.put("World", 2.0f);
Object writable = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"), map); Object convMap = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"), map);
assertTrue(writable instanceof MapWritable); assertTrue(convMap instanceof Map);
MapWritable mapWritable = (MapWritable) writable; ((Map) convMap).forEach((key, value) -> {
mapWritable.forEach((key, value) -> {
assertTrue(key instanceof Text); assertTrue(key instanceof Text);
assertTrue(value instanceof FloatWritable); assertTrue(value instanceof FloatWritable);
}); });
@ -338,6 +336,25 @@ public class TestNiFiOrcUtils {
return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>"); return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
} }
public static Schema buildNestedComplexAvroSchema() {
// Build a fake Avro record with nested complex types
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("nested.complex.record").namespace("any.data").fields();
builder.name("myMapOfArray").type().map().values().array().items().doubleType().noDefault();
builder.name("myArrayOfMap").type().array().items().map().values().stringType().noDefault();
return builder.endRecord();
}
public static GenericData.Record buildNestedComplexAvroRecord(Map<String, List<Double>> m, List<Map<String, String>> a) {
Schema schema = buildNestedComplexAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
row.put("myMapOfArray", m);
row.put("myArrayOfMap", a);
return row;
}
public static TypeInfo buildNestedComplexOrcSchema() {
return TypeInfoUtils.getTypeInfoFromTypeString("struct<myMapOfArray:map<string,array<double>>,myArrayOfMap:array<map<string,string>>>");
}
private static class TypeInfoCreator { private static class TypeInfoCreator {
static TypeInfo createInt() { static TypeInfo createInt() {