diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java index 687073eee7..ce06f82890 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java @@ -244,6 +244,7 @@ public class NiFiOrcUtils { case DOUBLE: case FLOAT: case STRING: + case NULL: return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType); case UNION: @@ -335,6 +336,7 @@ public class NiFiOrcUtils { case LONG: return TypeInfoFactory.getPrimitiveTypeInfo("bigint"); case BOOLEAN: + case NULL: // ORC has no null type, so just pick the smallest. All values are necessarily null. return TypeInfoFactory.getPrimitiveTypeInfo("boolean"); case BYTES: return TypeInfoFactory.getPrimitiveTypeInfo("binary"); @@ -362,6 +364,7 @@ public class NiFiOrcUtils { case LONG: return "BIGINT"; case BOOLEAN: + case NULL: // Hive has no null type, we picked boolean as the ORC type so use it for Hive DDL too. All values are necessarily null. return "BOOLEAN"; case BYTES: return "BINARY"; diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java index f211ac5a28..e8ee2a2261 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java @@ -283,8 +283,8 @@ public class ConvertAvroToORC extends AbstractProcessor { session.transfer(flowFile, REL_SUCCESS); session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime); - } catch (final ProcessException pe) { - getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe}); + } catch (ProcessException | IllegalArgumentException e) { + getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java index 282b42d495..f34a64716e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java @@ -17,10 +17,13 @@ package org.apache.nifi.processors.hive; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -43,7 +46,9 @@ import org.apache.nifi.util.orc.TestNiFiOrcUtils; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; import java.io.File; import java.io.FileOutputStream; import java.nio.charset.StandardCharsets; @@ -55,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -79,6 +85,95 @@ public class TestConvertAvroToORC { } + @Test + public void test_onTrigger_routing_to_failure_null_type() throws Exception { + String testString = "Hello World"; + GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithNull(testString); + + DatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record.getSchema(), out); + fileWriter.append(record); + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0); + assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, null BOOLEAN) STORED AS ORC", + resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE)); + } + + @Test + public void test_onTrigger_routing_to_failure_empty_array_type() throws Exception { + String testString = "Hello World"; + GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithEmptyArray(testString); + + DatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record.getSchema(), out); + fileWriter.append(record); + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0); + assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (string STRING, emptyArray ARRAY) STORED AS ORC", + resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE)); + } + + @Test + public void test_onTrigger_routing_to_failure_fixed_type() throws Exception { + String testString = "Hello!"; + GenericData.Record record = TestNiFiOrcUtils.buildAvroRecordWithFixed(testString); + + DatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + DataFileWriter fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record.getSchema(), out); + fileWriter.append(record); + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_FAILURE, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_FAILURE).get(0); + assertEquals("test.avro", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + + final InputStream in = new ByteArrayInputStream(resultFlowFile.toByteArray()); + final DatumReader datumReader = new GenericDatumReader<>(); + try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { + assertTrue(dataFileReader.hasNext()); + GenericRecord testedRecord = dataFileReader.next(); + + assertNotNull(testedRecord.get("fixed")); + assertArrayEquals(testString.getBytes(StandardCharsets.UTF_8), ((GenericData.Fixed) testedRecord.get("fixed")).bytes()); + } + } + @Test public void test_onTrigger_primitive_record() throws Exception { GenericData.Record record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World"); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java index 47ee3a573b..cd7847f2f5 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/util/orc/TestNiFiOrcUtils.java @@ -34,7 +34,9 @@ import org.apache.hadoop.io.Text; import org.junit.Test; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -271,7 +273,7 @@ public class TestNiFiOrcUtils { @Test public void test_convertToORCObject() { - Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x","y","z"); + Schema schema = SchemaBuilder.enumeration("myEnum").symbols("x", "y", "z"); List objects = Arrays.asList(new Utf8("Hello"), new GenericData.EnumSymbol(schema, "x")); objects.forEach((avroObject) -> { Object o = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("uniontype"), avroObject); @@ -304,6 +306,29 @@ public class TestNiFiOrcUtils { return builder.endRecord(); } + public static Schema buildAvroSchemaWithNull() { + // Build a fake Avro record which contains null + final SchemaBuilder.FieldAssembler builder = SchemaBuilder.record("test.record").namespace("any.data").fields(); + builder.name("string").type().stringType().stringDefault("default"); + builder.name("null").type().nullType().noDefault(); + return builder.endRecord(); + } + + public static Schema buildAvroSchemaWithEmptyArray() { + // Build a fake Avro record which contains empty array + final SchemaBuilder.FieldAssembler builder = SchemaBuilder.record("test.record").namespace("any.data").fields(); + builder.name("string").type().stringType().stringDefault("default"); + builder.name("emptyArray").type().array().items().nullType().noDefault(); + return builder.endRecord(); + } + + public static Schema buildAvroSchemaWithFixed() { + // Build a fake Avro record which contains null + final SchemaBuilder.FieldAssembler builder = SchemaBuilder.record("test.record").namespace("any.data").fields(); + builder.name("fixed").type().fixed("fixedField").size(6).fixedDefault("123456"); + return builder.endRecord(); + } + public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) { Schema schema = buildPrimitiveAvroSchema(); GenericData.Record row = new GenericData.Record(schema); @@ -351,6 +376,29 @@ public class TestNiFiOrcUtils { return row; } + public static GenericData.Record buildAvroRecordWithNull(String string) { + Schema schema = buildAvroSchemaWithNull(); + GenericData.Record row = new GenericData.Record(schema); + row.put("string", string); + row.put("null", null); + return row; + } + + public static GenericData.Record buildAvroRecordWithEmptyArray(String string) { + Schema schema = buildAvroSchemaWithEmptyArray(); + GenericData.Record row = new GenericData.Record(schema); + row.put("string", string); + row.put("emptyArray", Collections.emptyList()); + return row; + } + + public static GenericData.Record buildAvroRecordWithFixed(String string) { + Schema schema = buildAvroSchemaWithFixed(); + GenericData.Record row = new GenericData.Record(schema); + row.put("fixed", new GenericData.Fixed(schema, string.getBytes(StandardCharsets.UTF_8))); + return row; + } + public static TypeInfo buildComplexOrcSchema() { return TypeInfoUtils.getTypeInfoFromTypeString("struct,myEnum:string,myLongOrFloat:uniontype,myIntList:array>"); }