NIFI-11739 - Add ability to ignore missing fields in PutIceberg

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7421.
This commit is contained in:
Matt Burgess 2023-06-21 20:39:43 -04:00 committed by Pierre Villard
parent 2c79b5f8e2
commit 3f7b1de6b8
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
8 changed files with 418 additions and 84 deletions

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.iceberg;
import org.apache.nifi.components.DescribedValue;
public enum UnmatchedColumnBehavior implements DescribedValue {
IGNORE_UNMATCHED_COLUMN("Ignore Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. No notification will be logged"),
WARNING_UNMATCHED_COLUMN("Warn on Unmatched Columns",
"Any column in the database that does not have a field in the document will be assumed to not be required. A warning will be logged"),
FAIL_UNMATCHED_COLUMN("Fail on Unmatched Columns",
"A flow will fail if any column in the database that does not have a field in the document. An error will be logged");
private final String displayName;
private final String description;
UnmatchedColumnBehavior(final String displayName, final String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -21,9 +21,11 @@ import org.apache.commons.lang3.Validate;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -265,9 +267,16 @@ public class GenericDataConverters {
for (DataConverter<?, ?> converter : converters) {
final Optional<RecordField> recordField = recordSchema.getField(converter.getSourceFieldName());
final RecordField field = recordField.get();
// creates a record field accessor for every data converter
getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable()));
if (recordField.isEmpty()) {
final Types.NestedField missingField = schema.field(converter.getTargetFieldName());
if (missingField != null) {
getters.put(converter.getTargetFieldName(), createFieldGetter(convertSchemaTypeToDataType(missingField.type()), missingField.name(), missingField.isOptional()));
}
} else {
final RecordField field = recordField.get();
// creates a record field accessor for every data converter
getters.put(converter.getTargetFieldName(), createFieldGetter(field.getDataType(), field.getFieldName(), field.isNullable()));
}
}
}
@ -290,4 +299,54 @@ public class GenericDataConverters {
return converter.convert((S) getters.get(converter.getTargetFieldName()).getFieldOrNull(record));
}
}
public static DataType convertSchemaTypeToDataType(Type schemaType) {
switch (schemaType.typeId()) {
case BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType();
case INTEGER:
return RecordFieldType.INT.getDataType();
case LONG:
return RecordFieldType.LONG.getDataType();
case FLOAT:
return RecordFieldType.FLOAT.getDataType();
case DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case DATE:
return RecordFieldType.DATE.getDataType();
case TIME:
return RecordFieldType.TIME.getDataType();
case TIMESTAMP:
return RecordFieldType.TIMESTAMP.getDataType();
case STRING:
return RecordFieldType.STRING.getDataType();
case UUID:
return RecordFieldType.UUID.getDataType();
case FIXED:
case BINARY:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case DECIMAL:
return RecordFieldType.DECIMAL.getDataType();
case STRUCT:
// Build a record type from the struct type
Types.StructType structType = schemaType.asStructType();
List<Types.NestedField> fields = structType.fields();
List<RecordField> recordFields = new ArrayList<>(fields.size());
for (Types.NestedField field : fields) {
DataType dataType = convertSchemaTypeToDataType(field.type());
recordFields.add(new RecordField(field.name(), dataType, field.isOptional()));
}
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
return RecordFieldType.RECORD.getRecordDataType(recordSchema);
case LIST:
// Build a list type from the elements
Types.ListType listType = schemaType.asListType();
return RecordFieldType.ARRAY.getArrayDataType(convertSchemaTypeToDataType(listType.elementType()), listType.isElementOptional());
case MAP:
// Build a map type from the elements
Types.MapType mapType = schemaType.asMapType();
return RecordFieldType.MAP.getMapDataType(convertSchemaTypeToDataType(mapType.valueType()), mapType.isValueOptional());
}
throw new IllegalArgumentException("Invalid or unsupported type: " + schemaType);
}
}

View File

