NIFI-6000 Catch also IllegalArgumentException in ConvertAvroToORC hive processor. Added support for Avro null types.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3302
This commit is contained in:
Aleksandr Salatich 2019-02-12 18:28:48 +03:00 committed by Matthew Burgess
parent ae8a794ff0
commit e598b30d6d
4 changed files with 149 additions and 3 deletions

View File

@ -244,6 +244,7 @@ public class NiFiOrcUtils {
case DOUBLE:
case FLOAT:
case STRING:
case NULL:
return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
case UNION:
@ -335,6 +336,7 @@ public class NiFiOrcUtils {
case LONG:
return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
case BOOLEAN:
case NULL: // ORC has no null type, so just pick the smallest. All values are necessarily null.
return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
case BYTES:
return TypeInfoFactory.getPrimitiveTypeInfo("binary");
@ -362,6 +364,7 @@ public class NiFiOrcUtils {
case LONG:
return "BIGINT";
case BOOLEAN:
case NULL: // Hive has no null type, we picked boolean as the ORC type so use it for Hive DDL too. All values are necessarily null.
return "BOOLEAN";
case BYTES:
return "BINARY";

View File

@ -283,8 +283,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe});
} catch (ProcessException | IllegalArgumentException e) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
}
}

View File

@ -17,10 +17,13 @@
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -43,7 +46,9 @@ import org.apache.nifi.util.orc.TestNiFiOrcUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
@ -55,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -79,6 +85,95 @@ public class TestConvertAvroToORC {
}
@Test
public void test_onTrigger_routing_to_failure_null_type() throws Exception {
String testString = "Hello World";
GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithNull(testString);
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, null BOOLEAN) STORED AS ORC",
resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
}
@Test
public void test_onTrigger_routing_to_failure_empty_array_type() throws Exception {
String testString = "Hello World";
GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithEmptyArray(testString);
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, emptyArray ARRAY<BOOLEAN>) STORED AS ORC",
resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
}
@Test
public void test_onTrigger_routing_to_failure_fixed_type() throws Exception {
String testString = "Hello!";
GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithFixed(testString);
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_FAILURE, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_FAILURE).get(0);
assertEquals("test.avro", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
final InputStream in = new ByteArrayInputStream(resultFlowFile.toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
assertTrue(dataFileReader.hasNext());
GenericRecord testedRecord = dataFileReader.next();
assertNotNull(testedRecord.get("fixed"));
assertArrayEquals(testString.getBytes(StandardCharsets.UTF_8), ((GenericData.Fixed) testedRecord.get("fixed")).bytes());
}
}
@Test
public void test_onTrigger_primitive_record() throws Exception {
GenericData.Record record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");

View File

@ -34,7 +34,9 @@ import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -271,7 +273,7 @@ public class TestNiFiOrcUtils {
@Test
public void test_convertToORCObject() {
Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x","y","z");
Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", "z");
List<Object> objects = Arrays.asList(new Utf8("Hello"), new GenericData.EnumSymbol(schema, "x"));
objects.forEach((avroObject) -> {
Object o = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype<bigint,string>"), avroObject);
@ -304,6 +306,29 @@ public class TestNiFiOrcUtils {
return builder.endRecord();
}
public static Schema buildAvroSchemaWithNull() {
// Build a fake Avro record which contains null
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
builder.name("string").type().stringType().stringDefault("default");
builder.name("null").type().nullType().noDefault();
return builder.endRecord();
}
public static Schema buildAvroSchemaWithEmptyArray() {
// Build a fake Avro record which contains empty array
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
builder.name("string").type().stringType().stringDefault("default");
builder.name("emptyArray").type().array().items().nullType().noDefault();
return builder.endRecord();
}
public static Schema buildAvroSchemaWithFixed() {
// Build a fake Avro record which contains null
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
builder.name("fixed").type().fixed("fixedField").size(6).fixedDefault("123456");
return builder.endRecord();
}
public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
Schema schema = buildPrimitiveAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
@ -351,6 +376,29 @@ public class TestNiFiOrcUtils {
return row;
}
public static GenericData.Record buildAvroRecordWithNull(String string) {
Schema schema = buildAvroSchemaWithNull();
GenericData.Record row = new GenericData.Record(schema);
row.put("string", string);
row.put("null", null);
return row;
}
public static GenericData.Record buildAvroRecordWithEmptyArray(String string) {
Schema schema = buildAvroSchemaWithEmptyArray();
GenericData.Record row = new GenericData.Record(schema);
row.put("string", string);
row.put("emptyArray", Collections.emptyList());
return row;
}
public static GenericData.Record buildAvroRecordWithFixed(String string) {
Schema schema = buildAvroSchemaWithFixed();
GenericData.Record row = new GenericData.Record(schema);
row.put("fixed", new GenericData.Fixed(schema, string.getBytes(StandardCharsets.UTF_8)));
return row;
}
public static TypeInfo buildComplexOrcSchema() {
return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
}