[NIFI-8610] Do not reuse generic records in convertAvroToORC

[NIFI-8610] Simplify decimal test for convertAvroToORC

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #1081
This commit is contained in:
Dominik Przybysz 2021-05-18 10:26:58 +02:00 committed by Arpad Boda
parent c9dee30294
commit f07e17ba74
No known key found for this signature in database
GPG Key ID: 390C1B5ADE978835
2 changed files with 66 additions and 4 deletions

View File

@ -234,9 +234,8 @@ public class ConvertAvroToORC extends AbstractProcessor {
try { try {
int recordCount = 0; int recordCount = 0;
GenericRecord currRecord = null;
while (reader.hasNext()) { while (reader.hasNext()) {
currRecord = reader.next(currRecord); GenericRecord currRecord = reader.next();
List<Schema.Field> fields = currRecord.getSchema().getFields(); List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) { if (fields != null) {
Object[] row = new Object[fields.size()]; Object[] row = new Object[fields.size()];
@ -284,7 +283,7 @@ public class ConvertAvroToORC extends AbstractProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString()); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime); session.getProvenanceReporter().modifyContent(flowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime);
} catch (ProcessException | IllegalArgumentException e) { } catch (ProcessException | IllegalArgumentException e) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e}); getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, e});

View File

@ -49,9 +49,9 @@ import org.junit.Test;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -320,6 +320,69 @@ public class TestConvertAvroToORC {
assertEquals(sampleBigDecimal, ((HiveDecimalWritable) decimalFieldObject).getHiveDecimal().bigDecimalValue()); assertEquals(sampleBigDecimal, ((HiveDecimalWritable) decimalFieldObject).getHiveDecimal().bigDecimalValue());
} }
@Test
public void test_onTrigger_complex_records_with_bigdecimals() throws Exception {
Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
put("key1", 1.0);
put("key2", 2.0);
}};
BigDecimal sampleBigDecimal1 = new BigDecimal("3500.12");
BigDecimal sampleBigDecimal2 = new BigDecimal("0.01");
GenericData.Record record1 = TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal1));
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record1.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record1.getSchema(), out);
fileWriter.append(record1);
fileWriter.append(TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData1, "XYZ", 4L, Arrays.asList(100, 200), toByteBuffer(sampleBigDecimal2)));
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
TypeInfo resultSchema = TestNiFiOrcUtils.buildComplexOrcSchema();
StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
Object result1 = rows.next(null);
assertNotNull(result1);
Object decimalFieldObject1 = inspector.getStructFieldData(result1, inspector.getStructFieldRef("myDecimal"));
assertEquals(sampleBigDecimal1, ((HiveDecimalWritable) decimalFieldObject1).getHiveDecimal().bigDecimalValue());
Object result2 = rows.next(null);
assertNotNull(result2);
Object decimalFieldObject2 = inspector.getStructFieldData(result2, inspector.getStructFieldRef("myDecimal"));
assertEquals(sampleBigDecimal2, ((HiveDecimalWritable) decimalFieldObject2).getHiveDecimal().bigDecimalValue());
}
private ByteBuffer toByteBuffer(BigDecimal sampleBigDecimal) {
return ByteBuffer.wrap(sampleBigDecimal.unscaledValue().toByteArray());
}
@Test @Test
public void test_onTrigger_array_of_records() throws Exception { public void test_onTrigger_array_of_records() throws Exception {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array_of_records.avsc")); final Schema schema = new Schema.Parser().parse(new File("src/test/resources/array_of_records.avsc"));