NIFI-11263: Add case-insensitive and order independent field mapping to PutIceberg record converter

This closes #7026.

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
Mark Bathori 2023-03-10 09:54:07 +01:00 committed by Nandor Soma Abonyi
parent a278e8dde2
commit 2873575fce
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
6 changed files with 451 additions and 466 deletions

View File

@ -61,7 +61,8 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
@Tags({"iceberg", "put", "table", "store", "record", "parse", "orc", "parquet", "avro"})
@CapabilityDescription("This processor uses Iceberg API to parse and load records into Iceberg tables. " +
"The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. " +
"It is important that the incoming records and the Iceberg table must have matching schemas and the target Iceberg table should already exist. " +
"The target Iceberg table should already exist and it must have matching schemas with the incoming records, " +
"which means the Record Reader schema must contain all the Iceberg schema fields, every additional field which is not present in the Iceberg schema will be ignored. " +
"To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.")
@WritesAttributes({
@WritesAttribute(attribute = "iceberg.record.count", description = "The number of records in the FlowFile.")

View File

@ -41,82 +41,82 @@ public class ArrayElementGetter {
ElementGetter elementGetter;
switch (dataType.getFieldType()) {
case STRING:
elementGetter = (array, pos) -> DataTypeUtils.toString(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toString(element, ARRAY_FIELD_NAME);
break;
case CHAR:
elementGetter = (array, pos) -> DataTypeUtils.toCharacter(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toCharacter(element, ARRAY_FIELD_NAME);
break;
case BOOLEAN:
elementGetter = (array, pos) -> DataTypeUtils.toBoolean(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toBoolean(element, ARRAY_FIELD_NAME);
break;
case DECIMAL:
elementGetter = (array, pos) -> DataTypeUtils.toBigDecimal(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toBigDecimal(element, ARRAY_FIELD_NAME);
break;
case BYTE:
elementGetter = (array, pos) -> DataTypeUtils.toByte(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toByte(element, ARRAY_FIELD_NAME);
break;
case SHORT:
elementGetter = (array, pos) -> DataTypeUtils.toShort(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toShort(element, ARRAY_FIELD_NAME);
break;
case INT:
elementGetter = (array, pos) -> DataTypeUtils.toInteger(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toInteger(element, ARRAY_FIELD_NAME);
break;
case DATE:
elementGetter = (array, pos) -> DataTypeUtils.toLocalDate(array[pos], () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toLocalDate(element, () -> DataTypeUtils.getDateTimeFormatter(dataType.getFormat(), ZoneId.systemDefault()), ARRAY_FIELD_NAME);
break;
case TIME:
elementGetter = (array, pos) -> DataTypeUtils.toTime(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toTime(element, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
break;
case LONG:
elementGetter = (array, pos) -> DataTypeUtils.toLong(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toLong(element, ARRAY_FIELD_NAME);
break;
case BIGINT:
elementGetter = (array, pos) -> DataTypeUtils.toBigInt(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toBigInt(element, ARRAY_FIELD_NAME);
break;
case FLOAT:
elementGetter = (array, pos) -> DataTypeUtils.toFloat(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toFloat(element, ARRAY_FIELD_NAME);
break;
case DOUBLE:
elementGetter = (array, pos) -> DataTypeUtils.toDouble(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toDouble(element, ARRAY_FIELD_NAME);
break;
case TIMESTAMP:
elementGetter = (array, pos) -> DataTypeUtils.toTimestamp(array[pos], () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toTimestamp(element, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), ARRAY_FIELD_NAME);
break;
case UUID:
elementGetter = (array, pos) -> DataTypeUtils.toUUID(array[pos]);
elementGetter = DataTypeUtils::toUUID;
break;
case ARRAY:
elementGetter = (array, pos) -> DataTypeUtils.toArray(array[pos], ARRAY_FIELD_NAME, ((ArrayDataType) dataType).getElementType());
elementGetter = element -> DataTypeUtils.toArray(element, ARRAY_FIELD_NAME, ((ArrayDataType) dataType).getElementType());
break;
case MAP:
elementGetter = (array, pos) -> DataTypeUtils.toMap(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toMap(element, ARRAY_FIELD_NAME);
break;
case RECORD:
elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
elementGetter = element -> DataTypeUtils.toRecord(element, ARRAY_FIELD_NAME);
break;
case CHOICE:
elementGetter = (array, pos) -> {
elementGetter = element -> {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final DataType chosenDataType = DataTypeUtils.chooseDataType(array[pos], choiceDataType);
final DataType chosenDataType = DataTypeUtils.chooseDataType(element, choiceDataType);
if (chosenDataType == null) {
throw new IllegalTypeConversionException(String.format(
"Cannot convert value [%s] of type %s for array element to any of the following available Sub-Types for a Choice: %s",
array[pos], array[pos].getClass(), choiceDataType.getPossibleSubTypes()));
element, element.getClass(), choiceDataType.getPossibleSubTypes()));
}
return DataTypeUtils.convertType(array[pos], chosenDataType, ARRAY_FIELD_NAME);
return DataTypeUtils.convertType(element, chosenDataType, ARRAY_FIELD_NAME);
};
break;
default:
throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
}
return (array, pos) -> {
if (array[pos] == null) {
return element -> {
if (element == null) {
return null;
}
return elementGetter.getElementOrNull(array, pos);
return elementGetter.getElementOrNull(element);
};
}
@ -125,6 +125,6 @@ public class ArrayElementGetter {
*/
public interface ElementGetter extends Serializable {
@Nullable
Object getElementOrNull(Object[] array, int pos);
Object getElementOrNull(Object element);
}
}

View File

@ -19,6 +19,26 @@ package org.apache.nifi.processors.iceberg.converter;
/**
* Interface for data conversion between NiFi Record and Iceberg Record.
*/
public interface DataConverter<D, T> {
T convert(D data);
public abstract class DataConverter<S, T> {
private String sourceFieldName;
private String targetFieldName;
public String getSourceFieldName() {
return sourceFieldName;
}
public String getTargetFieldName() {
return targetFieldName;
}
public void setSourceFieldName(String sourceFieldName) {
this.sourceFieldName = sourceFieldName;
}
public void setTargetFieldName(String targetFieldName) {
this.targetFieldName = targetFieldName;
}
abstract T convert(S data);
}

View File

@ -23,8 +23,8 @@ import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Time;
@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.createFieldGetter;
@ -46,9 +47,7 @@ import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.cre
*/
public class GenericDataConverters {
static class SameTypeConverter implements DataConverter<Object, Object> {
static final SameTypeConverter INSTANCE = new SameTypeConverter();
static class SameTypeConverter extends DataConverter<Object, Object> {
@Override
public Object convert(Object data) {
@ -56,9 +55,7 @@ public class GenericDataConverters {
}
}
static class TimeConverter implements DataConverter<Time, LocalTime> {
static final TimeConverter INSTANCE = new TimeConverter();
static class TimeConverter extends DataConverter<Time, LocalTime> {
@Override
public LocalTime convert(Time data) {
@ -66,9 +63,7 @@ public class GenericDataConverters {
}
}
static class TimestampConverter implements DataConverter<Timestamp, LocalDateTime> {
static final TimestampConverter INSTANCE = new TimestampConverter();
static class TimestampConverter extends DataConverter<Timestamp, LocalDateTime> {
@Override
public LocalDateTime convert(Timestamp data) {
@ -76,9 +71,7 @@ public class GenericDataConverters {
}
}
static class TimestampWithTimezoneConverter implements DataConverter<Timestamp, OffsetDateTime> {
static final TimestampWithTimezoneConverter INSTANCE = new TimestampWithTimezoneConverter();
static class TimestampWithTimezoneConverter extends DataConverter<Timestamp, OffsetDateTime> {
@Override
public OffsetDateTime convert(Timestamp data) {
@ -86,9 +79,7 @@ public class GenericDataConverters {
}
}
static class UUIDtoByteArrayConverter implements DataConverter<UUID, byte[]> {
static final UUIDtoByteArrayConverter INSTANCE = new UUIDtoByteArrayConverter();
static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> {
@Override
public byte[] convert(UUID data) {
@ -99,7 +90,7 @@ public class GenericDataConverters {
}
}
static class FixedConverter implements DataConverter<Byte[], byte[]> {
static class FixedConverter extends DataConverter<Byte[], byte[]> {
private final int length;
@ -114,9 +105,7 @@ public class GenericDataConverters {
}
}
static class BinaryConverter implements DataConverter<Byte[], ByteBuffer> {
static final BinaryConverter INSTANCE = new BinaryConverter();
static class BinaryConverter extends DataConverter<Byte[], ByteBuffer> {
@Override
public ByteBuffer convert(Byte[] data) {
@ -124,8 +113,7 @@ public class GenericDataConverters {
}
}
static class BigDecimalConverter implements DataConverter<BigDecimal, BigDecimal> {
static class BigDecimalConverter extends DataConverter<BigDecimal, BigDecimal> {
private final int precision;
private final int scale;
@ -142,34 +130,34 @@ public class GenericDataConverters {
}
}
static class ArrayConverter<T, S> implements DataConverter<T[], List<S>> {
private final DataConverter<T, S> fieldConverter;
static class ArrayConverter<S, T> extends DataConverter<S[], List<T>> {
private final DataConverter<S, T> fieldConverter;
private final ArrayElementGetter.ElementGetter elementGetter;
ArrayConverter(DataConverter<T, S> elementConverter, DataType dataType) {
ArrayConverter(DataConverter<S, T> elementConverter, DataType dataType) {
this.fieldConverter = elementConverter;
this.elementGetter = ArrayElementGetter.createElementGetter(dataType);
}
@Override
@SuppressWarnings("unchecked")
public List<S> convert(T[] data) {
public List<T> convert(S[] data) {
final int numElements = data.length;
List<S> result = new ArrayList<>(numElements);
final List<T> result = new ArrayList<>(numElements);
for (int i = 0; i < numElements; i += 1) {
result.add(i, fieldConverter.convert((T) elementGetter.getElementOrNull(data, i)));
result.add(i, fieldConverter.convert((S) elementGetter.getElementOrNull(data[i])));
}
return result;
}
}
static class MapConverter<K, V, L, B> implements DataConverter<Map<K, V>, Map<L, B>> {
private final DataConverter<K, L> keyConverter;
private final DataConverter<V, B> valueConverter;
static class MapConverter<SK, SV, TK, TV> extends DataConverter<Map<SK, SV>, Map<TK, TV>> {
private final DataConverter<SK, TK> keyConverter;
private final DataConverter<SV, TV> valueConverter;
private final ArrayElementGetter.ElementGetter keyGetter;
private final ArrayElementGetter.ElementGetter valueGetter;
MapConverter(DataConverter<K, L> keyConverter, DataType keyType, DataConverter<V, B> valueConverter, DataType valueType) {
MapConverter(DataConverter<SK, TK> keyConverter, DataType keyType, DataConverter<SV, TV> valueConverter, DataType valueType) {
this.keyConverter = keyConverter;
this.keyGetter = ArrayElementGetter.createElementGetter(keyType);
this.valueConverter = valueConverter;
@ -178,34 +166,36 @@ public class GenericDataConverters {
@Override
@SuppressWarnings("unchecked")
public Map<L, B> convert(Map<K, V> data) {
public Map<TK, TV> convert(Map<SK, SV> data) {
final int mapSize = data.size();
final Object[] keyArray = data.keySet().toArray();
final Object[] valueArray = data.values().toArray();
Map<L, B> result = new HashMap<>(mapSize);
final Map<TK, TV> result = new HashMap<>(mapSize);
for (int i = 0; i < mapSize; i += 1) {
result.put(keyConverter.convert((K) keyGetter.getElementOrNull(keyArray, i)), valueConverter.convert((V) valueGetter.getElementOrNull(valueArray, i)));
result.put(keyConverter.convert((SK) keyGetter.getElementOrNull(keyArray[i])), valueConverter.convert((SV) valueGetter.getElementOrNull(valueArray[i])));
}
return result;
}
}
static class RecordConverter implements DataConverter<Record, GenericRecord> {
static class RecordConverter extends DataConverter<Record, GenericRecord> {
private final DataConverter<?, ?>[] converters;
private final RecordFieldGetter.FieldGetter[] getters;
private final List<DataConverter<?, ?>> converters;
private final Map<String, RecordFieldGetter.FieldGetter> getters;
private final Types.StructType schema;
RecordConverter(List<DataConverter<?, ?>> converters, List<RecordField> recordFields, Types.StructType schema) {
RecordConverter(List<DataConverter<?, ?>> converters, RecordSchema recordSchema, Types.StructType schema) {
this.schema = schema;
this.converters = (DataConverter<?, ?>[]) Array.newInstance(DataConverter.class, converters.size());
this.getters = new RecordFieldGetter.FieldGetter[converters.size()];
for (int i = 0; i < converters.size(); i += 1) {
final RecordField recordField = recordFields.get(i);
this.converters[i] = converters.get(i);
this.getters[i] = createFieldGetter(recordField.getDataType(), recordField.getFieldName(), recordField.isNullable());
this.converters = converters;
this.getters = new HashMap<>(converters.size());
for (DataConverter<?, ?> converter : converters) {
final Optional<RecordField> recordField = recordSchema.getField(converter.getSourceFieldName());
final RecordField field = recordField.get();
// creates a record field accessor for every data converter
getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable()));
}
}
@ -213,16 +203,16 @@ public class GenericDataConverters {
public GenericRecord convert(Record data) {
final GenericRecord record = GenericRecord.create(schema);
for (int i = 0; i < converters.length; i += 1) {
record.set(i, convert(data, i, converters[i]));
for (DataConverter<?, ?> converter : converters) {
record.setField(converter.getTargetFieldName(), convert(data, converter));
}
return record;
}
@SuppressWarnings("unchecked")
private <T, S> S convert(Record record, int pos, DataConverter<T, S> converter) {
return converter.convert((T) getters[pos].getFieldOrNull(record));
private <S, T> T convert(Record record, DataConverter<S, T> converter) {
return converter.convert((S) getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
}
}
}

View File

@ -34,8 +34,11 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* This class is responsible for schema traversal and data conversion between NiFi and Iceberg internal record structure.
@ -56,7 +59,7 @@ public class IcebergRecordConverter {
private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
return visit(schema, recordDataType, new IcebergSchemaVisitor(), new IcebergPartnerAccessors(fileFormat));
return visit(schema, new RecordTypeWithFieldNameMapper(schema, recordDataType), new IcebergSchemaVisitor(), new IcebergPartnerAccessors(schema, fileFormat));
}
@Override
@ -66,6 +69,8 @@ public class IcebergRecordConverter {
@Override
public DataConverter<?, ?> field(Types.NestedField field, DataType dataType, DataConverter<?, ?> converter) {
// set Iceberg schema field name (targetFieldName) in the data converter
converter.setTargetFieldName(field.name());
return converter;
}
@ -80,26 +85,26 @@ public class IcebergRecordConverter {
case DOUBLE:
case DATE:
case STRING:
return GenericDataConverters.SameTypeConverter.INSTANCE;
return new GenericDataConverters.SameTypeConverter();
case TIME:
return GenericDataConverters.TimeConverter.INSTANCE;
return new GenericDataConverters.TimeConverter();
case TIMESTAMP:
final Types.TimestampType timestampType = (Types.TimestampType) type;
if (timestampType.shouldAdjustToUTC()) {
return GenericDataConverters.TimestampWithTimezoneConverter.INSTANCE;
return new GenericDataConverters.TimestampWithTimezoneConverter();
}
return GenericDataConverters.TimestampConverter.INSTANCE;
return new GenericDataConverters.TimestampConverter();
case UUID:
final UUIDDataType uuidType = (UUIDDataType) dataType;
if (uuidType.getFileFormat() == FileFormat.PARQUET) {
return GenericDataConverters.UUIDtoByteArrayConverter.INSTANCE;
return new GenericDataConverters.UUIDtoByteArrayConverter();
}
return GenericDataConverters.SameTypeConverter.INSTANCE;
return new GenericDataConverters.SameTypeConverter();
case FIXED:
final Types.FixedType fixedType = (Types.FixedType) type;
return new GenericDataConverters.FixedConverter(fixedType.length());
case BINARY:
return GenericDataConverters.BinaryConverter.INSTANCE;
return new GenericDataConverters.BinaryConverter();
case DECIMAL:
final Types.DecimalType decimalType = (Types.DecimalType) type;
return new GenericDataConverters.BigDecimalConverter(decimalType.precision(), decimalType.scale());
@ -113,8 +118,17 @@ public class IcebergRecordConverter {
@Override
public DataConverter<?, ?> struct(Types.StructType type, DataType dataType, List<DataConverter<?, ?>> converters) {
Validate.notNull(type, "Can not create reader for null type");
final List<RecordField> recordFields = ((RecordDataType) dataType).getChildSchema().getFields();
return new GenericDataConverters.RecordConverter(converters, recordFields, type);
final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType;
final RecordSchema recordSchema = recordType.getChildSchema();
// set NiFi schema field names (sourceFieldName) in the data converters
for (DataConverter<?, ?> converter : converters) {
final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
converter.setSourceFieldName(recordField.get().getFieldName());
}
return new GenericDataConverters.RecordConverter(converters, recordSchema, type);
}
@Override
@ -129,21 +143,30 @@ public class IcebergRecordConverter {
}
public static class IcebergPartnerAccessors implements SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
private final Schema schema;
private final FileFormat fileFormat;
IcebergPartnerAccessors(FileFormat fileFormat) {
IcebergPartnerAccessors(Schema schema, FileFormat fileFormat) {
this.schema = schema;
this.fileFormat = fileFormat;
}
@Override
public DataType fieldPartner(DataType dataType, int fieldId, String name) {
Validate.isTrue(dataType instanceof RecordDataType, String.format("Invalid record: %s is not a record", dataType));
final RecordDataType recordType = (RecordDataType) dataType;
final Optional<RecordField> recordField = recordType.getChildSchema().getField(name);
Validate.isTrue(dataType instanceof RecordTypeWithFieldNameMapper, String.format("Invalid record: %s is not a record", dataType));
final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType;
Validate.isTrue(recordField.isPresent(), String.format("Cannot find record field with name %s", name));
final Optional<String> mappedFieldName = recordType.getNameMapping(name);
Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
final Optional<RecordField> recordField = recordType.getChildSchema().getField(mappedFieldName.get());
final RecordField field = recordField.get();
// If the actual record contains a nested record then we need to create a RecordTypeWithFieldNameMapper wrapper object for it.
if (field.getDataType() instanceof RecordDataType) {
return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) field.getDataType());
}
if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
return new UUIDDataType(field.getDataType(), fileFormat);
}
@ -187,4 +210,29 @@ public class IcebergRecordConverter {
return fileFormat;
}
}
/**
* Since the {@link RecordSchema} stores the field name and value pairs in a HashMap it makes the retrieval case-sensitive, so we create a name mapper for case-insensitive handling.
*/
private static class RecordTypeWithFieldNameMapper extends RecordDataType {
private final Map<String, String> fieldNameMap;
RecordTypeWithFieldNameMapper(Schema schema, RecordDataType recordType) {
super(recordType.getChildSchema());
// create a lowercase map for the NiFi record schema fields
final Map<String, String> lowerCaseMap = recordType.getChildSchema().getFieldNames().stream()
.collect(Collectors.toMap(String::toLowerCase, s -> s));
// map the Iceberg record schema fields to the NiFi record schema fields
this.fieldNameMap = new HashMap<>();
schema.columns().forEach((s) -> this.fieldNameMap.put(s.name(), lowerCaseMap.get(s.name().toLowerCase())));
}
Optional<String> getNameMapping(String name) {
return Optional.ofNullable(fieldNameMap.get(name));
}
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.nifi.processors.iceberg;
import org.apache.avro.file.DataFileWriter;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
@ -52,10 +51,11 @@ import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
@ -76,6 +76,12 @@ import java.util.Map;
import java.util.UUID;
import static java.io.File.createTempFile;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
@ -95,7 +101,7 @@ public class TestIcebergRecordConverter {
file.deleteOnExit();
}
private static final Schema STRUCT = new Schema(
private static final Schema STRUCT_SCHEMA = new Schema(
Types.NestedField.required(0, "struct", Types.StructType.of(
Types.NestedField.required(1, "nested_struct", Types.StructType.of(
Types.NestedField.required(2, "string", Types.StringType.get()),
@ -104,15 +110,14 @@ public class TestIcebergRecordConverter {
))
);
private static final Schema LIST = new Schema(
private static final Schema LIST_SCHEMA = new Schema(
Types.NestedField.required(0, "list", Types.ListType.ofRequired(
1, Types.ListType.ofRequired(
2, Types.StringType.get())
1, Types.ListType.ofRequired(2, Types.StringType.get())
)
)
);
private static final Schema MAP = new Schema(
private static final Schema MAP_SCHEMA = new Schema(
Types.NestedField.required(0, "map", Types.MapType.ofRequired(
1, 2, Types.StringType.get(), Types.MapType.ofRequired(
3, 4, Types.StringType.get(), Types.LongType.get()
@ -120,7 +125,7 @@ public class TestIcebergRecordConverter {
))
);
private static final Schema PRIMITIVES = new Schema(
private static final Schema PRIMITIVES_SCHEMA = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
Types.NestedField.optional(2, "float", Types.FloatType.get()),
@ -138,6 +143,25 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static final Schema CASE_INSENSITIVE_SCHEMA = new Schema(
Types.NestedField.optional(0, "FIELD1", Types.StringType.get()),
Types.NestedField.optional(1, "Field2", Types.StringType.get()),
Types.NestedField.optional(2, "fielD3", Types.StringType.get()),
Types.NestedField.optional(3, "field4", Types.StringType.get())
);
private static final Schema UNORDERED_SCHEMA = new Schema(
Types.NestedField.optional(0, "field1", Types.StringType.get()),
Types.NestedField.required(1, "field2", Types.StructType.of(
Types.NestedField.required(2, "field3", Types.StringType.get()),
Types.NestedField.required(3, "field4", Types.ListType.ofRequired(4, Types.StringType.get()))
)),
Types.NestedField.optional(5, "field5", Types.StringType.get()),
Types.NestedField.required(6, "field6", Types.MapType.ofRequired(
7, 8, Types.StringType.get(), Types.StringType.get()
))
);
private static RecordSchema getStructSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("struct", new RecordDataType(getNestedStructSchema())));
@ -197,6 +221,34 @@ public class TestIcebergRecordConverter {
return new SimpleRecordSchema(fields);
}
private static RecordSchema getCaseInsensitiveSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("field1", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("FIELD2", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("Field3", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("fielD4", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields);
}
private static RecordSchema getNestedUnorderedSchema() {
List<RecordField> nestedFields = new ArrayList<>();
nestedFields.add(new RecordField("FIELD4", new ArrayDataType(RecordFieldType.STRING.getDataType())));
nestedFields.add(new RecordField("FIELD3", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(nestedFields);
}
private static RecordSchema getUnorderedSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("FIELD6", new MapDataType(RecordFieldType.STRING.getDataType())));
fields.add(new RecordField("FIELD5", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("FIELD1", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("FIELD2", new RecordDataType(getNestedUnorderedSchema())));
return new SimpleRecordSchema(fields);
}
private static RecordSchema getChoiceSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
@ -222,11 +274,17 @@ public class TestIcebergRecordConverter {
}
private static Record setupListTestRecord() {
List<String> nestedList = new ArrayList<>();
nestedList.add("Test String");
List<String> nestedList1 = new ArrayList<>();
nestedList1.add("Test String1");
nestedList1.add("Test String2");
List<Collection> list = new ArrayList<>();
list.add(nestedList);
List<String> nestedList2 = new ArrayList<>();
nestedList2.add("Test String3");
nestedList2.add("Test String4");
List<Collection<String>> list = new ArrayList<>();
list.add(nestedList1);
list.add(nestedList2);
Map<String, Object> values = new HashMap<>();
values.put("list", list);
@ -238,7 +296,7 @@ public class TestIcebergRecordConverter {
Map<String, Long> nestedMap = new HashMap<>();
nestedMap.put("nested_key", 42L);
Map<String, Map> map = new HashMap<>();
Map<String, Map<String, Long>> map = new HashMap<>();
map.put("key", nestedMap);
Map<String, Object> values = new HashMap<>();
@ -273,6 +331,38 @@ public class TestIcebergRecordConverter {
return new MapRecord(getPrimitivesSchema(), values);
}
private static Record setupCaseInsensitiveTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("field1", "Text1");
values.put("FIELD2", "Text2");
values.put("Field3", "Text3");
values.put("fielD4", "Text4");
return new MapRecord(getCaseInsensitiveSchema(), values);
}
private static Record setupUnorderedTestRecord() {
List<String> listValues = new ArrayList<>();
listValues.add("list value2");
listValues.add("list value1");
Map<String, String> mapValues = new HashMap<>();
mapValues.put("key2", "map value2");
mapValues.put("key1", "map value1");
Map<String, Object> nestedValues = new HashMap<>();
nestedValues.put("FIELD4", listValues);
nestedValues.put("FIELD3", "value3");
MapRecord nestedRecord = new MapRecord(getNestedUnorderedSchema(), nestedValues);
Map<String, Object> values = new HashMap<>();
values.put("FIELD6", mapValues);
values.put("FIELD5", "value5");
values.put("FIELD1", "value1");
values.put("FIELD2", nestedRecord);
return new MapRecord(getStructSchema(), values);
}
private static Record setupChoiceTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("choice1", "20");
@ -282,396 +372,206 @@ public class TestIcebergRecordConverter {
return new MapRecord(getChoiceSchema(), values);
}
@Test
public void testPrimitivesAvro() throws IOException {
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testPrimitives(FileFormat format) throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeToAvro(PRIMITIVES, genericRecord, tempFile);
writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFromAvro(PRIMITIVES, tempFile.toInputFile());
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
assertEquals("Test String", resultRecord.get(0, String.class));
assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));
assertEquals(Float.valueOf(1.23456F), resultRecord.get(2, Float.class));
assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
}
}
@DisabledOnOs(WINDOWS)
@Test
public void testPrimitivesOrc() throws IOException {
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testStruct(FileFormat format) throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, STRUCT_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, STRUCT_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size());
assertInstanceOf(GenericRecord.class, resultRecord.get(0));
GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
assertEquals(1, nestedRecord.size());
assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
assertEquals("Test String", baseRecord.get(0, String.class));
assertEquals(Integer.valueOf(10), baseRecord.get(1, Integer.class));
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testList(FileFormat format) throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, LIST_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, LIST_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size());
assertInstanceOf(List.class, resultRecord.get(0));
List nestedList = resultRecord.get(0, List.class);
assertEquals(2, nestedList.size());
assertInstanceOf(List.class, nestedList.get(0));
assertInstanceOf(List.class, nestedList.get(1));
assertThat((List<String>) nestedList.get(0), hasItems("Test String1", "Test String2"));
assertThat((List<String>) nestedList.get(1), hasItems("Test String3", "Test String4"));
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testMap(FileFormat format) throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, MAP_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, MAP_SCHEMA, tempFile.toInputFile());
assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
assertEquals(1, resultRecord.size());
assertInstanceOf(Map.class, resultRecord.get(0));
Map nestedMap = resultRecord.get(0, Map.class);
assertEquals(1, nestedMap.size());
assertInstanceOf(Map.class, nestedMap.get("key"));
Map baseMap = (Map) nestedMap.get("key");
assertEquals(42L, baseMap.get("nested_key"));
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testSchemaMismatch(FileFormat format) {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
GenericRecord genericRecord = recordConverter.convert(record);
writeToOrc(PRIMITIVES, genericRecord, tempFile);
List<GenericRecord> results = readFromOrc(PRIMITIVES, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
public void testPrimitivesParquet() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
writeToParquet(PRIMITIVES, genericRecord, tempFile);
List<GenericRecord> results = readFromParquet(PRIMITIVES, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
Assertions.assertEquals(resultRecord.get(0, String.class), "Test String");
Assertions.assertEquals(resultRecord.get(1, Integer.class), new Integer(8));
Assertions.assertEquals(resultRecord.get(2, Float.class), new Float(1.23456F));
Assertions.assertEquals(resultRecord.get(3, Long.class), new Long(42L));
Assertions.assertEquals(resultRecord.get(4, Double.class), new Double(3.14159D));
Assertions.assertEquals(resultRecord.get(5, BigDecimal.class), new BigDecimal("12345678.12"));
Assertions.assertEquals(resultRecord.get(6, Boolean.class), Boolean.TRUE);
Assertions.assertArrayEquals(resultRecord.get(7, byte[].class), new byte[]{104, 101, 108, 108, 111});
Assertions.assertArrayEquals(resultRecord.get(8, ByteBuffer.class).array(), new byte[]{104, 101, 108, 108, 111});
Assertions.assertEquals(resultRecord.get(9, LocalDate.class), LocalDate.of(2017, 4, 4));
Assertions.assertEquals(resultRecord.get(10, LocalTime.class), LocalTime.of(14, 20, 33));
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
public void testStructAvro() throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
writeToAvro(STRUCT, genericRecord, tempFile);
List<GenericRecord> results = readFromAvro(STRUCT, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
Assertions.assertEquals(nestedRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format));
assertTrue(e.getMessage().contains("Cannot find field with name 'FIELD1' in the record schema"), e.getMessage());
}
@DisabledOnOs(WINDOWS)
@Test
public void testStructOrc() throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testCaseInsensitiveFieldMapping(FileFormat format) throws IOException {
RecordSchema nifiSchema = getCaseInsensitiveSchema();
Record record = setupCaseInsensitiveTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.ORC);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeToOrc(STRUCT, genericRecord, tempFile);
writeTo(format, CASE_INSENSITIVE_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFromOrc(STRUCT, tempFile.toInputFile());
List<GenericRecord> results = readFrom(format, CASE_INSENSITIVE_SCHEMA, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
Assertions.assertEquals(nestedRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
}
@Test
public void testStructParquet() throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
writeToParquet(STRUCT, genericRecord, tempFile);
List<GenericRecord> results = readFromParquet(STRUCT, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, resultRecord.get(0));
GenericRecord nestedRecord = (GenericRecord) resultRecord.get(0);
Assertions.assertEquals(nestedRecord.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, nestedRecord.get(0));
GenericRecord baseRecord = (GenericRecord) nestedRecord.get(0);
Assertions.assertEquals(baseRecord.get(0, String.class), "Test String");
Assertions.assertEquals(baseRecord.get(1, Integer.class), new Integer(10));
}
@Test
public void testListAvro() throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
writeToAvro(LIST, genericRecord, tempFile);
List<GenericRecord> results = readFromAvro(LIST, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(List.class, resultRecord.get(0));
List nestedList = (List) resultRecord.get(0);
Assertions.assertEquals(nestedList.size(), 1);
Assertions.assertInstanceOf(List.class, nestedList.get(0));
List baseList = (List) nestedList.get(0);
Assertions.assertEquals(baseList.get(0), "Test String");
assertEquals("Text1", resultRecord.get(0, String.class));
assertEquals("Text2", resultRecord.get(1, String.class));
assertEquals("Text3", resultRecord.get(2, String.class));
assertEquals("Text4", resultRecord.get(3, String.class));
}
@DisabledOnOs(WINDOWS)
@Test
public void testListOrc() throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testUnorderedFieldMapping(FileFormat format) throws IOException {
RecordSchema nifiSchema = getUnorderedSchema();
Record record = setupUnorderedTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeToOrc(LIST, genericRecord, tempFile);
writeTo(format, UNORDERED_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFromOrc(LIST, tempFile.toInputFile());
List<GenericRecord> results = readFrom(format, UNORDERED_SCHEMA, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
assertEquals(1, results.size());
assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(List.class, resultRecord.get(0));
List nestedList = (List) resultRecord.get(0);
assertEquals("value1", resultRecord.get(0, String.class));
Assertions.assertEquals(nestedList.size(), 1);
Assertions.assertInstanceOf(List.class, nestedList.get(0));
List baseList = (List) nestedList.get(0);
assertInstanceOf(GenericRecord.class, resultRecord.get(1));
GenericRecord nestedRecord = (GenericRecord) resultRecord.get(1);
Assertions.assertEquals(baseList.get(0), "Test String");
}
assertEquals("value3", nestedRecord.get(0, String.class));
@Test
public void testListParquet() throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
assertInstanceOf(List.class, nestedRecord.get(1));
List<String> nestedList = nestedRecord.get(1, List.class);
assertThat(nestedList, hasItems("list value1", "list value2"));
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
assertEquals("value5", resultRecord.get(2, String.class));
writeToParquet(LIST, genericRecord, tempFile);
List<GenericRecord> results = readFromParquet(LIST, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(List.class, resultRecord.get(0));
List nestedList = (List) resultRecord.get(0);
Assertions.assertEquals(nestedList.size(), 1);
Assertions.assertInstanceOf(List.class, nestedList.get(0));
List baseList = (List) nestedList.get(0);
Assertions.assertEquals(baseList.get(0), "Test String");
}
@Test
public void testMapAvro() throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
writeToAvro(MAP, genericRecord, tempFile);
List<GenericRecord> results = readFromAvro(MAP, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
Map nestedMap = (Map) resultRecord.get(0);
Assertions.assertEquals(nestedMap.size(), 1);
Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
Map baseMap = (Map) nestedMap.get("key");
Assertions.assertEquals(baseMap.get("nested_key"), 42L);
}
@DisabledOnOs(WINDOWS)
@Test
public void testMapOrc() throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.ORC);
GenericRecord genericRecord = recordConverter.convert(record);
writeToOrc(MAP, genericRecord, tempFile);
List<GenericRecord> results = readFromOrc(MAP, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
Map nestedMap = (Map) resultRecord.get(0);
Assertions.assertEquals(nestedMap.size(), 1);
Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
Map baseMap = (Map) nestedMap.get("key");
Assertions.assertEquals(baseMap.get("nested_key"), 42L);
}
@Test
public void testMapParquet() throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
writeToParquet(MAP, genericRecord, tempFile);
List<GenericRecord> results = readFromParquet(MAP, tempFile.toInputFile());
Assertions.assertEquals(results.size(), 1);
Assertions.assertInstanceOf(GenericRecord.class, results.get(0));
GenericRecord resultRecord = results.get(0);
Assertions.assertEquals(resultRecord.size(), 1);
Assertions.assertInstanceOf(Map.class, resultRecord.get(0));
Map nestedMap = (Map) resultRecord.get(0);
Assertions.assertEquals(nestedMap.size(), 1);
Assertions.assertInstanceOf(Map.class, nestedMap.get("key"));
Map baseMap = (Map) nestedMap.get("key");
Assertions.assertEquals(baseMap.get("nested_key"), 42L);
}
@Test
public void testSchemaMismatchAvro() {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
DataFileWriter.AppendWriteException e = assertThrows(DataFileWriter.AppendWriteException.class, () -> writeToAvro(STRUCT, genericRecord, tempFile));
assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"), e.getMessage());
}
@DisabledOnOs(WINDOWS)
@Test
public void testSchemaMismatchOrc() {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.ORC);
GenericRecord genericRecord = recordConverter.convert(record);
ClassCastException e = assertThrows(ClassCastException.class, () -> writeToOrc(STRUCT, genericRecord, tempFile));
assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
}
@Test
public void testSchemaMismatchParquet() {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
ClassCastException e = assertThrows(ClassCastException.class, () -> writeToParquet(STRUCT, genericRecord, tempFile));
assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
assertInstanceOf(Map.class, resultRecord.get(3));
Map map = resultRecord.get(3, Map.class);
assertEquals("map value1", map.get("key1"));
assertEquals("map value2", map.get("key2"));
}
@Test
@ -684,9 +584,9 @@ public class TestIcebergRecordConverter {
RecordFieldGetter.FieldGetter fieldGetter2 = RecordFieldGetter.createFieldGetter(dataType, "choice2", true);
RecordFieldGetter.FieldGetter fieldGetter3 = RecordFieldGetter.createFieldGetter(dataType, "choice3", true);
Assertions.assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record));
Assertions.assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record));
Assertions.assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record));
assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record));
assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record));
assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record));
}
@Test
@ -697,12 +597,38 @@ public class TestIcebergRecordConverter {
String[] testArray = {"20", "30a", String.valueOf(Long.MAX_VALUE)};
Assertions.assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray, 0));
Assertions.assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray, 1));
Assertions.assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray, 2));
assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray[0]));
assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray[1]));
assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray[2]));
}
public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
private void writeTo(FileFormat format, Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
switch (format) {
case AVRO:
writeToAvro(schema, record, outputFile);
break;
case ORC:
writeToOrc(schema, record, outputFile);
break;
case PARQUET:
writeToParquet(schema, record, outputFile);
break;
}
}
private ArrayList<GenericRecord> readFrom(FileFormat format, Schema schema, InputFile inputFile) throws IOException {
switch (format) {
case AVRO:
return readFromAvro(schema, inputFile);
case ORC:
return readFromOrc(schema, inputFile);
case PARQUET:
return readFromParquet(schema, inputFile);
}
throw new IOException("Unknown file format: " + format);
}
private void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
.schema(schema)
.createWriterFunc(DataWriter::create)
@ -712,7 +638,7 @@ public class TestIcebergRecordConverter {
}
}
public ArrayList<GenericRecord> readFromAvro(Schema schema, InputFile inputFile) throws IOException {
private ArrayList<GenericRecord> readFromAvro(Schema schema, InputFile inputFile) throws IOException {
try (AvroIterable<GenericRecord> reader = Avro.read(inputFile)
.project(schema)
.createReaderFunc(DataReader::create)
@ -721,7 +647,7 @@ public class TestIcebergRecordConverter {
}
}
public void writeToOrc(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
private void writeToOrc(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
try (FileAppender<GenericRecord> appender = ORC.write(outputFile)
.schema(schema)
.createWriterFunc(GenericOrcWriter::buildWriter)
@ -731,7 +657,7 @@ public class TestIcebergRecordConverter {
}
}
public ArrayList<GenericRecord> readFromOrc(Schema schema, InputFile inputFile) throws IOException {
private ArrayList<GenericRecord> readFromOrc(Schema schema, InputFile inputFile) throws IOException {
try (CloseableIterable<GenericRecord> reader = ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
@ -740,7 +666,7 @@ public class TestIcebergRecordConverter {
}
}
public void writeToParquet(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
private void writeToParquet(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
try (FileAppender<GenericRecord> appender = Parquet.write(outputFile)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
@ -750,7 +676,7 @@ public class TestIcebergRecordConverter {
}
}
public ArrayList<GenericRecord> readFromParquet(Schema schema, InputFile inputFile) throws IOException {
private ArrayList<GenericRecord> readFromParquet(Schema schema, InputFile inputFile) throws IOException {
try (CloseableIterable<GenericRecord> reader = Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))