@ -25,6 +25,8 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.iceberg.UnmatchedColumnBehavior;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@ -46,19 +48,26 @@ import java.util.stream.Collectors;
public class IcebergRecordConverter {
private final DataConverter<Record, GenericRecord> converter;
public final UnmatchedColumnBehavior unmatchedColumnBehavior;
public ComponentLog logger;
public GenericRecord convert(Record record) {
return converter.convert(record);
}
@SuppressWarnings("unchecked")
public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat) {
this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat);
public IcebergRecordConverter(Schema schema, RecordSchema recordSchema, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) {
this.converter = (DataConverter<Record, GenericRecord>) IcebergSchemaVisitor.visit(schema, new RecordDataType(recordSchema), fileFormat, unmatchedColumnBehavior, logger);
this.unmatchedColumnBehavior = unmatchedColumnBehavior;
this.logger = logger;
}
private static class IcebergSchemaVisitor extends SchemaWithPartnerVisitor<DataType, DataConverter<?, ?>> {
public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat) {
return visit(schema, new RecordTypeWithFieldNameMapper(schema, recordDataType), new IcebergSchemaVisitor(), new IcebergPartnerAccessors(schema, fileFormat));
public static DataConverter<?, ?> visit(Schema schema, RecordDataType recordDataType, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) {
return visit(schema, new RecordTypeWithFieldNameMapper(schema, recordDataType), new IcebergSchemaVisitor(),
new IcebergPartnerAccessors(schema, fileFormat, unmatchedColumnBehavior, logger));
}
@Override
@ -123,8 +132,10 @@ public class IcebergRecordConverter {
// set NiFi schema field names (sourceFieldName) in the data converters
for (DataConverter<?, ?> converter : converters) {
final Optional<String> mappedFieldName = recordType.getNameMapping(converter.getTargetFieldName());
final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
converter.setSourceFieldName(recordField.get().getFieldName());
if (mappedFieldName.isPresent()) {
final Optional<RecordField> recordField = recordSchema.getField(mappedFieldName.get());
converter.setSourceFieldName(recordField.get().getFieldName());
}
}
return new GenericDataConverters.RecordConverter(converters, recordSchema, type);
@ -144,10 +155,14 @@ public class IcebergRecordConverter {
public static class IcebergPartnerAccessors implements SchemaWithPartnerVisitor.PartnerAccessors<DataType> {
private final Schema schema;
private final FileFormat fileFormat;
private final UnmatchedColumnBehavior unmatchedColumnBehavior;
private final ComponentLog logger;
IcebergPartnerAccessors(Schema schema, FileFormat fileFormat) {
IcebergPartnerAccessors(Schema schema, FileFormat fileFormat, UnmatchedColumnBehavior unmatchedColumnBehavior, ComponentLog logger) {
this.schema = schema;
this.fileFormat = fileFormat;
this.unmatchedColumnBehavior = unmatchedColumnBehavior;
this.logger = logger;
}
@Override
@ -156,8 +171,25 @@ public class IcebergRecordConverter {
final RecordTypeWithFieldNameMapper recordType = (RecordTypeWithFieldNameMapper) dataType;
final Optional<String> mappedFieldName = recordType.getNameMapping(name);
Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
if (UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
Validate.isTrue(mappedFieldName.isPresent(), String.format("Cannot find field with name '%s' in the record schema", name));
}
if (mappedFieldName.isEmpty()) {
if (UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN.equals(unmatchedColumnBehavior)) {
if (logger != null) {
logger.warn("Cannot find field with name '" + name + "' in the record schema, using the target schema for datatype and a null value");
}
}
// If the field is missing, use the expected type from the schema (converted to a DataType)
final Types.NestedField schemaField = schema.findField(fieldId);
final Type schemaFieldType = schemaField.type();
if (schemaField.isRequired()) {
// Iceberg requires a non-null value for required fields
throw new IllegalArgumentException("Iceberg requires a non-null value for required fields, field: "
+ schemaField.name() + ", type: " + schemaFieldType);
}
return GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType);
}
final Optional<RecordField> recordField = recordType.getChildSchema().getField(mappedFieldName.get());
final RecordField field = recordField.get();

View File

@ -108,6 +108,14 @@ public class PutIceberg extends AbstractIcebergProcessor {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor UNMATCHED_COLUMN_BEHAVIOR = new PropertyDescriptor.Builder()
.name("unmatched-column-behavior")
.displayName("Unmatched Column Behavior")
.description("If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.")
.allowableValues(UnmatchedColumnBehavior.class)
.defaultValue(UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN.getValue())
.required(true)
.build();
static final PropertyDescriptor FILE_FORMAT = new PropertyDescriptor.Builder()
.name("file-format")
.displayName("File Format")
@ -178,6 +186,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
CATALOG,
CATALOG_NAMESPACE,
TABLE_NAME,
UNMATCHED_COLUMN_BEHAVIOR,
FILE_FORMAT,
MAXIMUM_FILE_SIZE,
KERBEROS_USER_SERVICE,
@ -256,8 +265,10 @@ public class PutIceberg extends AbstractIcebergProcessor {
final FileFormat format = getFileFormat(table.properties(), fileFormat);
final IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, flowFile.getId(), format, maximumFileSize);
taskWriter = taskWriterFactory.create();
final UnmatchedColumnBehavior unmatchedColumnBehavior =
UnmatchedColumnBehavior.valueOf(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format);
final IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), reader.getSchema(), format, unmatchedColumnBehavior, getLogger());
Record record;
while ((record = reader.nextRecord()) != null) {
@ -353,5 +364,4 @@ public class PutIceberg extends AbstractIcebergProcessor {
.retry(3)
.run(file -> table.io().deleteFile(file.path().toString()));
}
}

View File

@ -32,6 +32,8 @@ import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
import org.apache.nifi.processors.iceberg.catalog.TestHadoopCatalogService;
@ -79,10 +81,12 @@ public class TestDataFileActions {
);
private PutIceberg icebergProcessor;
private ComponentLog logger;
@BeforeEach
public void setUp() {
icebergProcessor = new PutIceberg();
logger = new MockComponentLogger();
}
@DisabledOnOs(WINDOWS)
@ -103,7 +107,7 @@ public class TestDataFileActions {
IcebergTaskWriterFactory taskWriterFactory = new IcebergTaskWriterFactory(table, new Random().nextLong(), FileFormat.PARQUET, null);
TaskWriter<Record> taskWriter = taskWriterFactory.create();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(table.schema(), abortSchema, FileFormat.PARQUET, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
for (MapRecord record : recordList) {
taskWriter.write(recordConverter.convert(record));

View File

@ -37,6 +37,8 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.mock.MockComponentLogger;
import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
@ -57,6 +59,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
@ -92,9 +95,12 @@ public class TestIcebergRecordConverter {
private OutputFile tempFile;
private ComponentLog logger;
@BeforeEach
public void setUp() throws Exception {
tempFile = Files.localOutput(createTempFile("test", null));
logger = new MockComponentLogger();
}
@AfterEach
@ -145,6 +151,24 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.required(1, "integer", Types.IntegerType.get()),
Types.NestedField.required(2, "float", Types.FloatType.get()),
Types.NestedField.required(3, "long", Types.LongType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
Types.NestedField.optional(9, "date", Types.DateType.get()),
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
@ -240,6 +264,24 @@ public class TestIcebergRecordConverter {
return new SimpleRecordSchema(fields);
}
private static RecordSchema getPrimitivesSchemaMissingFields() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
return new SimpleRecordSchema(fields);
}
private static RecordSchema getPrimitivesAsCompatiblesSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
@ -370,6 +412,29 @@ public class TestIcebergRecordConverter {
return new MapRecord(getPrimitivesSchema(), values);
}
private static Record setupPrimitivesTestRecordMissingFields() {
LocalDate localDate = LocalDate.of(2017, 4, 4);
LocalTime localTime = LocalTime.of(14, 20, 33);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
Map<String, Object> values = new HashMap<>();
values.put("string", "Test String");
values.put("double", 3.14159D);
values.put("decimal", new BigDecimal("12345678.12"));
values.put("boolean", true);
values.put("fixed", "hello".getBytes());
values.put("binary", "hello".getBytes());
values.put("date", localDate);
values.put("time", Time.valueOf(localTime));
values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
values.put("timestampTz", Timestamp.valueOf(localDateTime));
values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
values.put("choice", "10");
return new MapRecord(getPrimitivesSchemaMissingFields(), values);
}
private static Record setupCompatiblePrimitivesTestRecord() {
Map<String, Object> values = new HashMap<>();
@ -439,7 +504,7 @@ public class TestIcebergRecordConverter {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
@ -472,6 +537,54 @@ public class TestIcebergRecordConverter {
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
}
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testPrimitivesIgnoreMissingFields(FileFormat format) throws IOException {
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
Record record = setupPrimitivesTestRecordMissingFields();
MockComponentLogger mockComponentLogger = new MockComponentLogger();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
assertEquals("Test String", resultRecord.get(0, String.class));
assertNull(resultRecord.get(1, Integer.class));
assertNull(resultRecord.get(2, Float.class));
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0
UUID uuid = UUID.fromString("0000-00-00-00-000000");
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
}
// Test null values
for (String fieldName : record.getRawFieldNames()) {
@ -504,11 +617,81 @@ public class TestIcebergRecordConverter {
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testCompatiblePrimitives(FileFormat format) throws IOException {
public void testPrimitivesMissingRequiredFields(FileFormat format) {
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
MockComponentLogger mockComponentLogger = new MockComponentLogger();
assertThrows(IllegalArgumentException.class,
() -> new IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, mockComponentLogger));
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testPrimitivesWarnMissingFields(FileFormat format) throws IOException {
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
Record record = setupPrimitivesTestRecordMissingFields();
MockComponentLogger mockComponentLogger = new MockComponentLogger();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.WARNING_UNMATCHED_COLUMN, mockComponentLogger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, PRIMITIVES_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
assertEquals("Test String", resultRecord.get(0, String.class));
assertNull(resultRecord.get(1, Integer.class));
assertNull(resultRecord.get(2, Float.class));
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0
UUID uuid = UUID.fromString("0000-00-00-00-000000");
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
}
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testPrimitivesFailMissingFields(FileFormat format) throws IOException {
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
MockComponentLogger mockComponentLogger = new MockComponentLogger();
assertThrows(IllegalArgumentException.class,
() -> new IcebergRecordConverter(PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, mockComponentLogger));
}
@DisabledOnOs(WINDOWS)
@Test
public void testCompatiblePrimitives() throws IOException {
RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
Record record = setupCompatiblePrimitivesTestRecord();
final FileFormat format = PARQUET;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.FAIL_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
@ -536,21 +719,17 @@ public class TestIcebergRecordConverter {
assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(12, UUID.class));
}
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class));
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testStruct(FileFormat format) throws IOException {
@Test
public void testStruct() throws IOException {
RecordSchema nifiSchema = getStructSchema();
Record record = setupStructTestRecord();
final FileFormat format = FileFormat.ORC;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(STRUCT_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, STRUCT_SCHEMA, genericRecord, tempFile);
@ -574,13 +753,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testList(FileFormat format) throws IOException {
@Test
public void testList() throws IOException {
RecordSchema nifiSchema = getListSchema();
Record record = setupListTestRecord();
final FileFormat format = FileFormat.AVRO;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(LIST_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, LIST_SCHEMA, genericRecord, tempFile);
@ -593,7 +772,7 @@ public class TestIcebergRecordConverter {
assertEquals(1, resultRecord.size());
assertInstanceOf(List.class, resultRecord.get(0));
List nestedList = resultRecord.get(0, List.class);
List<?> nestedList = resultRecord.get(0, List.class);
assertEquals(2, nestedList.size());
assertInstanceOf(List.class, nestedList.get(0));
@ -604,13 +783,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testMap(FileFormat format) throws IOException {
@Test
public void testMap() throws IOException {
RecordSchema nifiSchema = getMapSchema();
Record record = setupMapTestRecord();
final FileFormat format = PARQUET;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(MAP_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, MAP_SCHEMA, genericRecord, tempFile);
@ -636,20 +815,21 @@ public class TestIcebergRecordConverter {
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testSchemaMismatch(FileFormat format) {
RecordSchema nifiSchema = getPrimitivesSchema();
RecordSchema nifiSchema = getPrimitivesSchemaMissingFields();
IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format));
assertTrue(e.getMessage().contains("Cannot find field with name 'FIELD1' in the record schema"), e.getMessage());
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> new IcebergRecordConverter(PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger));
assertTrue(e.getMessage().contains("Iceberg requires a non-null value for required fields"), e.getMessage());
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testCaseInsensitiveFieldMapping(FileFormat format) throws IOException {
@Test
public void testCaseInsensitiveFieldMapping() throws IOException {
RecordSchema nifiSchema = getCaseInsensitiveSchema();
Record record = setupCaseInsensitiveTestRecord();
final FileFormat format = FileFormat.AVRO;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(CASE_INSENSITIVE_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, CASE_INSENSITIVE_SCHEMA, genericRecord, tempFile);
@ -667,13 +847,13 @@ public class TestIcebergRecordConverter {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testUnorderedFieldMapping(FileFormat format) throws IOException {
@Test
public void testUnorderedFieldMapping() throws IOException {
RecordSchema nifiSchema = getUnorderedSchema();
Record record = setupUnorderedTestRecord();
final FileFormat format = PARQUET;
IcebergRecordConverter recordConverter = new IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format);
IcebergRecordConverter recordConverter = new IcebergRecordConverter(UNORDERED_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, UNORDERED_SCHEMA, genericRecord, tempFile);
@ -698,7 +878,7 @@ public class TestIcebergRecordConverter {
assertEquals("value5", resultRecord.get(2, String.class));
assertInstanceOf(Map.class, resultRecord.get(3));
Map map = resultRecord.get(3, Map.class);
Map<?,?> map = resultRecord.get(3, Map.class);
assertEquals("map value1", map.get("key1"));
assertEquals("map value2", map.get("key2"));
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@ -38,9 +39,8 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -102,14 +102,14 @@ public class TestPutIcebergWithHadoopCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException, IOException {
private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException, IOException {
TestHadoopCatalogService catalogService = new TestHadoopCatalogService();
IcebergCatalogFactory catalogFactory = new IcebergCatalogFactory(catalogService);
catalog = catalogFactory.create();
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
catalog.createTable(TABLE_IDENTIFIER, DATE_SCHEMA, spec, tableProperties);
@ -120,16 +120,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"avro", "orc", "parquet"})
public void onTriggerYearTransform(String fileFormat) throws Exception {
@Test
public void onTriggerYearTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.year("date")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.PARQUET);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@ -148,16 +147,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"avro", "orc", "parquet"})
public void onTriggerMonthTransform(String fileFormat) throws Exception {
@Test
public void onTriggerMonthTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.month("timestampMicros")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.ORC);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);
@ -177,16 +175,15 @@ public class TestPutIcebergWithHadoopCatalog {
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"avro", "orc", "parquet"})
public void onTriggerDayTransform(String fileFormat) throws Exception {
@Test
public void onTriggerDayTransform() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(DATE_SCHEMA)
.day("timestampMicros")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "default");
runner.setProperty(PutIceberg.TABLE_NAME, "date");
runner.setValidateExpressionUsage(false);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.iceberg;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@ -44,10 +45,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@ -122,10 +122,10 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
private void initCatalog(PartitionSpec spec, String fileFormat) throws InitializationException {
private void initCatalog(PartitionSpec spec, FileFormat fileFormat) throws InitializationException {
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
TestHiveCatalogService catalogService = new TestHiveCatalogService.Builder()
.withMetastoreUri(metastore.getThriftConnectionUri())
@ -143,16 +143,15 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.CATALOG, "catalog-service");
}
@ParameterizedTest
@ValueSource(strings = {"avro"})
public void onTriggerPartitioned(String fileFormat) throws Exception {
@Test
public void onTriggerPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.bucket("department", 3)
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@ -181,16 +180,15 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
@ParameterizedTest
@ValueSource(strings = {"orc"})
public void onTriggerIdentityPartitioned(String fileFormat) throws Exception {
@Test
public void onTriggerIdentityPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.identity("department")
.build();
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.ORC);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@ -219,9 +217,8 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
@ParameterizedTest
@ValueSource(strings = {"parquet"})
public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) throws Exception {
@Test
public void onTriggerMultiLevelIdentityPartitioned() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
.identity("name")
.identity("department")
@ -229,7 +226,7 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(spec, fileFormat);
initCatalog(spec, FileFormat.PARQUET);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
@ -262,12 +259,11 @@ public class TestPutIcebergWithHiveCatalog {
assertProvenanceEvents();
}
@ParameterizedTest
@ValueSource(strings = {"avro"})
public void onTriggerUnPartitioned(String fileFormat) throws Exception {
@Test
public void onTriggerUnPartitioned() throws Exception {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
initCatalog(PartitionSpec.unpartitioned(), fileFormat);
initCatalog(PartitionSpec.unpartitioned(), FileFormat.AVRO);
runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "${catalog.name}");
runner.setProperty(PutIceberg.TABLE_NAME, "${table.name}");
runner.setProperty(PutIceberg.MAXIMUM_FILE_SIZE, "${max.filesize}");