NIFI-6158 Fix conversion of Avro fixed type with logicalType decimal

This closes #3665.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Bryan Bende 2019-08-22 11:00:14 -04:00 committed by Koji Kawamura
parent bb9758be2c
commit cc88dd428f
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
7 changed files with 131 additions and 36 deletions

View File

@ -1012,6 +1012,11 @@ public class AvroTypeUtil {
return AvroTypeUtil.convertByteArray(bb.array());
case FIXED:
final GenericFixed fixed = (GenericFixed) value;
final LogicalType fixedLogicalType = avroSchema.getLogicalType();
if (fixedLogicalType != null && LOGICAL_TYPE_DECIMAL.equals(fixedLogicalType.getName())) {
final ByteBuffer fixedByteBuffer = ByteBuffer.wrap(fixed.bytes());
return new Conversions.DecimalConversion().fromBytes(fixedByteBuffer, avroSchema, fixedLogicalType);
}
return AvroTypeUtil.convertByteArray(fixed.bytes());
case ENUM:
return value.toString();

View File

@ -404,6 +404,40 @@ public class TestAvroTypeUtil {
}
@Test
public void testConvertAvroRecordToMapWithFieldTypeOfFixedAndLogicalTypeDecimal() {
// Create a field schema like {"type":"fixed","name":"amount","size":16,"logicalType":"decimal","precision":18,"scale":8}
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);
final Schema fieldSchema = Schema.createFixed("amount", null, null, 16);;
decimalType.addToSchema(fieldSchema);
// Create a field named "amount" using the field schema above
final Schema.Field field = new Schema.Field("amount", fieldSchema, null, (Object)null);
// Create an overall record schema with the amount field
final Schema avroSchema = Schema.createRecord(Collections.singletonList(field));
// Create an example Avro record with the amount field of type fixed and a logical type of decimal
final BigDecimal expectedDecimalValue = new BigDecimal("1234567890.12345678");
final GenericRecord genericRecord = new GenericData.Record(avroSchema);
genericRecord.put("amount", new Conversions.DecimalConversion().toFixed(expectedDecimalValue, fieldSchema, decimalType));
// Convert the Avro schema to a Record schema
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
// Convert the Avro record a Map and verify the object produced is the same BigDecimal that was converted to fixed
final Map<String,Object> convertedMap = AvroTypeUtil.convertAvroRecordToMap(genericRecord, recordSchema, StandardCharsets.UTF_8);
assertNotNull(convertedMap);
assertEquals(1, convertedMap.size());
final Object resultObject = convertedMap.get("amount");
assertNotNull(resultObject);
assertTrue(resultObject instanceof Double);
final Double resultDouble = (Double) resultObject;
assertEquals(Double.valueOf(expectedDecimalValue.doubleValue()), resultDouble);
}
@Test
public void testBytesDecimalConversion(){
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(18, 8);

View File

@ -39,10 +39,6 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@ -41,6 +41,12 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
@ -92,6 +98,7 @@
<exclude>src/test/resources/avro/user.avsc</exclude>
<exclude>src/test/resources/avro/user-with-array.avsc</exclude>
<exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
<exclude>src/test/resources/avro/user-with-fixed-decimal.avsc</exclude>
<exclude>src/test/resources/avro/all-minus-enum.avsc</exclude>
</excludes>
</configuration>

View File

@ -16,18 +16,7 @@
*/
package org.apache.nifi.processors.parquet;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@ -57,6 +46,20 @@ import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
public class FetchParquetTest {
static final String DIRECTORY = "target";
@ -66,6 +69,8 @@ public class FetchParquetTest {
private Schema schema;
private Schema schemaWithArray;
private Schema schemaWithNullableArray;
private Schema schemaWithDecimal;
private Configuration testConf;
private FetchParquet proc;
private TestRunner testRunner;
@ -81,6 +86,9 @@ public class FetchParquetTest {
final String avroSchemaWithNullableArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-nullable-array.avsc"), StandardCharsets.UTF_8);
schemaWithNullableArray = new Schema.Parser().parse(avroSchemaWithNullableArray);
final String avroSchemaWithDecimal = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-fixed-decimal.avsc"), StandardCharsets.UTF_8);
schemaWithDecimal = new Schema.Parser().parse(avroSchemaWithDecimal);
testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
@ -290,6 +298,24 @@ public class FetchParquetTest {
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
}
@Test
public void testFetchParquetWithDecimal() throws InitializationException, IOException {
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetWithDecimal.parquet");
final int numUsers = 10;
writeParquetUsersWithDecimal(parquetFile, numUsers);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
}
protected void verifyCSVRecords(int numUsers, String csvContent) {
final String[] splits = csvContent.split("[\\n]");
Assert.assertEquals(numUsers, splits.length);
@ -300,17 +326,21 @@ public class FetchParquetTest {
}
}
private AvroParquetWriter.Builder<GenericRecord> createAvroParquetWriter(final File parquetFile, final Schema schema) {
final Path parquetPath = new Path(parquetFile.getPath());
return AvroParquetWriter
.<GenericRecord>builder(parquetPath)
.withSchema(schema)
.withConf(testConf);
}
private void writeParquetUsers(final File parquetFile, int numUsers) throws IOException {
if (parquetFile.exists()) {
Assert.assertTrue(parquetFile.delete());
}
final Path parquetPath = new Path(parquetFile.getPath());
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
.<GenericRecord>builder(parquetPath)
.withSchema(schema)
.withConf(testConf);
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schema);
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < numUsers; i++) {
@ -322,7 +352,6 @@ public class FetchParquetTest {
writer.write(user);
}
}
}
private void writeParquetUsersWithArray(final File parquetFile, int numUsers) throws IOException {
@ -330,12 +359,7 @@ public class FetchParquetTest {
Assert.assertTrue(parquetFile.delete());
}
final Path parquetPath = new Path(parquetFile.getPath());
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
.<GenericRecord>builder(parquetPath)
.withSchema(schemaWithArray)
.withConf(testConf);
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithArray);
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
@ -354,7 +378,6 @@ public class FetchParquetTest {
writer.write(user);
}
}
}
private void writeParquetUsersWithNullableArray(final File parquetFile, int numUsers) throws IOException {
@ -362,12 +385,7 @@ public class FetchParquetTest {
Assert.assertTrue(parquetFile.delete());
}
final Path parquetPath = new Path(parquetFile.getPath());
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
.<GenericRecord>builder(parquetPath)
.withSchema(schemaWithNullableArray)
.withConf(testConf);
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithNullableArray);
// use the schemaWithArray here just to get the schema for the array part of the favorite_colors fields, the overall
// schemaWithNullableArray has a union of the array schema and null
@ -385,6 +403,32 @@ public class FetchParquetTest {
user.put("favorite_color", colors);
writer.write(user);
}
}
}
private void writeParquetUsersWithDecimal(final File parquetFile, int numUsers) throws IOException {
if (parquetFile.exists()) {
Assert.assertTrue(parquetFile.delete());
}
final BigDecimal initialAmount = new BigDecimal("1234567.0123456789");
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithDecimal);
final List<Schema> amountSchemaUnion = schemaWithDecimal.getField("amount").schema().getTypes();
final Schema amountSchema = amountSchemaUnion.stream().filter(s -> s.getType() == Schema.Type.FIXED).findFirst().orElse(null);
Assert.assertNotNull(amountSchema);
final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < numUsers; i++) {
final BigDecimal incrementedAmount = initialAmount.add(new BigDecimal("1"));
final GenericRecord user = new GenericData.Record(schemaWithDecimal);
user.put("name", "Bob" + i);
user.put("amount", decimalConversion.toFixed(incrementedAmount, amountSchema, amountSchema.getLogicalType()));
writer.write(user);
}
}

View File

@ -0,0 +1,8 @@
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "amount", "type": ["null", {"type":"fixed","name":"amount","size":16,"logicalType":"decimal","precision":38,"scale":10} ]}
]
}

View File

@ -178,6 +178,7 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>