diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java index b323633649..d918365a37 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/ConvertAvroToORC.java @@ -234,9 +234,8 @@ public class ConvertAvroToORC extends AbstractProcessor { try { int recordCount = 0; - GenericRecord currRecord = null; while (reader.hasNext()) { - currRecord = reader.next(currRecord); + GenericRecord currRecord = reader.next(); List fields = currRecord.getSchema().getFields(); if (fields != null) { Object[] row = new Object[fields.size()]; @@ -284,7 +283,7 @@ public class ConvertAvroToORC extends AbstractProcessor { flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString()); session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime); + session.getProvenanceReporter().modifyContent(flowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime); } catch (ProcessException | IllegalArgumentException e) { getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e}); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java index 9248b273bc..5fe6752487 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestConvertAvroToORC.java @@ -49,9 +49,9 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.InputStream; import java.io.File; import java.io.FileOutputStream; +import java.io.InputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -320,6 +320,69 @@ public class TestConvertAvroToORC { assertEquals(sampleBigDecimal, ((HiveDecimalWritable) decimalFieldObject).getHiveDecimal().bigDecimalValue()); } + @Test + public void test_onTrigger_complex_records_with_bigdecimals() throws Exception { + + Map mapData1 = new TreeMap() {{ + put("key1", 1.0); + put("key2", 2.0); + }}; + + + BigDecimal sampleBigDecimal1 = new BigDecimal("3500.12"); + BigDecimal sampleBigDecimal2 = new BigDecimal("0.01"); + + GenericData.Record record1 = TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal1)); + DatumWriter writer = new GenericDatumWriter<>(record1.getSchema()); + DataFileWriter fileWriter = new DataFileWriter<>(writer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + fileWriter.create(record1.getSchema(), out); + fileWriter.append(record1); + fileWriter.append(TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal2))); + fileWriter.flush(); + fileWriter.close(); + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1); + + // Write the flow file out to disk, since the ORC Reader needs a path + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0); + assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE)); + assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + byte[] resultContents = runner.getContentAsByteArray(resultFlowFile); + FileOutputStream fos = new FileOutputStream("target/test1.orc"); + fos.write(resultContents); + fos.flush(); + fos.close(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rows(); + TypeInfo resultSchema = TestNiFiOrcUtils.buildComplexOrcSchema(); + StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema); + + Object result1 = rows.next(null); + assertNotNull(result1); + Object decimalFieldObject1 = inspector.getStructFieldData(result1, inspector.getStructFieldRef("myDecimal")); + assertEquals(sampleBigDecimal1, ((HiveDecimalWritable) decimalFieldObject1).getHiveDecimal().bigDecimalValue()); + + Object result2 = rows.next(null); + assertNotNull(result2); + Object decimalFieldObject2 = inspector.getStructFieldData(result2, inspector.getStructFieldRef("myDecimal")); + assertEquals(sampleBigDecimal2, ((HiveDecimalWritable) decimalFieldObject2).getHiveDecimal().bigDecimalValue()); + } + + private ByteBuffer toByteBuffer(BigDecimal sampleBigDecimal) { + return ByteBuffer.wrap(sampleBigDecimal.unscaledValue().toByteArray()); + } + @Test public void test_onTrigger_array_of_records() throws Exception { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array_of_records.avsc"));